前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java中断机制深入分析之Thread源码跟踪

java中断机制深入分析之Thread源码跟踪

作者头像
saintyyu
发布2021-11-22 09:55:08
7670
发布2021-11-22 09:55:08
举报
文章被收录于专栏:IT专栏

一、Thread.stop() VS Thread.interrupt()

在jdk1.0时代,要终止一个Java线程,可以使用Thread提供的stop()和destroy()等方法,但这些方法在jdk1.4之后就已经不推荐使用了,原因是这些方法会强行关闭当前线程,并解锁当前线程已经持有的所有监视器(互斥锁、共享锁),这会导致被这些监视器保护的数据对象处于不一致的状态,其它线程可以查看到这些不一致状态的数据对象,从而导致各种不可预知的错误。

在jdk1.4引入了另外一种结束线程的方式——中断。简单来说,就是每个线程有一个int类型的成员变量_interrupted(0表示没被中断,1表示被中断)。线程在执行过程中的适当位置,检查这个变量,来获知当前线程是否应该结束。

我们可以使用Thread.interrupt()方法将中断标记_interrupted设为1,也可以使用Thread.interrupted()方法将中断标记_interrupted重置为0。下面是从jdk的Thread.java类中截取的interrupt()源码:

代码语言:javascript
复制
// java.lang.Thread
public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    
    private native void interrupt0();  //native方法

可以看到,Thread.interrupt()方法的核心方法是Thread.interrupt0()方法,从注释上看,这个方法只会设置中断标记位(_interrupted变量)。在调用Thread.interrupt()方法之前,如果当前线程已经处于阻塞状态(比如调用了Thread.sleep()方法),那么调用该阻塞线程的Thread.interrupt()方法将导致当前线程从Thread.sleep()函数中醒来,并抛出InterruptedException。下面是Thread.sleep()和Thread.interrupt()的示例:

代码语言:javascript
复制
public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                TimeUnit.HOURS.sleep(1);//当前线程sleep一个小时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(5);//主线程sleep5秒钟,保证thread线程能够执行sleep方法
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread.interrupt();//主线程中断thread线程,导致sleep方法抛出InterruptedException,结束阻塞
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("finish");
    }

在这个示例中,thread线程执行Thread.sleep()方法就标志着,该线程将在未来1个小时内不参与cpu的竞争,thread线程将处于阻塞状态。如果说Thread.interrupt()方法只是修改了中断标记_interrupted的值,那么已经放弃cpu、处于阻塞状态的thread线程如何能感知到这个变量已经被改变,从而立即抛出InterruptedException呢?为了回答这个问题,我们需要深入native方法Thread.interrupt0()。

二、Thread.interrupt0()源码跟踪

1、jdk源码(JNI注册)

代码语言:javascript
复制
// jdk/src/share/native/java/lang/Thread.c:43
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},   //sleep方法名映射成的jni方法名
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},   //interrupt0方法名映射成jni方法名
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

// jdk/src/share/javavm/export/jvm.h:254
JNIEXPORT void JNICALL
JVM_Interrupt(JNIEnv *env, jobject thread);   //JVM_Interrupt接口定义

2、java虚拟机(HotSpot实现)

代码语言:javascript
复制
// hotspot/src/share/prims/jvm.cpp:3289
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))  //JVM_Interrupt接口实现
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);      //调用interrupt方法
  }
JVM_END

// hotspot/src/share/vm/runtime/thraed.cpp:634
//成员变量对应着不同的阻塞方法
ParkEvent * _ParkEvent ;   // for synchronized()
ParkEvent * _SleepEvent ; // for Thread.sleep
ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
ParkEvent * _MuxEvent ;   // for low-level muxAcquire-muxRelease,共享锁

