前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Java 并发编程实战】使用 AQS 实现一个简单的互斥锁

【Java 并发编程实战】使用 AQS 实现一个简单的互斥锁

作者头像
一个会写诗的程序员
发布2020-02-24 08:17:36
4360
发布2020-02-24 08:17:36
举报

使用 AQS 实现一个简单的互斥锁

AQS 是什么?

参考[2]。

代码语言:javascript
复制
    /**
     * Returns a collection containing threads that may be waiting to
     * acquire.  Because the actual set of threads may change
     * dynamically while constructing this result, the returned
     * collection is only a best-effort estimate.  The elements of the
     * returned collection are in no particular order.  This method is
     * designed to facilitate construction of subclasses that provide
     * more extensive monitoring facilities.
     *
     * @return the collection of threads
     */
    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }


    /**
     * Returns a collection containing threads that may be waiting to
     * acquire in exclusive mode. This has the same properties
     * as {@link #getQueuedThreads} except that it only returns
     * those threads waiting due to an exclusive(独有的;排外的;专一的) acquire.
     *
     * @return the collection of threads
     */
    public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

CAS自旋

CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作

CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B。

代码语言:javascript
复制
if V==A (Compare)
then V=B (Swap)

因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。

java.util.concurrent.locks static final class AbstractQueuedSynchronizer.Node extends Object

Wait queue node class. The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks(自旋锁). We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait. To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.

Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts. The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/ We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.) Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility. CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention. Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class. < 1.8 >

代码语言:javascript
复制
   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }



    // Queuing utilities

    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
    static final long spinForTimeoutThreshold = 1000L;

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

简单互斥锁实现代码

代码语言:javascript
复制
package com.light.sword

import java.util.concurrent.locks.AbstractQueuedSynchronizer

/**
 * @author: Jack
 * 2020-02-10 15:42
 * 使用 AQS 实现一个简单的互斥锁
 */

class MutexDemo {
    private val sync = Sync()
    fun lock() {
        sync.acquire(0)
    }

    fun unlock() {
        sync.release(0)
    }

    class Sync : AbstractQueuedSynchronizer() {
        private val LOCKED = 1
        private val UNLOCKED = 0

        init {
            state = UNLOCKED
        }

        override fun tryAcquire(arg: Int): Boolean {
            return compareAndSetState(UNLOCKED, LOCKED)
        }

        override fun tryRelease(arg: Int): Boolean {
            state = UNLOCKED
            return true
        }
    }

}

怎样使用互斥锁

代码语言:javascript
复制
fun main() {
    for (i in 0..10) {
        Thread {
            task(Thread.currentThread().name)
        }.start()
    }

    val mutex = MutexDemo()
    for (i in 0..10) {
        Thread {
            mutex.lock()
            task(Thread.currentThread().name)
            mutex.unlock()
        }.start()
    }
}

fun task(threadName: String) {
    println("-------------")
    println("${System.currentTimeMillis()} : $threadName")
    Thread.sleep(1000)
}


// 输出:
-------------
-------------
1581489100026 : Thread-1
-------------
-------------
1581489100026 : Thread-2
-------------
1581489100026 : Thread-5
-------------
1581489100026 : Thread-0
-------------
1581489100026 : Thread-6
-------------
1581489100027 : Thread-10
-------------
-------------
1581489100027 : Thread-9
1581489100026 : Thread-4
-------------
1581489100027 : Thread-3
1581489100026 : Thread-8
1581489100027 : Thread-7
-------------
1581489100027 : Thread-11
-------------
1581489101028 : Thread-12
-------------
1581489102029 : Thread-13
-------------
1581489103030 : Thread-14
-------------
1581489104031 : Thread-15
-------------
1581489105032 : Thread-16
-------------
1581489106034 : Thread-17
-------------
1581489107036 : Thread-18
-------------
1581489108039 : Thread-19
-------------
1581489109042 : Thread-20
-------------
1581489110046 : Thread-21

参考资料:

[1] https://www.jianshu.com/p/282bdb57e343

[2] Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释: https://blog.csdn.net/universsky2015/article/details/95813887

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AQS 是什么?
    • CAS自旋
    • 简单互斥锁实现代码
    • 怎样使用互斥锁
    • 参考资料:
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档