带你深入了解Java线程中的那些事

引言

说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢?

public class ThreadDemo  {

                     public  static voidmain(String[]args) {

                                newThread().start();

              }

}

初始化流程

代码就一行很简单,那么这行简单的代码背后做了那些事情呢?

初始化Thread这个类

首先JVM会去加载Thread的字节码,初始化这个类,这里即调用下面这段代码:

publicclassThreadimplementsRunnable{

/*Make sure registerNatives is the first thing does.*/

private static native void register Natives();

static{        

registerNatives();    

}

}

是个native方法,那么我们去看看内部实现是什么,具体的目录是openjdk/jdk/src/share/native/java/lang/Thread.c,

#defineARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))

//JVM前缀开头的方法,具体实现在JVM中

staticJNINativeMethod methods[] = {   

{"start0","()V",        (void*)&JVM_StartThread},

{"stop0","("OBJ")V", (void*)&JVM_StopThread},    

{"isAlive","()Z",        (void*)&JVM_IsThreadAlive},    

{"suspend0","()V",        (void*)&JVM_SuspendThread},   

{"resume0","()V",        (void*)&JVM_ResumeThread},   

{"setPriority0","(I)V",      (void*)&JVM_SetThreadPriority},    

{"yield","()V",        (void*)&JVM_Yield},    

{"sleep","(J)V",      (void*)&JVM_Sleep},   

{"currentThread","()"THD,    (void*)&JVM_CurrentThread},    

{"countStackFrames","()I",        (void*)&JVM_CountStackFrames},   

{"interrupt0","()V",        (void*)&JVM_Interrupt},    

{"isInterrupted","(Z)Z",      (void*)&JVM_IsInterrupted},   

{"holdsLock","("OBJ")Z", (void*)&JVM_HoldsLock},    

{"getThreads","()["THD,  (void*)&JVM_GetAllThreads},   

{"dumpThreads","(["THD")[["STE, (void*)&JVM_DumpThreads},    

{"setNativeName","("STR")V",(void*)&JVM_SetNativeThreadName},};

#undefTHD

#undefOBJ

#undefSTE

#undefSTR

//jclass cls即为java.lang.Thread

JNIEXPORTvoidJNICALL

Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)

{   

(*env)->RegisterNatives(env, cls, methods,ARRAY_LENGTH(methods));

}

可以发现具体的实现都是由这些JVM开头的方法决定的,而这几个方法的具体实现都在hotspot\src\share\vm\prims\jvm.cpp文件中,而RegisterNatives我目前的理解其实类似一个方法表,从Java方法到native方法的一个映射,具体的原理后面再研究。

初始化Thread对象

其实就是一些赋值,名字、线程ID这些,这两个变量都是static,用synchronized修饰,保证线程安全性。

publicThread() {

//nextThreadNum就是变量的自增,用synchronized修饰保证可见性

init(null,null,"Thread-"+nextThreadNum(),0);    

}

private void init(ThreadGroupg,Runnabletarget,Stringname,

longstackSize) {        

init(g, target, name, stackSize,null); 

}

privatevoidinit(ThreadGroupg,Runnabletarget,Stringname,

longstackSize,AccessControlContextacc) {

if(name==null) {

thrownewNullPointerException("name cannot be null");        

}this.name=name;

Threadparent=currentThread();

//安全相关的一坨东西....

/*Stash the specified stack size in case the VM cares*/

this.stackSize=stackSize;

/*Set thread ID*/

tid=nextThreadID();    

}

privatestaticsynchronizedlongnextThreadID() {

return++threadSeqNumber;    

}

创建并启动线程

public synchronizedvoidstart() {

if(threadStatus!=0)

thrownewIllegalThreadStateException();        

group.add(this);

booleanstarted=false;

try{

            start0();

            started=true;        

}finally{try{if(!started) {

                    group.threadStartFailed(this);                

}            

}

catch(Throwableignore) {

/*do nothing. If start0 threw a Throwable then

it will be passed up the call stack*/

}        

}    

}

