转载

【J2SE】java并发基础

并发简述

并发通常是用于提高运行在 单处理器 上的程序的性能。在单 CPU 机器上使用多任务的程序在任意时刻只在执行一项工作。

并发编程使得一个程序可以被划分为多个分离的、独立的任务。一个线程就是在进程中的一个单一的顺序控制流。 java的线程机制是抢占式。

线程的好处是提供了轻量级的执行上下文切换,只改变了程序的执行序列和局部变量。

多线程的主要缺陷:<!-- java编程思想 -->

  1. 等待共享资源的时候性能降低。
  2. 需要处理线程的额外 CPU 花费。
  3. 糟糕的程序设计导致不必要的复杂度。
  4. 有可能产生一些病态行为,若饿死、竞争、死锁和活锁。
  5. 不同平台导致的不一样。

volatile关键字

源来:

当程序运行,JVM会为每一个线程分配一个独立的缓存用于提高执行效率,每一个线程都在自己独立的缓存中操作各自的数据。一个线程在缓冲中对数据进行修改,写入到主存后,其他线程无法得知数据已被更改,仍在操作缓存中已过时的数据,为了解决这个问题,提供了volatile关键字,实现内存可见,一旦主存数据被修改,便致使其他线程缓存数据行无效,强制前往主存获取新数据。

Example:内存不可见,导致主线程无法结束。

class ThreadDemo implements Runnable {
    //添加volatile关键字可实现内存可见性 public volatile boolean flag = false;
    public boolean flag = Boolean.false;
    
    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
        }
        flag = Boolean.true;
        System.out.println("ThreadDemo over");
    }
    
    public boolean isFlag() {
        return flag;
    }
}

public class TestVolatile {

    public static void main(String[] args) {
        ThreadDemo demo = new ThreadDemo();
        new Thread(demo).start();
        
        while (true) {
            if (demo.flag || demo.isFlag()) {
                System.out.println("Main over");
                break;
            }
        }
    }
}/*output:打印ThreadDemo over,主线程持续循环*/

作用:

当多个线程操作共享数据时,保证内存中的数据可见性。采用底层的内存栅栏,及时的将缓存中修改的数据刷新到主存中,并导致其他线程所缓存的数据无效,使得这些线程必须去主存中获取修改的数据。

优缺点:

  • 保证内存可见性,让各个线程能够彼此获取最新的内存数据。
  • 较传统synchronized加锁操作提高了效率,若有线程正在操作被synchronized修饰的代码块数据时,其他线程试图进行操作,发现已被其他线程占用,试图操作的线程必须挂起,等到下一次继续尝试操作。
  • 对volatile修饰的数据被修改后,其他线程必须前往主存中读取,若修改频繁,需要不断读取主存数据,效率将会降低。
  • 使用volatile,底层采用内存栅栏,JVM将不会对其提供指令重排序及其优化。
  • 不具备互斥性。多个线程可以同时对数据进行操作,只是由原来的在缓存操作转变成了直接在主存中操作。(synchronized是互斥的,一个线程正在执行,其他线程必须挂起等待)
  • 不保证变量的原子性。使用volatile仅仅是一个能保证可见性的轻量级同步策略。

原子变量与 CAS 算法

Example:使用volatile修饰,number自增问题。

class ThreadDemo implements Runnable {
    public volatile int number = 0;
    
    @Override
    public void run() {
        try {
            Thread.sleep(200);
        } catch (Exception e) {
        }
        System.out.print(getIncrementNumber() + " ");
    }
    
    public int getIncrementNumber() {
        return ++number;
    }
}

public class TestAtomic {
    public static void main(String[] args) {
        ThreadDemo demo = new ThreadDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(demo).start();
        }
    }
}/*output: 1 5 4 7 3 9 2 1 8 6 */
//    ++number底层原理思想
int temp = number;        // ①
number = number + 1;    // ②
temp = number;            // ③
return temp;            // ④

由 ++number 可知,返回的是 temp 中存储的值,且自增是一个多步操作,当多个线程调用 incrementNumber方法时,方法去主存中获取 number 值放入 temp 中,根据 CPU 时间片切换,当 A 线程完成了 ③ 操作时,时间片到了被中断,A 线程开始执行 ① 时不幸被中断,接着 A 获取到了CPU执行权,继续执行完成 ④ 操作更新了主存中的值,紧接着 B 线程开始执行,但是 B 线程 temp中存储的值已经过时了。 注意:自增操作为四步,只有在第四步的时候才会刷新主存的值,而不是number = number + 1 操作就反映到主存中去。 如图所示:

【J2SE】java并发基础

源来:

