前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 的多线程与 GIL

Python 的多线程与 GIL

作者头像
CS实验室
发布2021-03-22 12:25:43
9480
发布2021-03-22 12:25:43
举报
文章被收录于专栏:CS实验室

Python的多线程与GIL

Python从0.9.8版就开始支持多线程( thread模块),1.5.1版引入了 threading高级模块,是对thread模块的封装。

在Python3中, thread模块被重命名为 _thread,强调其作为底层模块应该避免使用。

_thread模块对应源码在 Modules/_threadmodule.c中,我们看下 _thread模块提供的接口函数:

代码语言:javascript
复制
static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
     METH_VARARGS, stack_size_doc},
    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
     METH_NOARGS, _set_sentinel_doc},
    {NULL,                      NULL}           /* sentinel */
};

0x00 线程的创建

代码语言:javascript
复制
static PyObject *
thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    unsigned long ident;

    ...

    // [1] 创建bootstate结构boot并初始化
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    // [2] 初始化多线程环境
    PyEval_InitThreads(); /* Start the interpreter's thread-awareness */
    // [3] 创建原生系统线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);
    ...
    return PyLong_FromUnsignedLong(ident);
}

[1] 创建bootstate结构boot并初始化。

bootstate结构体记录了线程的信息:

代码语言:javascript
复制
struct bootstate {
    PyInterpreterState *interp;
    PyObject *func;
    PyObject *args;
    PyObject *keyw;
    PyThreadState *tstate;
};

func, args, keyw记录线程执行的函数以及参数。 interptstate分别保存了进程状态 PyInterpreterState对象和线程状态 PyThreadState对象。

通过 PyThreadState_GET获取当前线程状态 PyThreadState对象,进而可以获取进程状态 PyInterpreterState对象。

PyThreadState定义在 Include/pystate.h

代码语言:javascript
复制
typedef struct _ts {
    /* See Python/ceval.c for comments explaining most fields */

    struct _ts *prev;
    struct _ts *next;
    PyInterpreterState *interp;

    struct _frame *frame;
    int recursion_depth;
    char overflowed; /* The stack has overflowed. Allow 50 more calls
                        to handle the runtime error. */
    char recursion_critical; /* The current calls must not cause
                                a stack overflow. */
    int stackcheck_counter;

    /* 'tracing' keeps track of the execution depth when tracing/profiling.
       This is to prevent the actual trace/profile code from being recorded in
       the trace/profile. */
    int tracing;
    int use_tracing;

    Py_tracefunc c_profilefunc;
    Py_tracefunc c_tracefunc;
    PyObject *c_profileobj;
    PyObject *c_traceobj;

    /* The exception currently being raised */
    PyObject *curexc_type;
    PyObject *curexc_value;
    PyObject *curexc_traceback;

    /* The exception currently being handled, if no coroutines/generators
     * are present. Always last element on the stack referred to be exc_info.
     */
    _PyErr_StackItem exc_state;

    /* Pointer to the top of the stack of the exceptions currently
     * being handled */
    _PyErr_StackItem *exc_info;

    PyObject *dict;  /* Stores per-thread state */

    int gilstate_counter;

    PyObject *async_exc; /* Asynchronous exception to raise */
    unsigned long thread_id; /* Thread id where this tstate was created */

    int trash_delete_nesting;
    PyObject *trash_delete_later;

    /* Called when a thread state is deleted normally, but not when it
     * is destroyed after fork().
     * Pain:  to prevent rare but fatal shutdown errors (issue 18808),
     * Thread.join() must wait for the join'ed thread's tstate to be unlinked
     * from the tstate chain.  That happens at the end of a thread's life,
     * in pystate.c.
     * The obvious way doesn't quite work:  create a lock which the tstate
     * unlinking code releases, and have Thread.join() wait to acquire that
     * lock.  The problem is that we _are_ at the end of the thread's life:
     * if the thread holds the last reference to the lock, decref'ing the
     * lock will delete the lock, and that may trigger arbitrary Python code
     * if there's a weakref, with a callback, to the lock.  But by this time
     * _PyThreadState_Current is already NULL, so only the simplest of C code
     * can be allowed to run (in particular it must not be possible to
     * release the GIL).
     * So instead of holding the lock directly, the tstate holds a weakref to
     * the lock:  that's the value of on_delete_data below.  Decref'ing a
     * weakref is harmless.
     * on_delete points to _threadmodule.c's static release_sentinel() function.
     * After the tstate is unlinked, release_sentinel is called with the
     * weakref-to-lock (on_delete_data) argument, and release_sentinel releases
     * the indirectly held lock.
     */
    void (*on_delete)(void *);
    void *on_delete_data;

    int coroutine_origin_tracking_depth;

    PyObject *coroutine_wrapper;
    int in_coroutine_wrapper;

    PyObject *async_gen_firstiter;
    PyObject *async_gen_finalizer;

    PyObject *context;
    uint64_t context_ver;

    /* Unique thread state id. */
    uint64_t id;

    /* XXX signal handlers should also be here */

} PyThreadState;

