Exchanger是什么
Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
几个关键点:
Exchanger必须是两两一组互相阻塞,比如第1次执行的等第2次执行的,第3次执行的等第4次的
Exchanger只能是两个线程交互,不能有多个参与
使用案例
public class MyTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
String exchange = exgr.exchange(A);
System.out.println(exchange);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println(A);
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
}
}
线程1、2分别接收对方的消息
源码
核心成员变量
private final Participant participant;
static final class Participant extends ThreadLocal<Node>
participant继承的ThreadLocal,里面塞的是Node
@jdk.internal.vm.annotation.Contended static final class Node {
……
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
Node里面存的是消息内容
private static final VarHandle SLOT;
SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class);
类变量SLOT是做线程间数据传递的载体,通过VarHandle的能力控制其CAS特性,同时映射到Exchanger对象的中Node对象中的slot上
构造函数
public Exchanger() {
participant = new Participant();
}
只有一个无参构造,因此Exchanger只能是两个线程传递
exchange - 交互方法
第一部分,取值:
for (Node q;;) {
if ((q = slot) != null) {
if (SLOT.compareAndSet(this, q, null)) { // part1
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
LockSupport.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
BOUND.compareAndSet(this, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else { // part2
p.item = item;
if (SLOT.compareAndSet(this, null, p)) // 这里操作成功,说明SLOT还没有值
break;
p.item = null;
}
}
这个自旋是第一部分,取值部分
因为初始状态SLOT、arena都是null,直接走到part2部分,走一个CAS操作设置SLOT的值,如果这里设置成功了,直接break,跳出取值自旋,如果这里设置失败了,重新自旋回到part1,开始消费。
消费的时候要先把SLOT设置回null状态,意味着又可以有新线程发事件了
消费的时候,取出历史数据,然后把在exhanger对象中记录的park线程给唤醒,即意味着消费了这个线程发的消息。在消费的时候,设置q.match=item,这里实际上是通知了消息对端,我消费了你,你也要消费我
第二部分,阻塞等待对端通知:
while ((v = p.match) == null) {
if (spins > 0) { // part3
……
}
else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) {
p.parked = t;
if (slot == p) {
if (ns == 0L)
LockSupport.park(this); // part4
……
}
MATCH.setRelease(p, null);
p.item = null;
p.hash = h;
return v;
当第一部分取消息的时候做CAS操作成功了,直接break,这个场景说明本线程是第一个来的,就走阻塞这个步骤
第一次循环match没有值,首先走part3部分等待cpu时钟,这里是为了减少cpu切换导致的性能损耗,在周期内还等不到,就去走part4休眠,等待唤醒
被唤醒之后再走一次while,调用v = p.match
,因为这时对端已经消费了自己的消息并且给match赋了值,再次判断就发现match已经有值了,跳出循环
最后把v=match
返回去,结束阻塞
评论区