volatile只能保证内存可见性,对多步操作的变量,无法保证其原子性,为了解决这个问题,提供了原子变量。

作用:

原子变量既含有volatile的内存可见性,又提供了对变量原子性操作的支持,采用底层硬件对并发操作共享数据的 CAS(Compare-And-Swap)算法,保证数据的原子性。

提供的原子类:

描述
AtomicBoolean 一个 boolean 值可以用原子更新。
AtomicInteger 可能原子更新的 int 值。
AtomicIntegerArray 一个 int 数组,其中元素可以原子更新。
AtomicIntegerFieldUpdater <T> 基于反射的实用程序,可以对指定类的指定的 volatile int 字段进行原子更新。
AtomicLong 一个 long 值可以用原子更新。
AtomicLongArray 可以 long 地更新元素的 long 数组。
AtomicLongFieldUpdater <T> 基于反射的实用程序,可以对指定类的指定的 volatile long 字段进行原子更新。
AtomicMarkableReference <V> AtomicMarkableReference 维护一个对象引用以及可以原子更新的标记位。
AtomicReference <V> 可以原子更新的对象引用。
AtomicReferenceArray <E> 可以以原子方式更新元素的对象引用数组。
AtomicReferenceFieldUpdater <T,V> 一种基于反射的实用程序,可以对指定类的指定的 volatile volatile引用原子更新。
AtomicStampedReference <V> AtomicStampedReference 维护对象引用以及可以原子更新的整数“印记”。
DoubleAccumulator 一个或多个变量一起维护使用提供的功能更新的运行的值 double
DoubleAdder 一个或多个变量一起保持初始为零 double 和。
LongAccumulator 一个或多个变量,它们一起保持运行 long 使用所提供的功能更新值。
LongAdder 一个或多个变量一起保持初始为零 long 总和。

CAS算法:

CAS(Compare-And-Swap)是底层硬件对于原子操作的一种算法,其包含了三个操作数:内存值(V),预估值(A),更新值(B)。当且仅当 V == A 时, 执行 V = B 操作;否则不执行任何结果。这里需要注意, A 和 B 两个操作数是原子性的,同一时刻只能有一个线程进行AB操作。

【J2SE】java并发基础

优缺点:

  • 操作失败时,直接放弃结果,并不释放对CPU的控制权,进而可以继续尝试操作,不必挂起等待。(synchronized会让出CPU)
  • 当多个线程并发的对主存中的数据进行操作时,有且只有一个会成功,其余均失败。
  • 原子变量中封装了用于对数据的原子操作,简化了代码的编写。

Collection并发类

HashMap 与 HashTable简述

HashMap是线程不安全的,而HashTable是线程安全的,因为HashTable所维护的Hash表存在着独占锁,当多个线程并发访问时,只能有一个线程可进行操作,但是对于复合操作时,HashTable仍然存在线程安全问题,不使用HashTable的主要原因还是效率低下。

// 功能:不包含obj,则添加
if (!hashTable.contains(obj)) {
    // 复合操作,执行此处时线程中断,obj被其他线程添加至容器中,此处继续执行将导致重复添加
    hashTable.put(obj);
}

可知上述两个操作需要 “原子性”,为了达到效果,还不是得对代码块进行同步

ConcurrentHashMap

采用锁分段机制,分为 16 个段(并发级别),每一个段下有一张表,该表采用链表结构链接着各个元素,每个段都使用独立的锁。当多个线程并发操作的时候,根据各自的级别不同,操作不同的段,多个线程并行操作,明显提高了效率,其次还提供了复合操作的诸多方法。 注:jdk1.8由原来的数组+单向链表结构转换成数据+单向链表+红黑树结构。

ConcurrentSkipListMap和ConcurrentSkipListSet

有序的哈希表,通过跳表实现,不允许null作为键或值。 ConcurrentSkipListMap详解

CopyOnWriteArrayList 和 CopyOnWriteArraySet

对collection进行写入操作时,将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全的执行。当修改完成时,一个原子性的操作将把心的数组换人,使得新的读取操作可以看到新的修改。<!--Java编程思想-->

好处之一是当多个迭代器同时遍历和修改列表时,不会抛出ConcurrentModificationException。

小结:

  • 当期望许多线程访问一个给定 collection 时,ConcurrentHashMap 通常优于同步的 HashMap
  • ConcurrentSkipListMap 通常优于同步的 TreeMap。
  • 当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的 ArrayList。
  • 并发迭代操作多时,可选择CopyOnWriteArrayList 和 CopyOnWriteArraySet。
  • 高并发情况下,可选择ConcurrentSkipListMap和ConcurrentSkipListSet

