引言
说到Thread大家都很熟悉,我们平常写并发代码的时候都会接触到,那么我们来看看下面这段代码是如何初始化以及执行的呢?
public class ThreadDemo {
public static voidmain(String[]args) {
newThread().start();
}
}
初始化流程
代码就一行很简单,那么这行简单的代码背后做了那些事情呢?
初始化Thread这个类
首先JVM会去加载Thread的字节码,初始化这个类,这里即调用下面这段代码:
publicclassThreadimplementsRunnable{
/*Make sure registerNatives is the first thing does.*/
private static native void register Natives();
static{
registerNatives();
}
}
是个native方法,那么我们去看看内部实现是什么,具体的目录是openjdk/jdk/src/share/native/java/lang/Thread.c,
#defineARRAY_LENGTH(a) (sizeof(a)/sizeof(a[0]))
//JVM前缀开头的方法,具体实现在JVM中
staticJNINativeMethod methods[] = {
{"start0","()V", (void*)&JVM_StartThread},
{"stop0","("OBJ")V", (void*)&JVM_StopThread},
{"isAlive","()Z", (void*)&JVM_IsThreadAlive},
{"suspend0","()V", (void*)&JVM_SuspendThread},
{"resume0","()V", (void*)&JVM_ResumeThread},
{"setPriority0","(I)V", (void*)&JVM_SetThreadPriority},
{"yield","()V", (void*)&JVM_Yield},
{"sleep","(J)V", (void*)&JVM_Sleep},
{"currentThread","()"THD, (void*)&JVM_CurrentThread},
{"countStackFrames","()I", (void*)&JVM_CountStackFrames},
{"interrupt0","()V", (void*)&JVM_Interrupt},
{"isInterrupted","(Z)Z", (void*)&JVM_IsInterrupted},
{"holdsLock","("OBJ")Z", (void*)&JVM_HoldsLock},
{"getThreads","()["THD, (void*)&JVM_GetAllThreads},
{"dumpThreads","(["THD")[["STE, (void*)&JVM_DumpThreads},
{"setNativeName","("STR")V",(void*)&JVM_SetNativeThreadName},};
#undefTHD
#undefOBJ
#undefSTE
#undefSTR
//jclass cls即为java.lang.Thread
JNIEXPORTvoidJNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods,ARRAY_LENGTH(methods));
}
可以发现具体的实现都是由这些JVM开头的方法决定的,而这几个方法的具体实现都在hotspot\src\share\vm\prims\jvm.cpp文件中,而RegisterNatives我目前的理解其实类似一个方法表,从Java方法到native方法的一个映射,具体的原理后面再研究。
初始化Thread对象
其实就是一些赋值,名字、线程ID这些,这两个变量都是static,用synchronized修饰,保证线程安全性。
publicThread() {
//nextThreadNum就是变量的自增,用synchronized修饰保证可见性
init(null,null,"Thread-"+nextThreadNum(),0);
}
private void init(ThreadGroupg,Runnabletarget,Stringname,
longstackSize) {
init(g, target, name, stackSize,null);
}
privatevoidinit(ThreadGroupg,Runnabletarget,Stringname,
longstackSize,AccessControlContextacc) {
if(name==null) {
thrownewNullPointerException("name cannot be null");
}this.name=name;
Threadparent=currentThread();
//安全相关的一坨东西....
/*Stash the specified stack size in case the VM cares*/
this.stackSize=stackSize;
/*Set thread ID*/
tid=nextThreadID();
}
privatestaticsynchronizedlongnextThreadID() {
return++threadSeqNumber;
}
创建并启动线程
public synchronizedvoidstart() {
if(threadStatus!=0)
thrownewIllegalThreadStateException();
group.add(this);
booleanstarted=false;
try{
start0();
started=true;
}finally{try{if(!started) {
group.threadStartFailed(this);
}
}
catch(Throwableignore) {
/*do nothing. If start0 threw a Throwable then
it will be passed up the call stack*/
}
}
}
这里start0()是个native方法,对应jvm.cpp中的JVM_StartThread,我们看到很多方法都是JVM_ENTRY开头,JVM_END结尾,类似于{}的作用,这里是将很多公共的操作封装到了JVM_ENTRY里面.
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread)) JVMWrapper("JVM_StartThread");
JavaThread *native_thread =NULL;
//We cannot hold the Threads_lock when we throw an exception,
//due to rank ordering issues. Example: we might need to grab the
//Heap_lock while we construct the exception.
boolthrow_illegal_thread_state =false;
//We must release the Threads_lock before we can post a jvmti event
//in Thread::start.
{
//加锁
MutexLockermu(Threads_lock);
//自从JDK 5之后 java.lang.Thread#threadStatus可以用来阻止重启一个已经启动的线程,所以这里的JavaThread通常为空。然而对于一个和JNI关联的线程来说,在线程被创建和更新他的threadStatus之前会有一个小窗口,因此必须检查这种情况if(java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) !=NULL) { throw_illegal_thread_state =true;
}else{
//We could also check the stillborn flag to see if this thread was already stopped, but
//for historical reasons we let the thread detect that itself when it starts running
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
//为C++线程结构体分配内存并创建native线程。从java取出的stack size是有符号的,因此这里需要进行一次转换,避免传入负数导致创建一个非常大的栈。
size_tsz = size >0? (size_t) size :0;
native_thread =newJavaThread(&thread_entry, sz);
//At this point it may be possible that no osthread was created for the
//JavaThread due to lack of memory. Check for this situation and throw
//an exception if necessary. Eventually we may want to change this so
//that we only grab the lock if the thread was created successfully -
//then we can also do this check and throw the exception in the
//JavaThread constructor.
if(native_thread->osthread() !=NULL) {
//Note: the current thread is not being used within "prepare"
.native_thread->prepare(jthread);
}
}
}
if(throw_illegal_thread_state) {
THROW(vmSymbols::java_lang_IllegalThreadStateException());
}
assert(native_thread !=NULL,"Starting null thread?");
if(native_thread->osthread() == NULL) {
//No one should hold a reference to the 'native_thread'.deletenative_thread;
if(JvmtiExport::should_post_resource_exhausted()) {
JvmtiExport::post_resource_exhausted( JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
"unable to create new native thread");
}
THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
"unable to create new native thread");
}
Thread::start(native_thread);
JVM_END
基本上这里就是先加锁,做些检查,然后创建JavaThread,如果创建成功的话会调用prepare(),然后是一些异常处理,没有异常的话最后会启动线程,那么下面我们先来看看JavaThread是如何被创建的。
JavaThread::JavaThread(ThreadFunction entry_point,size_tstack_sz) :
Thread()
#ifINCLUDE_ALL_GCS
,_satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif//INCLUDE_ALL_GCS{if(TraceThreadEvents)
{
tty->print_cr("creating thread %p",this);
}
initialize();//这个方法其实就是一堆变量的初始化,不是Null就是0.
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
//Create the native thread itself.
//%note runtime_23os::ThreadType thr_type = os::java_thread;
//根据传进来的entry_point判断要创建的线程的类型。
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread : os::java_thread;
os::create_thread(this, thr_type, stack_sz);
//_osthread可能是Null,因此我们耗尽了内存(太多的活跃线程)。我们需要抛出OOM,然而不能在这做,因为调用者可能还持有锁,而所有的锁都必须在抛出异常之前被释放。代码执行到这,线程还是suspended状态,因为线程必须被创建者直接启动。
}voidJavaThread::initialize() {
//Initialize fields
//...
//...set_thread_state(_thread_new);//线程的初始状态
//...
}
JavaThreadState记录了线程记录了线程正在执行的代码在哪一部分,这个信息可能会被安全点使用到(GC),最核心的有四种:
2 ._thread_in_native在native代码中
每个状态都会对应一个中间的转换状态,这些额外的中间状态使得安全点的代码能够更快的处理某一线程状态而不用挂起线程。
enumJavaThreadState {
_thread_uninitialized =0,//should never happen (missing initialization)
_thread_new =2,//just starting up, i.e., in process of being initialized
_thread_new_trans =3,//corresponding transition state (not used, included for completness)
_thread_in_native =4,//running in native code
_thread_in_native_trans =5,//corresponding transition state
_thread_in_vm =6,//running in VM
_thread_in_vm_trans =7,//corresponding transition state
_thread_in_Java =8,//running in Java or in stub code
_thread_in_Java_trans =9,//corresponding transition state (not used, included for completness)
_thread_blocked =10,//blocked in vm
_thread_blocked_trans =11,//corresponding transition state
_thread_max_state =12//maximum thread state+1 - used for statistics allocation
};
我们看到os::create_thread(this, thr_type, stack_sz);这行代码会去实际的创建线程,首先我们知道Java宣传的是一次编译,到处运行,那么究竟是怎么做到在不同的CPU、操作系统上还能够保持良好的可移植性呢?
//平台相关的东东
#ifdefTARGET_OS_FAMILY_linux
#include"os_linux.hpp"
#include"os_posix.hpp"
#endif
#ifdefTARGET_OS_FAMILY_solaris
#include"os_solaris.hpp"
#include"os_posix.hpp"
#endif
#ifdefTARGET_OS_FAMILY_windows
#include"os_windows.hpp"
#endif
#ifdefTARGET_OS_FAMILY_aix
#include"os_aix.hpp"
#include"os_posix.hpp"
#endif
#ifdefTARGET_OS_FAMILY_bsd
#include"os_posix.hpp"
#include"os_bsd.hpp"
#endif
#ifdefTARGET_OS_ARCH_linux_x86
#include"os_linux_x86.hpp"
#endif
#ifdefTARGET_OS_ARCH_linux_sparc
#include"os_linux_sparc.hpp"
#endif
#ifdefTARGET_OS_ARCH_linux_zero
#include"os_linux_zero.hpp"
#endif
#ifdefTARGET_OS_ARCH_solaris_x86
#include"os_solaris_x86.hpp"
#endif
#ifdefTARGET_OS_ARCH_solaris_sparc
#include"os_solaris_sparc.hpp"
#endif
#ifdefTARGET_OS_ARCH_windows_x86
#include"os_windows_x86.hpp"
#endif
#ifdefTARGET_OS_ARCH_linux_arm
#include"os_linux_arm.hpp"
#endif
#ifdefTARGET_OS_ARCH_linux_ppc
#include"os_linux_ppc.hpp"
#endif
#ifdefTARGET_OS_ARCH_aix_ppc
#include"os_aix_ppc.hpp"
#endif
#ifdefTARGET_OS_ARCH_bsd_x86
#include"os_bsd_x86.hpp"
#endif
#ifdefTARGET_OS_ARCH_bsd_zero
#include"os_bsd_zero.hpp"
#endif
我们看到os.hpp中有这样一段代码,能够根据不同的操作系统选择include不同的头文件,从而将平台相关的逻辑封装到对应的库文件中,我们这里以linux为例,create_thread最终会调用os_linux.cpp中的create_thread方法。
boolos::create_thread(Thread* thread, ThreadType thr_type,size_tstack_size) {
assert(thread->osthread() ==NULL,"caller responsible");
//Allocate the OSThread object
OSThread* osthread =newOSThread(NULL,NULL);
if (osthread ==NULL)
{
return false;
}
//set the correct thread state
osthread->set_thread_type(thr_type);
//初始状态为ALLOCATED,而不是INITIALIZED
osthread->set_state(ALLOCATED);
thread->set_osthread(osthread);
//init thread attributes
pthread_attr_tattr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//stack size
if(os::Linux::supports_variable_stack_size()) {
//如果上层未传递则计算stack_size
if(stack_size ==0) {
//如果为compiler_thread,则分配4M,否则默认会分配1M
stack_size =os::Linux::default_stack_size(thr_type);
switch(thr_type) {
case os::java_thread:
//Java线程用ThreadStackSize,这个值可以通过-Xss指定assert(JavaThread::stack_size_at_create() >0,"this should be set");
stack_size =JavaThread::stack_size_at_create();
break;
case os::compiler_thread:
if(CompilerThreadStackSize >0) {
stack_size = (size_t)(CompilerThreadStackSize * K);
break;
}//else fall through:
//use VMThreadStackSize if CompilerThreadStackSize is not defined
caseos::vm_thread:
caseos::pgc_thread:
caseos::cgc_thread:
caseos::watcher_thread:
if(VMThreadStackSize >0) stack_size = (size_t)(VMThreadStackSize * K);
break;
}
}
//用两者较大的那个,min_stack_allowed默认为128K
stack_size =MAX2(stack_size, os::Linux::min_stack_allowed);
pthread_attr_setstacksize(&attr, stack_size);
}else{
//let pthread_create() pick the default value.
}
//glibc guard page
pthread_attr_setguardsize(&attr,os::Linux::default_guard_size(thr_type));
ThreadState state;
{
//检查是否需要加锁
bool lock =os::Linux::is_LinuxThreads() && !os::Linux::is_floating_stack();
if(lock) {
os::Linux::createThread_lock()->lock_without_safepoint_check();
}
pthread_ttid;
//Linux用于创建线程的函数,这个线程通过执行java_start来启动,其中thread是作为java_start的参数传递进来的
int ret =pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
pthread_attr_destroy(&attr);
if(ret !=0) {
//创建失败,将_osthread置为空,还记得在jvm.cpp的JVM_StartThread中会根据_osthread是否为空来判断
//是否创建成功
if(PrintMiscellaneous && (Verbose || WizardMode)) {
perror("pthread_create()");
}
//清理资源,并解锁
thread->set_osthread(NULL);
dele teosthread;
if(lock)os::Linux::createThread_lock()->unlock();
return false;
}
//创建成功会将底层线程的ID保存在tid中
osthread->set_pthread_id(tid);
//等待子线程创建完成或者终止
{
Monitor* sync_with_child = osthread->startThread_lock(); MutexLockerExml(sync_with_child, Mutex::_no_safepoint_check_flag);
while((state = osthread->get_state()) == ALLOCATED) {
sync_with_child->wait(Mutex::_no_safepoint_check_flag);
}
}
if(lock) {
os::Linux::createThread_lock()->unlock();
}
}
//线程的数目达到极限了if(state == ZOMBIE) {
thread->set_osthread(NULL);
delete osthread;
return false;
}
//The thread is returned suspended (in state INITIALIZED),
//and is started higher up in the call chain
assert(state == INITIALIZED,"race condition");
return true;
}
下面我们来看看pthread_create会执行的回调函数java_start,这个方法是所有新创建的线程必走的流程。
staticvoid*java_start(Thread *thread)
{
//尝试随机化热栈帧高速缓存行的索引,这有助于优化拥有相同栈帧线程去互相驱逐彼此的缓存行时,线程
//可以是同一个JVM实例或者不同的JVM实例,这尤其有助于拥有超线程技术的处理器。staticintcounter =0;
intpid =os::current_process_id();
alloca(((pid ^ counter++) &7) *128);
ThreadLocalStorage::set_thread(thread);
OSThread* osthread = thread->osthread();
Monitor*sync= osthread->startThread_lock();
//non floating stack LinuxThreads needs extra check, see above
if(!_thread_safety_check(thread)) {
//notify parent thread
MutexLockerExml(sync, Mutex::_no_safepoint_check_flag);
osthread->set_state(ZOMBIE);
sync->notify_all();
returnNULL;
}
//thread_id is kernel thread id (similar to Solaris LWP id)
osthread->set_thread_id(os::Linux::gettid());
if(UseNUMA) {
intlgrp_id =os::numa_get_group_id();
if(lgrp_id != -1) {
thread->set_lgrp_id(lgrp_id);
}
}
//initialize signal mask for this thread
os::Linux::hotspot_sigmask(thread);
//initialize floating point control register
os::Linux::init_thread_fpu_state();
//handshaking with parent thread
{
MutexLockerExml(sync, Mutex::_no_safepoint_check_flag);
//设置为已经初始化完成,并notify父线程
osthread->set_state(INITIALIZED);
sync->notify_all();
//wait until os::start_thread()
while(osthread->get_state() == INITIALIZED) {
sync->wait(Mutex::_no_safepoint_check_flag);
}
}
//这里上层传递过来的是JavaThread,因此会调用JavaThread#run()方法
thread->run();
return0;
}
voidJavaThread::run() {
//初始化本地线程分配缓存(TLAB)相关的属性
this->initialize_tlab();
//used to test validitity of stack trace backs
this->record_base_of_stack_pointer();
//Record real stack base and size.
this->record_stack_base_and_size();
//Initialize thread local storage;
set before calling MutexLocker
this->initialize_thread_local_storage();
this->create_stack_guard_pages();
this->cache_global_variables();
//将线程的状态更改为_thread_in_vm,线程已经可以被VM中的安全点相关的代码处理了,也就是说必须JVM如果线程在执行native里面的代码,是搞不了安全点的,待确认
ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);
assert(JavaThread::current() ==this,"sanity check");
assert(!Thread::current()->owns_locks(),"sanity check");
DTRACE_THREAD_PROBE(start,this);
//This operation might block. We call that after all safepoint checks for a new thread has
//been completed.
this>set_active_handles(JNIHandleBlock::allocate_block());
if(JvmtiExport::should_post_thread_life()) {
JvmtiExport::post_thread_start(this);
}
EventThreadStart event;
if(event.should_commit()) {
event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));
event.commit(); }
//We call another function to do the rest so we are sure that the stack addresses used
//from there will be lower than the stack base just computedthread_main_inner();
//Note, thread is no longer valid at this point!
}
voidJavaThread::thread_main_inner() {
assert(JavaThread::current() ==this,"sanity check");
assert(this->threadObj() !=NULL,"just checking");
//Execute thread entry point unless this thread has a pending exception
//or has been stopped before starting.
//Note: Due to JVM_StopThread we can have pending exceptions already!
if(!this->has_pending_exception() &&
!java_lang_Thread::is_stillborn(this->threadObj())) {
{
ResourceMarkrm(this);
this->set_native_thread_name(this->get_thread_name());
}
HandleMarkhm(this);
//这个entry_point就是JVM_StartThread中传递过来的那个,也就是thread_entry
this->entry_point()(this,this);
}
DTRACE_THREAD_PROBE(stop,this);
this->exit(false);
delete this;
}
我们最后再看thread_entry的代码
staticvoidthread_entry(JavaThread* thread, TRAPS) {
HandleMarkhm(THREAD);
Handleobj(THREAD, thread->threadObj());
JavaValueresult(T_VOID);
JavaCalls::call_virtual(&result,
obj,
KlassHandle(THREAD,SystemDictionary::Thread_klass()),
vmSymbols::run_method_name(),
vmSymbols::void_method_signature(),
THREAD);}
vmSymbols,这个是JVM对于那些需要特殊处理的类、方法等的声明,我的理解就是一个方法表,根据下面这行代码可以看出来,其实调用的就是run()方法.
/*common method and field names*
/template(run_method_name, "run")
然后我们回到JVM_StartThread方法中,这里会接着调用prepare()方法,设置线程优先级(将Java中的优先级映射到os中),然后添加到线程队列中去.最后会调用Thread::start(native_thread);
启动线程。
voidThread::start(Thread* thread) {
trace("start", thread);
//start和resume不一样,start被synchronized修饰
if(!DisableStartThread) {
if(thread->is_Java_thread()) {
//在启动线程之前初始化线程的状态为RUNNABLE,为啥不能在之后设置呢?因为启动之后可能是
//等待或者睡眠等其他状态,具体是什么我们不知道
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(), java_lang_Thread::RUNNABLE);
}os::start_thread(thread);
}
}
总结