这里start0()是个native方法,对应jvm.cpp中的JVM_StartThread,我们看到很多方法都是JVM_ENTRY开头,JVM_END结尾,类似于{}的作用,这里是将很多公共的操作封装到了JVM_ENTRY里面.

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))  JVMWrapper("JVM_StartThread");  

JavaThread *native_thread =NULL;

//We cannot hold the Threads_lock when we throw an exception,

//due to rank ordering issues. Example:  we might need to grab the

//Heap_lock while we construct the exception.

boolthrow_illegal_thread_state =false;

//We must release the Threads_lock before we can post a jvmti event

//in Thread::start.

{

//加锁

MutexLockermu(Threads_lock);

//自从JDK 5之后 java.lang.Thread#threadStatus可以用来阻止重启一个已经启动的线程,所以这里的JavaThread通常为空。然而对于一个和JNI关联的线程来说,在线程被创建和更新他的threadStatus之前会有一个小窗口,因此必须检查这种情况if(java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) !=NULL) {      throw_illegal_thread_state =true;    

}else{

//We could also check the stillborn flag to see if this thread was already stopped, but

//for historical reasons we let the thread detect that itself when it starts running

jlong size =

java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));

//为C++线程结构体分配内存并创建native线程。从java取出的stack size是有符号的,因此这里需要进行一次转换,避免传入负数导致创建一个非常大的栈。

size_tsz = size >0? (size_t) size :0;      

native_thread =newJavaThread(&thread_entry, sz);

//At this point it may be possible that no osthread was created for the

//JavaThread due to lack of memory. Check for this situation and throw

//an exception if necessary. Eventually we may want to change this so

//that we only grab the lock if the thread was created successfully -

//then we can also do this check and throw the exception in the

//JavaThread constructor.

if(native_thread->osthread() !=NULL) {

//Note: the current thread is not being used within "prepare"

.native_thread->prepare(jthread);      

}    

}  

}

if(throw_illegal_thread_state) {

THROW(vmSymbols::java_lang_IllegalThreadStateException());  

}

assert(native_thread !=NULL,"Starting null thread?");

if(native_thread->osthread() == NULL) {

//No one should hold a reference to the 'native_thread'.deletenative_thread;

if(JvmtiExport::should_post_resource_exhausted()) {

JvmtiExport::post_resource_exhausted(        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,

"unable to create new native thread");    

}

THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),

"unable to create new native thread");  

}

Thread::start(native_thread);

JVM_END

基本上这里就是先加锁,做些检查,然后创建JavaThread,如果创建成功的话会调用prepare(),然后是一些异常处理,没有异常的话最后会启动线程,那么下面我们先来看看JavaThread是如何被创建的。

JavaThread::JavaThread(ThreadFunction entry_point,size_tstack_sz) :  

Thread()

#ifINCLUDE_ALL_GCS  

,_satb_mark_queue(&_satb_mark_queue_set),

_dirty_card_queue(&_dirty_card_queue_set)

#endif//INCLUDE_ALL_GCS{if(TraceThreadEvents) 

{    

tty->print_cr("creating thread %p",this);

 }

initialize();//这个方法其实就是一堆变量的初始化,不是Null就是0.

_jni_attach_state = _not_attaching_via_jni;

set_entry_point(entry_point);

//Create the native thread itself.

//%note runtime_23os::ThreadType thr_type = os::java_thread;

//根据传进来的entry_point判断要创建的线程的类型。

thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :                                                    os::java_thread;

os::create_thread(this, thr_type, stack_sz);

