前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件

作者头像
IT架构圈
发布2018-06-01 12:07:51
6660
发布2018-06-01 12:07:51
举报
文章被收录于专栏:IT架构圈

一.通过Callable接口实现多线程

1.Callable接口介绍:

(1)java.util.concurrent.Callable是一个泛型接口,只有一个call()方法

(2)call()方法抛出异常Exception异常,且返回一个指定的泛型类对象

2.Callable接口实现多线程的应用场景

(1)当父线程想要获取子线程的运行结果时

3.使用Callable接口实现多线程的步骤

(1)第一步:创建Callable子类的实例化对象

(2)第二步:创建FutureTask对象,并将Callable对象传入FutureTask的构造方法中

(注意:FutureTask实现了Runnable接口和Future接口)

(3)第三步:实例化Thread对象,并在构造方法中传入FurureTask对象

(4)第四步:启动线程

例1(利用Callable接口实现线程):

利用Callable接口创建子线程类:

代码语言:javascript
复制
package call;
import java.util.concurrent.Callable;
/*
 * 实现Callable接口创建子线程,指明范型为返回的数据类型
 * */
public class CallDemo implements Callable<String> {
        @Override
        public String call() throws Exception {
            String th_name = Thread.currentThread().getName();
            System.out.println(th_name + "遭遇大规模敌军突袭...");
            System.out.println(th_name + "迅速变换阵型...");
            System.out.println(th_name + "极速攻杀敌军...");
            return "敌军损失惨重,我军大获全胜";
        }
}

实线程类:

代码语言:javascript
复制
package call;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestCallable {
    public static void main(String[] args) {
            CallDemo cl = new CallDemo();// 实例化Callable子类对象
            FutureTask<String> ft = new FutureTask<String>(cl);// 实例化FutureTask对象,并将Callable子类对象传入FutureTask的构造方法中
            new Thread(ft, "编程坑太多部队——>").start();// 启动线程
            Thread.currentThread().setName("李存勖部队——>");// 设置父线程名
    try {
            System.out.println(Thread.currentThread().getName() + "休整5000ms");
            Thread.sleep(5000);
    } catch (InterruptedException e) {
            e.printStackTrace();
    }
       System.out.println(Thread.currentThread().getName() + "休整完毕..");
    try {
            String str = ft.get();// 利用FutureTask对象调用get()方法获取子线程的返回值
            System.out.println(Thread.currentThread().getName() + "获取友军消息"
            + str);
    } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

代码语言:javascript
复制
李存勖部队——>休整5000ms

编程坑太多部队——>遭遇大规模敌军突袭...

编程坑太多部队——>迅速变换阵型...

编程坑太多部队——>极速攻杀敌军...

李存勖部队——>休整完毕..

李存勖部队——>获取友军消息敌军损失惨重,我军大获全胜

例2(匿名类部类实现Callable接口创建子线程):

匿名类部类实现Callable接口创建子线程类并实现:

代码语言:javascript
复制
package call;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
//匿名类部类实现Callable接口创建子线程
public class AnonyCallable {
    public static void main(String[] args) {
    Callable<String> cl = new Callable<String>() {
    @Override
    public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "正在行军~~~");
            System.out.println(Thread.currentThread().getName() + "遭遇敌军~~~");
            System.out.println(Thread.currentThread().getName() + "奋勇杀敌!!!!");
            return "战斗胜利,俘虏敌军50000人";
            }
};
        FutureTask<String> ft = new FutureTask(cl);
        new Thread(ft, "编程坑太多部队").start();
try {
        Thread.currentThread().setName("李存勖部队");
        Thread.sleep(3000);
        System.out.println(Thread.currentThread().getName() + "休整3000ms");
} catch (InterruptedException e) {
    e.printStackTrace();
}
    System.out.println(Thread.currentThread().getName() + "整顿完毕,等待友军消息...");
try {
            String str = ft.get();
            System.out.println("李存勖部队得知友军消息为:" + str);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
    }
}
}

运行结果:

代码语言:javascript
复制
    编程坑太多部队正在行军~~~

    编程坑太多部队遭遇敌军~~~

    编程坑太多部队奋勇杀敌!!!!

    李存勖部队休整3000ms

    李存勖部队整顿完毕,等待友军消息...

    李存勖部队得知友军消息为:战斗胜利,俘虏敌军50000人

