前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mysql To ES By Flink-CDC

Mysql To ES By Flink-CDC

原创
作者头像
沈小翊
发布2023-11-27 16:59:31
4690
发布2023-11-27 16:59:31
举报
文章被收录于专栏:大数据生态大数据生态

本文将介绍如何通过Flink实现Mysql到ES的CDC近实时数据同步。

CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、

更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

目前市面上大多数flink cdc到ES的方法都是flink sql client建源端表同步mysql表,建终端表同步关联ES索引,建立一个同步任务

代码语言:javascript
复制
insert into es_table select * from mysql_table;

实时地完成Mysql到ES的数据同步,依赖flink内核实现,非常简便。但如果需要在CDC过程中进行数据处理则需要手动建立CDC

1. 环境准备

Mysql 8.0

ElasticSearch 7.16

Flink 1.14.4

JDK 1.8

pom文件

代码语言:javascript
复制
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.15.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.17</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>   <artifactId>flink-walkthrough-common_${scala.binary.version}
            </artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
2. 连接Mysql获取binlog Datastream
代码语言:javascript
复制
RestHighLevelClient  client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("ip")
                .port(3306)
                .databaseList("database_name")
                .tableList("table_name")
                .username("root")
                .password("password")
             .deserializer(newJsonDebeziumDeserializationSchema())
                .build();

每隔三秒向mysql查询新的binlog

代码语言:javascript
复制
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        DataStream<String> input = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("es-ip", 9200, "http"));
3. 解析binlog-对应处理ES中数据后sink到ES
代码语言:javascript
复制
//创建ElasticsearchSink sink to es
        ElasticsearchSink.Builder<String> esSinkBuilder = 
        new ElasticsearchSink.Builder<>(httpHosts,
        new ElasticsearchSinkFunction<String>() {
        //向ES添加数据
        public IndexRequest createIndexRequest(
        String index,HashMap<String,Object> map) 
       {
       return Requests.indexRequest().index(index).source(map);
       }
        //向ES删除数据
       public void delete(String index,String element,
       String before_after) throws IOException {
       System.out.println("删除该数据");
       client.delete(Requests.deleteRequest(index)
       .id(getID(element,before_after)),
       RequestOptions.DEFAULT);
       }
        //根据binlog中字段,在ES中进行多重匹配查询数据ID
        public String getID(String element,String before_after) throws IOException {
        JSONObject object = JSON.parseObject(element);
        JSONObject json_value =object.getJSONObject(before_after);
        if(json_value.toString().equals("null")){
           System.out.println("这是条删除binlog,数据已删除无法找到");
              return "";
           }
        int i = 0;
Set<Map.Entry<String, Object>> entrySet = json_value.entrySet();

        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key值
        for (Map.Entry<String, Object> entry : entrySet) {
            map.put("field"+i,entry.getKey());
            map.put("value"+i,entry.getValue());
            i++;
            }
        //添加字段匹配查询
        MultiSearchRequest request = new MultiSearchRequest();
        SearchRequest firstSearchRequest = new SearchRequest();
        for (i = 0; i < entrySet.size(); i++) {
            SearchSourceBuilder searchSourceBuilder = 
            new SearchSourceBuilder();
        //多重查询
searchSourceBuilder.query(QueryBuilders.matchQuery(map.get("field"+i).toString(), map.get("value"+i).toString()));
                              firstSearchRequest.source(searchSourceBuilder);
        request.add(firstSearchRequest);
        }
        //在response中拿到配对数据id
        MultiSearchResponse response = client.msearch
        (request, RequestOptions.DEFAULT);
        MultiSearchResponse.Item firstResponse = response
        .getResponses()[0];
        SearchResponse searchResponse=firstResponse.getResponse();
        SearchHits hits = searchResponse.getHits();
        return firstResponse.getResponse().toString()
        .contains("\"hits\":[]") ? "空数据" : hits.getHits()[0].getId();
        }
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        //对binlog进行判断,对ES数据做出"增" | "删" | "改"动作
        String index = "mysql_es";
        if(element.contains("\"before\":null")){   
        //解析增加数据binlog
        JSONObject json_value = JSON.parseObject(element)
        .getJSONObject("after");
        int i = 0;
        Set<Map.Entry<String, Object>> entrySet = json_value
        .entrySet();
        HashMap<String,Object> map = new HashMap<>();
        //通过迭代器获取这段json当中所有的key value
        for (Map.Entry<String, Object> entry : entrySet) {
                map.put(entry.getKey(),entry.getValue());
            }
                                      indexer.add(createIndexRequest(index,map));
         }else if (element.contains("\"after\":null")){         
         //解析删除数据binlog
         try {
             delete(index,element,"before");
            } catch (IOException e) {
             System.out.println("运行异常");
             throw new RuntimeException(e);
            }
          }else if (!element.contains("\"after\":null") && !element.contains("\"before\":null)")){
          try {
              delete(index,element,"before");  
              //解析更新数据binlog
             } catch (IOException e) {
               throw new RuntimeException(e);
            }

           JSONObject json_value = JSON.parseObject(element)
           .getJSONObject("after");

           Set<Map.Entry<String, Object>> entrySet = json_value
           .entrySet();

           HashMap<String,Object> map = new HashMap<>();
            //通过迭代器获取这段json当中所有的key值
           for (Map.Entry<String, Object> entry : entrySet) {
                  map.put(entry.getKey(),entry.getValue());
               }
                            indexer.add(createIndexRequest(index,map));
           }else {
           System.out.println("binlog不在判断范围内");
             }
            }
          }
        );
4. 配置ES sink
代码语言:javascript
复制
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {}
        );

// finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());

        env.execute();
程序局限
  1. 不适用mysql数据库内有相同数据场景,mysql表需要有主键
  2. 不支持断点进行,每次启动程序重新同步
  3. 未考虑字段嵌套场景(多层JSON)

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 环境准备
  • 2. 连接Mysql获取binlog Datastream
  • 3. 解析binlog-对应处理ES中数据后sink到ES
  • 4. 配置ES sink
  • 程序局限
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档