前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink-kafka源-esSink

Flink-kafka源-esSink

作者头像
用户5927264
发布2023-03-16 19:45:12
2620
发布2023-03-16 19:45:12
举报
文章被收录于专栏:OSChinaOSChina
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.shi</groupId>
    <artifactId>flinkTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.34</version>
        </dependency>
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 设置编译版本为1.8 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

代码语言:javascript
复制
package com.shi.kafkaTest;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author shiye
 * @create 2023-02-20 17:06
 */
public class KafkSourceEsSinkTest {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("sungrow_cdc", new SimpleStringSchema(), getKafkaPropertes()));

        ArrayList<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("10.0.81.167", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.168", 9201, "http"));
        httpHosts.add(new HttpHost("10.0.81.169", 9201, "http"));

        ElasticsearchSink.Builder<String> esBuilder = new ElasticsearchSink.Builder<>(httpHosts, getEsSinkFunction());
        //刷新前最大缓存的操作数。
        esBuilder.setBulkFlushMaxActions(1);
        //刷新前最大缓存的数据量(以兆字节为单位)。
        esBuilder.setBulkFlushMaxSizeMb(100);
        //刷新的时间间隔(不论缓存操作的数量或大小如何)
        esBuilder.setBulkFlushInterval(10000);
        //设置用户名密码
        esBuilder.setRestClientFactory(getRestClientFactory());
        stream.addSink(esBuilder.build());

        env.execute();
    }

    //定义kafka配置信息
    public static Properties getKafkaPropertes() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.0.81.158:9092,10.0.81.159:9092,10.0.81.160:9092");
        properties.setProperty("group.id", "sungrow_cdc_shiye_test_group4");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

    /**
     * 定义ES Sink算子
     *
     * @return
     */
    public static ElasticsearchSinkFunction<String> getEsSinkFunction() {
        ElasticsearchSinkFunction<String> esSinkFunction = new ElasticsearchSinkFunction<String>() {
            @Override
            public void process(String event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                System.out.println("------> " + event);
                JSONObject jsonObject = JSONObject.parseObject(event);

//                Map<String, Object> map = new HashMap<>();
//                map.put("dbName", jsonObject.get("dbName"));
//                map.put("tableName", jsonObject.get("tableName"));
//                map.put("operation", jsonObject.get("operation"));

                IndexRequest request = Requests.indexRequest()
                        .index("temp_cdc_test")
                        .source(jsonObject);

                requestIndexer.add(request);
            }
        };
        return esSinkFunction;
    }

    /**
     * es 用户名密码
     */
    public static RestClientFactory getRestClientFactory() {

        RestClientFactory restClientFactory = new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "123456"));
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.disableAuthCaching();
                        return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
            }
        };
        return restClientFactory;
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-02-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档