通过 _PyThreadState_Prealloc函数创建线程状态 PyThreadState对象,定义在 Python/pystate.c

代码语言:javascript
复制
PyThreadState *
_PyThreadState_Prealloc(PyInterpreterState *interp)
{
    return new_threadstate(interp, 0);
}

static PyThreadState *
new_threadstate(PyInterpreterState *interp, int init)
{
    PyThreadState *tstate = (PyThreadState *)PyMem_RawMalloc(sizeof(PyThreadState));

    if (_PyThreadState_GetFrame == NULL)
        _PyThreadState_GetFrame = threadstate_getframe;

    if (tstate != NULL) {
        tstate->interp = interp;

        tstate->frame = NULL;
        tstate->recursion_depth = 0;
        tstate->overflowed = 0;
        tstate->recursion_critical = 0;
        tstate->stackcheck_counter = 0;
        tstate->tracing = 0;
        tstate->use_tracing = 0;
        tstate->gilstate_counter = 0;
        tstate->async_exc = NULL;
        tstate->thread_id = PyThread_get_thread_ident();

        tstate->dict = NULL;

        tstate->curexc_type = NULL;
        tstate->curexc_value = NULL;
        tstate->curexc_traceback = NULL;

        tstate->exc_state.exc_type = NULL;
        tstate->exc_state.exc_value = NULL;
        tstate->exc_state.exc_traceback = NULL;
        tstate->exc_state.previous_item = NULL;
        tstate->exc_info = &tstate->exc_state;

        tstate->c_profilefunc = NULL;
        tstate->c_tracefunc = NULL;
        tstate->c_profileobj = NULL;
        tstate->c_traceobj = NULL;

        tstate->trash_delete_nesting = 0;
        tstate->trash_delete_later = NULL;
        tstate->on_delete = NULL;
        tstate->on_delete_data = NULL;

        tstate->coroutine_origin_tracking_depth = 0;

        tstate->coroutine_wrapper = NULL;
        tstate->in_coroutine_wrapper = 0;

        tstate->async_gen_firstiter = NULL;
        tstate->async_gen_finalizer = NULL;

        tstate->context = NULL;
        tstate->context_ver = 1;

        tstate->id = ++interp->tstate_next_unique_id;

        if (init)
            _PyThreadState_Init(tstate);

        HEAD_LOCK();
        tstate->prev = NULL;
        tstate->next = interp->tstate_head;
        if (tstate->next)
            tstate->next->prev = tstate;
        interp->tstate_head = tstate;
        HEAD_UNLOCK();
    }

    return tstate;
}

[2] 初始化多线程环境

当Python启动时,是并不支持多线程的。换句话说,Python中支持多线程的数据结构以及GIL都是没有创建的,Python之所以有这种行为是因为大多数的Python程序都不需要多线程的支持。 Python选择了让用户激活多线程机制的策略。在Python虚拟机启动时,多线程机制并没有被激活,它只支持单线程,一旦用户调用thread.startnewthread,明确指示Python虚拟机创建新的线程,Python就能意识到用户需要多线程的支持,这个时候,Python虚拟机会自动建立多线程机制需要的数据结构、环境以及那个至关重要的GIL。 摘自:《Python源码剖析》 — 陈儒

PyEval_InitThreads函数定义在 Python/ceval.c

代码语言:javascript
复制
void
PyEval_InitThreads(void)
{
    // [A] 检查是否存在GIL
    if (gil_created())
        return;
    // [B] 创建并初始化GIL
    create_gil();
    // [C] 主线程获取GIL
    take_gil(PyThreadState_GET());
    // [D] 检查和初始化原生系统线程环境
    _PyRuntime.ceval.pending.main_thread = PyThread_get_thread_ident();
    if (!_PyRuntime.ceval.pending.lock)
        _PyRuntime.ceval.pending.lock = PyThread_allocate_lock();
}
[A] 检查是否存在GIL

