前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Python脚本实现ElasticSearch的在线数据迁移

使用Python脚本实现ElasticSearch的在线数据迁移

作者头像
保持热爱奔赴山海
发布2023-07-31 14:28:02
3590
发布2023-07-31 14:28:02
举报
文章被收录于专栏:饮水机管理员饮水机管理员

该脚本的功能,类似于 elasticsearch-dump ,二者都是基于scroll来实现的(包括reindex底层也是scroll)。

依赖包

代码语言:javascript
复制
# 我这里演示的ES是7.x的,如果下面的脚本运行报错,请考虑调整这里的python的elasticsearch包版本
pip install elasticsearch==7.13.1

配置文件

vim configs.py

代码语言:javascript
复制
# -*- coding: utf-8 -*-

# es数据源的信息
es_source_host = ['127.0.0.1:9200']  # 支持多个节点间用逗号分隔
es_source_index = "index-test1"

# es目标库的信息
es_dest_host = ['127.0.0.1:9200']
es_dest_index = "index-test2"

# 每次取的条数
batch_size = 2000

# 每轮休眠的时间(单位秒)
sleep_time = 0

主程序

vim run.py

代码语言:javascript
复制
# -*- coding: utf-8 -*-
import json
import time
import configs

from elasticsearch import Elasticsearch

src_es = Elasticsearch(hosts = configs.es_source_host,maxsize=16)
dest_es = Elasticsearch(hosts = configs.es_dest_host,maxsize=16)

start_ts = time.time()

scroll_time = '5m'  # 指定 Scroll 上下文的存活时间

src_index_name = configs.es_source_index
dest_index_name = configs.es_dest_index

def create_dest_index():
    try:
        dest_es.indices.create(
            index=configs.es_dest_index,
            body={"settings": {"index": {"number_of_shards": 4}}},
        )

    except Exception as e:
        print(str(e))

def update_dest_index_setting(time_dur,replicas):
    try:
        res = dest_es.indices.put_settings(
            index=configs.es_dest_index,
            body={"index.refresh_interval": time_dur, "number_of_replicas": replicas},
        )
        print(res)
    except Exception as e:
        print(str(e))


def update_dest_index_mapping():
    dest_mapping = src_es.indices.get_mapping(index=configs.es_source_index)[configs.es_source_index]["mappings"]
    try:
        res = dest_es.indices.put_mapping(body=dest_mapping, index=configs.es_dest_index)
        print(res)
    except Exception as e:
        print(str(e))



def migrate():
    query = {
        "query": {
            "match_all": {}  # 查询所有文档
        }
    }

    # 计数下,用于最后确认scroll的次数
    count = 0

    # 初始化 Scroll 上下文
    response = src_es.search(index=src_index_name, scroll=scroll_time, body=query,size=configs.batch_size)
    scroll_id = response['_scroll_id']
    hits = response['hits']['hits']

    # 处理第一批结果,拼装bulk需要的数据结构
    data_list1=[]
    for hit in hits:
        data1={}
        _id, _source = hit["_id"], hit["_source"]
        data1["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
        data_list1.append(data1["index"])
        data_list1.append(_source)

    # 把第一次找出的数据,拼装好的结果写入目标ES
    dest_res = dest_es.bulk(index=dest_index_name, body=data_list1)
    if dest_res["errors"]:
        for item in response["items"]:
            if "error" in item["index"]:
                print(f"Failed operation: {item['index']}")
    else:
        print("Bulk operations completed successfully!")

    count += 1

    # 滚动获取剩余结果
    while True:
        if len(hits) < 0:
            break

        response = src_es.scroll(scroll_id=scroll_id, scroll=scroll_time)
        scroll_id = response['_scroll_id']
        # print("scroll_id ---> ", scroll_id )
        hits = response['hits']['hits']

        # 拼装bulk需要的数据结构
        data_list2=[]
        for hit in hits:
            data2={}
            _id, _source = hit["_id"], hit["_source"]
            data2["index"]= {"index": {"_index": dest_index_name , "_id": _id }}
            data_list2.append(data2["index"])
            data_list2.append(_source)

        if len(data_list2) <=0:
            break
        dest_res = dest_es.bulk(index=dest_index_name, body=data_list2)
        if dest_res["errors"]:
            for item in response["items"]:
                if "error" in item["index"]:
                    print(f"Failed operation: {item['index']}")
        else:
            print("Bulk operations completed successfully!")

        time.sleep(configs.sleep_time)

        count += 1


    stop_ts = time.time()
    print('scroll 遍历的次数: ', count, '耗时(秒):', int(stop_ts - start_ts))



if __name__ == '__main__':
    create_dest_index()  # 创建目标索引
    update_dest_index_setting("60s",0)  # 临时降低持久性,提升写入性能
    update_dest_index_mapping()  # 复制mapping
    migrate() # 数据同步
    update_dest_index_setting("1s",1)  # 提升持久性,确保数据安全性

执行

代码语言:javascript
复制
python run.py

效率

代码语言:javascript
复制
测试下来,速度还是很给力的。
测试数据集:
	docs: 639566
	primary size: 179.78MB

耗时:
elasticsearch-dump迁移耗时7分钟。
python脚本迁移耗时 4分钟(可能是因为我脚本里面的迁移前先调大refresh的功劳?)。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖包
  • 配置文件
  • 主程序
  • 执行
  • 效率
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档