对于并发编程这块知识点的掌控一直不是很好,基本都是停留在使用synchronized阶段,于是决定开一博客专题记录知识点。
在 Java 中,有多种方式来实现多线程。继承 Thread 类、 实现 Runnable 接口、使用 ExecutorService、Callable、 Future实现带返回结果的多线程。
Thread类本质上是实现了Runnable接口的一个实例,代 表一个线程的实例。启动线程的唯一方法就是通过Thread 类的start()实例方法。start()方法是一个native方法,它会 启动一个新线程,并执行run()方法。这种方式实现多线程 很简单,通过自己的类直接extend Thread,并复写run() 方法,就可以启动新线程并执行自己定义的run()方法。
public class MyThread extends Thread {
public void run() {
System.out.println("MyThread.run()");
}
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
myThread2.start();
如果自己的类已经extends另一个类,就无法直接extends Thread,此时,可以实现一个Runnable接口 。
public class MyThread extends OtherClass implements Runnable {
public void run() {
System.out.println("MyThread.run()");
}
}
有的时候,我们可能需要让一步执行的线程在执行完成以 后,提供一个返回值给到当前的主线程,主线程需要依赖 这个值进行后续的逻辑处理,那么这个时候,就需要用到 带返回值的线程了。Java中提供了这样的实现方式。
public class CallableDemo implements Callable<String> {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
CallableDemo callableDemo = new CallableDemo();
Future<String> future = executorService.submit(callableDemo);
System.out.println(future.get());
executorService.shutdown();
}
@Override
public String call() throws Exception {
int a = 1;
int b = 2;
System.out.println(a + b);
return "执行结果:" + (a + b);
}
}
其实大家在工作中应该很少有场景能够应用多线程了,因 为基于业务开发来说,很多使用异步的场景我们都通过分 布式消息队列来做了。但并不是说多线程就不会被用到, 之前我应用得比较多的场景是在做文件跑批,每天会有一些比如收益文件、对账文件,我们会有一个定时任务去拿 到数据然后通过线程去处理。在zookeeper中有一个比较有意思的异步责任链模式。
Request
public class Request {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" + "name='" + name + '\'' +
'}';
}
}
RequestProcessor
public interface RequestProcessor {
void processRequest(Request request);
}
PrintProcessor
public class PrintProcessor extends Thread implements RequestProcessor {
LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();
private final RequestProcessor nextProcessor;
public PrintProcessor(RequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
@Override
public void run() {
while (true) {
try {
Request request = requests.take();
System.out.println("print data:" + request.getName());
nextProcessor.processRequest(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 处理请求
public void processRequest(Request request) {
requests.add(request);
}
}
public class SaveProcessor extends Thread implements IRequestProcessor{
//阻塞队列
LinkedBlockingQueue<Request> requests=new LinkedBlockingQueue<>();
private IRequestProcessor nextProcessor;
private volatile boolean isFinish=false;
public SaveProcessor() {
}
public SaveProcessor(IRequestProcessor nextProcessor) {
this.nextProcessor = nextProcessor;
}
public void shutdown(){ //对外提供关闭的方法
isFinish=true;
}
@Override
public void run() {
while(!isFinish){ //不建议这么写
try {
Request request=requests.take();//阻塞式获取数据
//真正的处理逻辑; store to mysql 。
System.out.println("SaveProcessor:"+request);
//交给下一个责任链
nextProcessor.process(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void process(Request request) {
//TODO 根据实际需求去做一些处理
requests.add(request);
}
}
Main
public class Main {
PrintProcessor printProcessor;
protected Main() {
SaveProcessor saveProcessor = new SaveProcessor();
saveProcessor.start();
printProcessor = new PrintProcessor(saveProcessor);
printProcessor.start();
}
private void doTest(Request request) {
printProcessor.processRequest(request);
}
public static void main(String[] args) {
Request request = new Request();
request.setName("Mic");
new Main().doTest(request);
}
}
基本应用搞清楚以后,我们再来基于Java线程的基础切入, 来逐步去深入挖掘线程的整体模型。
Java 线程既然能够创建,那么也势必会被销毁,所以线程 是存在生命周期的,那么我们接下来从线程的生命周期开 始去了解线程。
线程一共有 6 种状态(NEW、RUNNABLE、BLOCKED、 WAITING、TIME_WAITING、TERMINATED) 。
初始状态,线程被构建,但是还没有调用start方法 。
运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为“运行中” 。
阻塞状态,表示线程进入等待状态,也就是线程 因为某种原因放弃了CPU使用权,阻塞也分为几种情况 :
➢ 等待阻塞:运行的线程执行 wait 方法,jvm 会把当前 线程放入到等待队列
➢ 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程 放入到锁池中
➢ 其他阻塞:运行的线程执行Thread.sleep或者t.join方 法,或者发出了 I/O 请求时,JVM 会把当前线程设置 为阻塞状态,当sleep结束、join线程终止、io处理完 毕则线程恢复
超时等待状态,超时以后自动返回。
终止状态,表示当前线程执行完毕
public class ThreadStatusDemo {
public static void main(String[] args) {
new Thread(()->{
while(true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Time_Waiting_Thread").start();
new Thread(()->{
while(true){
synchronized (ThreadStatusDemo.class) {
try {
ThreadStatusDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"Wating_Thread").start();
//BLOCKED
new Thread(new BlockedDemo(),"Blocke01_Thread").start();
new Thread(new BlockedDemo(),"Blocke02_Thread").start();
}
static class BlockedDemo extends Thread{
@Override
public void run() {
synchronized (BlockedDemo.class){
while(true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
启动一个线程前,最好为这个线程设置线程名称,因为这 样在使用jstack分析程序或者进行问题排查时,就会给开 发人员提供一些提示 显示线程的状态 ➢ 运行该示例,打开终端或者命令提示符,键入“jps”, (JDK1.5 提供的一个显示当前所有 java 进程 pid 的命 令)
➢ 根据上一步骤获得的pid,继续输入jstack pid(jstack 是 java 虚拟机自带的一种堆栈跟踪工具。jstack 用于 打印出给定的 java 进程 ID 或 core file 或远程调试服 务的Java堆栈信息) 通过上面的分析,我们了解到了线程的生命周期,现在在整个生命周期中并不是固定的处于某个状态,而是随着代 码的执行在不同的状态之间进行切换
前面我们通过一些案例演示了线程的启动,也就是调用 start()方法去启动一个线程,当run方法中的代码执行完毕 以后,线程的生命周期也将终止。调用start方法的语义是 当前线程告诉JVM,启动调用start方法的线程。
最早学习线程的时候会比较疑惑,启动一个线程 为什么是调用start方法,而不是run方法,这做一个简单 的分析,先简单看一下start方法的定义 。
我们看到调用 start 方法实际上是调用一个 native 方法 start0()来启动一个线程,首先 start0()这个方法是在 Thread的静态块中来注册的,代码如下
registerNatives 的 本 地 方 法 的 定 义 在 文 件 Thread.c,Thread.c定义了各个操作系统平台要用的关于线 程的公共数据和操作,以下是Thread.c的全部内容 :
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c 2b5/src/share/native/java/lang/Thread.c
从 这 段 代 码 可 以 看 出 , start0() , 实 际 会 执 行 JVM_StartThread方法,这个方法是干嘛的呢? 从名字上 来看,似乎是在JVM层面去启动一个线程,如果真的是这 样,那么在 JVM 层面,一定会调用 Java 中定义的 run 方法。那接下来继续去找找答案。我们找到 jvm.cpp这个文 件;这个文件需要下载hotspot的源码才能找到.
线程的启动过程大家都非常熟悉,但是如何终止一个线程呢?
线程的终止,并不是简单的调用stop命令去。虽然api仍 然可以调用,但是和其他的线程控制方法如 suspend、 resume 一样都是过期了的不建议使用,就拿 stop 来说, stop方法在结束一个线程时并不会保证线程的资源正常释 放,因此会导致程序可能出现一些不确定的状态。 要优雅的去中断一个线程,在线程中提供了一个 interrupt 方法 。
当其他线程通过调用当前线程的interrupt方法,表示向当 前线程打个招呼,告诉他可以中断线程的执行了,至于什 么时候中断,取决于当前线程自己。 线程通过检查自身是否被中断来进行相应可以通过 isInterrupted()来判断是否被中断。 通过下面这个例子,来实现了线程终止的逻辑
public class InterruptDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
while(!Thread.currentThread().isInterrupted()){//默认是false _interrupted state?
i++;
}
System.out.println("i:"+i);
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); //把isInterrupted设置成true
}
}
这种通过标识位或者中断操作的方式能够使线程在终止时 有机会去清理资源,而不是武断地将线程停止,因此这种 终止线程的做法显得更加安全和优雅。
上面的案例中,通过 interrupt,设置了一个标识告诉线程 可 以 终 止 了 , 线 程 中 还 提 供 了 静 态 方 法 Thread.interrupted()对设置中断标识的线程复位。比如在 上面的案例中,外面的线程调用thread.interrupt来设置中 断标识,而在线程里面,又通过 Thread.interrupted 把线程的标识又进行了复位。
public class InterruptDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()){
System.out.println("before:" + Thread.currentThread().isInterrupted());
Thread.interrupted(); // 对线程进行复 位,由 true 变成 false
System.out.println("after:" + Thread.currentThread().isInterrupted());
}
}
}, "interruptDemo"); thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
除了通过 Thread.interrupted 方法对线程中断标识进行复 位以外,还有一种被动复位的场景,就是对抛出 InterruptedException 异 常 的 方 法 , 在 InterruptedException 抛出之前,JVM 会先把线程的中断 标识位清除,然后才会抛出InterruptedException,这个时 候如果调用isInterrupted方法,将会返回false 分别通过下面两个demo来演示复位的效果 。
public class ExceptionThreadDemo {
private static int i;
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
while(!Thread.currentThread().isInterrupted()){//默认是false _interrupted state?
try {
TimeUnit.SECONDS.sleep(10); //中断一个处于阻塞状态的线程。join/wait/queue.take..
System.out.println("demo");
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
System.out.println("i:"+i);
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); //把isInterrupted设置成true
System.out.println(thread.isInterrupted()); //true
}
}
这里给大家普及一个知识点,为什么 Object.wait 、 Thread.sleep 和 Thread.join 都 会 抛 出 InterruptedException? 你会发现这几个方法有一个共同 点,都 是属于阻塞的方法 而阻塞方法的释放会取决于一些外部的事件 , 但是阻塞方 法可能因为等不到外部的触发事件而导致无法终止,所以 它允许一个线程请求 自己 来停止它正在做的事情。当一个 方法抛出 InterruptedException 时,它是在告诉调用者如 果执行该方法的线程被中断,它会尝试停止正在做的事情 并且通过抛出 InterruptedException 表示提前返回。 所以 ,这 个 异 常 的 意 思 是 表 示 一 个 阻 塞 被 其 他 线 程 中 断 了 。 然后,由于线程调用了 interrupt() 中断方法,那么 Object.wait 、 Thread.sleep 等被阻塞的线程被唤醒以后会 通过 is_interrupted 方 法 判 断 中 断 标 识 的 状 态 变 化 ,如 果 发 现中断标识为 true ,则先清除中断标识,然后抛出 InterruptedException 。Thread.interrupted()是属于当前线程的,是当前线程对外 界中断信号的一个响应,表示自己已经得到了中断信号, 但不会立刻中断自己,具体什么时候中断由自己决定,让 外界知道在自身中断前,他的中断状态仍然是false,这就 是复位的原因。
需要注意的是,InterruptedException异常的抛出并不意味 着线程必须终止,而是提醒当前线程有中断的操作发生, 至于接下来怎么处理取决于线程本身,比如
1. 直接捕获异常不做任何处理
2. 将异常往外抛出
3. 停止当前线程,并打印异常信息