gil_created函数定义在 Python/ceval_gil.h

代码语言:javascript
复制
static int gil_created(void)
{
    return (_Py_atomic_load_explicit(&_PyRuntime.ceval.gil.locked,
                                     _Py_memory_order_acquire)
            ) >= 0;
}

_Py_atomic_*是一组原子操作,封装了不同的平台的原子接口。

_Py_atomic_load_explicit定义在 Include/pyatomic.h

代码语言:javascript
复制
#define _Py_atomic_load_explicit(ATOMIC_VAL, ORDER) \
    atomic_load_explicit(&(ATOMIC_VAL)->_value, ORDER)

atomic_load_explicit函数是c11标准的原子读操作,参考Atomic operations library。

_Py_memory_order_acquire是枚举元素:

代码语言:javascript
复制
typedef enum _Py_memory_order {
    _Py_memory_order_relaxed = memory_order_relaxed,
    _Py_memory_order_acquire = memory_order_acquire,
    _Py_memory_order_release = memory_order_release,
    _Py_memory_order_acq_rel = memory_order_acq_rel,
    _Py_memory_order_seq_cst = memory_order_seq_cst
} _Py_memory_order;

memory_order_*指定常规的非原子内存访问如何围绕原子操作排序,这边我们不细究这些原子操作排序的不同,感兴趣可以参考memory order。

我们只关心两种原子操作,一种 load原子读和一种 store原子写。

_PyRuntime是一个 _PyRuntimeState对象,表示Python运行状态对象。 ceval指向 _ceval_runtime_state对象,表示Python字节码执行器的运行状态对象,定义在 Include/internal/ceval.h

代码语言:javascript
复制
struct _ceval_runtime_state {
    int recursion_limit;
    /* Records whether tracing is on for any thread.  Counts the number
       of threads for which tstate->c_tracefunc is non-NULL, so if the
       value is 0, we know we don't have to check this thread's
       c_tracefunc.  This speeds up the if statement in
       PyEval_EvalFrameEx() after fast_next_opcode. */
    int tracing_possible;
    /* This single variable consolidates all requests to break out of
       the fast path in the eval loop. */
    _Py_atomic_int eval_breaker;
    /* Request for dropping the GIL */
    _Py_atomic_int gil_drop_request;
    struct _pending_calls pending;
    struct _gil_runtime_state gil;
};

ceval.gil指向 _gil_runtime_state对象,表示GIL的运行状态对象,定义在 Include/internal/gil.h

代码语言:javascript
复制
struct _gil_runtime_state {
    /* microseconds (the Python API uses seconds, though) */
    unsigned long interval;
    /* Last PyThreadState holding / having held the GIL. This helps us
       know whether anyone else was scheduled after we dropped the GIL. */
    _Py_atomic_address last_holder;
    /* Whether the GIL is already taken (-1 if uninitialized). This is
       atomic because it can be read without any lock taken in ceval.c. */
    _Py_atomic_int locked;
    /* Number of GIL switches since the beginning. */
    unsigned long switch_number;
    /* This condition variable allows one or several threads to wait
       until the GIL is released. In addition, the mutex also protects
       the above variables. */
    PyCOND_T cond;
    PyMUTEX_T mutex;
#ifdef FORCE_SWITCHING
    /* This condition variable helps the GIL-releasing thread wait for
       a GIL-awaiting thread to be scheduled and take the GIL. */
    PyCOND_T switch_cond;
    PyMUTEX_T switch_mutex;
#endif
};

locked就是传说中的GIL,是一个 _Py_atomic_int对象,定义在 Include/pyatomic.h

代码语言:javascript
复制
typedef struct _Py_atomic_int {
    atomic_int _value;
} _Py_atomic_int;

所以, gil_created函数通过原子读操作检查原子位 locked是否初始化。未初始化时, locked为-1。当GIL被线程获取时, locked为1,反之为0。

[B] 创建并初始化GIL
代码语言:javascript
复制
static void create_gil(void)
{
    MUTEX_INIT(_PyRuntime.ceval.gil.mutex);
#ifdef FORCE_SWITCHING
    MUTEX_INIT(_PyRuntime.ceval.gil.switch_mutex);
#endif
    COND_INIT(_PyRuntime.ceval.gil.cond);
#ifdef FORCE_SWITCHING
    COND_INIT(_PyRuntime.ceval.gil.switch_cond);
#endif
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder, 0);
    _Py_ANNOTATE_RWLOCK_CREATE(&_PyRuntime.ceval.gil.locked);
    _Py_atomic_store_explicit(&_PyRuntime.ceval.gil.locked, 0,
                              _Py_memory_order_release);
}

