Java阻塞队列线程集控制的实现方法

Java阻塞队列线程集控制的实现方法

队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。

下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。

java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。

生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。

我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。

注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。

import java.io.*;  
import java.util.*;  
import java.util.concurrent.*;  

public class BlockingQueueTest  
{  
   public static void main(String[] args)  
   {  
      Scanner in = new Scanner(System.in);  
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");  
      String directory = in.nextLine();  
      System.out.print("Enter keyword (e.g. volatile): ");  
      String keyword = in.nextLine();  

      final int FILE_QUEUE_SIZE = 10;  
      final int SEARCH_THREADS = 100;  

      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));  
      new Thread(enumerator).start();  
      for (int i = 1; i <= SEARCH_THREADS; i++)  
         new Thread(new SearchTask(queue, keyword)).start();  
   }  
}  

/**  
 * This task enumerates all files in a directory and its subdirectories.  
 */ 
class FileEnumerationTask implements Runnable  
{  
   /**  
    * Constructs a FileEnumerationTask.  
    * @param queue the blocking queue to which the enumerated files are added  
    * @param startingDirectory the directory in which to start the enumeration  
    */ 
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)  
   {  
      this.queue = queue;  
      this.startingDirectory = startingDirectory;  
   }  

   public void run()  
   {  
      try 
      {  
         enumerate(startingDirectory);  
         queue.put(DUMMY);  
      }  
      catch (InterruptedException e)  
      {  
      }  
   }  

   /**  
    * Recursively enumerates all files in a given directory and its subdirectories  
    * @param directory the directory in which to start  
    */ 
   public void enumerate(File directory) throws InterruptedException  
   {  
      File[] files = directory.listFiles();  
      for (File file : files)  
      {  
         if (file.isDirectory()) enumerate(file);  
         else queue.put(file);  
      }  
   }  

   public static File DUMMY = new File("");  

   private BlockingQueue<File> queue;  
   private File startingDirectory;  
}  

/**  
 * This task searches files for a given keyword.  
 */ 
class SearchTask implements Runnable  
{  
   /**  
    * Constructs a SearchTask.  
    * @param queue the queue from which to take files  
    * @param keyword the keyword to look for  
    */ 
   public SearchTask(BlockingQueue<File> queue, String keyword)  
   {  
      this.queue = queue;  
      this.keyword = keyword;  
   }  

   public void run()  
   {  
      try 
      {  
         boolean done = false;  
         while (!done)  
         {  
            File file = queue.take();  
            if (file == FileEnumerationTask.DUMMY)  
            {  
               queue.put(file);  
               done = true;  
            }  
            else search(file);              
         }  
      }  
      catch (IOException e)  
      {  
         e.printStackTrace();  
      }  
      catch (InterruptedException e)  
      {  
      }        
   }  

   /**  
    * Searches a file for a given keyword and prints all matching lines.  
    * @param file the file to search  
    */ 
   public void search(File file) throws IOException  
   {  
      Scanner in = new Scanner(new FileInputStream(file));  
      int lineNumber = 0;  
      while (in.hasNextLine())  
      {  
         lineNumber++;  
         String line = in.nextLine().trim();  
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);  
      }  
      in.close();  
   }  

   private BlockingQueue<File> queue;  
   private String keyword;  
}

原文发布于微信公众号 - Java学习网(javalearns)

原文发表时间:2015-05-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏我是攻城师

浅谈Lucene中的DocValues

3733
来自专栏奔跑的蛙牛技术博客

并发知识5

锁和条件不能解决线程中的所有问题 账户1:200; 账户2:300; 线程1:从账户1转移300到账户2 线程2: 从账户2转移400到账户1 线程一和线...

982
来自专栏Java 源码分析

NioEventLoopGroup 源码分析

NioEventLoopGroup 源码分析 1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看...

3817
来自专栏小樱的经验随笔

堆和栈的区别

一、预备知识—程序的内存分配 一个由c/C++编译的程序占用的内存分为以下几个部分 1、栈区(stack)— 由编译器自动分配释放 ,存放函数的参数值,局部变量...

3589
来自专栏chenssy

【追光者系列】HikariCP源码分析之ConcurrentBag

HikariCP contains a custom lock-free collection called a ConcurrentBag. The idea...

1952
来自专栏用户2442861的专栏

2014 360校园招聘技术类面试题

821
来自专栏青玉伏案

iOS逆向工程之Hopper中的ARM指令

虽然前段时间ARM被日本软银收购了,但是科技是无国界的,所以呢ARM相关知识该学的学。现在看ARM指令集还是倍感亲切的,毕竟大学里开了ARM这门课,并且做了不少...

3247
来自专栏机器学习从入门到成神

java.lang.StackOverflowError异常解决

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/sinat_35512245/articl...

1.3K2
来自专栏Java开发者杂谈

分布式改造剧集2---DIY分布式锁

1637
来自专栏進无尽的文章

简述OC语言

对于一门语言的学习是需要时间领悟的,而对于一些原理性的问题,我们需要清楚其核心思想,知其然而知其所以然,这样才能有利于自己的后续发展。本文只是简述,没有面面具到...

2152

扫码关注云+社区

领取腾讯云代金券