转载

Java并发—— CountDownLatch与CyclicBarrier

CountDownLatch闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门 一直是关闭的 ,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态, 门永远保持打开状态

CountDownLatch实现原理

CountDownLatch通过内部类Sync实现方法,sync继承AQS重写模板中的方法。sync内部定义:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
         
        Sync(int count) {
            setState(count);
        }
        /**
         * 获取同步状态
         */
        int getCount() {
            return getState();
        }
        /**
         * 获取同步状态
         */
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        /**
         * 释放同步状态
         */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
复制代码

从源码中重写的方法可以得知,CountDownLatch中的sync采用 共享模式 。CountDownLatch示例:

public class TestHarness {
     
    public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
         
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        startGate.await();
                        try {
                            System.out.println(Thread.currentThread().getName() + "开始执行");
                            task.run();
                        } finally {
                            endGate.countDown();
                            System.out.println(Thread.currentThread().getName() + "执行结束");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            t.start();
        }
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        System.out.println("所有线程执行完毕,耗时:" + (end-start));
        return end - start;
    }
         
    public static void main(String[] args) throws InterruptedException {
        System.out.println(timeTasks(10, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "————————work");
            }
        }));
    }
}
复制代码

运行结果:

Thread-0开始执行
Thread-3开始执行
Thread-0————————work
Thread-0执行结束
Thread-1开始执行
Thread-2开始执行
Thread-7开始执行
Thread-7————————work
Thread-7执行结束
Thread-9开始执行
Thread-9————————work
Thread-9执行结束
Thread-8开始执行
Thread-8————————work
Thread-8执行结束
Thread-2————————work
Thread-2执行结束
Thread-6开始执行
Thread-1————————work
Thread-6————————work
Thread-6执行结束
Thread-5开始执行
Thread-5————————work
Thread-5执行结束
Thread-3————————work
Thread-3执行结束
Thread-4开始执行
Thread-1执行结束
Thread-4————————work
Thread-4执行结束
所有线程执行完毕,耗时:2794976
2794976
复制代码

CyclicBarrier

相对于CountDownLatch是 一次性对象,一旦进入终止状态,就不能被重置 ,CyclicBarrier可以反复使用。CyclicBarrier类似于闭锁,与 闭锁的关键区别在于,闭锁用于等待事件,栅栏用于等待其他线程 ,其作用是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier实现原理

CyclicBarrier构造方法

public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
复制代码

参数parties指栅栏拦截的线程数量

参数barrierAction指当这些线程都到达栅栏时优先会执行的线程

await()方法

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            final Generation g = generation;
            // 若栅栏处于断开状态,抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 若线程中断,断开CyclicBarrier
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            int index = --count;
            // count为0表明所有线程到达栅栏位置
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 若初始化时指定了所有线程到达栅栏时的任务,执行它 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 唤醒所有等待线程,开始新的generation
                    nextGeneration();
                    return 0;
                } finally {
                    // 若任务执行异常,断开CyclicBarrier
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 循环所有线程到达栅栏或栅栏断开或线程中断或超时
            for (;;) {
                try {
                    // 一直等待
                    if (!timed)
                        trip.await();
                    // 限时等待    
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 若线程中断且栅栏没有断开,断开CyclicBarrier
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                 
                if (g.broken)
                    throw new BrokenBarrierException();
                
                if (g != generation)
                    return index;
                // 若等待超时,断开CyclicBarrier
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
复制代码

其主要逻辑:若有线程未到达栅栏位置,到达栅栏位置的线程一直等待状态,直至发生以下场景:

①. 所有线程都到达栅栏位置

②. 有线程被中断

③. 线程等待超时

④. 有线程调用reset()方法,断开当前栅栏,将栅栏重置为初始状态

reset方法:

public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 断开当前栅栏
            breakBarrier();   // break the current generation
            // 开始新的generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
复制代码

CyclicBarrier示例

public class CyclicBarrierTest {
     
    private static CyclicBarrier cyclicBarrier;
     
    static class CyclicBarrierThread extends Thread{
        public void run() {
            System.out.println("运动员:" + Thread.currentThread().getName() + "到场");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
     
    public static void main(String[] args){
        cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("运动员全部到齐,比赛开始");
            }
        });
         
        for(int i = 0 ; i < 5 ; i++){
            new CyclicBarrierThread().start();
        }
    }

}
复制代码

运行结果:

运动员:Thread-0到场
运动员:Thread-1到场
运动员:Thread-2到场
运动员:Thread-3到场
运动员:Thread-4到场
运动员全部到齐,比赛开始
复制代码

CountDownLatch与CyclicBarrier区别

①.CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次

②.CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断

③.CountDownLatch倾向于一个线程等多个线程,CyclicBarrier倾向于多个线程互相等待

原文  https://juejin.im/post/5b87f575e51d45389005bca9
正文到此结束
Loading...