_gil_runtime_state结构体中的 condmutex用来保护 locked

The GIL is just a boolean variable (locked) whose access is protected by a mutex (gilmutex), and whose changes are signalled by a condition variable (gilcond). gil_mutex is taken for short periods of time, and therefore mostly uncontended.

GIL利用条件机制和互斥锁 <cond,mutex>保护一个锁变量 locked作为实现。

PyCOND_TPyMUTEX_T定义在 Include/internal/condvar.h,根据平台不一样具体实现也不一样,以POSIX Thread(pthread)为例:

代码语言:javascript
复制
#define PyMUTEX_T pthread_mutex_t
#define PyCOND_T pthread_cond_t

create_gil函数初始化锁变量 mutex和初始化条件变量 cond之后,通过 _Py_atomic_store_relaxed原子写初始化 last_holderlast_holder是一个 _Py_atomic_address对象,定义在 Include/pyatomic.h

代码语言:javascript
复制
typedef struct _Py_atomic_address {
    atomic_uintptr_t _value;
} _Py_atomic_address;

atomic_uintptr_t是一个原子类型指针,当一个线程获得GIL之后, _value将指向线程状态 PyThreadState对象。

并通过 _Py_atomic_store_explicit原子写初始化 locked

[C] 主线程获取GIL

take_gil函数定义在 Python/ceval_gil.h

代码语言:javascript
复制
static void take_gil(PyThreadState *tstate)
{
    int err;
    if (tstate == NULL)
        Py_FatalError("take_gil: NULL tstate");

    err = errno;
    MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);

    if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
        goto _ready;

    while (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked)) {
        int timed_out = 0;
        unsigned long saved_switchnum;

        saved_switchnum = _PyRuntime.ceval.gil.switch_number;
        COND_TIMED_WAIT(_PyRuntime.ceval.gil.cond, _PyRuntime.ceval.gil.mutex,
                        INTERVAL, timed_out);
        /* If we timed out and no switch occurred in the meantime, it is time
           to ask the GIL-holding thread to drop it. */
        if (timed_out &&
            _Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked) &&
            _PyRuntime.ceval.gil.switch_number == saved_switchnum) {
            SET_GIL_DROP_REQUEST();
        }
    }
_ready:
#ifdef FORCE_SWITCHING
    /* This mutex must be taken before modifying
       _PyRuntime.ceval.gil.last_holder (see drop_gil()). */
    MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
    /* We now hold the GIL */
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 1);
    _Py_ANNOTATE_RWLOCK_ACQUIRED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);

    if (tstate != (PyThreadState*)_Py_atomic_load_relaxed(
                    &_PyRuntime.ceval.gil.last_holder))
    {
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                 (uintptr_t)tstate);
        ++_PyRuntime.ceval.gil.switch_number;
    }

#ifdef FORCE_SWITCHING
    COND_SIGNAL(_PyRuntime.ceval.gil.switch_cond);
    MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
#endif
    if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)) {
        RESET_GIL_DROP_REQUEST();
    }
    if (tstate->async_exc != NULL) {
        _PyEval_SignalAsyncExc();
    }

    MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);
    errno = err;
}

首先获取mutex互斥锁,获取mutex互斥锁之后检查是否有线程持有GIL( locked为1),如果GIL被占有的话,设置cond信号量等待。等待的时候线程就会挂起,释放mutex互斥锁。cond信号被唤醒时,mutex互斥锁会自动加锁。while语句避免了意外唤醒时条件不满足,大多数情况下都是条件满足被唤醒。

如果等待超时并且这期间没有发生线程切换,就通过 SET_GIL_DROP_REQUEST请求持有GIL的线程释放GIL。反之获得GIL就将 locked置为1,并将当前线程状态 PyThreadState对象保存到 last_holderswitch_number切换次数加1。

[D] 检查和初始化原生系统线程环境

pending是一个 _pending_calls结构对象:

代码语言:javascript
复制
struct _pending_calls {
    unsigned long main_thread;
    pythread_type_lock lock;
    /* request for running pending calls. */
    _py_atomic_int calls_to_do;
    /* request for looking at the `async_exc` field of the current
       thread state.
       guarded by the gil. */
    int async_exc;
#define npendingcalls 32
    struct {
        int (*func)(void *);
        void *arg;
    } calls[npendingcalls];
    int first;
    int last;
};