CountDownLatch闭锁

源由:

当一个修房子的 A 线程正在执行,需要砖头时,开启了一个线程 B 去拉砖头,此时 A 线程需要等待 B 线程的结果后才能继续执行时,但是线程之间都是并行操作的,为了解决这个问题,提供了CountDownLatch。

作用:

一个同步辅助类,为了保证执行某些操作时,“所有准备事项都已就绪”,仅当某些操作执行完毕后,才能执行后续的代码块,否则一直等待。

CountDownLatch中存在一个锁计数器,如果锁计数器不为 0 的话,它会阻塞任何一个调用 await() 方法的线程。也就是说,当一个线程调用 await() 方法时,如果锁计数器不等于 0,那么就会一直等待锁计数器为 0 的那一刻,这样就解决了需要等待其他线程执行完毕才执行的需求。

Example:

class ThreadDemo implements Runnable {
    private CountDownLatch latch = null;

    public ThreadDemo(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            System.out.println("execute over");
        } finally {
            latch.countDown();    // 必须保证计数器减一
        }
    }
}
public class TestCountDownLatch {

    public static void main(String[] args) {
        final int count = 10;
        final CountDownLatch latch = new CountDownLatch(count);
        ThreadDemo demo = new ThreadDemo(latch);
        for (int i = 0; i < count; ++i) {
            new Thread(demo).start();
        }
        
        try {
            latch.await();    // 等待计数器为 0
            System.out.println("其他线程结束,继续往下执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}/**output:
    execute over
    ...
    其他线程结束,继续往下执行...
*/

细节:

  • 子线程完毕后,必须调用 countDown() 方法使得 锁计数器减一,否则将会导致调用 await() 方法的线程持续等待,尽可能的放置在 finally 中。
  • 锁计数器的个数与子线程数最好相等,只要计数器等于 0,不论是否还存在子线程,await() 方法将得到响应,继续执行后续代码。

Callable接口

源由:

当开启一个线程执行运算时,可能会需要该线程的计算结果,之前的 implements Runnableextends Thread 的 run() 方法并没有提供可以返回的功能,因此提供了 Callable接口。 Callable 的运行结果, 需要使用 FutureTask 类来接受。

Example:

class ThreadDemo implements Callable<Integer> {
    private Integer cycleValue;
    
    public ThreadDemo(Integer cycleValue) {
        this.cycleValue = cycleValue;
    }
    
    @Override
    public Integer call() throws Exception {
        int result = 0;
        for (int i=0; i<cycleValue; ++i) {
            result += i;
        }
        return result;
    }
    
}
public class TestCallable {

    public static void main(String[] args) throws Exception {
        ThreadDemo demo = new ThreadDemo(Integer.MAX_VALUE);
        //    使用FutureTask接受结果
        FutureTask<Integer> task = new FutureTask<>(demo);
        new Thread(task).start();
        
        Integer result = task.get();    // 等待计算结果返回, 闭锁
        System.out.println(result);
    }
}/*output:1073741825 */

Lock同步锁和Condition线程通信控制对象

Lock: 在进行性能测试时,使用Lock通常会比使用synchronized要高效许多,并且synchronized的开销变化范围很大,而Lock相对稳定。只有在性能调优时才使用Lock对象。<!--Java编程思想-->

Condition:替代了 Object 监视器方法的使用,描述了可能会与锁有关的条件标量,相比 Object 的 notifyAll() ,Condition 的 signalAll() 更安全。Condition 实质上被绑定到一个锁上,使用newCondition() 方法为 Lock 实例获取 Condition。

Lock和Condition对象只有在困难的多线程问题中才是必须的。 <!--Java编程思想-->

synchonized与Lock的区别:

synchonized Lock
隐式锁 显示锁
JVM底层实现,由JVM维护 由程序员手动维护
灵活控制(也有风险)

“虚假唤醒”:当一个线程A在等待时,被另一个线程唤醒,被唤醒的线程不一定满足了可继续向下执行的条件,如果被唤醒的线程未满足条件,而又向下执行了,那么称这个现象为 “虚假唤醒”。

//    安全的方式,保证退出等待循环前,一定能满足条件
while (条件) {
    wait();
}

Example:生产消费者<!--参考Java编程思想 P712-->

// 产品car
class Car {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private boolean available = false; // false:无货;true有货

    public void put(){
        lock.lock();
        try {
            while (available) {        // 有货等待
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + "put():    进货");
            available = true;
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        lock.lock();
        try {
            while (!available) {    // 无货等待
                condition.await();
            }
            System.out.println(Thread.currentThread().getName() + "get():出货");
            available = false;
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}
// 消费者
class Consume implements Runnable {
    private Car car;
    
    public Consume(Car car) {
        this.car = car;
    }

    @Override
    public void run() {
        for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; ++i) {
            car.get();
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }
        }
    }
    
}
// 生产者
class Produce implements Runnable {
    private Car car;
    
    public Produce(Car car) {
        this.car = car;
    }
    
    @Override
    public void run() {
        for (int i=0; i<TestProduceAndConsume.LOOP_SIZE; i++) {
            car.put();
        }
    }
    
}
public class TestProduceAndConsume {
    public static final int LOOP_SIZE = 10;
    
    public static void main(String[] args) {
        Car car = new Car();
        for (int i=0; i<5; ++i) {
            Consume consume = new Consume(car);
            Produce produce = new Produce(car);
            new Thread(consume, i + "--").start();
            new Thread(produce, i + "--").start();
        }
    }
    
}

每一个 对 lock() 的调用都必须紧跟着一个 try-finally 子句,用以保证可以在任何情况下都能释放锁,任务在调用 await()signal()signalAll() 之前,必须拥有锁。

lock.lock();
try {
    ...    // 业务代码
} finally {
    lock.unlock();
}

ReadWriteLock读写锁

源由:

上述讲解的锁都是读写一把锁,不论是读或写,都是一把锁解决,当多线程访问数据时,若发生了一千次操作,其中的写操作只执行了一次,数据的更新率非常低,那么每次进行读操作时,都要加锁读取”不会更改的“数据,显然是不必要的开销,因此出现了 ReadWriteLock 读写锁,该对象提供读锁和写锁。

作用:

ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 write写入操作,那么多个线程可以同时进行持有读锁。而写入锁是独占的,当执行写操作时,其他线程不可写,也不可读。

性能的提升取决于读写操作期间读取数据相对于修改数据的频率,如果读取操作远远大于写入操作时,便能增强并发性。

Example:

class Demo {
    private int value = 0;
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    
    public void read() {
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " : " + value);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    public void write(int value) {
        lock.writeLock().lock();
        try {
            this.value = value;
            System.out.println("write(" + value + ")");
        } finally {
            lock.writeLock().unlock();
        }
    }
}
class ReadLock implements Runnable {
    private Demo demo = null;
    
    public ReadLock(Demo demo) {
        this.demo = demo;
    }
    
    @Override
    public void run() {
        for (int i=0; i<20; ++i) {
            demo.read();
            try {
                Thread.sleep(320);
            } catch (InterruptedException e) {
            }
        }
    }
    
}
class WriteLock implements Runnable {
    private Demo demo = null;
    
    public WriteLock(Demo demo) {
        this.demo = demo;
    }
    
    @Override
    public void run() {
        for (int i=0; i<10; ++i) {
            demo.write(i);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
            }
        }
    }
    
}
public class TestReadWriteLock {
    
    public static void main(String[] args) {
        Demo demo = new Demo();
        ReadLock readLock = new ReadLock(demo);
        WriteLock writeLock = new WriteLock(demo);
        for (int i=0; i<3; ++i) {
            new Thread(readLock, i + "--").start();
        }
        new Thread(writeLock).start();
    }
}/**output:
    0-- : 0
    1-- : 0
    2-- : 0
    write(0)
    write(1)
    1-- : 1
    2-- : 1
    0-- : 1
    write(2)
    write(3)
    1-- : 3
    0-- : 3
    ...
*/

线程池与线程调度

源来:

在传统操作中(如连接数据库),当我们需要使用一个线程的时候,就 直接创建一个线程,线程完毕后被垃圾收集器回收。每一次需要线程的时候,不断的创建与销毁,大大增加了资源的开销。

作用:

线程池维护着一个线程队列,该队列中保存着所有等待着的线程,避免了重复的创建与销毁而带来的开销。

体系结构:

Execuotr:负责线程的使用与调度的根接口。
    |- ExecutorService:线程池的主要接口。
        |- ForkJoinPool:采用分而治之技术将任务分解。
        |- ThreadPoolExecutor:线程池的实现类。
        |- ScheduledExecutorService:负责线程调度的子接口。
            |- ScheduledThreadPoolExecutor:负责线程池的调度。继承ThreadPoolExecutor并实现ScheduledExecutorService接口

Executors 工具类API描述:

方法 描述
ExecutorService newFixedThreadPool(int nThreads) 创建一个可重用固定数量的无界队列线程池。使用了有限的线程集来执行所提交的所有任务。创建的时候可以一次性预先进行代价高昂的线程分配。
ExecutorService newWorkStealingPool(int parallelism) 创建一个维护足够的线程以支持给定的parallelism并行级别的线程池。
ExecutorService newSingleThreadExecutor() 创建一个使用单个线程运行的无界队列的执行程序。
ExecutorService newCachedThreadPool() 创建一个根据需要创建新线程的线程池,当有可用线程时将重新使用以前构造的线程。
ScheduledExecutorService newSingleThreadScheduledExecutor() 创建一个单线程执行器,可以调度命令在给定的延迟之后运行,或定期执行。
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。
ThreadFactory privilegedThreadFactory() 返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。

补充:

ExecutorService.shutdown():防止新任务被提交,并继续运行被调用之前所提交的所有任务,待任务都完成后退出。

CachedThreadPoo在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,是Executor的首选。仅当这个出现问题时,才需切换 FixedThreadPool。

SingleThreadExecutor: 类似于线程数量为 1 的FixedThreadPool,但它提供了不会存在两个及以上的线程被并发调用的并发。

Example:线程池

public class TestThreadPool {

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 10; ++i) {
            Future<String> future = pool.submit(new Callable<String>() {

                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
                
            });

            String threadName = future.get();
            System.out.println(threadName);
        }
        pool.shutdown();    // 拒绝新任务并等待正在执行的线程完成当前任务后关闭。
    }
}/**output:
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2
    ...
*/

Example:线程调度

public class TestThreadPool {

    public static void main(String[] args) throws Exception {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        for (int i = 0; i < 5; ++i) {
            ScheduledFuture<String> future = pool.schedule(new Callable<String>() {
    
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName() + " : " + Instant.now();
                }
            }, 1, TimeUnit.SECONDS);    // 延迟执行单位为 1秒的任务
            
            String result = future.get();
            System.out.println(result);
        }
        pool.shutdown();
    }
}/**output:
    pool-1-thread-1 : 2019-03-18T12:10:31.260Z
    pool-1-thread-1 : 2019-03-18T12:10:32.381Z
    pool-1-thread-2 : 2019-03-18T12:10:33.382Z
    pool-1-thread-1 : 2019-03-18T12:10:34.383Z
    pool-1-thread-2 : 2019-03-18T12:10:35.387Z
*/

<span style="color: red">注意:若没有执行 shutdown() 方法,则线程会一直等待而不停止。</span>

ForkJoinPool分支/合并框架

源由:

在一个线程队列中,假如队头的线程由于某种原因导致了阻塞,那么在该队列中的后继线程需要等待队头线程结束,只要队头一直阻塞,这个队列中的所有线程都将等待。此时,可能其他线程队列都已经完成了任务而空闲,这种情况下,就大大减少了吞吐量。

ForkJoin的“工作窃取”模式:

当执行一个新任务时,采用分而治之的思想,将其分解成更小的任务执行,并将分解的任务加入到线程队列中,当某一个线程队列没有任务时,会随机从其他线程队列中“偷取”一个任务,放入自己的队列中执行。

Example:

// 求次方: value为底,size为次方数
class CountPower extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    public Long value = 0L;
    public int size = 0;
    public static final Long CRITICAL = 10L;     // 阈值
    