//_osthread可能是Null,因此我们耗尽了内存(太多的活跃线程)。我们需要抛出OOM,然而不能在这做,因为调用者可能还持有锁,而所有的锁都必须在抛出异常之前被释放。代码执行到这,线程还是suspended状态,因为线程必须被创建者直接启动。

}voidJavaThread::initialize() {

//Initialize fields

//...

//...set_thread_state(_thread_new);//线程的初始状态

//...

}

JavaThreadState记录了线程记录了线程正在执行的代码在哪一部分,这个信息可能会被安全点使用到(GC),最核心的有四种:

  1. _thread_new刚开始启动,但还没执行初始化代码,更可能还在OS初始化的层面

2 ._thread_in_native在native代码中

  1. _thread_in_vm在vm中执行
  2. _thread_in_Java执行在解释或者编译后的Java代码中

每个状态都会对应一个中间的转换状态,这些额外的中间状态使得安全点的代码能够更快的处理某一线程状态而不用挂起线程。

enumJavaThreadState {  

_thread_uninitialized    =0,//should never happen (missing initialization)

_thread_new              =2,//just starting up, i.e., in process of being initialized

_thread_new_trans        =3,//corresponding transition state (not used, included for completness)

_thread_in_native        =4,//running in native code

_thread_in_native_trans  =5,//corresponding transition state

_thread_in_vm            =6,//running in VM

_thread_in_vm_trans      =7,//corresponding transition state

_thread_in_Java          =8,//running in Java or in stub code

_thread_in_Java_trans    =9,//corresponding transition state (not used, included for completness)

_thread_blocked          =10,//blocked in vm

_thread_blocked_trans    =11,//corresponding transition state

_thread_max_state        =12//maximum thread state+1 - used for statistics allocation

};

我们看到os::create_thread(this, thr_type, stack_sz);这行代码会去实际的创建线程,首先我们知道Java宣传的是一次编译,到处运行,那么究竟是怎么做到在不同的CPU、操作系统上还能够保持良好的可移植性呢?

//平台相关的东东

#ifdefTARGET_OS_FAMILY_linux

#include"os_linux.hpp"

#include"os_posix.hpp"

#endif

#ifdefTARGET_OS_FAMILY_solaris

#include"os_solaris.hpp"

#include"os_posix.hpp"

#endif

#ifdefTARGET_OS_FAMILY_windows

#include"os_windows.hpp"

#endif

#ifdefTARGET_OS_FAMILY_aix

#include"os_aix.hpp"

#include"os_posix.hpp"

#endif

#ifdefTARGET_OS_FAMILY_bsd

#include"os_posix.hpp"

#include"os_bsd.hpp"

#endif

#ifdefTARGET_OS_ARCH_linux_x86

#include"os_linux_x86.hpp"

#endif

#ifdefTARGET_OS_ARCH_linux_sparc

#include"os_linux_sparc.hpp"

#endif

#ifdefTARGET_OS_ARCH_linux_zero

#include"os_linux_zero.hpp"

#endif

#ifdefTARGET_OS_ARCH_solaris_x86

#include"os_solaris_x86.hpp"

#endif

#ifdefTARGET_OS_ARCH_solaris_sparc

#include"os_solaris_sparc.hpp"

#endif

#ifdefTARGET_OS_ARCH_windows_x86

#include"os_windows_x86.hpp"

#endif

#ifdefTARGET_OS_ARCH_linux_arm

#include"os_linux_arm.hpp"

#endif

#ifdefTARGET_OS_ARCH_linux_ppc

#include"os_linux_ppc.hpp"

#endif

#ifdefTARGET_OS_ARCH_aix_ppc

#include"os_aix_ppc.hpp"

#endif

#ifdefTARGET_OS_ARCH_bsd_x86

#include"os_bsd_x86.hpp"

#endif

#ifdefTARGET_OS_ARCH_bsd_zero

#include"os_bsd_zero.hpp"

#endif

我们看到os.hpp中有这样一段代码,能够根据不同的操作系统选择include不同的头文件,从而将平台相关的逻辑封装到对应的库文件中,我们这里以linux为例,create_thread最终会调用os_linux.cpp中的create_thread方法。