pending cells实现了一个机制:

Mechanism whereby asynchronously executing callbacks (e.g. UNIX signal handlers or Mac I/O completion routines) can schedule calls to a function to be called synchronously.

也就是主线程执行一些被注册的函数, pending.main_thread记录了主线程id:

代码语言:javascript
复制
unsigned long
PyThread_get_thread_ident(void)
{
    volatile pthread_t threadid;
    if (!initialized)
        PyThread_init_thread();
    threadid = pthread_self();
    return (unsigned long) threadid;
}

获得线程id之前会检查 initialized变量,如果说GIL指示着Python的多线程环境是否已经建立,那么这个 initialized变量就指示着为了使用底层平台所提供的原生thread,必须的初始化动作是否完成。

initializedPyThread_init_thread定义在 Python/thread.c

代码语言:javascript
复制
static int initialized;

void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
    const char *p = Py_GETENV("PYTHONTHREADDEBUG");

    if (p) {
        if (*p)
            thread_debug = atoi(p);
        else
            thread_debug = 1;
    }
#endif /* Py_DEBUG */
    if (initialized)
        return;
    initialized = 1;
    dprintf(("PyThread_init_thread called\n"));
    PyThread__init_thread();
}

POSIX Thread(pthread) 下 PyThread__init_thread会根据编译器和平台来确认是否要进行初始化动作,定义在 Python/thread_pthread.h

代码语言:javascript
复制
static void
PyThread__init_thread(void)
{
#if defined(_AIX) && defined(__GNUC__)
    extern void pthread_init(void);
    pthread_init();
#endif
}

pending.lock用来确保线程安全,是一个Python级别的线程锁。

[3] 创建原生系统线程

POSIX Thread(pthread) 下的 PyThread_start_new_thread定义在 Python/thread_pthread.h

代码语言:javascript
复制
unsigned long
PyThread_start_new_thread(void (*func)(void *), void *arg)
{
    pthread_t th;
    int status;
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_t attrs;
#endif
#if defined(THREAD_STACK_SIZE)
    size_t      tss;
#endif

    dprintf(("PyThread_start_new_thread called\n"));
    if (!initialized)
        PyThread_init_thread();

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    if (pthread_attr_init(&attrs) != 0)
        return PYTHREAD_INVALID_THREAD_ID;
#endif
#if defined(THREAD_STACK_SIZE)
    PyThreadState *tstate = PyThreadState_GET();
    size_t stacksize = tstate ? tstate->interp->pythread_stacksize : 0;
    tss = (stacksize != 0) ? stacksize : THREAD_STACK_SIZE;
    if (tss != 0) {
        if (pthread_attr_setstacksize(&attrs, tss) != 0) {
            pthread_attr_destroy(&attrs);
            return PYTHREAD_INVALID_THREAD_ID;
        }
    }
#endif
#if defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);
#endif

    status = pthread_create(&th,
#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
                             &attrs,
#else
                             (pthread_attr_t*)NULL,
#endif
                             (void* (*)(void *))func,
                             (void *)arg
                             );

#if defined(THREAD_STACK_SIZE) || defined(PTHREAD_SYSTEM_SCHED_SUPPORTED)
    pthread_attr_destroy(&attrs);
#endif
    if (status != 0)
        return PYTHREAD_INVALID_THREAD_ID;

    pthread_detach(th);

#if SIZEOF_PTHREAD_T <= SIZEOF_LONG
    return (unsigned long) th;
#else
    return (unsigned long) *(unsigned long *) &th;
#endif
}

PyThread_start_new_thread函数通过 pthread_create函数创建原生的系统线程,然后调用 pthread_detach函数将线程状态改为 unjoinable状态,确保线程运行结束后资源的释放,最后返回线程id。

0x01 线程的执行

传给了 pthread_create函数用来创建线程的func参数是 t_bootstrap函数,arg参数包装了线程信息的boot对象,也就是说主线程创建的子线程将会执行 t_bootstrap(boot)

t_bootstrap定义在 Modules/_threadmodule.c

代码语言:javascript
复制
static void
t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    tstate->interp->num_threads++;
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    ...
    Py_DECREF(boot->func);
    Py_DECREF(boot->args);
    Py_XDECREF(boot->keyw);
    PyMem_DEL(boot_raw);
    tstate->interp->num_threads--;
    PyThreadState_Clear(tstate);
    PyThreadState_DeleteCurrent();
    PyThread_exit_thread();
}

