目 录CONTENT

文章目录

ConcurrentLinkedQueue - 基于链表的非阻塞无界队列

FatFish1
2024-10-24 / 0 评论 / 0 点赞 / 59 阅读 / 0 字 / 正在检测是否收录...

简介

ConcurrentLinkedQueue是一个典型的非阻塞、无边界的线程安全队列,基于链接节点,采用CAS算法实现。CoucurrentLinkedQueue规定了如下几个不变形:

  • 在入队的最后一个元素的next为null;

  • 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到;

  • 对于要删除的节点,不能直接将其设置为null,而是先将其item域设置为null,迭代器会跳过item为null的节点;

  • 允许head和tail更新滞后,就是说head、tail不总是指向第一个元素和最后一个元素;比如当前一个线程取到tail节点后,在操作前,另一个线程把tail更新了,此时第一个线程的tail就不是指向最后一个元素,也就是说tail更新滞后了

head的不可变和可变性:

  • 不可变

    • 所有未删除的节点都可以通过head节点遍历到;

    • head不能为null;

    • head节点的next不能指向自身;

  • 可变性

    • head的item可能为null,也可能不为null;

    • 允许tail滞后head;也就是调用succ()方法

源码分析

Node内部类

无界队列的节点抽象

volatile E item;
volatile Node<E> next;

两个核心成员变量都是使用volatile声明的,说明其具备线程安全性

Node(E item) {
    UNSAFE.putObject(this, itemOffset, item);
}

jdk8版本构造函数使用UNSAFE包进行约束,17版本已经切换到VarHandle

核心成员变量

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

一头一尾两个指针

ConcurrentLinkedQueue就是通过头尾指针移动进行链表的维护和遍历

offer - 添加方法

因为ConcurrentLinkedQueue是非阻塞的,因此添加失败是没有走park那套流程的,因此肯定不是基于AQS实现的,没有锁,也不存在唤醒

为了保证线程安全,采用的是CAS操作+无限自旋

for (Node<E> t = tail, p = t;;) {
    Node<E> q = p.next;
    ……
}

自旋条件初始化是取tail尾,赋值给p,没有判断条件,没有增长算法,意味着除非CAS执行成功,否则永不返回。

if (q == null) {
    if (p.casNext(null, newNode)) {
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }

第一个对象入队,tail的next一定为null,做CAS操作,让p.next指向新节点。结果:head=tail,tail.next=newNode。因为这时p指向tail即p=t,不执行尾节点的更新操作,直接返回return true

补充图片concurrentqueue入队

第二个对象入队,这时q是p的next,肯定非null,且q != p,走最后一个else

else
    p = (p != t && t != (t = tail)) ? t : q;

这一步是找队尾。核心是这个判断条件p != t && t != (t = tail)

  • 首先p != t

    • 初始的条件是p=t,这里就一定不成立,使p=q即往后移动一位

    • p做过移动,这里就成立

  • t != (t = tail)这个条件比较的是t和tail值,即t的初始值和tail,比完把tail赋值给t。但是初始化的时候t=tail,什么情况下t的初始值会不等于tail呢?其实是多线程,别的线程把tail改了,本线程的t还是老tail,这里其实就是判断多线程并发

11: getstatic #10 // Field t:I
14: getstatic #10 // Field t:I
17: if_icmpeq 24

可见,这里是两次把t压入栈顶,比较两个栈顶元素,因此t = tail虽然执行了,但是也改变不了第一次入栈的t的值

综合来看,这个判断的逻辑是:

  • 如果别的线程并发修改了tail,那么将tail赋值给p

  • 如果没有并发修改,判断p和t,如果是第二次入队,p与t相等,就将q赋值给p

然后再走下一次循环执行q=p.next,如果这时p移动到了队尾,这里q就又是null了,执行第一个if,做CAS插入,如果不是队尾,再判断p != t && t != (t = tail),这一轮判断p!=t就成立了,在没有并发的情况下,后者也成立,就把t赋值给p,如果有并发,也把t赋值给p

然后再走下一次循环,继续判断q=p.next是否为null,直到找到队尾,走CAS插入

if (q == null) {
    if (p.casNext(null, newNode)) {
        if (p != t) // hop two nodes at a time
            casTail(t, newNode);  // Failure is OK.
        return true;
    }

插入后p!=t成立,因为这时p是真队尾,tail是假的,然后执行CAS操作修改tail指针。

补充图片concurrentqueue入队

可见入队操作,两个元素为1圈,tail不恒指向队尾

为什么这样?

假如保证tail恒指向队尾,代码可以修改为

for (;;) {
  Node<E> t = tail;
  if (t.casNext(null, n) && casTail(t, n)) {
    return true;
  }
}

如果让tail恒指向队尾,每次插入就都需要指向两个操作:casNext、casTail,两个cas操作的性能势必不如1次/2次间隔,这样性能提升几乎50%

poll - 出队

出队因为没有阻塞,也是通过for循环死循环自旋实现的

首先假设一个场景,四个元素,一个head哨兵:

补充图片concurrentqueue出队

restartFromHead: for (;;) {
    for (Node<E> h = head, p = h, q;; p = q) {

最外层是死循环强制自旋,第二层是指针移动的循环,初始条件是head值,让p=head,无循环条件,每次执行循环让p的指针指向q

if ((item = p.item) != null && p.casItem(item, null)) {
    if (p != h) // hop two nodes at a time
        updateHead(h, ((q = p.next) != null) ? q : p);
    return item;
}

第一个if,即判断p指向的节点不是空节点(不是head哨兵),同时使用CAS设置p.item为null能成功,即取出了队头元素。如果这个时候h=head不指向对头,这里重置队头到下一个实际节点

补充图片concurrentqueue出队

但如果是第一圈循环,p一开始指向的是哨兵,这里就不成立,走第二个if

else if ((q = p.next) == null) {
    updateHead(h, p);
    return null;
}

第二个if让q=p.next,然后判空,如果成立,即p没有下一个节点了,此时场景哨兵后面啥也没有,所以这里刷信头节点,返回null

else if (p == q)
    continue restartFromHead;

如果判空不成立,走第三个if,这时肯定是不成立的,因为q=p.next,这时就不从外圈循环开始,而是从内圈开始。循环一次,让p=q。如果此时再跑第一个if成功取出了,这时p!=h成立,更新头节点并返回即可。

补充图片concurrentqueue出队

下次再弹节点,因为第二个for循环中,p=h指向一个实际节点,而非哨兵,第一个if判断p.item!=null能成立,直接就可以取出去了,这时p=h也成立,因此不需要再更新head直接返回。

从这个流程可以看出,两次弹出,移动一次head

0

评论区