在我们的项目中,我们使用Apache Spark来写入ES。我们运行多个并行写入ES的spark作业。我们处理的数据量太大,导致写入吞吐量高达约5K写入/秒。
我们希望限制ES写入,以便将其降低到500到1000写入/秒的范围。我们遇到过像es.batch.size.bytes
和es.batch.size.entries
这样的ES配置,但我们不确定这些配置如何与Apache Spark一起工作。
发布于 2020-07-20 14:45:08
使用repartition()
对Spark中的数据进行重新分区,或者更好地使用coalesce()
(如果您正在减少分区数量),是一种控制ES索引速度的简单方法。
如果要在pyspark中设置属性
esconf={}
esconf["es.mapping.id"] = "_id"
esconf["es.nodes"] = "localhost"
esconf["es.port"] = "9200"
esconf["es.batch.size.bytes"] = "1000000" //default 1mb for bulk request
esconf["es.batch.size.entries"] = "1000" //default 1000 for bulk request
df.write.format("org.elasticsearch.spark.sql").options(**esconf).mode("append").save("index_name")
注意:批量大小和条目是为每个任务实例分配的。总是乘以Hadoop作业中的任务数,以在运行时点击Elasticsearch得到总批量大小/条目。这就是您获得5K写入/秒的原因
https://stackoverflow.com/questions/62989672
复制相似问题