t_bootstrap函数先对线程状态对象设置子线程的id,并初始化线程状态对象。

调用 PyEval_AcquireThread获取GIL,定义在 Python/ceval.c

代码语言:javascript
复制
void
PyEval_AcquireThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_AcquireThread: NULL new thread state");
    /* Check someone has called PyEval_InitThreads() to create the lock */
    assert(gil_created());
    take_gil(tstate);
    if (PyThreadState_Swap(tstate) != NULL)
        Py_FatalError(
            "PyEval_AcquireThread: non-NULL old thread state");
}

PyEval_AcquireThread实际上就是调用了 take_gil来获取GIL。

获取GIL之后对进程状态对象( interp)累加线程数,并调用 PyObject_Call执行子线程的函数。

在子线程的全部计算完成之后,Python将销毁子线程。

0x02 线程的销毁

PyThreadState_Clear清除当前线程对应的线程状态对象,所谓清理,实际上比较简单,就是对线程状态对象中维护的东西进行引用计数的维护。

PyThreadState_DeleteCurrent释放线程状态对象并释放GIL。

代码语言:javascript
复制
void
PyThreadState_DeleteCurrent()
{
    PyThreadState *tstate = GET_TSTATE();
    if (tstate == NULL)
        Py_FatalError(
            "PyThreadState_DeleteCurrent: no current tstate");
    tstate_delete_common(tstate);
    if (_PyRuntime.gilstate.autoInterpreterState &&
        PyThread_tss_get(&_PyRuntime.gilstate.autoTSSkey) == tstate)
    {
        PyThread_tss_set(&_PyRuntime.gilstate.autoTSSkey, NULL);
    }
    SET_TSTATE(NULL);
    PyEval_ReleaseLock();
}

static void
tstate_delete_common(PyThreadState *tstate)
{
    PyInterpreterState *interp;
    if (tstate == NULL)
        Py_FatalError("PyThreadState_Delete: NULL tstate");
    interp = tstate->interp;
    if (interp == NULL)
        Py_FatalError("PyThreadState_Delete: NULL interp");
    HEAD_LOCK();
    if (tstate->prev)
        tstate->prev->next = tstate->next;
    else
        interp->tstate_head = tstate->next;
    if (tstate->next)
        tstate->next->prev = tstate->prev;
    HEAD_UNLOCK();
    if (tstate->on_delete != NULL) {
        tstate->on_delete(tstate->on_delete_data);
    }
    PyMem_RawFree(tstate);
}

PyThreadState_DeleteCurrent中,首先会删除清理后的当前线程状态对象,然后通过 PyEval_ReleaseLock释放GIL。

PyEval_ReleaseLock定义在 Python/ceval.c

代码语言:javascript
复制
void
PyEval_ReleaseLock(void)
{
    /* This function must succeed when the current thread state is NULL.
       We therefore avoid PyThreadState_GET() which dumps a fatal error
       in debug mode.
    */
    drop_gil((PyThreadState*)_Py_atomic_load_relaxed(
        &_PyThreadState_Current));
}

PyEval_ReleaseLock调用了 drop_gil函数,定义在 Python/ceval_gil.h

代码语言:javascript
复制
static void drop_gil(PyThreadState *tstate)
{
    if (!_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil.locked))
        Py_FatalError("drop_gil: GIL is not locked");
    /* tstate is allowed to be NULL (early interpreter init) */
    if (tstate != NULL) {
        /* Sub-interpreter support: threads might have been switched
           under our feet using PyThreadState_Swap(). Fix the GIL last
           holder variable so that our heuristics work. */
        _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.last_holder,
                                 (uintptr_t)tstate);
    }

    MUTEX_LOCK(_PyRuntime.ceval.gil.mutex);
    _Py_ANNOTATE_RWLOCK_RELEASED(&_PyRuntime.ceval.gil.locked, /*is_write=*/1);
    _Py_atomic_store_relaxed(&_PyRuntime.ceval.gil.locked, 0);
    COND_SIGNAL(_PyRuntime.ceval.gil.cond);
    MUTEX_UNLOCK(_PyRuntime.ceval.gil.mutex);

