1 package com.thread.test.thread;
2 import java.util.Random;
3 import java.util.concurrent.*;
4
5 /**
6 * Created by windwant on 2016/5/26.
7 */
8 public class MyBlockingQueue {
9 public static void main(String[] args) throws InterruptedException {
10 testArrayBlockingQueue();
11 }
12
13 /**
14 * 公平性 构造函数 true
15 */
16 public static void testArrayBlockingQueue(){
17 BlockingQueue<String> abq = new ArrayBlockingQueue<String>(5);
18 ExecutorService es = Executors.newCachedThreadPool();
19 es.execute(new MyPro(abq, 1000));
20 es.execute(new MyCus(abq, 5000));
21 es.shutdown();
22 }
23
24 /**
25 * 基于链表节点的可设置容量的队列,先进先出,队尾插入元素,队首获取元素。
26 * 链表队列比基于数据的队列有更高的存取效率,但是在并发应用中效率无法预测。
27 */
28 public static void testLinkedBlockingQueue(){
29 BlockingQueue<String> abq = new LinkedBlockingQueue<String>(5);
30 ExecutorService es = Executors.newCachedThreadPool();
31 es.execute(new MyPro(abq, 20));
32 es.execute(new MyCus(abq, 2000));
33 es.shutdown();
34 }
35
36 /**
37 * DelayQueue
38 * 无容量限制的阻塞队列,元素包含延迟时限,只有到达时限,元素才能被取出。
39 * 队列顶部是距离到期时间最远的元素。
40 * 如果所有的元素都未到期,将会返回null。
41 * 元素在执行getDelay()方法返回值小于等于0时过期,即使没有被通过take或者poll执行提取,它们也会被当作一般元素对待。
42 * 队列size方法返回所有元素的数量。
43 * 队列不能包含null元素。
44 */
45 public static void testDelayQueue() throws InterruptedException {
46 DelayQueue<MyDelayItem> dq = new DelayQueue<MyDelayItem>();
47 ExecutorService es = Executors.newFixedThreadPool(5);
48 es.execute(new MyDelayPro(dq, 1000));
49 es.execute(new MyDelayCus(dq, 10000));
50 es.shutdown();
51 }
52
53 /**
54 * 无容量限制的阻塞队列,元素顺序维持策略同PriorityQueue一样,支持阻塞获取
55 * 不允许添加null元素
56 * 元素必须支持排序
57 * 支持集合遍历,排序
58 */
59 public static void testPriorityBlockingQueue() throws InterruptedException {
60 PriorityBlockingQueue<MyPriorityItem> pbq = new PriorityBlockingQueue<MyPriorityItem>();
61 ExecutorService es = Executors.newFixedThreadPool(5);
62 es.execute(new MyPriorityBlockingQueuePro(pbq, 1000));
63 es.execute(new MyPriorityBlockingQueueCus(pbq, 10000));
64 es.shutdown();
65 }
66
67 /**
68 * 阻塞队列,插入元素和提取元素必须同步。
69 * 异步队列没有容量的概念。
70 * 无法使用peek,因为只有当你尝试移除时,元素才会存在。
71 * 无法插入元素,除非有另外一个线程同时尝试获取元素。
72 * 不支持遍历操作,因为队列中根本没有元素。
73 * 队列的顶部就是尝试插入元素的线程要插入的元素。
74 * 如果没有尝试插入元素的线程,那么就不存在能够提取的元素,poll会返回null。
75 * 集合操作contains返null
76 * 不允许插入null元素
77 * */
78 public static void testSynchronousQueue() throws InterruptedException {
79 SynchronousQueue<String> sq = new SynchronousQueue<String>();
80 ExecutorService es = Executors.newFixedThreadPool(5);
81 es.execute(new MySynchronousQueuePro(sq, 1000));
82 es.execute(new MySynchronousQueueCus(sq, 2000));
83 es.shutdown();
84 }
85 }
86
87 /**
88 * 测试生产者
89 */
90 class MyPro implements Runnable{
91
92 private BlockingQueue<String> bq;
93
94 private int period = 1000;
95
96 private Random r = new Random();
97 MyPro(BlockingQueue bq, int period){
98 this.bq = bq;
99 this.period = period;
100 }
101
102 public void run() {
103 try{
104 while (true){
105 Thread.sleep(period);
106 String value = String.valueOf(r.nextInt(100));
107 if(bq.offer(value)){ //offer 能够插入就返回true,否则返回false
108 System.out.println("pro make value: " + value + " queue : " + bq.toString());
109 System.out.println("******************************************************");
110 }
111 }
112 }catch (InterruptedException e){
113 e.printStackTrace();
114 }
115 }
116 }
117
118 /**
119 * 测试消费者
120 */
121 class MyCus implements Runnable{
122
123 private BlockingQueue<String> bq;
124
125 private int period = 1000;
126
127 private Random r = new Random();
128 MyCus(BlockingQueue bq, int period){
129 this.bq = bq;
130 this.period = period;
131 }
132
133 public void run() {
134 try{
135 while (true){
136 Thread.sleep(period);
137 String value = bq.take(); //获取队列头部元素,无元素则阻塞
138 System.out.println("cus take value: " + value + " queue : " + bq.toString());
139 System.out.println("======================================================");
140 }
141 }catch (InterruptedException e){
142 e.printStackTrace();
143 }
144 }
145 }
146
147 /**
148 * 延迟队列元素 实现排序
149 */
150 class MyDelayItem implements Delayed{
151
152 private long liveTime;
153
154 private long removeTime;
155
156 MyDelayItem(long liveTime, long removeTime){
157 this.liveTime = liveTime;
158 this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.nanoTime();
159 }
160
161 public long getDelay(TimeUnit unit) {
162 return unit.convert(removeTime - System.nanoTime(), unit);
163 }
164
165 public int compareTo(Delayed o) {
166 if(o == null) return -1;
167 if(o == this) return 0;
168 if(o instanceof MyDelayItem){
169 MyDelayItem tmp = (MyDelayItem) o;
170 if(liveTime > tmp.liveTime){
171 return 1;
172 }else if(liveTime == tmp.liveTime){
173 return 0;
174 }else{
175 return -1;
176 }
177 }
178 long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
179 return diff > 0 ? 1 : diff == 0 ? 0 : -1;
180 }
181
182 public String toString(){
183 return "{livetime: " + String.valueOf(liveTime) + ", removetime: " + String.valueOf(removeTime) + "}";
184 }
185 }
186
187 /**
188 * 延迟队列测试生产者
189 */
190 class MyDelayPro implements Runnable{
191
192 private DelayQueue<MyDelayItem> dq;
193
194 private int period = 1000;
195
196 private Random r = new Random();
197
198 MyDelayPro(DelayQueue dq, int period){
199 this.dq = dq;
200 this.period = period;
201 }
202 public void run() {
203 try{
204 while (true){
205 Thread.sleep(period);
206 if(dq.size() > 5){
207 continue;
208 }
209 MyDelayItem di = new MyDelayItem(r.nextInt(10), r.nextInt(10));
210 dq.offer(di);
211 System.out.println("delayqueue: add---" + di.toString() + "size: " + dq.size());
212 System.out.println("*************************************");
213 }
214 }catch (InterruptedException e){
215 e.printStackTrace();
216 }
217 }
218 }
219
220 /**
221 * 延迟队列测试消费者
222 */
223 class MyDelayCus implements Runnable{
224
225 private DelayQueue<MyDelayItem> dq;
226
227 private int period = 1000;
228
229 MyDelayCus(DelayQueue dq, int period){
230 this.dq = dq;
231 this.period = period;
232 }
233 public void run() {
234 try{
235 while (true){
236 Thread.sleep(period);
237 MyDelayItem di = dq.take();
238 System.out.println("delayqueue: remove---" + di.toString());
239 System.out.println("delayqueue: ---" + dq.toString());
240 System.out.println("======================================");
241 }
242 }catch (InterruptedException e){
243 e.printStackTrace();
244 }
245 }
246 }
247
248 /**
249 * 延迟队列元素 时限排序对比延迟队列
250 */
251 class MyPriorityItem implements Comparable<MyPriorityItem> {
252
253 private int priority;
254
255 MyPriorityItem(int priority){
256 this.priority = priority;
257 }
258
259 /**
260 * 数字大优先级高
261 * @param o
262 * @return
263 */
264 public int compareTo(MyPriorityItem o) {
265 if(o == null) return -1;
266 if(o == this) return 0;
267 if(priority > o.priority){
268 return -1;
269 }else if(priority == o.priority){
270 return 0;
271 }else{
272 return 1;
273 }
274 }
275
276 public String toString(){
277 return "{priority: " + String.valueOf(priority) + "}";
278 }
279 }
280
281 /**
282 * 优先队列测试生产者
283 */
284 class MyPriorityBlockingQueuePro implements Runnable{
285
286 private PriorityBlockingQueue<MyPriorityItem> pbq;
287
288 private int period = 1000;
289
290 private Random r = new Random();
291
292 MyPriorityBlockingQueuePro(PriorityBlockingQueue pbq, int period){
293 this.pbq = pbq;
294 this.period = period;
295 }
296 public void run() {
297 try{
298 while (true){
299 Thread.sleep(period);
300 if(pbq.size() > 5){
301 continue;
302 }
303 MyPriorityItem pi = new MyPriorityItem(r.nextInt(10));
304 pbq.offer(pi);
305 System.out.println("PriorityBlockingQueue: add---" + pi.toString() + " size: " + pbq.size());
306 System.out.println("PriorityBlockingQueue: " + pbq.toString());
307 System.out.println("*************************************");
308 }
309 }catch (InterruptedException e){
310 e.printStackTrace();
311 }
312 }
313 }
314
315 /**
316 * 优先队列测试消费者
317 */
318 class MyPriorityBlockingQueueCus implements Runnable{
319
320 private PriorityBlockingQueue<MyPriorityItem> pbq;
321
322 private int period = 1000;
323
324 private Random r = new Random();
325
326 MyPriorityBlockingQueueCus(PriorityBlockingQueue pbq, int period){
327 this.pbq = pbq;
328 this.period = period;
329 }
330 public void run() {
331 try{
332 while (true){
333 Thread.sleep(period);
334 MyPriorityItem di = pbq.take();
335 System.out.println("PriorityBlockingQueue: remove---" + di.toString());
336 System.out.println("PriorityBlockingQueue: ---" + pbq.toString());
337 System.out.println("======================================");
338 }
339 }catch (InterruptedException e){
340 e.printStackTrace();
341 }
342 }
343 }
344
345 /**
346 * 阻塞队列测试生产者
347 */
348 class MySynchronousQueuePro implements Runnable{
349
350 private SynchronousQueue<String> sq;
351
352 private int period = 1000;
353
354 private Random r = new Random();
355 MySynchronousQueuePro(SynchronousQueue sq, int period){
356 this.sq = sq;
357 this.period = period;
358 }
359
360 public void run() {
361 try{
362 while (true){
363 Thread.sleep(period);
364 String value = String.valueOf(r.nextInt(100));
365 if(sq.offer(value)) {
366 System.out.println("pro make value: " + value + " synchronous :" + sq.toString());
367 System.out.println("******************************************************");
368 }
369 }
370 }catch (InterruptedException e){
371 e.printStackTrace();
372 }
373 }
374 }
375
376 /**
377 * 阻塞队列测试消费者
378 */
379 class MySynchronousQueueCus implements Runnable{
380
381 private BlockingQueue<String> sq;
382
383 private int period = 1000;
384
385 MySynchronousQueueCus(BlockingQueue sq, int period){
386 this.sq = sq;
387 this.period = period;
388 }
389
390 public void run() {
391 try{
392 while (true){
393 Thread.sleep(period);
394 String value = sq.take();
395 System.out.println("cus take value: " + value + " synchronous :" + sq.toString());
396 System.out.println("======================================================");
397 }
398 }catch (InterruptedException e){
399 e.printStackTrace();
400 }
401 }
402 }
项目地址:https://github.com/windwant/windwant-demo/tree/master/thread-demo