前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >索引迁移

索引迁移

作者头像
YG
发布2018-05-23 17:14:59
1.3K0
发布2018-05-23 17:14:59
举报
文章被收录于专栏:YG小书屋YG小书屋YG小书屋
索引迁移工具esm

下载地址:https://github.com/medcl/esm 经过测试发现使用--copy_setting和--copymappings失败。而只用--copymappings也不起作用。

/bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 --copy_settings --copy_mappings -x=bestbuykaggle  

手动创建索引,设置mapping和setting。数据导入导出没问题。但是速度很慢,可能是我单个文档有点大,大约在3000/s。

reindex测试

ES5之后,推出了一个reindex的功能,可在不同集群间传递数据。详细信息可看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docs-reindex.html

curl -XPOST 'localhost:9200/_reindex?pretty' -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://xxx:9200",
    },
    "index": "source",
  },
  "dest": {
    "index": "dest"
  }
}
'

这个也是用手动创建索引,设置mapping和setting。经过测试,数据导入导出没问题。速度在4200/s。系统默认单进程处理,针对于几千万甚至上亿的数据量,这个速度还是慢。后来打算用reindex+slice这种方式通过多进程操作。但是Reindexing from remote clusters does not support manual or automatic slicing.即从其他集群导数据,不支持人工和手动切片。。这条路行不通。

手动scroll+slice+bulk

利用scroll和slice,并行从原集群中读数据,然后并行地通过Bulk写入目标集群。

package com.dump.core;

import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class DumpIndex {

    public static String index="";
    public static String dest_cluster_name="";
    public static String source_cluster_name="";
    public static String dest_ips[]=null;
    public static String source_ips[]=null;
    public static String type="";
    public static TransportClient source_client = null;
    public static TransportClient dest_client = null;
    public static ExecutorService executor;
    public static int threadSize;

    public static void setConfES() throws Exception {
        Settings settings = Settings.builder().put("cluster.name", source_cluster_name).build();
        int port = 9300;
        source_client = new PreBuiltTransportClient(settings);

        for (int i=0;i<source_ips.length;i++){
            source_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(source_ips[i]), port));
        }

        settings = Settings.builder().put("cluster.name", dest_cluster_name).build();
        dest_client = new PreBuiltTransportClient(settings);
        for (int i=0;i<dest_ips.length;i++){
            dest_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(dest_ips[i]), port));
        }
    }

    public static BulkProcessor getBulker(final int id){
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                dest_client,
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        if (!response.hasFailures()) {
                            System.out.println("线程 "+id+" 索引过程了"+response.getItems().length+"个文档");
                        } else {
                            System.out.println("线程 "+id+" 索引过程中遇到了一些失败");
                        }
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        failure.printStackTrace();
                    }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(0)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
                .build();
        return bulkProcessor;
    }

    public static void parallel(){
        for(int i=0;i<threadSize;i++){
            QueryBuilder qb = matchAllQuery();
            String vals="";
            SliceBuilder sliceBuilder = new SliceBuilder(i, threadSize);
            SearchResponse scrollResp = source_client.prepareSearch(index)
                    .setScroll(new TimeValue(60000))
                    .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
                    .setQuery(qb)
                    .slice(sliceBuilder)
                    .setSize(5000).get();
            BulkProcessor bulkProcessor = getBulker(i);
            SliceCustomer customer =new SliceCustomer(scrollResp,bulkProcessor,source_client,index,type);
            executor.execute(customer);
        }
        executor.shutdown();
    }
    public static void main(String []args) throws Exception {
        try {
            source_cluster_name= args[0];
            System.out.println("source_cluster_name is "+ source_cluster_name);
            source_ips = args[1].split(",");
            System.out.println("source_ips is "+args[1]);
            dest_cluster_name = args[2];
            System.out.println("dest_cluster_name is " + dest_cluster_name);
            dest_ips = args[3].split(",");
            System.out.println("dest_ips is " + args[3]);
            index = args[4];
            System.out.println("index is " + index);
            type = index.split("-")[0];
            System.out.println("type is " + type);
            threadSize = Integer.parseInt(args[5]);
            System.out.println("threadSize is " + threadSize);
            setConfES();
        } catch (Exception e) {
            System.out.println("参数个数错误,参数依次是source_cluster_name,source_ips,dest_cluster_name,dest_ips,threadSize");
            return;
        }
        executor = Executors.newFixedThreadPool(threadSize);
        setConfES();
        parallel();
    }
}

