非阻塞并发队列提供并刷新

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (2)
  • 关注 (0)
  • 查看 (11)

非阻塞并发队列提供并刷新

我需要一个无限制的非阻塞并发队列,基本上只有2个操作:

  • offer:原子地插入指定的项目在这个队列的尾部;
  • flush:将队列中当前存在的所有项目按照插入顺序逐个处理。更具体地说,必须是原子的只是这个“takeAll”操作,这将是刷新的第一个操作。takeAll后提供给队列的所有项目将被插入,然后仅通过另一个后续刷新进行处理。

目标是消费者在takeAll上有一个CAS操作,然后可以遍历列表中的元素,而无需通过每次读取的CAS操作。此外,我们已经拥有Node(Entry),因为需要存储其他不可变状态。新节点可以将HEAD作为构造函数参数,创建单向链接列表。

文学中是否存在具有这些特征的队列?

提问于
用户回答回答于

需要一个CAS两个offer()takeAll()

问题:长takeAll()执行,因为它需要完全遍历相反方向的单链表。

解决方案:在节点上创建其他跳过级别。对于所提到的数字(N〜100K)来说,两个级别就足够了,因此将步数减少takeAll()到〜150。

基于提到的实现,Node类:

public static final class Node<T> {

    private final T value;
    private Node<T> prev, prevL1, prevL2;
    private Node<T> next, nextL1, nextL2;

    private Node(T obj, Node<T> prev, long c) {
        value = obj;
        this.prev = prev;  
        // level 1 to skip 64 nodes, level 2 to skip 64^2 nodes
        // c is a value from some global addition counter, that
        // is not required to be atomic with `offer()`
        prevL1 = (c & (64 - 1) == 0) ? prev : prev.prevL1;
        prevL2 = (c & (64 * 64 - 1) == 0) ? prev : prev.prevL2;
    }

    public T get() {
        return value;
    }

    public Node<T> findHead() {
        // see below
    }

    public Node<T> next() {
        // see below
    }
}

FunkyQueue#offer() 方法:

public void offer(T t) {
    long c = counter.incrementAndGet();  
    for(;;) {
        Node<T> oldTail = tail.get();
        Node<T> newTail = new Node<T>(t, oldTail, c);
        if (tail.compareAndSet(oldTail, newTail)) 
            break;
    }
}

FunkyQueue#getAll() 现在将返回列表的头部:

public List<T> takeAll() {
    return tail.getAndSet(null).findHead();
}

它调用Node#findHead(),现在可以使用跳过级别加快向后遍历:

private Node<T> findHead() {

     Node<T> n = this;
     while (n.prevL2 != null) {  // <- traverse back on L2, assigning `next` nodes
         n.prevL2.nextL2 = n;
         n = n.prevL2; 
     }
     while (n.prevL1 != null) {  // <- the same for L1
         n.prevL1.nextL1 = n;
         n = n.prev1;
     }
     while (n.prev != null) {    // <- the same for L0
         n.prev.next = n;
         n = n.prev;
     }
     return n;
}

最后,Node#next()

public Node<T> next() {

    if (this.next == null && this.nextL1 == null && this.nextL2 == null)       
        throw new IllegalStateException("No such element");

    Node<T> n;
    if (this.next == null) {         // L0 is not traversed yet
        if (this.nextL1 == null) {   // the same for L1
            n = this.nextL2;         // step forward on L2
            while (n != this) {      // traverse on L1
                n.prevL1.nextL1 = n;
                n = n.prevL1;
            }
        }  
        n = this.nextL1;             // step forward on L1
        while (n != this) {          // traverse on L0
            n.prev.next = n;
            n = n.prev;
        }
    }
    return this.next;
}

我想主要的想法很清楚。施加一些重构,能够使Node#findHead(),因此FunkyQueue#getAll()在为O(log N)操作,并且Node#next()在O(1)在平均。

用户回答回答于

public class FunkyQueue<T> { private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>(); public void offer(T t) { while(true) { Node<T> tail = _tail.get(); Node<T> newTail = new Node<T>(t, tail); if(_tail.compareAndSet(tail, newTail)) { break; } } } public List<T> takeAll() { Node<T> tail = _tail.getAndSet(null); LinkedList<T> list = new LinkedList<T>(); while(tail != null) { list.addFirst(tail.get()); tail = tail.getPrevious(); } return list; } private static final class Node<T> { private final T _obj; private Node<T> _prev; private Node(T obj, Node<T> prev) { _obj = obj; _prev = prev; } public T get() { return _obj; } public Node<T> getPrevious() { return _prev; } } }

扫码关注云+社区