专栏首页一个会写诗的程序员的博客【Java 并发编程实战】使用 AQS 实现一个简单的互斥锁

【Java 并发编程实战】使用 AQS 实现一个简单的互斥锁

使用 AQS 实现一个简单的互斥锁

AQS 是什么?

参考[2]。

    /**
     * Returns a collection containing threads that may be waiting to
     * acquire.  Because the actual set of threads may change
     * dynamically while constructing this result, the returned
     * collection is only a best-effort estimate.  The elements of the
     * returned collection are in no particular order.  This method is
     * designed to facilitate construction of subclasses that provide
     * more extensive monitoring facilities.
     *
     * @return the collection of threads
     */
    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }


    /**
     * Returns a collection containing threads that may be waiting to
     * acquire in exclusive mode. This has the same properties
     * as {@link #getQueuedThreads} except that it only returns
     * those threads waiting due to an exclusive(独有的;排外的;专一的) acquire.
     *
     * @return the collection of threads
     */
    public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

CAS自旋

CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作

CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B。

if V==A (Compare)
then V=B (Swap)

因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。

java.util.concurrent.locks static final class AbstractQueuedSynchronizer.Node extends Object

Wait queue node class. The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks(自旋锁). We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait. To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.

Insertion into a CLH queue requires only a single atomic operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts. The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/ We also use "next" links to implement blocking mechanics. The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated "tail" when a node's successor appears to be null. (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.) Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelled predecessor who will carry this responsibility. CLH queues need a dummy header node to get started. But we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention. Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on. Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill Scherer and Michael Scott, along with members of JSR-166 expert group, for helpful ideas, discussions, and critiques on the design of this class. < 1.8 >

   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }



    // Queuing utilities

    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices
     * to improve responsiveness with very short timeouts.
     */
    static final long spinForTimeoutThreshold = 1000L;

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

简单互斥锁实现代码

package com.light.sword

import java.util.concurrent.locks.AbstractQueuedSynchronizer

/**
 * @author: Jack
 * 2020-02-10 15:42
 * 使用 AQS 实现一个简单的互斥锁
 */

class MutexDemo {
    private val sync = Sync()
    fun lock() {
        sync.acquire(0)
    }

    fun unlock() {
        sync.release(0)
    }

    class Sync : AbstractQueuedSynchronizer() {
        private val LOCKED = 1
        private val UNLOCKED = 0

        init {
            state = UNLOCKED
        }

        override fun tryAcquire(arg: Int): Boolean {
            return compareAndSetState(UNLOCKED, LOCKED)
        }

        override fun tryRelease(arg: Int): Boolean {
            state = UNLOCKED
            return true
        }
    }

}

怎样使用互斥锁

fun main() {
    for (i in 0..10) {
        Thread {
            task(Thread.currentThread().name)
        }.start()
    }

    val mutex = MutexDemo()
    for (i in 0..10) {
        Thread {
            mutex.lock()
            task(Thread.currentThread().name)
            mutex.unlock()
        }.start()
    }
}

fun task(threadName: String) {
    println("-------------")
    println("${System.currentTimeMillis()} : $threadName")
    Thread.sleep(1000)
}


// 输出:
-------------
-------------
1581489100026 : Thread-1
-------------
-------------
1581489100026 : Thread-2
-------------
1581489100026 : Thread-5
-------------
1581489100026 : Thread-0
-------------
1581489100026 : Thread-6
-------------
1581489100027 : Thread-10
-------------
-------------
1581489100027 : Thread-9
1581489100026 : Thread-4
-------------
1581489100027 : Thread-3
1581489100026 : Thread-8
1581489100027 : Thread-7
-------------
1581489100027 : Thread-11
-------------
1581489101028 : Thread-12
-------------
1581489102029 : Thread-13
-------------
1581489103030 : Thread-14
-------------
1581489104031 : Thread-15
-------------
1581489105032 : Thread-16
-------------
1581489106034 : Thread-17
-------------
1581489107036 : Thread-18
-------------
1581489108039 : Thread-19
-------------
1581489109042 : Thread-20
-------------
1581489110046 : Thread-21

参考资料:

[1] https://www.jianshu.com/p/282bdb57e343

[2] Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释: https://blog.csdn.net/universsky2015/article/details/95813887

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Paul Graham:梦寐以求的编程语言

    这是一篇2001年发表的博文,距今超过10年。但是,好的文章是不会随时间流逝而贬值滴。 作者 Paul Graham 是硅谷大牛。对许多问题(包括:编程技术、...

    一个会写诗的程序员
  • 【Java 并发编程实战】信号量 (Semaphore)

    1965年,荷兰学者Edsger Dijkstra提出的信号量(Semaphores)机制是一种卓有成效的进程同步工具,在长期广泛的应用中,信号量机制得到了极大...

    一个会写诗的程序员
  • “MR模型-关系思考法” (光剑)【中英文对照】

    阅读本文,你将了解如何运用 “MR模型-关系思考法” 进行思考、理解我们生活的这个复杂世界。

    一个会写诗的程序员
  • uva-----11292 The Dragon of Loowater

    Problem C: The Dragon of Loowater Once upon a time, in the Kingdom of Loowater, ...

    Gxjun
  • UVa 11292 - Dragon of Loowater(排序贪心)

    Once upon a time, in the Kingdom of Loowater, a minor nuisance turned into a maj...

    谙忆
  • 在不破坏加密的情况下检测WhatsApp的错误信息(CS CY)

    诸如WhatsApp之类的智能手机通信App的普及,正在彻底改变很多用户与Internet通信和交互。直接发送到用户手机的信息的即时性和通过端到端加密进行的安全...

    小童
  • UVA 11292 Dragon of Loowater(简单贪心)

    Problem C: The Dragon of Loowater Once upon a time, in the Kingdom of Loowater, ...

    Angel_Kitty
  • 亚马逊扫清了FCC进入卫星网络的障碍

    亚马逊在寻求向地球上没有服务和服务不足的地区提供互联网服务的过程中,扫清了一个主要的监管障碍。美国联邦通信委员会(U.S.FederalCommunicatio...

    用户8054111
  • R Programming-week1 Reading Data

    There are a few principal functions readingdata into R.

    统计学家
  • HDOJ(HDU) 2401 Baskets of Gold Coins(数列、)

    Problem Description You are given N baskets of gold coins. The baskets are num...

    谙忆

扫码关注云+社区

领取腾讯云代金券