class SliceCustomer implements Runnable{
    public SearchResponse scrollResp;
    public BulkProcessor bulkProcessor;
    public TransportClient source_client;
    public String index;
    public String type;

    public SliceCustomer(SearchResponse scrollResp, BulkProcessor bulkProcessor, TransportClient source_client, String index, String type) {
        this.scrollResp = scrollResp;
        this.bulkProcessor = bulkProcessor;
        this.source_client = source_client;
        this.index = index;
        this.type = type;
    }

    @Override
    public void run() {
        String vals ="";
        do {
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                vals=hit.sourceAsString();
                bulkProcessor.add(new IndexRequest(index,type).source(vals));
            }
            scrollResp = source_client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
        } while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

        bulkProcessor.flush();
        try {
            bulkProcessor.awaitClose(100, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对_doc进行排序(addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)),ES底层优化,更快速地获取到数据,然后并行地通过bulk写入目标ES。 但是,但是又有问题出现了。scroll+slice会导致节点内存占用过高:

If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N is the total number of documents in the shard. After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of sliced query you perform in parallel to avoid the memory explosion.

可以通过在slice中指定一个doc_values来解决这个问题。更详细的信息请看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-request-scroll.html#sliced-scroll 经测试,设置4个线程的情况下,除去ES启动时间,索引速度在8000/s左右。对多个数亿的文档来说,依然满足不了要求。

终极方案

通过hive重新导数据。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

复制原索引的setting和mapping脚本
import os
import json
import re
import sys
import threading

def get_json(cmd):
    val = os.popen(cmd).read()
    result = json.loads(val)
    return result

def set_mappings(index,source,dest):
    cmd = "curl %s/%s/_mapping" % (source,index)
    print cmd
    result = get_json(cmd)
    mappings = json.dumps(result[index]["mappings"])
    cmd = "curl %s/%s/_settings" % (source,index)
    print cmd
    result = get_json(cmd)
    print result
    settings = result[index]["settings"]["index"]
    pri_num = settings["number_of_shards"]
    rep_num = settings["number_of_replicas"]
    codec = ''
    routing =''
    try:
        codec = settings["codec"]
    except:
        codec = ''
    try:
        routing = json.dumps(settings["routing"])
    except:
        routing =''
    settings = "{\"number_of_shards\":%s,\"number_of_replicas\":%s," % (pri_num,rep_num)
    if len(codec)>0:
        tmp = "\"codec\":\"%s\"," % codec
        settings = settings + tmp 
    if len(routing)>0:
        tmp = "\"routing\":%s," % routing
        settings = settings + tmp
    settings = settings[:-1] +"}"
    
    print mappings
    print "--------------------"
    print settings
    print "--------------------"
    cmd = "curl -XPUT %s/%s -d '{\"settings\":%s,\"mappings\":%s}'" % (dest,index,settings,mappings)
    result = get_json(cmd)
    print cmd
    print "--------------------"
    print result
    if "error" in result.keys():
        return 0
    if "acknowledged" in result.keys():
        if result["acknowledged"] == 'false' or result["shards_acknowledged"] == 'false':
            return 0
    return 1

def run():
    args = sys.argv
    source = args[1]
    dest = args[2]
    index = args[3]
    print "stat to move %s,create mapping..." % index
    set_mappings(index,source,dest)

run()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.11.01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 索引迁移工具esm
  • reindex测试
  • 手动scroll+slice+bulk
  • 终极方案
  • 复制原索引的setting和mapping脚本
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档