BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。
首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。
BlockingQueue的核心方法:
放入数据:
offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.
offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
获取数据: poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间, 取不到时返回null; poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内, 队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。 take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到 BlockingQueue有新的数据被加入; drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
测试代码:
package BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueTest {
public static void main(String args[]) throws InterruptedException{
BlockingQueue<String> queue = new ArrayBlockingQueue(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
生产者:
package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue){
this.queue = queue;
}
public void run(){
String data = null;
Random r = new Random();
System.out.println("启动生产者线程");
try{
while(isRunning){
System.out.println("正在生产数据.....");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入数据失败:" + data);
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
finally{
System.out.println("退出生产者线程!");
}
}
public void stop(){
isRunning = false;
}
}
消费者:
package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue){
this.queue = queue;
}
public void run(){
System.out.println("启动消费者线程:");
Random r = new Random();
boolean isRunning = true;
try{
while(isRunning){
System.out.println("正从队列获取数据...");
String data = queue.poll(2,TimeUnit.SECONDS);
if(null != data){
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}else{
isRunning = false;
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}finally{
System.out.println("退出消费者线程!");
}
}
}
参考:http://wsmajunfeng.iteye.com/blog/1629354