目 录CONTENT

文章目录

CyclicBarrier - 同步屏障

FatFish1
2024-10-23 / 0 评论 / 0 点赞 / 57 阅读 / 0 字 / 正在检测是否收录...

含义

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞直到最后一个线程到达屏障时,屏障才会开门所有被屏障拦截的线程才会继续运行

CountDownLatch的区别

  • CountDownLatch只能用一次,CyclicBarrier可以使用reset()方法重置

  • CountDownLatch强调的是一等多,即一个线程阻塞,等一堆子线程执行完,CyclicBarrier强调的是多等一,即多个线程阻塞,等最后一个线程执行完再一起执行

CyclicBarrier源码分析

核心成员变量

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

基于AQS的ReentrantLock和Condition

private int count;

计数

private Generation generation = new Generation();

代际标志,因为CyclicBarrier是可以重置的,因此需要一个generation对象标志是不是还在这一代内。

构造函数

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    ……
    this.barrierCommand = barrierAction;
}

提供了两个构造函数,一个带Runnable,一个不带

将Runnable类型对象barrierAction给到barrierCommand变量,用于后续激活阻塞线程,如果有这个变量,优先激活barrierAction,用于一些特殊处理

await/dowait - 阻塞方法

int index = --count;
if (index == 0) {  // tripped
    Runnable command = barrierCommand;
    if (command != null) {
        try {
            command.run();
        } catch (Throwable ex) {
            breakBarrier();
            throw ex;
        }
    }
    nextGeneration();
    return 0;
}

可以看到,每个线程执行await时,都会让计数index减1,直到最后一个线程过来,index减为0,执行if语句段的内容

if语句内其实是调用barrierCommand的run方法,因此由最后一个到达屏障的线程执行barrierCommand,这里有两个结果

  • 如果barrierCommand抛异常了,调用breakBarrier方法,唤醒前面阻塞的线程,同时重置屏障计数,同时将当前这一个generation设置为broken

  • 如果barrierCommand正常完成,调用nextGeneration方法,更新代际标志,重置计数器,唤醒前面阻塞的线程

 如果来的不是最后一个线程呢,又会做什么?

跳过上面的if语句,执行阻塞流程:

for (;;) {
    try {
        if (!timed)
            trip.await();
        else if (nanos > 0L)
            nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
        // part1.
        if (g == generation && ! g.broken) {
            breakBarrier();
            throw ie;
        } else {
            // We're about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
        }
    }
    // part2.
    if (g.broken)
        throw new BrokenBarrierException();
    if (g != generation)
        return index;
    if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
    }
}

因为刚刚判断count非0,因此自己不是最后一个到达屏障的,先执行trip.await()阻塞自己,并且释放锁。等最后一个线程来了唤醒所有线程,拿到锁,继续跑,如果正常没有InterruptedException,走到part2

if (g.broken)
    throw new BrokenBarrierException();
if (g != generation)
    return index;

首先判断自己所处的代际是不是已经被改成broken了,如果是,自己也不处理了,直接抛异常;如果不是,判断当前generation是不是和自己是一个代际,如果不是,说明最后一个线程已经跑完了开始执行下一个代际了,自己就正常返回,结束阻塞即可

如果自己在阻塞过程中出现了InterruptedException,走到part1

if (g == generation && ! g.broken) {
    breakBarrier();
    throw ie;
} else {
    Thread.currentThread().interrupt();
}

这里看下自己所处的代际是不是当前代际,且自己所处的代际不能broken,这时判断是因为自己导致的当前代际失败,因此要主动调用breakBarrier方法,释放broken信号。否则,直接中断自己就可以了。

注意以下场景:

// 场景1
CyclicBarrier cb = new CyclicBarrier(2)
new Thread({ cb.await() }).start();
cb.await()
// 场景2
CyclicBarrier cb = new CyclicBarrier(2, new Runnable({}))
new Thread({ cb.await() }).start();
cb.await()

场景1中,CyclicBarrier没有action,因此最后一个到达屏障的线程直接唤醒前面阻塞的,主线程和子线程谁先谁后,完全由CPU分配时间片决定

场景2中,有action,最后一个达到屏障的线程需要执行Runnable中的方法,具体谁最后一个达到,由CPU分配时间片决定,因此最终顺序是Runnable最先完成,然后最后到达的线程执行完,然后第一个到达的执行完成

0

评论区