// hotspot/src/share/vm/runtime/thraed.cpp:804
void Thread::interrupt(Thread* thread) {    //调用interrupt方法
  trace("interrupt", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  os::interrupt(thread);      //调用os::interrupt方法
}

// hotspot/src/hotspot/os/linux/vm/os_linux.cpp:4192
void os::interrupt(Thread* thread) {     //调用os::interrupt方法
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  if (!osthread->interrupted()) {//如果当前线程没被中断
    osthread->set_interrupted(true);//设置中断标记
    // More than one thread can get here with the same value of osthread,
    // resulting in multiple notifications.  We do, however, want the store
    // to interrupted() to be visible to other threads before we execute unpark().
    OrderAccess::fence();
    ParkEvent * const slp = thread->_SleepEvent ;//获取sleep方法对应的ParkEvent,slp不为空则说明当前线程阻塞在sleep方法上
    // Thread.sleep方法继续运行
    if (slp != NULL) slp->unpark() ;//如果当前线程阻塞在sleep方法上,则调用unpark唤醒线程
  }

  // For JSR166. Unpark event if interrupt status already was set
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();//如果当前线程是阻塞在互斥锁或者共享锁上,则唤醒线程

  ParkEvent * ev = thread->_ParkEvent ;//获取synchronized方法对应的ParkEvent
  if (ev != NULL) ev->unpark() ;//如果当前线程是阻塞在synchronized上,则唤醒线程
}

// hotspot/src/share/vm/runtime/osThread.hpp:
volatile jint _interrupted; // 中断标记是存储在osThread中,而不是java的Thread中    

volatile bool interrupted() const{ 
    return _interrupted != 0;//查看中断标记
}

void set_interrupted(bool z){ 
    _interrupted = z ? 1 : 0; //设置中断标记
}

由源码可知,Thread.interrupt0()方法实际上并不是像jdk注释中说的那样,只设置中断标记位,而是在设置完中断标记后通过调用类型为ParkEvent的成员变量的unpark()方法,将被阻塞的线程唤醒。由上述源码可知,当前线程可以阻塞在sleep方法、synchronized修饰的方法、获取共享锁/互斥锁的等方法上,不同的类型对应了不同的ParkEvent类型的成员变量。以Thread.sleep()方法为例,该方法在hotspot的实现中最终是调用ParkEvent的park()方法将自己阻塞住,当该线程因为中断被唤醒之后,会立即检查当前线程的中断标记,如果中断标记为1,则会抛出InterruptedException:

代码语言:javascript
复制
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))    //sleep方法名映射成的jni方法名
  JVMWrapper("JVM_Sleep");
 
  if (millis < 0) {
    //参数异常
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
 
  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
    //如果已经被中断则抛出异常
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
  }
 
  //增加线程sleep的次数,并启动计入sleep耗时的计时器
  JavaThreadSleepState jtss(thread);
 
  EventThreadSleep event;
 
  if (millis == 0) {//相当于调用sleep(0),因而本质上当前线程不会被中断
    //x86下ConvertSleepToYield为true
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {//类似于调用sleep(1000),参数值大于0,会被阻塞直到超时或者发生中断
    ThreadState old_state = thread->osthread()->get_state();
    //将osthread的状态置为SLEEPING
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {//调用阻塞方法os::sleep方法被中断了!!!
      if (!HAS_PENDING_EXCEPTION) {//如果没有待处理异常
        if (event.should_commit()) {
          //发布事件
          event.set_time(millis);
          event.commit();
        }
        //抛出异常
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");//抛出InterruptedException异常
      }
    }
    //恢复原来的状态
    thread->osthread()->set_state(old_state);
  }
  if (event.should_commit()) {
    event.set_time(millis);
    event.commit();
  }
JVM_END
 
int os::sleep(Thread* thread, jlong millis, bool interruptible) {   //阻塞方法
  assert(thread == Thread::current(),  "thread consistency check");
 
  ParkEvent * const slp = thread->_SleepEvent ;//类型为ParkEvent的SleepEvent
  slp->reset() ;
  OrderAccess::fence() ;
 
  if (interruptible) {//默认是可中断的
    jlong prevtime = javaTimeNanos();
 
    for (;;) {
      if (os::is_interrupted(thread, true)) {
        //如果线程已被中断
        return OS_INTRPT;
      }
 
      jlong newtime = javaTimeNanos();
 
      if (newtime - prevtime < 0) {
        //linux不支持monotonic_clock
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }
 
      if(millis <= 0) {
        //休眠的时间已过
        return OS_OK;
      }
 
      prevtime = newtime;
 
      {
        assert(thread->is_Java_thread(), "sanity check");
        JavaThread *jt = (JavaThread *) thread;
        //修改线程状态为_thread_blocked,等代码块退出将其恢复成_thread_in_vm,转换过程中会检查安全点
        ThreadBlockInVM tbivm(jt);
        //将OSThread的状态修改为CONDVAR_WAIT,代码块退出时恢复原来的状态
        OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
 
        jt->set_suspend_equivalent();
        //让当前线程休眠,如果线程被唤醒了则继续for循环
        slp->park(millis);        //调用ParkEvent类的park方法阻塞当前线程,park方法的阻塞原理见后文
 
        jt->check_and_wait_while_suspended();
      }
    }
  } else {
    OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
    jlong prevtime = javaTimeNanos();
    
    //逻辑同上只是不需要检查目标线程是否中断,使用与sleep时间特别短的情形,如sleep(0)
    for (;;) {
      
      jlong newtime = javaTimeNanos();
 
      if (newtime - prevtime < 0) {
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }
 
      if(millis <= 0) break ;
 
      prevtime = newtime;
      slp->park(millis);
    }
    return OS_OK ;
  }
}

//返回当前线程是否已经被中断
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
  trace("is_interrupted", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  return os::is_interrupted(thread, clear_interrupted);
}
 
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");
 
  OSThread* osthread = thread->osthread();
  bool interrupted = osthread->interrupted();
  if (interrupted && clear_interrupted) {
    //如果需要清除interrupted标识
    osthread->set_interrupted(false);
  }
 
  return interrupted;
}

