前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS-Exchanger源码学习

AQS-Exchanger源码学习

作者头像
逍遥壮士
发布2023-02-28 16:21:43
1680
发布2023-02-28 16:21:43
举报
文章被收录于专栏:技术趋势技术趋势

上文:AQS-semaphore&CyclicBarrier&CountDownLatch源码学习

源码下载:https://gitee.com/hong99/jdk8

Exchanger是什么?

exchanger是一个极少使用到的交换类,主要用于线程阻塞或者因为阻塞引起但任务又急于执行,这里候就可以进行交换。但是有一个非常的复杂点就是两个并发任务执行过程中交换数据,这一点是非常厉害的,可以看下下面的一些基础实现。

基础功能的学习

代码语言:javascript
复制
package com.aqs;

import java.util.concurrent.Exchanger;

/**
 * @author: csh
 * @Date: 2022/12/17 10:43
 * @Description:Exchanger 学习
 */
public class ExchangerStudy {

    public static void main(String[] args) {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        for (int i = 0; i < 10; i++) {
            final int index=i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"下标为:"+index);
                    try {
                        int newIndex = exchanger.exchange(index);
                        //等待100毫秒
                        Thread.sleep(100);
                        System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"原来下标为:"+index+"交换后下标为:"+newIndex);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

    }
}

结果

代码语言:javascript
复制
当前线程:Thread_Thread-2下标为:2
当前线程:Thread_Thread-1下标为:1
当前线程:Thread_Thread-4下标为:4
当前线程:Thread_Thread-3下标为:3
当前线程:Thread_Thread-5下标为:5
当前线程:Thread_Thread-6下标为:6
当前线程:Thread_Thread-0下标为:0
当前线程:Thread_Thread-7下标为:7
当前线程:Thread_Thread-8下标为:8
当前线程:Thread_Thread-9下标为:9
当前线程:Thread_Thread-4原来下标为:4交换后下标为:3
当前线程:Thread_Thread-6原来下标为:6交换后下标为:5
当前线程:Thread_Thread-0原来下标为:0交换后下标为:7
当前线程:Thread_Thread-9原来下标为:9交换后下标为:8
当前线程:Thread_Thread-8原来下标为:8交换后下标为:9
当前线程:Thread_Thread-7原来下标为:7交换后下标为:0
当前线程:Thread_Thread-5原来下标为:5交换后下标为:6
当前线程:Thread_Thread-2原来下标为:2交换后下标为:1
当前线程:Thread_Thread-1原来下标为:1交换后下标为:2
当前线程:Thread_Thread-3原来下标为:3交换后下标为:4

可以看到两个线程可以交换执行的下标。是比较厉害。接下来我们看看底层的源码实现。

源码学习

java.util.concurrent.Exchanger 源码实现时

代码语言:javascript
复制
//交换机
public class Exchanger<V> {
    //避免伪共享 左移数据下标(获取内存偏移量)
    private static final int ASHIFT = 7;

    //节点最大的数组下标
    private static final int MMASK = 0xff;

    //用于递增,每次加一个seq
    private static final int SEQ = MMASK + 1;

    //获取cpu的核数
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    //实际组数长度(线程数)
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

    //自旋次数 当cpu为单核时,该参为禁用
    private static final int SPINS = 1 << 10;

    //用于提供给其他线程的交换对象
    private static final Object NULL_ITEM = new Object();

    //用于超时的传递对象
    private static final Object TIMED_OUT = new Object();

    //交换节点 保存交换的数据
    @sun.misc.Contended static final class Node {
        int index; // 多槽数据索引
        int bound; // 上一次的边界
        int collides; // 记录边界范围内cas失败次数
        int hash; // 代表hash值 用于自旋优化
        Object item; // 节点带的数据
        volatile Object match; // 未来 配对成功 交换的数据
        volatile Thread parked; // 匹配的线程
    }
    //本地线程类实现初始化值
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

    // 存放node节点 保障线程安全
    private final Participant participant;

   //多槽数组
    private volatile Node[] arena;

    /**
     * 交换的槽位
     */
    private volatile Node slot;

    //上次记录
    private volatile int bound;