#ifdef FORCE_SWITCHING
    if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request) &&
        tstate != NULL)
    {
        MUTEX_LOCK(_PyRuntime.ceval.gil.switch_mutex);
        /* Not switched yet => wait */
        if (((PyThreadState*)_Py_atomic_load_relaxed(
                    &_PyRuntime.ceval.gil.last_holder)
            ) == tstate)
        {
        RESET_GIL_DROP_REQUEST();
            /* NOTE: if COND_WAIT does not atomically start waiting when
               releasing the mutex, another thread can run through, take
               the GIL and drop it again, and reset the condition
               before we even had a chance to wait for it. */
            COND_WAIT(_PyRuntime.ceval.gil.switch_cond,
                      _PyRuntime.ceval.gil.switch_mutex);
    }
        MUTEX_UNLOCK(_PyRuntime.ceval.gil.switch_mutex);
    }
#endif
}

drop_gil函数获取mutex互斥锁之后将 locked置为0,释放GIL,并通知条件变量 cond

清除和释放工作完成后,子线程就调用 PyThread_exit_thread退出了。

代码语言:javascript
复制
void
PyThread_exit_thread(void)
{
    dprintf(("PyThread_exit_thread called\n"));
    if (!initialized)
        exit(0);
    pthread_exit(0);
}

PyThread_exit_thread是一个平台相关的操作,完成各个平台上不同的销毁原生线程的工作。在POSIX Thread(pthread) 下,实际上就是调用 pthread_exit函数。

0x03 线程的调度

时间调度

当然,子线程是不会一直执行 t_bootstrap到释放GIL,Python中持有GIL的线程会在某个时间释放GIL。

In the GIL-holding thread, the main loop (PyEvalEvalFrameEx) must be able to release the GIL on demand by another thread. A volatile boolean variable (gildrop_request) is used for that purpose, which is checked at every turn of the eval loop. That variable is set after a wait ofinterval microseconds on gil_cond has timed out. [Actually, another volatile boolean variable (eval_breaker) is used which ORs several conditions into one. Volatile booleans are sufficient as inter-thread signalling means since Python is run on cache-coherent architectures only.] A thread wanting to take the GIL will first let pass a given amount of time ( interval microseconds) before setting gildroprequest. This encourages a defined switching period, but doesn't enforce it since opcodes can take an arbitrary time to execute. The interval value is available for the user to read and modify using the Python API sys.{get,set}switchinterval().

我们看 PyEval_EvalFrameEx函数,定义在 Python/ceval.c

代码语言:javascript
复制
PyObject *
PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
{
    PyThreadState *tstate = PyThreadState_GET();
    return tstate->interp->eval_frame(f, throwflag);
}

PyObject* _Py_HOT_FUNCTION
_PyEval_EvalFrameDefault(PyFrameObject *f, int throwflag)
{
    ...
main_loop:
    for (;;) {
        ...

        if (_Py_atomic_load_relaxed(&_PyRuntime.ceval.eval_breaker)) {
            ...

            if (_Py_atomic_load_relaxed(
                        &_PyRuntime.ceval.gil_drop_request))
            {
                /* Give another thread a chance */
                if (PyThreadState_Swap(NULL) != tstate)
                    Py_FatalError("ceval: tstate mix-up");
                drop_gil(tstate);

                /* Other threads may run now */

                take_gil(tstate);

                /* Check if we should make a quick exit. */
                if (_Py_IsFinalizing() &&
                    !_Py_CURRENTLY_FINALIZING(tstate))
                {
                    drop_gil(tstate);
                    PyThread_exit_thread();
                }

                if (PyThreadState_Swap(tstate) != NULL)
                    Py_FatalError("ceval: orphan tstate");
            }
            ...
        }
        ...
    }
}

_PyEval_EvalFrameDefault是虚拟机执行字节码的主入口函数,当满足 _Py_atomic_load_relaxed(&_PyRuntime.ceval.gil_drop_request)时,当前线程会释放GIL,其他等待GIL的线程会尝试获取GIL并执行。

gil_drop_request会在 cond等待超时被设置。而超时时间被设置在 Python/ceval_gil.h

代码语言:javascript
复制
#define INTERVAL (_PyRuntime.ceval.gil.interval >= 1 ? _PyRuntime.ceval.gil.interval : 1)

gil.interval会在 _PyRuntimeState初始化的时候会被设置。

代码语言:javascript
复制
#define DEFAULT_INTERVAL 5000

static void _gil_initialize(struct _gil_runtime_state *state)
{
    _Py_atomic_int uninitialized = {-1};
    state->locked = uninitialized;
    state->interval = DEFAULT_INTERVAL;
}

