前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >canal-adapter消费Kafka中MySQL的binlog数据,却没有同步更新Elastic search

canal-adapter消费Kafka中MySQL的binlog数据,却没有同步更新Elastic search

作者头像
iiopsd
发布2022-12-23 08:47:27
1.6K0
发布2022-12-23 08:47:27
举报
文章被收录于专栏:iiopsd技术专栏iiopsd技术专栏

背景

在同步MySQL数据到ES的场景中,选择了canal组件同步数据。

数据同步流程图
数据同步流程图

问题描述

在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。

问题分析

初步估计是内部解析yml的时候出错了,但具体是什么原因只能看源码调试了。

下载源码

GitHub地址

IDEA导入代码

项目结构图如下:

在这里插入图片描述
在这里插入图片描述

配置文件: application.yml (## 备注代表需要注意和修改的地方)

代码语言:javascript
复制
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  ## 模式
  mode: kafka #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    ## kafka地址,ip用内网(容器)ip
    kafka.bootstrap.servers: 192.168.0.107:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  ## 数据库配置
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.0.107:3306/test?useUnicode=true
      username: root
      password: root
  canalAdapters:
  ## Kafka主题名
  - instance: canal_manager # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
	  ## Elasticsearch 配置,canal 1.5后name:es7(用es可能会有问题)
      - name: es7
        hosts: http://192.168.0.107:9200 # 127.0.0.1:9200 for rest mode
        properties:
          ## 模式rest
          mode: rest # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

customer.ym (## 备注代表需要注意和修改的地方)

代码语言:javascript
复制
dataSourceKey: defaultDS
## Kafka主题
destination: canal_manager
groupId: g1
esMapping:
  ## Elasticsearch 索引
  _index: product
  ## 主键
  _id: _id
  _type: _doc
  upsert: true
  #  relations:
  #    customer_order:
  #      name: customer
  ## 正确的sql
  #sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, r.Pro_No proNo, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
  ## 出错的sql
  sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
  etlCondition: "where p.c_time>={}"
  commitBatch: 3000

启动入口:CanalAdapterApplication 全量同步接口类:CommonRest 请求示例:

代码语言:javascript
复制
// post
http://127.0.0.1:8081/etl/es7/customer.yml

查找问题

启动程序后,发现打印了错误日志。

代码语言:javascript
复制
2022-03-18 09:10:56.537 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2022-03-18 09:10:56.742 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2022-03-18 09:10:56.874 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2022-03-18 09:11:00.028 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
	at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.__refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.jrLockAndRefresh(AbstractApplicationContext.java:40002) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:41008) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
	at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
Caused by: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	... 44 common frames omitted
Caused by: com.alibaba.druid.sql.parser.ParserException: null
	at com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser.parse(SqlParser.java:71) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:143) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
	... 45 common frames omitted
2022-03-18 09:11:00.046 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /Users/Desktop/workspace/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin
2022-03-18 09:11:00.101 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: canal_manager-g1 succeed
2022-03-18 09:11:00.101 [Thread-35] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: canal_manager <=============
2022-03-18 09:11:00.101 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2022-03-18 09:11:00.113 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2022-03-18 09:11:00.126 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-03-18 09:11:00.128 [Thread-35] INFO  o

查看日志异常好像是sql解析异常:java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException,但是sql放到Navicat或其他工具中都可以正常执行,可能是canal内部有自己的解析规则。

根据错误日志断点调试一下,发现具体的问题在ES7xAdapter 适配器初始化的时候出错了。异常信息:“关系条件”列必须位于“选择”列中(Relation condition column must in select columns.)。

在这里插入图片描述
在这里插入图片描述

// ES同步指定sql格式解析 SqlParser.java

代码语言:javascript
复制
/**
     * 解析sql
     *
     * @param sql sql
     * @return 视图对象
     */
    public static SchemaItem parse(String sql) {
        try {
            SQLStatementParser parser = new MySqlStatementParser(sql);
            SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
            MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();

            SchemaItem schemaItem = new SchemaItem();
            schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));
            SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
            List<TableItem> tableItems = new ArrayList<>();
            SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);
            tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));

            List<FieldItem> fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);
            fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));

            schemaItem.init();

            if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {
                throw new ParserException("Parse sql error");
            }
            return schemaItem;
        } catch (Exception e) {
            throw new ParserException();
        }
    }

解决方案

根据异常提示和测试,我们知道canal的配置文件sql格式要求连表查询的时候,必须将关联条件也查出来,就是说如果表A和表B的关联字段都要查出来。

代码语言:javascript
复制
// 正确的sql,要将关联的两个表主键都查出来
select a.id, b.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id

// 错误的sql
select a.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id

总结

有些问题还是需要源码才能发现的,就像这个情况,日志只提示了sql解析异常,但是看起来又没有问题,只能去看代码逻辑调试,才能发现根本的原因。个人经验总结,如果有不对的地方,请大家指正。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 问题描述
  • 问题分析
    • 下载源码
      • IDEA导入代码
        • 查找问题
        • 解决方案
        • 总结
        相关产品与服务
        云数据库 MySQL
        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档