索引迁移

索引迁移工具esm

下载地址:https://github.com/medcl/esm 经过测试发现使用--copy_setting和--copymappings失败。而只用--copymappings也不起作用。

/bin/esm -s=http://192.168.3.206:9200 -d=http://localhost:9200 --copy_settings --copy_mappings -x=bestbuykaggle  

手动创建索引,设置mapping和setting。数据导入导出没问题。但是速度很慢,可能是我单个文档有点大,大约在3000/s。

reindex测试

ES5之后,推出了一个reindex的功能,可在不同集群间传递数据。详细信息可看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docs-reindex.html

curl -XPOST 'localhost:9200/_reindex?pretty' -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://xxx:9200",
    },
    "index": "source",
  },
  "dest": {
    "index": "dest"
  }
}
'

这个也是用手动创建索引,设置mapping和setting。经过测试,数据导入导出没问题。速度在4200/s。系统默认单进程处理,针对于几千万甚至上亿的数据量,这个速度还是慢。后来打算用reindex+slice这种方式通过多进程操作。但是Reindexing from remote clusters does not support manual or automatic slicing.即从其他集群导数据,不支持人工和手动切片。。这条路行不通。

手动scroll+slice+bulk

利用scroll和slice,并行从原集群中读数据,然后并行地通过Bulk写入目标集群。

package com.dump.core;

import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class DumpIndex {

    public static String index="";
    public static String dest_cluster_name="";
    public static String source_cluster_name="";
    public static String dest_ips[]=null;
    public static String source_ips[]=null;
    public static String type="";
    public static TransportClient source_client = null;
    public static TransportClient dest_client = null;
    public static ExecutorService executor;
    public static int threadSize;

    public static void setConfES() throws Exception {
        Settings settings = Settings.builder().put("cluster.name", source_cluster_name).build();
        int port = 9300;
        source_client = new PreBuiltTransportClient(settings);

        for (int i=0;i<source_ips.length;i++){
            source_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(source_ips[i]), port));
        }

        settings = Settings.builder().put("cluster.name", dest_cluster_name).build();
        dest_client = new PreBuiltTransportClient(settings);
        for (int i=0;i<dest_ips.length;i++){
            dest_client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(dest_ips[i]), port));
        }
    }

    public static BulkProcessor getBulker(final int id){
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                dest_client,
                new BulkProcessor.Listener() {
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        if (!response.hasFailures()) {
                            System.out.println("线程 "+id+" 索引过程了"+response.getItems().length+"个文档");
                        } else {
                            System.out.println("线程 "+id+" 索引过程中遇到了一些失败");
                        }
                    }

                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        failure.printStackTrace();
                    }
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(0)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
                .build();
        return bulkProcessor;
    }

    public static void parallel(){
        for(int i=0;i<threadSize;i++){
            QueryBuilder qb = matchAllQuery();
            String vals="";
            SliceBuilder sliceBuilder = new SliceBuilder(i, threadSize);
            SearchResponse scrollResp = source_client.prepareSearch(index)
                    .setScroll(new TimeValue(60000))
                    .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
                    .setQuery(qb)
                    .slice(sliceBuilder)
                    .setSize(5000).get();
            BulkProcessor bulkProcessor = getBulker(i);
            SliceCustomer customer =new SliceCustomer(scrollResp,bulkProcessor,source_client,index,type);
            executor.execute(customer);
        }
        executor.shutdown();
    }
    public static void main(String []args) throws Exception {
        try {
            source_cluster_name= args[0];
            System.out.println("source_cluster_name is "+ source_cluster_name);
            source_ips = args[1].split(",");
            System.out.println("source_ips is "+args[1]);
            dest_cluster_name = args[2];
            System.out.println("dest_cluster_name is " + dest_cluster_name);
            dest_ips = args[3].split(",");
            System.out.println("dest_ips is " + args[3]);
            index = args[4];
            System.out.println("index is " + index);
            type = index.split("-")[0];
            System.out.println("type is " + type);
            threadSize = Integer.parseInt(args[5]);
            System.out.println("threadSize is " + threadSize);
            setConfES();
        } catch (Exception e) {
            System.out.println("参数个数错误,参数依次是source_cluster_name,source_ips,dest_cluster_name,dest_ips,threadSize");
            return;
        }
        executor = Executors.newFixedThreadPool(threadSize);
        setConfES();
        parallel();
    }
}

class SliceCustomer implements Runnable{
    public SearchResponse scrollResp;
    public BulkProcessor bulkProcessor;
    public TransportClient source_client;
    public String index;
    public String type;

    public SliceCustomer(SearchResponse scrollResp, BulkProcessor bulkProcessor, TransportClient source_client, String index, String type) {
        this.scrollResp = scrollResp;
        this.bulkProcessor = bulkProcessor;
        this.source_client = source_client;
        this.index = index;
        this.type = type;
    }