Thread、JavaThread、OSThread、ParkEvent和Parker之间的关系

上述源码跟踪过程中,涉及到几个类:Thread、JavaThread、OSThread、ParkEvent和Parker等对象。这里介绍下他们之间的关系:

代码语言:javascript
复制
|                  |              |           |
|      java        |     vm       |     os    |
|                  |              |           |
 java.lang.Thread <-> JavaThread ->  OSThread
                          |--ParkEvent * _ParkEvent ;   // 用于 synchronized()方法的阻塞和唤醒
                          |--ParkEvent * _SleepEvent ; // 用于 Thread.sleep方法的阻塞和唤醒
                          |--ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
                          |--ParkEvent * _MuxEvent ;   // for low-level muxAcquire-muxRelease,共享锁
                          |--Parker*    _parker ;      // for Unsafe.park(),Unsafe.unpark(Thread thread)
  • java.lang.Thread 实例表示一个Java层的线程,虚拟机内部则用JavaThread类的对象来表示一个java.lang.Thread实例。它包含了一些附加信息来追踪线程的状态。JavaThread持有一个与之相关联的java.lang.Thread对象(oop表示)的引用,java.lang.Thread对象也保存了对应的JavaThread(原生int类型表示)的引用。JavaThread同时也持有了相关联的OSThread实例的引用。
  • OSThread实例表示了一个操作系统内核线程,它包含了一些操作系统级别的附加信息,用于追踪线程状态,“_interrupted”变量就存储在该实例中。
  • ParkEvent和Parker则是JavaThread的成员变量,每个线程都拥有这些成员变量的实例,不同线程之间不共享。JavaThread使用这两种类对应的成员变量来实现Java线程的阻塞和唤醒。

当我们执行Thread thread = new Thread()方法实际上只创建了Thread实例;当我们执行thread.start()方法时,虚拟机内部会先创建JavaThread实例,然后Thread实例和JavaThread实例相互引用,最后会调用pthread_create方法创建OSThread,并让JavaThread持有该内核线程的引用。当我们执行thread的各种方法时,实际上会通过JavaThread最终影响到OSThread。

