前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch 通过Scroll遍历索引,构造pandas dataframe 【Python多进程实现】

Elasticsearch 通过Scroll遍历索引,构造pandas dataframe 【Python多进程实现】

作者头像
NaughtyCat
发布2020-10-09 16:28:24
1.5K0
发布2020-10-09 16:28:24
举报
文章被收录于专栏:开心的平凡酱开心的平凡酱

首先,python 多线程不能充分利用多核CPU的计算资源(只能共用一个CPU),所以得用多进程。笔者从3.7亿数据的索引,取200多万的数据,从取数据到构造pandas dataframe总共大概用时14秒左右。每个分片用一个进程查询数据,最后拼接出完整的结果。

由于返回的json数据量较大,每次100多万到200多万,如何快速根据json构造pandas 的dataframe是个问题 — 笔者测试过read_json()、json_normalize()、DataFrame(eval(pandas_json))及DataFrame.from_dict(),from_dict()速度最快

转载请注明出处:https://www.cnblogs.com/NaughtyCat/p/how-to-get-all-results-from-es-by-scroll-python-version.html

  • Elasticsearch scroll取数据— python版

源码如下:

代码语言:javascript
复制
def es_scroll(index, min_timestamp, max_timestamp, slice_no):
    es = Elasticsearch('http://localhost:9200', timeout = 30, max_retries=10, retry_on_timeout=True)
    page = es.search(
            index = index,
            doc_type = "tls_book",
            scroll = '1m',
            body={
            "slice": {
                "id": slice_no,
                "max": SLICES
            },
            "_source": [
            "SrcIP" 
            ],
            "sort": [
            "_doc"
            ],
            "query": {
                    "range" : {
                        "@timestamp" : {
                            "gte" : min_timestamp,
                            "lte" : max_timestamp,
                            "boost" : 2.0
                        }
                    }
                }
            },
            version = False,
            size = 10000)
    sid = page['_scroll_id']
    scroll_size = page['hits']['total']

    # Start scrolling
    df = pd.DataFrame()
    appended_data = []

    while (scroll_size > 0):
        frame = pd.DataFrame.from_dict([document['_source'] for document in page["hits"]["hits"]])
        appended_data.append(frame)
        page = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30)
        # Update the scroll ID
        sid = page['_scroll_id']
        # Get the number of results that we returned in the last scroll
        scroll_size = len(page['hits']['hits'])
    if len(appended_data) > 0: 
        df = pd.concat(appended_data, ignore_index=True, sort = False)
    del appended_data
    gc.collect() 
    es.clear_scroll(body={'scroll_id': sid})
    return df            

 注:

 (1)通过 "_source" 关键字,指定要取的字段,可减少不必要的字段,提高查询速度

(2)官方文档指出,通过 "sort": "_doc" —即__按照_doc排序,可提高查询效率

(3)根据自己的环境,测试合理的 size ,效率会有数倍的差距。笔者环境(128G, 32核)一次取10000性能最好,网上大多测试,size取2000或者1000似乎较佳

(4)clear_scroll及时清理用完的scroll_id

(5)如果数据量较大,设置超时和重试次数(默认是10秒,否则超时会取不到数据),具体如下

代码语言:javascript
复制
 timeout = 30, max_retries=10, retry_on_timeout=True

(6)Sliced scroll

如果返回的数据量特别大,可通过slice让多个分片独自来处理请求,如下(id从0开始):

代码语言:javascript
复制
            "slice": {
                "id": slice_no,
                "max": SLICES
            },
代码语言:javascript
复制
参考: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html#sliced-scroll
  • python 多进程如何个函数传多个参数

python多进程或者多线程要向调用的函数传递多个参数,需要构造参数元组集合,代码如下(本示例每个进程不同的只有es的slice_id):

代码语言:javascript
复制
def build_parameters(index, min_timestamp, max_timestamp):
    parmeters =[]
    for num in range(0, SLICES): 
        tuple_paremeter = (index, min_timestamp, max_timestamp, num)
        parmeters.append(tuple_paremeter)
    return parmeters
  • python多进程实例

 示例使用进程池,及starmap  传递调用的函数及参数 (with相当于try, excepion, finallly的集合,会自动做资源的释放或关闭等)

代码语言:javascript
复制
            with multiprocessing.Pool(processes = SLICES) as pool:
                result = pool.starmap(es_scroll, parameters)

然后,拼接返回的dataframe 集合即可构造一个完整的dataframe,如下:

代码语言:javascript
复制
frame = pd.concat(result, ignore_index=True, sort = False)

*******************************************************************************************

精力有限,想法太多,专注做好一件事就行

  • 我只是一个程序猿。5年内把代码写好,技术博客字字推敲,坚持零拷贝和原创
  • 写博客的意义在于打磨文笔,训练逻辑条理性,加深对知识的系统性理解;如果恰好又对别人有点帮助,那真是一件令人开心的事

*******************************************************************************************

代码语言:javascript
复制
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档