首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在没有GIL的情况下,spacy-io如何使用多线程?

在没有GIL的情况下,spacy-io如何使用多线程?
EN

Stack Overflow用户
提问于 2016-05-05 10:11:34
回答 2查看 2K关注 0票数 7

这个职位 带Spacy管的多线程NLP提到了这个问题,

这里来自https://spacy.io/

代码语言:javascript
运行
复制
from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])

from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
    # Multi-word expressions, such as names, dates etc
    # can be merged into single tokens
    for ent in doc.ents:
        ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
    # Efficient, lossless serialization --- all annotations
    # saved, same size as uncompressed text
    byte_string = doc.to_bytes()
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-05-05 10:47:40

我需要写一篇关于这个的博客文章。tl;dr是用Cython实现的,Cython是一种类似Python的语言,可以转换成C或C++,并最终生成一个Python扩展。您可以在这里阅读更多关于使用Cython发布GIL的信息:

http://docs.cython.org/src/userguide/parallelism.html

下面是.pipe方法在spaCy中的实现:

https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135

代码语言:javascript
运行
复制
def pipe(self, stream, int batch_size=1000, int n_threads=2):
    cdef Pool mem = Pool()
    cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
    cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
    cdef Doc doc
    cdef int i
    cdef int nr_class = self.moves.n_moves
    cdef int nr_feat = self.model.nr_feat
    cdef int status
    queue = []
    for doc in stream:
        doc_ptr[len(queue)] = doc.c
        lengths[len(queue)] = doc.length
        queue.append(doc)
        if len(queue) == batch_size:
            with nogil:
                for i in cython.parallel.prange(batch_size, num_threads=n_threads):
                    status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
                    if status != 0:
                        with gil:
                            sent_str = queue[i].text
                            raise ValueError("Error parsing doc: %s" % sent_str)
            PyErr_CheckSignals()
            for doc in queue:
                self.moves.finalize_doc(doc)
                yield doc
            queue = []
    batch_size = len(queue)
    with nogil:
        for i in cython.parallel.prange(batch_size, num_threads=n_threads):
            status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
            if status != 0:
                with gil:
                    sent_str = queue[i].text
                    raise ValueError("Error parsing doc: %s" % sent_str)
    PyErr_CheckSignals()
    for doc in queue:
        self.moves.finalize_doc(doc)
        yield doc

多线程的实际机制非常简单,因为NLP (经常)是(经常)令人尴尬的并行--每个文档都是独立解析的,所以我们只需要在一个文本流上做一个prange循环。

然而,以多线程的方式实现解析器是相当困难的。为了有效地使用多线程,您需要释放GIL,而不是重新获得它。这意味着不使用Python对象,不引发异常等等。

当你创建一个Python对象--比如说一个列表--你需要增加它的引用计数,它是全局存储的。这意味着获得吉尔。那是没有办法的。但是,如果您是在C扩展中,并且只想在堆栈上放置一个整数,或者调用malloc或free,则不需要获取GIL。因此,如果您只使用C和C++构造在该级别编写程序,则可以释放GIL。

我已经用Cython写了几年的统计解析器。(在spaCy之前,我已经为我的学术研究做了一个实现。)要在没有GIL的情况下编写整个解析循环是很困难的。到2015年末,我已经完成了机器学习、哈希表、外部解析循环和大部分特征提取作为nogil代码。但是state对象有一个复杂的接口,并且被实现为一个cdef类。如果不获取GIL,我就无法创建这个对象或将其存储在容器中。

当我想出一种在Cython中编写C++类的无文档方法时,就出现了突破。这允许我将控制解析器状态的现有cdef类挖空。我用方法代理了它与内部C++类的接口。这样,我就可以保持代码正常工作,并确保我没有在特性计算中引入任何细微的bug。

您可以在这里看到内部类:state.pxd

如果您浏览此文件的git历史记录,您可以看到我实现.pipe方法的补丁。

票数 16
EN

Stack Overflow用户

发布于 2016-05-05 10:45:42

据推测,它在C级而不是python级别进行解析。进入C之后,如果不需要访问任何python对象,就可以安全地释放GIL。在最低的读写级别上,CPython也会发布GIL。原因是,如果有其他线程正在运行,并且我们即将调用一个阻塞的C函数,那么我们应该在函数调用期间释放GIL。

您可以在CPython的最低实现中看到这一点。

代码语言:javascript
运行
复制
    if (gil_held) {
        do {
            Py_BEGIN_ALLOW_THREADS
            errno = 0;
#ifdef MS_WINDOWS
            n = write(fd, buf, (int)count);
#else
            n = write(fd, buf, count);
#endif
            /* save/restore errno because PyErr_CheckSignals()
             * and PyErr_SetFromErrno() can modify it */
            err = errno;
            Py_END_ALLOW_THREADS
        } while (n < 0 && err == EINTR &&
                !(async_err = PyErr_CheckSignals()));
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37047872

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档