boolos::create_thread(Thread* thread, ThreadType thr_type,size_tstack_size) {

assert(thread->osthread() ==NULL,"caller responsible");

//Allocate the OSThread object

OSThread* osthread =newOSThread(NULL,NULL);

if (osthread ==NULL) 

{

return false;  

}

//set the correct thread state

osthread->set_thread_type(thr_type);

//初始状态为ALLOCATED,而不是INITIALIZED

osthread->set_state(ALLOCATED);  

thread->set_osthread(osthread);

//init  thread  attributes

pthread_attr_tattr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

//stack size

if(os::Linux::supports_variable_stack_size()) {

//如果上层未传递则计算stack_size

if(stack_size ==0) {

//如果为compiler_thread,则分配4M,否则默认会分配1M

stack_size =os::Linux::default_stack_size(thr_type);

switch(thr_type) {

case  os::java_thread:

//Java线程用ThreadStackSize,这个值可以通过-Xss指定assert(JavaThread::stack_size_at_create() >0,"this should be set");        

stack_size =JavaThread::stack_size_at_create();

break;

case  os::compiler_thread:

if(CompilerThreadStackSize >0) {          

stack_size = (size_t)(CompilerThreadStackSize * K);

break;        

}//else fall through:

//use VMThreadStackSize if CompilerThreadStackSize is not defined

caseos::vm_thread:

caseos::pgc_thread:

caseos::cgc_thread:

caseos::watcher_thread:

if(VMThreadStackSize >0) stack_size = (size_t)(VMThreadStackSize * K);

break;      

}    

}

//用两者较大的那个,min_stack_allowed默认为128K

stack_size =MAX2(stack_size, os::Linux::min_stack_allowed);

pthread_attr_setstacksize(&attr, stack_size);  

}else{

//let pthread_create() pick the default value.

}

//glibc guard page

pthread_attr_setguardsize(&attr,os::Linux::default_guard_size(thr_type));  

ThreadState state;  

{

//检查是否需要加锁

bool lock =os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();

if(lock) {

os::Linux::createThread_lock()->lock_without_safepoint_check();    

}

pthread_ttid;

//Linux用于创建线程的函数,这个线程通过执行java_start来启动,其中thread是作为java_start的参数传递进来的

int ret =pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);

pthread_attr_destroy(&attr);

if(ret !=0) {

//创建失败,将_osthread置为空,还记得在jvm.cpp的JVM_StartThread中会根据_osthread是否为空来判断

//是否创建成功

if(PrintMiscellaneous && (Verbose || WizardMode)) {

perror("pthread_create()");     

 }

//清理资源,并解锁

thread->set_osthread(NULL);

dele  teosthread;

if(lock)os::Linux::createThread_lock()->unlock();

return  false;    

}

//创建成功会将底层线程的ID保存在tid中

osthread->set_pthread_id(tid);

//等待子线程创建完成或者终止

{

Monitor* sync_with_child = osthread->startThread_lock();      MutexLockerExml(sync_with_child, Mutex::_no_safepoint_check_flag);

while((state = osthread->get_state()) == ALLOCATED) {        

sync_with_child->wait(Mutex::_no_safepoint_check_flag);      

}    

}

if(lock) {

os::Linux::createThread_lock()->unlock();    

}  

}

//线程的数目达到极限了if(state == ZOMBIE) {      

thread->set_osthread(NULL);

delete osthread;

return false;  

}

//The thread is returned suspended (in state INITIALIZED),

//and is started higher up in the call chain

assert(state == INITIALIZED,"race condition");

return true;

}

下面我们来看看pthread_create会执行的回调函数java_start,这个方法是所有新创建的线程必走的流程。