    @Override
    public void run() {
        String vals ="";
        do {
            for (SearchHit hit : scrollResp.getHits().getHits()) {
                vals=hit.sourceAsString();
                bulkProcessor.add(new IndexRequest(index,type).source(vals));
            }
            scrollResp = source_client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
        } while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

        bulkProcessor.flush();
        try {
            bulkProcessor.awaitClose(100, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

对_doc进行排序(addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)),ES底层优化,更快速地获取到数据,然后并行地通过bulk写入目标ES。 但是,但是又有问题出现了。scroll+slice会导致节点内存占用过高:

If the number of slices is bigger than the number of shards the slice filter is very slow on the first calls, it has a complexity of O(N) and a memory cost equals to N bits per slice where N is the total number of documents in the shard. After few calls the filter should be cached and subsequent calls should be faster but you should limit the number of sliced query you perform in parallel to avoid the memory explosion.

可以通过在slice中指定一个doc_values来解决这个问题。更详细的信息请看https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-request-scroll.html#sliced-scroll 经测试,设置4个线程的情况下,除去ES启动时间,索引速度在8000/s左右。对多个数亿的文档来说,依然满足不了要求。

终极方案

通过hive重新导数据。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

复制原索引的setting和mapping脚本

import os
import json
import re
import sys
import threading

def get_json(cmd):
    val = os.popen(cmd).read()
    result = json.loads(val)
    return result

def set_mappings(index,source,dest):
    cmd = "curl %s/%s/_mapping" % (source,index)
    print cmd
    result = get_json(cmd)
    mappings = json.dumps(result[index]["mappings"])
    cmd = "curl %s/%s/_settings" % (source,index)
    print cmd
    result = get_json(cmd)
    print result
    settings = result[index]["settings"]["index"]
    pri_num = settings["number_of_shards"]
    rep_num = settings["number_of_replicas"]
    codec = ''
    routing =''
    try:
        codec = settings["codec"]
    except:
        codec = ''
    try:
        routing = json.dumps(settings["routing"])
    except:
        routing =''
    settings = "{\"number_of_shards\":%s,\"number_of_replicas\":%s," % (pri_num,rep_num)
    if len(codec)>0:
        tmp = "\"codec\":\"%s\"," % codec
        settings = settings + tmp 
    if len(routing)>0:
        tmp = "\"routing\":%s," % routing
        settings = settings + tmp
    settings = settings[:-1] +"}"
    
    print mappings
    print "--------------------"
    print settings
    print "--------------------"
    cmd = "curl -XPUT %s/%s -d '{\"settings\":%s,\"mappings\":%s}'" % (dest,index,settings,mappings)
    result = get_json(cmd)
    print cmd
    print "--------------------"
    print result
    if "error" in result.keys():
        return 0
    if "acknowledged" in result.keys():
        if result["acknowledged"] == 'false' or result["shards_acknowledged"] == 'false':
            return 0
    return 1

def run():
    args = sys.argv
    source = args[1]
    dest = args[2]
    index = args[3]
    print "stat to move %s,create mapping..." % index
    set_mappings(index,source,dest)

run()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序猿DD

分布式消息队列 RocketMQ 源码分析 —— Message 顺序发送与消费

? 本文主要基于 RocketMQ 4.0.x 正式版 1. 概述 2. Producer 顺序发送 3. Consumer 严格顺序消费 3.1 获得(锁定...

7488
来自专栏同步博客

Memcache存储机制与指令汇总

  memcached是高性能的分布式内存缓存服务器。一般的使用目的是,通过缓存数据库查询结果,减少数据库访问次数,以提高动态Web应用的速度、提高可扩展性。

932
来自专栏我是攻城师

spark sql on hive笔记一

3376
来自专栏测试开发架构之路

JMeter测试工具.jmx文件详解

摘要:了解.jmx文件格式类型,对jmeter二次开发与拓展有很大的帮助,当然也可以利用python对其进行一些处理(生成一些测试用例,对jmx文件进行 ”增删...

3014
来自专栏大内老A

提供第三种代码生成方式——通过自定义BuildProvider为ASP.NET提供代码生成

之前写了一些关于代码生成的文章,提供了两种不同方式的代码生成解决方案,即CodeDOM+Custom Tool和T4。对于ASP.NET应用,你还有第三种选择—...

22910
来自专栏Java3y

Servlet第六篇【Session介绍、API、生命周期、应用、与Cookie区别】

什么是Session Session 是另一种记录浏览器状态的机制。不同的是Cookie保存在浏览器中,Session保存在服务器中。用户使用浏览器访问服务器的...

5365
来自专栏前端杂谈

vue使用Axios做ajax请求

49812
来自专栏酷玩时刻

前端后台以及游戏中使用google-protobuf详解

protoBuf是一种灵活高效的独立于语言平台的结构化数据表示方法,与XML相比,protoBuf更小更快更简单。你可以用定义自己protoBuf的数据结构,用...

2762
来自专栏用户2442861的专栏

java数据库操作 (附带数据库连接池的代码)

本文来自:曹胜欢博客专栏。转载请注明出处:http://blog.csdn.net/csh624366188

3192
来自专栏魏琼东

基于DotNet构件技术的企业级敏捷软件开发平台 - AgileEAS.NET平台开发指南 - 实现插件

插件契约介绍          我们知道,要基于平台(容器)加插件的这种模式进行开发,我们必须定义一组契约,用于约束模块插件开发,也就是说,模块插件需要遵守一定...

1978

扫码关注云+社区

领取腾讯云代金券