目 录CONTENT

文章目录

Exchanger - 线程通信器

FatFish1
2024-10-23 / 0 评论 / 0 点赞 / 70 阅读 / 0 字 / 正在检测是否收录...

Exchanger是什么

Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

几个关键点:

  • Exchanger必须是两两一组互相阻塞,比如第1次执行的等第2次执行的,第3次执行的等第4次的

  • Exchanger只能是两个线程交互,不能有多个参与

使用案例

public class MyTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A"; // A录入银行流水数据
                    String exchange = exgr.exchange(A);
                    System.out.println(exchange);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水B"; // B录入银行流水数据
                    String A = exgr.exchange("B");
                    System.out.println(A);
                    System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
                            + A + ",B录入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
    }
}

线程1、2分别接收对方的消息

源码

核心成员变量

private final Participant participant;

static final class Participant extends ThreadLocal<Node>

participant继承的ThreadLocal,里面塞的是Node

@jdk.internal.vm.annotation.Contended static final class Node {
    ……
    Object item;            // This thread's current item
    volatile Object match;  // Item provided by releasing thread
    volatile Thread parked; // Set to this thread when parked, else null
}

Node里面存的是消息内容

private static final VarHandle SLOT;
 
SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class);

类变量SLOT是做线程间数据传递的载体,通过VarHandle的能力控制其CAS特性,同时映射到Exchanger对象的中Node对象中的slot上

构造函数

public Exchanger() {
    participant = new Participant();
}

只有一个无参构造,因此Exchanger只能是两个线程传递

exchange - 交互方法

第一部分,取值:

for (Node q;;) {
    if ((q = slot) != null) {
        if (SLOT.compareAndSet(this, q, null)) {  // part1
            Object v = q.item;
            q.match = item;
            Thread w = q.parked;
            if (w != null)
                LockSupport.unpark(w);
            return v;
        }
        // create arena on contention, but continue until slot null
        if (NCPU > 1 && bound == 0 &&
            BOUND.compareAndSet(this, 0, SEQ))
            arena = new Node[(FULL + 2) << ASHIFT];
    }
    else if (arena != null)
        return null; // caller must reroute to arenaExchange
    else {   // part2
        p.item = item;
        if (SLOT.compareAndSet(this, null, p))  // 这里操作成功,说明SLOT还没有值
            break;
        p.item = null;
    }
}

这个自旋是第一部分,取值部分

因为初始状态SLOT、arena都是null,直接走到part2部分,走一个CAS操作设置SLOT的值,如果这里设置成功了,直接break,跳出取值自旋,如果这里设置失败了,重新自旋回到part1,开始消费。

消费的时候要先把SLOT设置回null状态,意味着又可以有新线程发事件了

消费的时候,取出历史数据,然后把在exhanger对象中记录的park线程给唤醒,即意味着消费了这个线程发的消息。在消费的时候,设置q.match=item,这里实际上是通知了消息对端,我消费了你,你也要消费我

第二部分,阻塞等待对端通知:

while ((v = p.match) == null) {
    if (spins > 0) {  // part3
    ……
}
    else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) {
        p.parked = t;
        if (slot == p) {
            if (ns == 0L)
                LockSupport.park(this);  // part4
    ……
}
MATCH.setRelease(p, null);
p.item = null;
p.hash = h;
return v;

当第一部分取消息的时候做CAS操作成功了,直接break,这个场景说明本线程是第一个来的,就走阻塞这个步骤

第一次循环match没有值,首先走part3部分等待cpu时钟,这里是为了减少cpu切换导致的性能损耗,在周期内还等不到,就去走part4休眠,等待唤醒

被唤醒之后再走一次while,调用v = p.match,因为这时对端已经消费了自己的消息并且给match赋了值,再次判断就发现match已经有值了,跳出循环

最后把v=match返回去,结束阻塞

0

评论区