我正在使用Spark streaming来处理Kafka队列中的大量数据,我需要将结果写入设置的两个ES集群。
我在我的项目中使用了"org.elasticsearch.elasticsearch- spark -XX“,并且能够通过使用EsSpark.saveJsonToEs()
方法写入一个ES集群,该方法在内部从不可变的spark上下文中获取ES的特定属性。
因此,如果spark上下文是不可变的,我如何在运行时更改属性并将结果写入多个ES集群。
请提个建议。
Ref - https://www.elastic.co/guide/en/elasticsearch/hadoop/6.7/spark.html
发布于 2020-01-10 01:39:05
我觉得下面的spark.driver.allowMultipleContexts
方法应该行得通……使用新的spark配置单独创建spark上下文会话。
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "XXXXXXXX")
conf.set("es.port", "9020")
conf.set("spark.driver.allowMultipleContexts", "true")
val sc1 = new SparkContext(conf)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "yyyyyy")
conf.set("es.port", "9020")
conf.set("spark.driver.allowMultipleContexts", "true")
val sc2 = new SparkContext(conf)
https://stackoverflow.com/questions/59650766
复制相似问题