二.生产者——消费者问题

生产者线程不断生产,消费者线程不断取走生产者生产的产品

Object中的几个方法支持:

(1)wait():线程等待,当前线程进入调用对象的线程——等待池

(2)Notify():唤醒一个等待线程

(3)notifyAll():唤醒全部的等到线程

注意:以上三个方法都必须在同步机制中调用

例3(生产者消费者问题(一对一)):

早餐基础类:

代码语言:javascript
复制
package one2one.producer;
// 早餐基础类
public class Breakfast {
   private String food;   // 吃的
   private String drink;  // 喝的
   private boolean flag=false;
   public synchronized void makeBreakfast(String food,String drink){
   if(flag){
   try {
   wait();   // 生产者线程进入同步对象维护的“线程等待池”
   } catch (InterruptedException e) {
   e.printStackTrace();
   }
   }
   this.food=food;
   try {
   Thread.sleep(1000);   // 休眠,但不释放“锁”
   } catch (InterruptedException e) {
   e.printStackTrace();
   }
   this.drink=drink;
   flag=true;
   notify();
   }
   public synchronized void eatBreakfast(){
   if(!flag){
   try {
   wait();   // 消费者线程进入同步对象维护的“线程等待池”,而且当前线程释放"锁"
   } catch (InterruptedException e) {
   e.printStackTrace();
   }
   }
   try {
   Thread.sleep(1000);
   } catch (InterruptedException e) {
   e.printStackTrace();
   }
   System.out.println(this.food+"=============>"+this.drink);
   flag=false;
   notify();
   }
}

生产者线程类:

代码语言:javascript
复制
package one2one.producer;
// 生产者线程
public class Producer implements Runnable{
    private Breakfast bf;
    public Producer(Breakfast bf){
     this.bf=bf;
    }
@Override
public void run() {
        for (int i = 1; i <=7; i++) {
if(i%2==0){
this.bf.makeBreakfast("bread","milk");
}else{
this.bf.makeBreakfast("馒头","稀饭");
}
}  
}
}

消费者线程类:

代码语言:javascript
复制
package one2one.producer;
// 消费者线程
public class Consumer implements Runnable{
    private Breakfast bf;
    public Consumer(Breakfast bf){
     this.bf=bf;
    }
@Override
public void run() {
for (int i = 1; i <=7; i++) {
System.out.println("星期"+i+"早餐种类:food======>drink");
this.bf.eatBreakfast();
}
}
}
测试类:
package one2one.producer;
public class Test {
public static void main(String[] args) {
Breakfast bf=new Breakfast();
new Thread(new Producer(bf)).start();   // 启动生产者线程
new Thread(new Consumer(bf)).start();   // 启动消费者线程
}
}

运行结果:

代码语言:javascript
复制
星期1早餐种类:food======>drink
馒头=============>稀饭
星期2早餐种类:food======>drink
bread=============>milk
星期3早餐种类:food======>drink
馒头=============>稀饭
星期4早餐种类:food======>drink
bread=============>milk
星期5早餐种类:food======>drink
馒头=============>稀饭
星期6早餐种类:food======>drink
bread=============>milk
星期7早餐种类:food======>drink
馒头=============>稀饭
(生产者消费者问题(many2many))

生产消费基础类:

代码语言:javascript
复制
package manytomany.product;
public class Product {
private int count = 0;// 商品数量
private int MAX = 10;// 最大库存
// 生产商品
public synchronized void makeProduct() {
String thread_name = Thread.currentThread().getName();// 获取生产者线程名
if (count > MAX) {
System.out.println("货物已满" + thread_name + "停止生产...");
try {
notifyAll(); // 唤醒所有的消费者线程
wait(); // 生产者线程停止生产
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++; // 生产者线程生产商品
System.out.println(thread_name + "生产了产品,目前商品总量:" + count);
notifyAll();// 唤醒所有消费者线程,模拟消费
}
}
// 消费商品
public synchronized void buyProduct() {
String thread_name = Thread.currentThread().getName();// 获取线程名
if (count <= 0) {
System.out.println("已无货," + thread_name + "停住消费...");
try {
notifyAll();// 唤醒生产者线程 生产商品
wait();// 消费者线程休眠,停止消费
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;// 开始 消费商品
System.out.println(thread_name + "消费了一个商品,目前商品数量为:" + count);
}
}
}

生产者线程:

代码语言:javascript
复制
package manytomany.product;
public class Producer implements Runnable {
private Product product; // 获取Product对象
public Producer(Product product) {
this.product = product;
}
@Override
public void run() {
while (true) {
product.makeProduct();// 调用生产方法
}
}
}
消费者线程:
package manytomany.product;
public class Consumer implements Runnable{
private Product product;
public Consumer(Product product) {
this.product = product;
}
@Override
public void run() {
while(true){
product.buyProduct();//调用消费方法
}
}
}
测试类:
package manytomany.product;
public class Test {
public static void main(String[] args) {
Product pro = new Product();
new Thread(new Producer(pro), "生产者1号——>").start();
new Thread(new Producer(pro), "生产者2号——>").start();
new Thread(new Producer(pro), "生产者3号——>").start();
new Thread(new Producer(pro), "生产者4号——>").start();
new Thread(new Consumer(pro), "消费者A——>").start();
new Thread(new Consumer(pro), "消费者B——>").start();
new Thread(new Consumer(pro), "消费者C——>").start();
new Thread(new Consumer(pro), "消费者D——>").start();
new Thread(new Consumer(pro), "消费者E——>").start();
}
}

运行结果(截取部分):

代码语言:javascript
复制
消费者E——>消费了一个商品,目前商品数量为:7

消费者E——>消费了一个商品,目前商品数量为:6

消费者E——>消费了一个商品,目前商品数量为:5

消费者A——>消费了一个商品,目前商品数量为:4

消费者A——>消费了一个商品,目前商品数量为:3

消费者A——>消费了一个商品,目前商品数量为:2

消费者A——>消费了一个商品,目前商品数量为:1

消费者A——>消费了一个商品,目前商品数量为:0

已无货,消费者A——>停住消费...

已无货,消费者C——>停住消费...

已无货,消费者D——>停住消费...

已无货,消费者B——>停住消费...

生产者1号——>生产了产品,目前商品总量:1

生产者1号——>生产了产品,目前商品总量:2

生产者1号——>生产了产品,目前商品总量:3

生产者1号——>生产了产品,目前商品总量:4

生产者1号——>生产了产品,目前商品总量:5

多线程下载(复制)文件

1.使用RandomAccessFile与InputStream的skip(long n)方法使每个线程负责文件的每一部分读写。

例(开启6个线程断点下载(复制)电影).

下载复制线程:

代码语言:javascript
复制
package download;
import java.io.*;
public class DownloadRunnable implements Runnable{
private File srcFile;   // 源文件路径
private long startPos;   // 每个线程的开始下载位置
private long partTask;   // 每个线程的下载任务
private RandomAccessFile raf;  // 用来写入
public DownloadRunnable(File srcFile,long startPos,long partTask,RandomAccessFile raf){
this.srcFile=srcFile;
this.startPos=startPos;
this.partTask=partTask;
this.raf=raf;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"准备从第"+startPos+"个字节开始读...");
InputStream input=null;
try {
input=new FileInputStream(srcFile);
input.skip(startPos);  // 跳过输入流的startPos个字节
byte[] b=new byte[1024*1024*10];
int len=0;
int count=0;   // 用来记录已经读写的字节数
while((len=input.read(b))!=-1 && count<partTask){
raf.write(b, 0, len);
count+=len;
}
System.out.println(Thread.currentThread().getName()+"已经写入了"+count+"个字节");
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
input.close();
raf.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

开启线程类:

代码语言:javascript
复制
package dowload;
import java.io.*;
public class TestDown {
public static void main(String[] args) {
File srcFile = new File("e:" + File.separator + "哈利波特" + File.separator
+ "哈利波特与死亡圣器上.mkv");
long pathTask = srcFile.length() / 6;
for (int i = 0; i < 6; i++) {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile("d:" + File.separator + "Movies.mkv", "rw");
long startPos = pathTask * i;
raf.seek(startPos);
new Thread(
new DownloadRunnable(srcFile, startPos, pathTask, raf),
"第" + (i + 1) + "条下载线程——>").start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-03-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程坑太多 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档