前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用java线程池技术,从MySQL往Elasticsearch导入海量数据

利用java线程池技术,从MySQL往Elasticsearch导入海量数据

作者头像
猿芯
发布2020-07-06 17:59:58
5440
发布2020-07-06 17:59:58
举报
文章被收录于专栏:Wooola的技术博客

前言

近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。

关键技术栈

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具说明

maven依赖

代码语言:javascript
复制
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>

java线程池设置

默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。

代码语言:javascript
复制
private static int THREADS = 21;
public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);

定义已办生产者线程/已阅生产者线程:lPendProducer/ZlReadProducer

public class ZlPendProducer implements Runnable {

代码语言:javascript
复制
...
@Override
public void run() {
System.out.println(threadName + "::启动...");
for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
try {
....
int size = 1000;
for (int i = 0; i < count; i += size) {
if (i + size > count) {
//作用为size最后没有100条数据则剩余几条newList中就装几条
size = count - i;
}
String sql = "select * from " + tableName + " limit " + i + ", " + size;
System.out.println(tableName + "::sql::" + sql);
rs = statement.executeQuery(sql);
List<HistPendingEntity> lst = new ArrayList<>();
while (rs.next()) {
HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
lst.add(p);
}
MteExecutor.POR.submit(new ZlPendConsumer(lst));
Thread.sleep(2000);
}
....
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZlReadProducer implements Runnable {
...已阅生产者处理逻辑同已办生产者
}

定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer

代码语言:javascript
复制
public class ZlPendConsumer implements Runnable {
private String threadName;
private List<HistPendingEntity> lst;
public ZlPendConsumer(List<HistPendingEntity> lst) {
this.lst = lst;
}
@Override
public void run() {
...
lst.forEach(v -> {
try {
String json = new Gson().toJson(v);
EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
Const.COUNTER.LD_P.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
System.out.println("err::PendingId::" + v.getPendingId());
}
});
...
}
}
public class ZlReadConsumer implements Runnable {
//已阅消费者处理逻辑同已办消费者
}

定义导入Elasticsearch数据监控线程:Monitor,监控线程-Monitor为了计算每分钟导入Elasticsearch的数据总条数,利用监控线程,可以调整线程池的线程数的大小,以便利用多线程更快速的导入数据。

代码语言:javascript
复制
public void monitorToES() {
new Thread(() -> {
while (true) {
StringBuilder sb = new StringBuilder();
sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT)
.append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL)
.append("::已办入库总数::").append(Const.COUNTER.LD_P);
sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT);
sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL)
.append("::已阅入库总数::").append(Const.COUNTER.LD_R);
if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
start = System.currentTimeMillis();
} else {
long end = System.currentTimeMillis();
if ((end - start) / 1000 >= 60) {
start = end;
sb.append("\n#########################################\n");
sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条");
sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条");
ldPrevPendCount = Const.COUNTER.LD_P.get();
ldPrevReadCount = Const.COUNTER.LD_R.get();
}
}
System.out.println(sb.toString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

初始化Elasticsearch:EsClient

代码语言:javascript
复制
String cName = meta.get("cName");//es集群名字
String esNodes = meta.get("esNodes");//es集群ip节点
Settings esSetting = Settings.builder()
.put("cluster.name", cName)
.put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
.put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5
.build();
String[] nodes = esNodes.split(",");
client = new PreBuiltTransportClient(esSetting);
for (String node : nodes) {
if (node.length() > 0) {
String[] hostPort = node.split(":");
client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
}
}

初始化数据库连接

代码语言:javascript
复制
conn = DriverManager.getConnection(url, user, password);

启动脚本

代码语言:javascript
复制
nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 &

参数说明

ES-Cluster2019 为 Elasticsearch 集群名字 node1:9300,node2:9300,node3:9300为es的节点IP 130 130为已办已阅分表的数据

程序入口:MteMain

代码语言:javascript
复制
// 监控线程
Monitor monitorService = new Monitor();
monitorService.monitorToES();
// 已办生产者线程
Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
pendProducerThread.start();
// 已阅生产者线程
Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
readProducerThread.start();
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构荟萃 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 关键技术栈
  • 工具说明
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档