默认是0.005s,可以通过sys.getswitchinterval来查看 interval时间间隔:

代码语言:javascript
复制
> import sys
> sys.getswitchinterval()
> 0.005

阻塞调度

Python3中除了时间调度之外,还有一种阻塞调度:当线程执行I/O操作,或者sleep时,线程将会挂起,释放GIL。

time.sleep为例,实现在 Modules/timemodule.c

代码语言:javascript
复制
/* Implement pysleep() for various platforms.
   When interrupted (or when another error occurs), return -1 and
   set an exception; else return 0. */

static int
pysleep(_PyTime_t secs)
{
    _PyTime_t deadline, monotonic;

    _PyTime_t millisecs;
    unsigned long ul_millis;
    DWORD rc;
    HANDLE hInterruptEvent;

    deadline = _PyTime_GetMonotonicClock() + secs;

    do {
        if (_PyTime_AsTimeval(secs, &timeout, _PyTime_ROUND_CEILING) < 0)
            return -1;

        Py_BEGIN_ALLOW_THREADS
        err = select(0, (fd_set *)0, (fd_set *)0, (fd_set *)0, &timeout);
        Py_END_ALLOW_THREADS

        if (err == 0)
            break;

        if (errno != EINTR) {
            PyErr_SetFromErrno(PyExc_OSError);
            return -1;
        }

        /* sleep was interrupted by SIGINT */
        if (PyErr_CheckSignals())
            return -1;

        monotonic = _PyTime_GetMonotonicClock();
        secs = deadline - monotonic;
        if (secs < 0)
            break;
        /* retry with the recomputed delay */
    } while (1);

    return 0;
}

pysleep函数是跨平台实现,上面只保留非windows平台的代码。

Python在这里使用select来实现sleep阻塞,可以看到select前后有两个宏定义:

  • Py_BEGIN_ALLOW_THREADS
  • Py_END_ALLOW_THREADS

定义在 Include/ceval.h

代码语言:javascript
复制
#define Py_BEGIN_ALLOW_THREADS { \
                        PyThreadState *_save; \
                        _save = PyEval_SaveThread();
#define Py_BLOCK_THREADS        PyEval_RestoreThread(_save);
#define Py_UNBLOCK_THREADS      _save = PyEval_SaveThread();
#define Py_END_ALLOW_THREADS    PyEval_RestoreThread(_save); \
                 }

Py_BEGIN_ALLOW_THREADSPy_END_ALLOW_THREADS宏分别调用了 PyEval_SaveThreadPyEval_RestoreThread函数:

代码语言:javascript
复制
/* Functions save_thread and restore_thread are always defined so
   dynamically loaded modules needn't be compiled separately for use
   with and without threads: */

PyThreadState *
PyEval_SaveThread(void)
{
    PyThreadState *tstate = PyThreadState_Swap(NULL);
    if (tstate == NULL)
        Py_FatalError("PyEval_SaveThread: NULL tstate");
    assert(gil_created());
    drop_gil(tstate);
    return tstate;
}

void
PyEval_RestoreThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_RestoreThread: NULL tstate");
    assert(gil_created());

    int err = errno;
    take_gil(tstate);
    /* _Py_Finalizing is protected by the GIL */
    if (_Py_IsFinalizing() && !_Py_CURRENTLY_FINALIZING(tstate)) {
        drop_gil(tstate);
        PyThread_exit_thread();
        Py_UNREACHABLE();
    }
    errno = err;

    PyThreadState_Swap(tstate);
}

PyThreadState_Swap定义在 Python/pystate.c

代码语言:javascript
复制
PyThreadState *
PyThreadState_Swap(PyThreadState *newts)
{
    PyThreadState *oldts = GET_TSTATE();

    SET_TSTATE(newts);
    ...
    return oldts;
}
  • PyEval_SaveThread:设置当前线程状态对象为NULL,释放GIL,并保存到 _save变量。
  • PyEval_RestoreThread:获取GIL,重新设置当前线程状态对象。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-07-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CS实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python的多线程与GIL
    • 0x00 线程的创建
      • [1] 创建bootstate结构boot并初始化。
      • [2] 初始化多线程环境
      • [3] 创建原生系统线程
    • 0x01 线程的执行
      • 0x02 线程的销毁
        • 0x03 线程的调度
          • 时间调度
          • 阻塞调度
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档