Exchanger类是JDK5中的一个并发工具辅助类,这个类的主要作用是可以用于两个线程之间交换数据,以实际生活中的场景来讲,比如很多小区楼下都有自取的快递柜,如果快递员来了,就把东西直接放快递柜子就行了,然后我们直接从柜子中取走东西即可。这个柜子就起到了媒介的作用。也就说柜子可以是双向信息交换的一个媒介,比如我需要邮寄东西,我可以提前把东西放进柜子,然后快递来了把新买的东西放进去,然后把需要邮寄的东西拿走,这样就完成了交换,但大多数时候,我们可能买东西比较多,邮寄东西比较少。 Exchanger在遗传算法和管道设计比较有用。
下面先来看一个简单的交换例子:
public static void main(String[] args) {
Exchanger exchanger=new Exchanger();
ExchangeDemo1 demo1=new ExchangeDemo1();
Thread t1=new Thread(new Worker(exchanger,"A"));
Thread t2=new Thread(new Worker(exchanger,"B"));
t1.start();
t2.start();
}
static class Worker implements Runnable{
Exchanger exchanger;
Object object;
public Worker(Exchanger exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}
public void run() {
Object privious=this.object;
try {
this.object=this.exchanger.exchange(this.object);
System.out.println(Thread.currentThread().getName()+" exchange "+privious+" for "+this.object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出结果:
Thread-1 exchange B for A
Thread-0 exchange A for B
这种场景可以理解为,快递员把我们买的东西,放进了柜子里面,同时取走了我们要邮寄的快递,这样就完成了交换。
如果我们一直买东西,而不邮寄东西,那么Exchanger类其实就变成了简化版本的生产者和消费者的模型。快递员就是生产者,我们本身就是消费者,而柜子就成为了我们媒介容器,看下面的一个例子:
static Exchanger<DataBuffer<Integer>> exchanger=new Exchanger<>();
static DataBuffer<Integer> intialEmptyBuffer=new DataBuffer<>();
static DataBuffer<Integer> intialFullBuffer=new DataBuffer<>();
static AtomicInteger countDown=new AtomicInteger(5);
static class ProducerWorker implements Runnable{
//每次睡眠时长,单位是秒
long sleep;
public ProducerWorker(long sleep) {
this.sleep = sleep;
}
@Override
public void run() {
DataBuffer currentBuffer=intialEmptyBuffer;
while (currentBuffer!=null&&countDown.get()>0){
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
currentBuffer.put(countDown.get());//每次放入数据
if(currentBuffer.isFull()){
try {
System.out.println(Thread.currentThread().getName()+" 放入了快递"+countDown.get());
currentBuffer=exchanger.exchange(currentBuffer);//交换后得到null
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDown.getAndDecrement();
}
}
}
static class ConsumerWorker implements Runnable{
//每次睡眠时长,单位是秒
long sleep;
public ConsumerWorker(long sleep) {
this.sleep = sleep;
}
@Override
public void run() {
DataBuffer<Integer> currentBuffer=intialFullBuffer;
while (currentBuffer!=null&&countDown.get()>0){
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
//如果为空就进行交换
if(currentBuffer.isEmpyt()){
try {
currentBuffer=exchanger.exchange(currentBuffer); //交换数据
Integer value=currentBuffer.get();//不断的从
System.out.println(Thread.currentThread().getName()+" 拿走了快递"+value);
System.out.println();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
new Thread(new ProducerWorker(1),"快递员").start(); //启动生产者,每次睡眠1秒,启动顺序不分先后
new Thread(new ConsumerWorker(3),"我 ").start(); //启动消费者,每次睡眠3秒,启动顺序不分先后
}
/***
* 数据交换类
* @param <T>
*/
static class DataBuffer<T>{
T data;
public boolean isFull(){
return data!=null;
}
public boolean isEmpyt(){
return data==null;
}
public T get(){
T d=data;
data=null;
return d;
}
public void put (T data){
this.data=data;
}
}
输出结果:
快递员 放入了快递5
我 拿走了快递5
快递员 放入了快递4
我 拿走了快递4
快递员 放入了快递3
我 拿走了快递3
快递员 放入了快递2
我 拿走了快递2
快递员 放入了快递1
我 拿走了快递1
Exchanger类有两个交换方法:
exchange(V x) //不能超时的交换方法
exchange(V x, long timeout, TimeUnit unit)//可以设置超时的方法
Exchanger类在处理交换时,如果只有一方到达,而另一方没有到达,先到的这一方会等待另一方到达,只有两方都到达,完成交换才能进行下一次的交换,这有点类似约会的场景,但如果另一方一直不来,那么先到的一方可能永远不会停止,一直在等待,正因为如此Exchanger也提供了可超时的等待策略,在指定的时间内如果另一方仍然不能赴约,自己可中断。
底层原理分析:
Exchanger类底层并不是太复杂,关键的技术有:
(1)使用CAS自旋指令完成数据交换
(2)使用LockSupport的park方法使交换线程进入休眠等待, 使用LockSupport的unpark方法唤醒等待线程。
(3)此外还声明了一个Node对象用于存储交换数据。
本文主要介绍了Java里面并发工具类Exchanger,使用Exchanger可以轻松的给两个线程进行数据交换,这里需要注意的是如果超过两个线程操作交换,那么则很有可能出现问题,这里想象一下我们的快递柜的例子就很容易想明白,这一点需要注意,不要使用多个线程操作交换。
文中代码,如果需要可以到我的github上下载运行。
https://github.com/qindongliang/Java-Note