在jdk1.0时代,要终止一个Java线程,可以使用Thread提供的stop()和destroy()等方法,但这些方法在jdk1.4之后就已经不推荐使用了,原因是这些方法会强行关闭当前线程,并解锁当前线程已经持有的所有监视器(互斥锁、共享锁),这会导致被这些监视器保护的数据对象处于不一致的状态,其它线程可以查看到这些不一致状态的数据对象,从而导致各种不可预知的错误。
在jdk1.4引入了另外一种结束线程的方式——中断。简单来说,就是每个线程有一个int类型的成员变量_interrupted(0表示没被中断,1表示被中断)。线程在执行过程中的适当位置,检查这个变量,来获知当前线程是否应该结束。
我们可以使用Thread.interrupt()方法将中断标记_interrupted设为1,也可以使用Thread.interrupted()方法将中断标记_interrupted重置为0。下面是从jdk的Thread.java类中截取的interrupt()源码:
// java.lang.Thread
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
private native void interrupt0(); //native方法
可以看到,Thread.interrupt()方法的核心方法是Thread.interrupt0()方法,从注释上看,这个方法只会设置中断标记位(_interrupted变量)。在调用Thread.interrupt()方法之前,如果当前线程已经处于阻塞状态(比如调用了Thread.sleep()方法),那么调用该阻塞线程的Thread.interrupt()方法将导致当前线程从Thread.sleep()函数中醒来,并抛出InterruptedException。下面是Thread.sleep()和Thread.interrupt()的示例:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
TimeUnit.HOURS.sleep(1);//当前线程sleep一个小时
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
try {
TimeUnit.SECONDS.sleep(5);//主线程sleep5秒钟,保证thread线程能够执行sleep方法
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.interrupt();//主线程中断thread线程,导致sleep方法抛出InterruptedException,结束阻塞
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finish");
}
在这个示例中,thread线程执行Thread.sleep()方法就标志着,该线程将在未来1个小时内不参与cpu的竞争,thread线程将处于阻塞状态。如果说Thread.interrupt()方法只是修改了中断标记_interrupted的值,那么已经放弃cpu、处于阻塞状态的thread线程如何能感知到这个变量已经被改变,从而立即抛出InterruptedException呢?为了回答这个问题,我们需要深入native方法Thread.interrupt0()。
// jdk/src/share/native/java/lang/Thread.c:43
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep}, //sleep方法名映射成的jni方法名
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt}, //interrupt0方法名映射成jni方法名
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
// jdk/src/share/javavm/export/jvm.h:254
JNIEXPORT void JNICALL
JVM_Interrupt(JNIEnv *env, jobject thread); //JVM_Interrupt接口定义
// hotspot/src/share/prims/jvm.cpp:3289
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread)) //JVM_Interrupt接口实现
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr); //调用interrupt方法
}
JVM_END
// hotspot/src/share/vm/runtime/thraed.cpp:634
//成员变量对应着不同的阻塞方法
ParkEvent * _ParkEvent ; // for synchronized()
ParkEvent * _SleepEvent ; // for Thread.sleep
ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease,共享锁
// hotspot/src/share/vm/runtime/thraed.cpp:804
void Thread::interrupt(Thread* thread) { //调用interrupt方法
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread); //调用os::interrupt方法
}
// hotspot/src/hotspot/os/linux/vm/os_linux.cpp:4192
void os::interrupt(Thread* thread) { //调用os::interrupt方法
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {//如果当前线程没被中断
osthread->set_interrupted(true);//设置中断标记
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;//获取sleep方法对应的ParkEvent,slp不为空则说明当前线程阻塞在sleep方法上
// Thread.sleep方法继续运行
if (slp != NULL) slp->unpark() ;//如果当前线程阻塞在sleep方法上,则调用unpark唤醒线程
}
// For JSR166. Unpark event if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();//如果当前线程是阻塞在互斥锁或者共享锁上,则唤醒线程
ParkEvent * ev = thread->_ParkEvent ;//获取synchronized方法对应的ParkEvent
if (ev != NULL) ev->unpark() ;//如果当前线程是阻塞在synchronized上,则唤醒线程
}
// hotspot/src/share/vm/runtime/osThread.hpp:
volatile jint _interrupted; // 中断标记是存储在osThread中,而不是java的Thread中
volatile bool interrupted() const{
return _interrupted != 0;//查看中断标记
}
void set_interrupted(bool z){
_interrupted = z ? 1 : 0; //设置中断标记
}
由源码可知,Thread.interrupt0()方法实际上并不是像jdk注释中说的那样,只设置中断标记位,而是在设置完中断标记后通过调用类型为ParkEvent的成员变量的unpark()方法,将被阻塞的线程唤醒。由上述源码可知,当前线程可以阻塞在sleep方法、synchronized修饰的方法、获取共享锁/互斥锁的等方法上,不同的类型对应了不同的ParkEvent类型的成员变量。以Thread.sleep()方法为例,该方法在hotspot的实现中最终是调用ParkEvent的park()方法将自己阻塞住,当该线程因为中断被唤醒之后,会立即检查当前线程的中断标记,如果中断标记为1,则会抛出InterruptedException:
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis)) //sleep方法名映射成的jni方法名
JVMWrapper("JVM_Sleep");
if (millis < 0) {
//参数异常
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
//如果已经被中断则抛出异常
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
//增加线程sleep的次数,并启动计入sleep耗时的计时器
JavaThreadSleepState jtss(thread);
EventThreadSleep event;
if (millis == 0) {//相当于调用sleep(0),因而本质上当前线程不会被中断
//x86下ConvertSleepToYield为true
if (ConvertSleepToYield) {
os::yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
os::sleep(thread, MinSleepInterval, false);
thread->osthread()->set_state(old_state);
}
} else {//类似于调用sleep(1000),参数值大于0,会被阻塞直到超时或者发生中断
ThreadState old_state = thread->osthread()->get_state();
//将osthread的状态置为SLEEPING
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {//调用阻塞方法os::sleep方法被中断了!!!
if (!HAS_PENDING_EXCEPTION) {//如果没有待处理异常
if (event.should_commit()) {
//发布事件
event.set_time(millis);
event.commit();
}
//抛出异常
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");//抛出InterruptedException异常
}
}
//恢复原来的状态
thread->osthread()->set_state(old_state);
}
if (event.should_commit()) {
event.set_time(millis);
event.commit();
}
JVM_END
int os::sleep(Thread* thread, jlong millis, bool interruptible) { //阻塞方法
assert(thread == Thread::current(), "thread consistency check");
ParkEvent * const slp = thread->_SleepEvent ;//类型为ParkEvent的SleepEvent
slp->reset() ;
OrderAccess::fence() ;
if (interruptible) {//默认是可中断的
jlong prevtime = javaTimeNanos();
for (;;) {
if (os::is_interrupted(thread, true)) {
//如果线程已被中断
return OS_INTRPT;
}
jlong newtime = javaTimeNanos();
if (newtime - prevtime < 0) {
//linux不支持monotonic_clock
assert(!Linux::supports_monotonic_clock(), "time moving backwards");
} else {
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
}
if(millis <= 0) {
//休眠的时间已过
return OS_OK;
}
prevtime = newtime;
{
assert(thread->is_Java_thread(), "sanity check");
JavaThread *jt = (JavaThread *) thread;
//修改线程状态为_thread_blocked,等代码块退出将其恢复成_thread_in_vm,转换过程中会检查安全点
ThreadBlockInVM tbivm(jt);
//将OSThread的状态修改为CONDVAR_WAIT,代码块退出时恢复原来的状态
OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
//让当前线程休眠,如果线程被唤醒了则继续for循环
slp->park(millis); //调用ParkEvent类的park方法阻塞当前线程,park方法的阻塞原理见后文
jt->check_and_wait_while_suspended();
}
}
} else {
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jlong prevtime = javaTimeNanos();
//逻辑同上只是不需要检查目标线程是否中断,使用与sleep时间特别短的情形,如sleep(0)
for (;;) {
jlong newtime = javaTimeNanos();
if (newtime - prevtime < 0) {
assert(!Linux::supports_monotonic_clock(), "time moving backwards");
} else {
millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
}
if(millis <= 0) break ;
prevtime = newtime;
slp->park(millis);
}
return OS_OK ;
}
}
//返回当前线程是否已经被中断
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
return os::is_interrupted(thread, clear_interrupted);
}
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();
if (interrupted && clear_interrupted) {
//如果需要清除interrupted标识
osthread->set_interrupted(false);
}
return interrupted;
}
上述源码跟踪过程中,涉及到几个类:Thread、JavaThread、OSThread、ParkEvent和Parker等对象。这里介绍下他们之间的关系:
| | | |
| java | vm | os |
| | | |
java.lang.Thread <-> JavaThread -> OSThread
|--ParkEvent * _ParkEvent ; // 用于 synchronized()方法的阻塞和唤醒
|--ParkEvent * _SleepEvent ; // 用于 Thread.sleep方法的阻塞和唤醒
|--ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
|--ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease,共享锁
|--Parker* _parker ; // for Unsafe.park(),Unsafe.unpark(Thread thread)
当我们执行Thread thread = new Thread()方法实际上只创建了Thread实例;当我们执行thread.start()方法时,虚拟机内部会先创建JavaThread实例,然后Thread实例和JavaThread实例相互引用,最后会调用pthread_create方法创建OSThread,并让JavaThread持有该内核线程的引用。当我们执行thread的各种方法时,实际上会通过JavaThread最终影响到OSThread。
ParkEvent和Parker底层实现阻塞的机制非常相似,本质上都是利用了操作系统提供的原子操作Atomic::xchg(cas命令就是基于此)、互斥锁mutex和条件变量condition等机制来实现的,阻塞和唤醒原理可以表示为下图:
早期基于BIO模型的Socket和ServerSocket的阻塞方法accept、readLine等方法不支持中断,因而当线程因等待服务端返回而阻塞时,即使该线程被标记为中断,也不会对当前线程产生任何影响。下面给出了一个客户端通过Socket请求服务端并输出服务端响应的示例:
public class client {
//客户端
public static void main(String[] args) {
try {
//1.创建客户端Socket,指定服务器地址和端口号
Socket socket=new Socket("127.0.0.1", 8888);
//2.获取输出流,用来向服务器发送信息
OutputStream os=socket.getOutputStream();
//字节输出流
//转换为打印流
PrintWriter pw=new PrintWriter(os);
pw.write("用户名:admin;密码:admin");
pw.flush();
//刷新缓存,向服务器端输出信息
//关闭输出流
socket.shutdownOutput();
//3.获取输入流,用来读取服务器端的响应信息
InputStream is=socket.getInputStream();
BufferedReader br=new BufferedReader(new InputStreamReader(is));
String info=null;
while((info=br.readLine())!=null){//br.readLine方法为阻塞方法,且不支持中断,所以调用interrupt()方法并不会使得readLine方法从阻塞状态醒来
System.out.println("我是客户端,服务器端返回的信息是:"+info);
}
//4.关闭资源
br.close();
is.close();
pw.close();
os.close();
socket.close();
}
catch (IOException ex) {
Logger.getLogger(client.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
所以,在微服务架构中,使用Hystrix等熔断降级组件时,需要确保阻塞方法要能响应中断。因为Hystrix的超时中断机制使用的就是线程中断。PS:基于NIO的netty框架的阻塞方法支持中断。
1、https://www.jianshu.com/p/f75b77bdf389 Java 并发之线程中断
2、https://www.dazhuanlan.com/2020/01/02/5e0d9b848a3b7/ hotspot Thread JavaThread OSThread
3、https://blog.csdn.net/qq_31865983/article/details/105184585 Hotspot Parker和ParkEvent 源码解析
4、https://blog.csdn.net/Saintyyu/article/details/107426428 Unsafe类park和unpark方法源码深入分析(mutex+cond)
5、https://blog.csdn.net/a7980718/article/details/83661613 jdk1.8 Unsafe类 park和unpark方法解析
6、https://www.cnblogs.com/linzhanfly/p/11258496.html Thread.interrupt()源码跟踪
7、https://blog.csdn.net/qq_31865983/article/details/105174567 Hotspot Thread本地方法实现 源码解析
8、https://www.jb51.net/article/131547.htm Java Socket编程服务器响应客户端实例代码
9、https://blog.csdn.net/iter_zc/article/details/41843595 聊聊JVM(五)从JVM角度理解线程
10、https://www.cnblogs.com/xy-nb/p/6769586.html 参照Openjdk源码分析Object.getClass()方法
11、https://blog.csdn.net/summer_fish/article/details/108263390 java线程和os线程