ParkEvent和Parker阻塞原理源码跟踪

ParkEvent和Parker底层实现阻塞的机制非常相似,本质上都是利用了操作系统提供的原子操作Atomic::xchg(cas命令就是基于此)、互斥锁mutex和条件变量condition等机制来实现的,阻塞和唤醒原理可以表示为下图:

具体源码的跟踪和分析可以参见参考博客3参考博客4

不支持中断的BIO和支持中断的NIO

早期基于BIO模型的Socket和ServerSocket的阻塞方法accept、readLine等方法不支持中断,因而当线程因等待服务端返回而阻塞时,即使该线程被标记为中断,也不会对当前线程产生任何影响。下面给出了一个客户端通过Socket请求服务端并输出服务端响应的示例:

代码语言:javascript
复制
public class client {
    //客户端 
    public static void main(String[] args) {
        try {
            //1.创建客户端Socket,指定服务器地址和端口号 
            Socket socket=new Socket("127.0.0.1", 8888);
            //2.获取输出流,用来向服务器发送信息 
            OutputStream os=socket.getOutputStream();
            //字节输出流 
            //转换为打印流 
            PrintWriter pw=new PrintWriter(os);
            pw.write("用户名:admin;密码:admin");
            pw.flush();
            //刷新缓存,向服务器端输出信息 
            //关闭输出流 
            socket.shutdownOutput();
            //3.获取输入流,用来读取服务器端的响应信息 
            InputStream is=socket.getInputStream();
            BufferedReader br=new BufferedReader(new InputStreamReader(is));
            String info=null;
            while((info=br.readLine())!=null){//br.readLine方法为阻塞方法,且不支持中断,所以调用interrupt()方法并不会使得readLine方法从阻塞状态醒来
                System.out.println("我是客户端,服务器端返回的信息是:"+info);
            }
            //4.关闭资源 
            br.close();
            is.close();
            pw.close();
            os.close();
            socket.close();
        }
        catch (IOException ex) {
            Logger.getLogger(client.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

所以,在微服务架构中,使用Hystrix等熔断降级组件时,需要确保阻塞方法要能响应中断。因为Hystrix的超时中断机制使用的就是线程中断。PS:基于NIO的netty框架的阻塞方法支持中断。

参考博客:

1、https://www.jianshu.com/p/f75b77bdf389 Java 并发之线程中断

2、https://www.dazhuanlan.com/2020/01/02/5e0d9b848a3b7/  hotspot Thread JavaThread OSThread

3、https://blog.csdn.net/qq_31865983/article/details/105184585 Hotspot Parker和ParkEvent 源码解析

4、https://blog.csdn.net/Saintyyu/article/details/107426428 Unsafe类park和unpark方法源码深入分析(mutex+cond)

5、https://blog.csdn.net/a7980718/article/details/83661613 jdk1.8 Unsafe类 park和unpark方法解析

6、https://www.cnblogs.com/linzhanfly/p/11258496.html Thread.interrupt()源码跟踪

7、https://blog.csdn.net/qq_31865983/article/details/105174567 Hotspot Thread本地方法实现 源码解析

8、https://www.jb51.net/article/131547.htm Java Socket编程服务器响应客户端实例代码

9、https://blog.csdn.net/iter_zc/article/details/41843595 聊聊JVM(五)从JVM角度理解线程

10、https://www.cnblogs.com/xy-nb/p/6769586.html   参照Openjdk源码分析Object.getClass()方法

11、https://blog.csdn.net/summer_fish/article/details/108263390 java线程和os线程

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Thread.stop() VS Thread.interrupt()
  • 二、Thread.interrupt0()源码跟踪
    • 1、jdk源码(JNI注册)
      • 2、java虚拟机(HotSpot实现)
        • Thread、JavaThread、OSThread、ParkEvent和Parker之间的关系
          • ParkEvent和Parker阻塞原理源码跟踪
            • 不支持中断的BIO和支持中断的NIO
              • 参考博客:
              相关产品与服务
              云服务器
              云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档