    //多槽位的交换实现(带过期时间)
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        //获取多槽位数组
        Node[] a = arena;
        //获取当前线程node对象
        Node p = participant.get();
        for (int i = p.index;;) { // access slot at i
            int b, m, c; long j; // 初始化相关的交换变量
            //获取交换节点信息(先获取偏移地址)
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            //如果不为空(证明已经有线程) 进行交换
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                //获取交换q的内容
                Object v = q.item; // release
                //将当前线程的内容赋值给q的match
                q.match = item;
                //获取被交换线程
                Thread w = q.parked;
                //不为空进行唤醒
                if (w != null)
                    U.unpark(w);
                //这个是交换后的值
                return v;
            }
            //槽位还没被占的场景
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                //交换对象值
                p.item = item; // offer
                //cas交换
                if (U.compareAndSwapObject(a, j, null, p)) {
                    //计算超时时间
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        //用来携带交换线程的数
                        Object v = p.match;
                        //已被交换 则清标识
                        if (v != null) {
                            U.putOrderedObject(p, MATCH, null);
                            p.item = null; // clear for next use
                            p.hash = h;
                            //交换成功
                            return v;
                        }
                        //判断自旋是否大于0
                        else if (spins > 0) {
                            //太复杂了这里 反正就是各各自旋次数获取 然后释放线程资源
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0) // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 && // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield(); // two yields per wait
                        }
                        //已有交换线程 准备数据中
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS; // releaser hasn't set match yet
                        // //线程不挂起 不是 多槽 时间没结束
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t; // minimize window
                            //注意这里 park则挂起线程
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        //换槽位(原因可能一直没有线程) 逻辑有点复杂
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            if (m != 0) // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1; // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break; // expired; restart
                        }
                    }
                }
                else
                    //获取槽位失败,先清空数据
                    p.item = null; // clear offer
            }
            //不在有效范围内,或者已经被其它线程抢了~
            else {
                //更新bound
                if (p.bound != b) { // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1; // cyclically traverse
                }
                else
                    //往后挪动
                    i = m + 1; // grow
                //更新下标
                p.index = i;
            }
        }
    }

    //单槽位交换方法
    private final Object slotExchange(Object item, boolean timed, long ns) {
        //获取当前线程携带的数据
        Node p = participant.get();
        //获取当前线程
        Thread t = Thread.currentThread();
        //判断中断不为中断
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;
        //自旋
        for (Node q;;) {
            //赋给q
            if ((q = slot) != null) {
                //cas交换对象并将slot置为null
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    //将当前的对象赋给交换对象的match
                    q.match = item;
                    //取出交换的线程
                    Thread w = q.parked;
                    //唤醒线程
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            //多槽交换不为空则退出
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                //把数据赋给当前node节点
                p.item = item;
                //进行交换数据 成功则退出
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                 //如果交换不成功则赋为空
                p.item = null;
            }
        }

        // await release
        //拿到节点hash
        int h = p.hash;
        //计算时间
        long end = timed ? System.nanoTime() + ns : 0L;
        //获取自旋次数
        int spins = (NCPU > 1) ? SPINS : 1;
        //返回值
        Object v;
        //判断不为空
        while ((v = p.match) == null) {
            //自旋次数
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    //让出线程权限
                    Thread.yield();
            }
            //优化操作 自旋线程准备中
            else if (slot != p)
                spins = SPINS;
            //线程不挂起 不是 多槽 时间没结束
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                //
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        //
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        //返回这个v是线程交换线程的。
        return v;
    }

    //构造方法
    public Exchanger() {
        participant = new Participant();
    }

    //交换的数据
    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        //当前线程用于交换的数据
        Object v;
        //获取item值
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        //不是多槽
        if ((arena != null ||
              //slotExchange 单槽位交换实现的方法
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    //
    @SuppressWarnings("unchecked")
    public V exchange(V x, long timeout, TimeUnit unit)
        throws InterruptedException, TimeoutException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;
        long ns = unit.toNanos(timeout);
        if ((arena != null ||
             (v = slotExchange(item, true, ns)) == null) &&
            ((Thread.interrupted() ||
              (v = arenaExchange(item, true, ns)) == null)))
            throw new InterruptedException();
        if (v == TIMED_OUT)
            throw new TimeoutException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

    //字段偏移量信息
    private static final sun.misc.Unsafe U;
    private static final long BOUND;
    private static final long SLOT;
    private static final long MATCH;
    private static final long BLOCKER;
    private static final int ABASE;
    //初始化信息
    static {
        int s;
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> ek = Exchanger.class;
            Class<?> nk = Node.class;
            Class<?> ak = Node[].class;
            Class<?> tk = Thread.class;
            BOUND = U.objectFieldOffset
                (ek.getDeclaredField("bound"));
            SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));
            MATCH = U.objectFieldOffset
                (nk.getDeclaredField("match"));
            BLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            s = U.arrayIndexScale(ak);
            // ABASE absorbs padding in front of element 0
            ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
            throw new Error("Unsupported array scale");
    }

}

最后

exchanger默认也是使用了CAS+park/unpark进行实现的,我这里是基于jdk8,jdk7与jdk8是有区别的。本身解决的问题是通过两个线程进行交换执行值,没想到这个exchanger代码不多但是非常复杂,有些可能写得不好,但是有想深入同学可以看看下面两个文章。

参考资料:

https://www.bilibili.com/video/BV17P4y177SD/?spm_id_from=333.788&vd_source=7d0e42b081e08cb3cefaea55cc1fa8b7

https://blog.csdn.net/weixin_30612769/article/details/97769773

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档