staticvoid*java_start(Thread *thread)

 {

//尝试随机化热栈帧高速缓存行的索引,这有助于优化拥有相同栈帧线程去互相驱逐彼此的缓存行时,线程

//可以是同一个JVM实例或者不同的JVM实例,这尤其有助于拥有超线程技术的处理器。staticintcounter =0;

intpid =os::current_process_id();

alloca(((pid ^ counter++) &7) *128);

ThreadLocalStorage::set_thread(thread);  

OSThread* osthread = thread->osthread();  

Monitor*sync= osthread->startThread_lock();

//non floating stack LinuxThreads needs extra check, see above

if(!_thread_safety_check(thread)) {

//notify parent thread

MutexLockerExml(sync, Mutex::_no_safepoint_check_flag);    

osthread->set_state(ZOMBIE);

sync->notify_all();

returnNULL;  

}

//thread_id is kernel thread id (similar to Solaris LWP id)

osthread->set_thread_id(os::Linux::gettid());

if(UseNUMA) {

intlgrp_id =os::numa_get_group_id();

if(lgrp_id != -1) {      

thread->set_lgrp_id(lgrp_id);    

}  

}

//initialize signal mask for this thread

os::Linux::hotspot_sigmask(thread);

//initialize floating point control register

os::Linux::init_thread_fpu_state();

//handshaking with parent thread

{    

MutexLockerExml(sync, Mutex::_no_safepoint_check_flag);

//设置为已经初始化完成,并notify父线程

osthread->set_state(INITIALIZED);

sync->notify_all();

//wait until os::start_thread()

while(osthread->get_state() == INITIALIZED) {

sync->wait(Mutex::_no_safepoint_check_flag);    

}  

}

    //这里上层传递过来的是JavaThread,因此会调用JavaThread#run()方法

thread->run();

return0;

}

voidJavaThread::run() {

//初始化本地线程分配缓存(TLAB)相关的属性

this->initialize_tlab();

//used to test validitity of stack trace backs

this->record_base_of_stack_pointer();

//Record real stack base and size.

this->record_stack_base_and_size();

//Initialize thread local storage; 

set before calling MutexLocker

this->initialize_thread_local_storage();

this->create_stack_guard_pages();

this->cache_global_variables();

//将线程的状态更改为_thread_in_vm,线程已经可以被VM中的安全点相关的代码处理了,也就是说必须JVM如果线程在执行native里面的代码,是搞不了安全点的,待确认

ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);

assert(JavaThread::current() ==this,"sanity check");

assert(!Thread::current()->owns_locks(),"sanity check");

DTRACE_THREAD_PROBE(start,this);

//This operation might block. We call that after all safepoint checks for a new thread has

//been completed.

this>set_active_handles(JNIHandleBlock::allocate_block());

if(JvmtiExport::should_post_thread_life()) {

JvmtiExport::post_thread_start(this); 

}  

EventThreadStart event;

if(event.should_commit()) {   

event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));    

 event.commit();  }

//We call another function to do the rest so we are sure that the stack addresses used

//from there will be lower than the stack base just computedthread_main_inner();

//Note, thread is no longer valid at this point!

}

voidJavaThread::thread_main_inner() {

assert(JavaThread::current() ==this,"sanity check");

assert(this->threadObj() !=NULL,"just checking");

//Execute thread entry point unless this thread has a pending exception

//or has been stopped before starting.

//Note: Due to JVM_StopThread we can have pending exceptions already!

if(!this->has_pending_exception() &&      

!java_lang_Thread::is_stillborn(this->threadObj())) {    

{

ResourceMarkrm(this);

this->set_native_thread_name(this->get_thread_name());    

}    

HandleMarkhm(this);

//这个entry_point就是JVM_StartThread中传递过来的那个,也就是thread_entry

this->entry_point()(this,this);

}

DTRACE_THREAD_PROBE(stop,this);

this->exit(false);

delete this;

}

我们最后再看thread_entry的代码

