前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >理解Java并发工具类Exchanger

理解Java并发工具类Exchanger

作者头像
我是攻城师
发布2018-09-30 11:34:28
1.1K0
发布2018-09-30 11:34:28
举报
文章被收录于专栏:我是攻城师我是攻城师

Exchanger类是JDK5中的一个并发工具辅助类,这个类的主要作用是可以用于两个线程之间交换数据,以实际生活中的场景来讲,比如很多小区楼下都有自取的快递柜,如果快递员来了,就把东西直接放快递柜子就行了,然后我们直接从柜子中取走东西即可。这个柜子就起到了媒介的作用。也就说柜子可以是双向信息交换的一个媒介,比如我需要邮寄东西,我可以提前把东西放进柜子,然后快递来了把新买的东西放进去,然后把需要邮寄的东西拿走,这样就完成了交换,但大多数时候,我们可能买东西比较多,邮寄东西比较少。 Exchanger在遗传算法和管道设计比较有用。

下面先来看一个简单的交换例子:

代码语言:javascript
复制
public static void main(String[] args) {

        Exchanger exchanger=new Exchanger();
        ExchangeDemo1 demo1=new ExchangeDemo1();

        Thread t1=new Thread(new Worker(exchanger,"A"));
        Thread t2=new Thread(new Worker(exchanger,"B"));

        t1.start();
        t2.start();

    }


   static class  Worker implements Runnable{

        Exchanger exchanger;
        Object object;

        public Worker(Exchanger exchanger, Object object) {
            this.exchanger = exchanger;
            this.object = object;
        }

        public void run() {

            Object privious=this.object;
            try {
                this.object=this.exchanger.exchange(this.object);
                System.out.println(Thread.currentThread().getName()+" exchange "+privious+"  for "+this.object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }
    }

输出结果:

代码语言:javascript
复制
Thread-1 exchange B  for A
Thread-0 exchange A  for B

这种场景可以理解为,快递员把我们买的东西,放进了柜子里面,同时取走了我们要邮寄的快递,这样就完成了交换。

如果我们一直买东西,而不邮寄东西,那么Exchanger类其实就变成了简化版本的生产者和消费者的模型。快递员就是生产者,我们本身就是消费者,而柜子就成为了我们媒介容器,看下面的一个例子:

代码语言:javascript
复制
static Exchanger<DataBuffer<Integer>>  exchanger=new Exchanger<>();
   static DataBuffer<Integer> intialEmptyBuffer=new DataBuffer<>();
   static DataBuffer<Integer> intialFullBuffer=new DataBuffer<>();
   static AtomicInteger countDown=new AtomicInteger(5);


    static class ProducerWorker implements Runnable{
        //每次睡眠时长,单位是秒
        long sleep;

        public ProducerWorker(long sleep) {
            this.sleep = sleep;
        }

        @Override
        public void run() {
            DataBuffer currentBuffer=intialEmptyBuffer;
            while (currentBuffer!=null&&countDown.get()>0){
                try {
                    TimeUnit.SECONDS.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                currentBuffer.put(countDown.get());//每次放入数据
                if(currentBuffer.isFull()){
                    try {
                        System.out.println(Thread.currentThread().getName()+"  放入了快递"+countDown.get());
                        currentBuffer=exchanger.exchange(currentBuffer);//交换后得到null
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                countDown.getAndDecrement();

            }

        }


    }

    static class ConsumerWorker implements Runnable{

        //每次睡眠时长,单位是秒
        long sleep;

        public ConsumerWorker(long sleep) {
            this.sleep = sleep;
        }

        @Override
        public void run() {
            DataBuffer<Integer> currentBuffer=intialFullBuffer;

            while (currentBuffer!=null&&countDown.get()>0){
                try {
                    TimeUnit.SECONDS.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                //如果为空就进行交换
                if(currentBuffer.isEmpyt()){
                    try {
                        currentBuffer=exchanger.exchange(currentBuffer); //交换数据
                        Integer value=currentBuffer.get();//不断的从
                        System.out.println(Thread.currentThread().getName()+"  拿走了快递"+value);
                        System.out.println();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }


            }

        }
    }


    public static void main(String[] args) {
        new Thread(new ProducerWorker(1),"快递员").start(); //启动生产者,每次睡眠1秒,启动顺序不分先后
        new Thread(new ConsumerWorker(3),"我   ").start(); //启动消费者,每次睡眠3秒,启动顺序不分先后


    }

    /***
     * 数据交换类
     * @param <T>
     */
   static class DataBuffer<T>{
        T data;
        public boolean isFull(){
            return data!=null;
        }

        public boolean isEmpyt(){
            return  data==null;
        }

        public T get(){
            T d=data;
            data=null;
            return d;
        }

        public void  put (T data){
            this.data=data;
        }



    }

输出结果:

代码语言:javascript
复制
快递员  放入了快递5
我     拿走了快递5

快递员  放入了快递4
我     拿走了快递4

快递员  放入了快递3
我     拿走了快递3

快递员  放入了快递2
我     拿走了快递2

快递员  放入了快递1
我     拿走了快递1

Exchanger类有两个交换方法:

代码语言:javascript
复制
exchange(V x) //不能超时的交换方法
exchange(V x, long timeout, TimeUnit unit)//可以设置超时的方法

Exchanger类在处理交换时,如果只有一方到达,而另一方没有到达,先到的这一方会等待另一方到达,只有两方都到达,完成交换才能进行下一次的交换,这有点类似约会的场景,但如果另一方一直不来,那么先到的一方可能永远不会停止,一直在等待,正因为如此Exchanger也提供了可超时的等待策略,在指定的时间内如果另一方仍然不能赴约,自己可中断。

底层原理分析:

Exchanger类底层并不是太复杂,关键的技术有:

(1)使用CAS自旋指令完成数据交换

(2)使用LockSupport的park方法使交换线程进入休眠等待, 使用LockSupport的unpark方法唤醒等待线程。

(3)此外还声明了一个Node对象用于存储交换数据。

本文主要介绍了Java里面并发工具类Exchanger,使用Exchanger可以轻松的给两个线程进行数据交换,这里需要注意的是如果超过两个线程操作交换,那么则很有可能出现问题,这里想象一下我们的快递柜的例子就很容易想明白,这一点需要注意,不要使用多个线程操作交换。

文中代码,如果需要可以到我的github上下载运行。

https://github.com/qindongliang/Java-Note

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我是攻城师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档