Flink 实时写入数据到 ElasticSearch 性能调优

背景说明

线上业务反应使用Flink消费上游kafka topic里的轨迹数据出现backpressure,数据积压严重。单次bulk的写入量为:3000/50mb/30s,并行度为48。针对该问题,为了避免影响线上业务申请了一个与线上集群配置相同的ES集群。本着复现问题进行优化就能解决的思路进行调优测试。

测试环境

  • elasticsearch 2.3.3
  • flink 1.6.3
  • flink-connector-elasticsearch2_2.11
  • 八台SSD,56核 :3主5从

Rally分布式压测ES集群

  • 从压测结果来看,集群层面的平均写入性能大概在每秒10w+的doc。

Flink写入测试

  • 配置文件
config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
  • 执行代码片段
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
initEnv(env);
Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
//从kafka中获取轨迹数据
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
    new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
//从checkpoint最新处消费
flinkKafkaConsumer010.setStartFromLatest();
DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
    .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");
  • 运行时配置

任务容器数为24个container,一共48个并发。savepoint为15分钟

  • 运行现象
  • source和Map算子均出现较高的反压
  • ES集群层面,目标索引写入速度写入陡降

平均QPS为:12k左右

  • 对比取消sink算子后的QPS
streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");

平均QPS为:116k左右

有无sink参照实验的结论

取消sink2ES的操作后,QPS达到110k,是之前QPS的十倍。由此可以基本判定: ES集群写性能导致的上游反压

优化方向

  • 索引字段类型调整

bulk失败的原因是由于集群dynamic mapping自动监测,部分字段格式被识别为日期格式而遇到空字符串无法解析报错。 解决方案:关闭索引自动检测

效果: ES集群写入性能明显提高但flink operator 依然存在反压:

  • 降低副本数
curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'
  • 提高refresh_interval

针对这种ToB、日志型、实时性要求不高的场景,我们不需要查询的实时性,通过加大甚至关闭refresh_interval的参数提高写入性能。

curl -XPUT{集群地址}/{索引名称}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": {  "index": {"refresh_interval" : -1   }   }  }'
  • 检查集群各个节点CPU核数

在flink执行时,通过Grafana观测各个节点CPU 使用率以及通过linux命令查看各个节点CPU核数。发现CPU使用率高的节点CPU核数比其余节点少。为了排除这个短板效应,我们将在这个节点中的索引shard移动到CPU核数多的节点。

curl -XPOST {集群地址}/_cluster/reroute  -d'{"commands":[{"move":{"index":"{索引名称}","shard":5,"from_node":"源node名称","to_node":"目标node名称"}}]}' -H "Content-Type:application/json"

以上优化的效果:

经过以上的优化,我们发现写入性能提升有限。因此,需要深入查看写入的瓶颈点

  • 在CPU使用率高的节点使用Arthas观察线程:
  • 打印阻塞的线程堆栈
"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.LinkedTransferQueue@369223fa
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
    at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
    at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
    at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

从上面的线程堆栈我们可以看出线程处于等待状态。

关于这个问题的讨论详情查看https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,这个issue讨论大致意思是:节点数不够,需要增加节点。于是我们又增加节点并通过设置索引级别的total_shards_per_node参数将索引shard的写入平均到各个节点上)

  • 线程队列优化

ES是将不同种类的操作(index、search…)交由不同的线程池执行,主要的线程池有三:index、search和bulk thread_pool。线程池队列长度配置按照官网默认值,我觉得增加队列长度而集群本身没有很高的处理能力线程还是会await(事实上实验结果也是如此在此不必赘述),因为实验节点机器是56核,对照官网,:

因此修改size数值为56。

经过以上的优化,我们发现在kafka中的topic积压有明显变少的趋势:

  • index buffer size的优化

参照官网:

indices.memory.index_buffer_size : 10%
  • translog优化:

索引写入ES的基本流程是:1.数据写入buffer缓冲和translog 2.每秒buffer的数据生成segment并进入内存,此时segment被打开并供search使用查询 3.buffer清空并重复上述步骤 4.buffer不断添加、清空translog不断累加,当达到某些条件触发commit操作,刷到磁盘。es默认的刷盘操作为request但容易部分操作比较耗时,在日志型集群、允许数据在刷盘过程中少量丢失可以改成异步async 另外一次commit操作是在translog达到某个阈值执行的,可以把大小(flush_threshold_size )调大,刷新间隔调大。

index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s

效果:

  • flink反压从打满100%降到40%(output buffer usage):
  • kafka 消费组里的积压明显减少:

总结

当ES写入性能遇到瓶颈时,我总结的思路应该是这样:

  • 看日志,是否有字段类型不匹配,是否有脏数据。
  • 看CPU使用情况,集群是否异构
  • 客户端是怎样的配置?使用的bulk 还是单条插入
  • 查看线程堆栈,查看耗时最久的方法调用
  • 确定集群类型:ToB还是ToC,是否允许有少量数据丢失?
  • 针对ToB等实时性不高的集群减少副本增加刷新时间
  • index buffer优化 translog优化,滚动重启集群

END

本文分享自微信公众号 - zhisheng(zhisheng_blog)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏信息化漫谈

用漫画看懂ElasticSearch分布式存储原理(四)

ElasticSearch自带存储,相当于是自己的数据仓库。在实践中,一般mysql等数据库与Elastic自己的库是不同的库,在mysql存入数据后,将数据自...

10720
来自专栏开发笔记

服务链路跟踪 && 服务监控

微服务以微出名,在实际的开发过程中,涉及到成百上千个服务,网络请求引起服务之间的调用极其复杂。

10020
来自专栏足球是圆的

elk安装和使用

启动成功后在控制台随便输入文字,此时会同步到elasticsearch中(前提是在运行中),elasticsearch中会添加elasticsearch-201...

10520
来自专栏upuptop的专栏

ELK的理论杂项知识

代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中...

8310
来自专栏信息化漫谈

用漫画看懂ElasticSearch的基本原理(三)

有了上二章对反向索引的掌握,今天我们回来本质,看一下ElasticSearch是如何工作的。

7110
来自专栏Linyb极客之路

ElasticSearch 性能优化实战,让你的 ES 飞起来!

英文原文:https://www.elastic.co/guide/en/elasticsearch/reference/current/how-to.html

24010
来自专栏信息化漫谈

用漫画看弹性搜索与baidu的关系(二)

ElasticSearch与搜索引擎其实是异曲同工的,搜索引擎baidu、google等基本原理也是采用了文本搜索技术。

7520
来自专栏Java3y

从另外一个角度看什么是数据库

或许你还能想到 Redis、Zookeeper,甚至是 Elasticsearch ……

7110
来自专栏信息化漫谈

用漫画看懂ElasticSearch弹性搜索(一)

在云计算的Paas层,经常有听到ElasticSearch,我最初的理解,该组件用来进行电商网页的模糊性查找最好了。例如在taobao的搜索栏查找“给爸爸的生日...

8920
来自专栏架构专题

LWP进程资源耗尽,Resource temporarily unavailable

服务器环境使用root账户运行应用程序是非常危险的,容易让人拿到shell变成肉鸡。所以有点意识的团队,都会建立一个低权限的普通用户用来运行java程序。

8010

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励