我想在flink上使用elastic producer,但我在身份验证方面遇到了一些麻烦:我的elastic search集群前面有nginx,而我在Nginx中使用basic auth。
但是使用弹性搜索连接器,我无法在url中添加基本身份验证(因为InetSocketAddress)
你有没有想过在基本身份验证中使用elasticsearch连接器?
耽误您时间,实在对不起。
下面是我的代码:
val configur = new java.util.HashMap[String, String]
configur.put("cluster.name", "cluster")
configur.put("bulk.flush.max.actions", "1000")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300))
jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur,
transportAddresses,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]]
return Requests.indexRequest()
.index("flinkTest")
.source(jsonMap);
}
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}))
发布于 2017-11-25 13:06:31
Flink使用Elasticsearch传输客户端,该客户端在端口9300上使用二进制协议进行连接。您的nginx代理位于端口9200上的HTTP接口前面。
Flink不会使用您的代理,因此不需要提供身份验证。
发布于 2017-11-30 08:05:38
如果你需要使用超文本传输协议客户端来连接Flink和Elasticsearch,一种解决方案是使用Jest Library。
您必须创建一个自定义的SinkFunction,就像下面这样的基本java类:
package fr.gfi.keenai.streaming.io.sinks.elasticsearch5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Index;
public class ElasticsearchJestSinkFunction<T> extends RichSinkFunction<T> {
private static final long serialVersionUID = -7831614642918134232L;
private JestClient client;
@Override
public void invoke(T value) throws Exception {
String document = convertToJsonDocument(value);
Index index = new Index.Builder(document).index("YOUR_INDEX_NAME").type("YOUR_DOCUMENT_TYPE").build();
client.execute(index);
}
@Override
public void open(Configuration parameters) throws Exception {
// Construct a new Jest client according to configuration via factory
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder("http://localhost:9200")
.multiThreaded(true)
// Per default this implementation will create no more than 2 concurrent
// connections per given route
.defaultMaxTotalConnectionPerRoute(2)
// and no more 20 connections in total
.maxTotalConnection(20)
// Basic username and password authentication
.defaultCredentials("YOUR_USER", "YOUR_PASSWORD")
.build());
client = factory.getObject();
}
private String convertToJsonDocument(T value) {
//TODO
return "{}";
}
}
请注意,您还可以使用批量操作来提高速度。
在本post的“将Flink连接到Amazon RS”部分描述了Flink的Jest实现示例
https://stackoverflow.com/questions/47441610
复制相似问题