staticvoidthread_entry(JavaThread* thread, TRAPS) {  

HandleMarkhm(THREAD);

Handleobj(THREAD, thread->threadObj()); 

JavaValueresult(T_VOID);

JavaCalls::call_virtual(&result,                          

obj,

KlassHandle(THREAD,SystemDictionary::Thread_klass()),

vmSymbols::run_method_name(),

vmSymbols::void_method_signature(),                          

THREAD);}

vmSymbols,这个是JVM对于那些需要特殊处理的类、方法等的声明,我的理解就是一个方法表,根据下面这行代码可以看出来,其实调用的就是run()方法.

/*common method and field names*

/template(run_method_name,                                                       "run")

然后我们回到JVM_StartThread方法中,这里会接着调用prepare()方法,设置线程优先级(将Java中的优先级映射到os中),然后添加到线程队列中去.最后会调用Thread::start(native_thread);

启动线程。

voidThread::start(Thread* thread) {

trace("start", thread);

//start和resume不一样,start被synchronized修饰

if(!DisableStartThread) {

if(thread->is_Java_thread()) {

//在启动线程之前初始化线程的状态为RUNNABLE,为啥不能在之后设置呢?因为启动之后可能是
//等待或者睡眠等其他状态,具体是什么我们不知道
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),                                          java_lang_Thread::RUNNABLE);    

}os::start_thread(thread);  

}

}

总结

  1. 一个Java线程对应一个JavaThread->OSThread -> Native Thread
  2. 在调用java.lang.Thread#start()方法之前不会启动线程,仅仅调用run()方法只是会在当前线程运行而已
  3. //todo

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

ActiveMQ笔记(5):JMX监控

系统上线运行后,及时监控报警是很必要的手段,对于ActiveMQ而言,主要监控的指标有:MQ本身的健康状况、每个队列的生产者数量、消费者数量、队列的当前消息数等...

2508
来自专栏大内老A

IoC+AOP的简单实现

对EnterLib有所了解的人应该知道,其中有一个名叫Policy Injection的AOP框架;而整个EnterLib完全建立在另一个叫作Unity的底层框...

2109
来自专栏Charlie's Road

<Solidity学习系列四>使用编译器

Solidity存储库的一个构建目标是solc,solidity命令行编译器。 使用solc --help为您提供所有选项的解释。 编译器可以生成各种输出,范围...

1542
来自专栏函数式编程语言及工具

FunDA(10)- 用户功能函数模式:User Function Model

   前面我们提过:FunDA就像一个管道(PipeLine)。管道内流动着一串数据(Data)或者运算指令(Action)。管道的源头就是能产生纯数据的数据源...

2005
来自专栏IT杂记

通过Java程序提交通用Mapreduce任务并获取Job信息

背景 我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且...

1.1K5
来自专栏小灰灰

SpringMVC返回图片的几种方式

主要借助的是 HttpServletResponse这个对象,实现case如下

2557
来自专栏博岩Java大讲堂

Java日志体系(commons-logging)Java日志系统学习

6405
来自专栏坚毅的PHP

my php & mysql FAQ

php中文字符串长度及定长截取问题使用str_len("中国") 结果为6,php系统默认一个中文字符长度为3,可改用mb_strlen函数获得长度,mb_su...

3846
来自专栏SDNLAB

Open vSwitch系列之openflow版本兼容

众所周知Open vSwitch支持的openflow版本从1.0到1.5版本(当前Open vSwitch版本是2.3.2)通过阅读代码,处理openflow...

56513
来自专栏haifeiWu与他朋友们的专栏

造个轮子之基于 Netty 实现自己的 RPC 框架

服务端开发都会或多或少的涉及到 RPC 的使用,当然如果止步于会用,对自己的成长很是不利,所以楼主今天本着知其然,且知其所以然的精神来探讨一下 RPC 这个东西...

1523

扫码关注云+社区

领取腾讯云代金券