    public CountPower(Long value, int size) {
        this.value = value;
        this.size = size;
    }

    @Override
    protected Long compute() {
        // 当要开方的此时 小于 阈值,则计算 (视为最小的任务单元)
        if(size <= CRITICAL) {
            Long sum = 1L;
            for (int i=0; i<size; ++i) {
                sum *= value;
            }
            return sum;
        } else {
            int mid = size / 2;
            // 拆分任务,并压入线程队列
            CountPower leftPower = new CountPower(value, mid);
            leftPower.fork();
            
            CountPower rightPower = new CountPower(value, size - mid);
            rightPower.fork();
            
            // 将当前两个任务返回的执行结果再相乘
            return leftPower.join() * rightPower.join();
        }
    }
    
}
public class TestForkJoinPool {

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool();
        CountPower task = new CountPower(2L, 11);
        Long result = pool.invoke(task);
        System.out.println(result);
    }
}/**output: 2048*/

根据分而治之的思想进行分解,需要一个结束递归的条件,该条件内的代码就是被分解的最小单元。使用 fork()在当前任务正在运行的池中异步执行此任务,即将该任务压入线程队列。调用 join()`返回计算结果。RecursiveTask是有返回值的task,RecursiveAction则是没有返回值的。

参考

  • 尚硅谷JUC视频教程
  • 《java编程思想》第 21 章 并发
原文  https://segmentfault.com/a/1190000018564479
正文到此结束
Loading...