前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s 安装canal 注意事项

k8s 安装canal 注意事项

作者头像
iiopsd
发布2022-12-23 08:48:12
7070
发布2022-12-23 08:48:12
举报
文章被收录于专栏:iiopsd技术专栏iiopsd技术专栏

1. MySQL信息

代码语言:javascript
复制
vim /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
service mysqld restart
代码语言:javascript
复制
CREATE USER canal IDENTIFIED BY 'canal!%123AD';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2. canal操作

代码语言:javascript
复制
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal

# 修改连接数据库的配置文件 kafka 配置
vim canal/conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
canal.mq.servers = 127.0.0.1:9092


# 修改连接数据库的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
	## mysql serverId
	canal.instance.mysql.slaveId = 123
	#position info,需要改成自己的数据库信息
	canal.instance.master.address = 127.0.0.1:3306 
	canal.instance.master.journal.name = 
	canal.instance.master.position = 
	canal.instance.master.timestamp = 
	#canal.instance.standby.address = 
	#canal.instance.standby.journal.name =
	#canal.instance.standby.position = 
	#canal.instance.standby.timestamp = 
	#username/password,需要改成自己的数据库信息
	canal.instance.dbUsername = canal  
	canal.instance.dbPassword = canal
	canal.instance.defaultDatabaseName =
	canal.instance.connectionCharset = UTF-8
	#table regex
	canal.instance.filter.regex = .\*\\\\..\*
 
 # kafka主题
 canal.mq.topic=canal_manager


# 启动
bash bin/startup.sh
# 关闭
bash bin/stop.sh

3. canal-adapter使用操作

代码语言:javascript
复制
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: root
  canalAdapters:
  - instance: canal_manager  # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
     - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
#        - name: kudu
#          key: kudu
#          properties:

编写 mer_index.yml (注意:sql 语句中每张表的主键都要出现在查询中)

代码语言:javascript
复制
dataSourceKey: defaultDS
destination: canal_manager
groupId: g1
esMapping:
  _index: mer_index
  _id: _id
  _type: _doc
  upsert: true
#  relations:
#    customer_order:
#      name: customer
  sql: "SELECT a.Role_No _id,a.Mer_Id merId,a.Role_Code roleCode,b.Mer_logo merlogo,b.Mer_Name merName,b.Mer_Status merStatus,c.Src_Tbl_No srcTblNo,c.Province_Code provinceCode,c.City_Code cityCode,c.County_Code countyCode,c.Addr addr,c.Latitude latitude,c.Longitude longitude FROM b_merchants_role a INNER JOIN b_merchants_info b ON a.Mer_Id=b.Mer_Id INNER JOIN s_address c ON b.Mer_Id=c.Src_Tbl_No"
  etlCondition: "where s.c_time>={}"
  commitBatch: 3000

使用startup.sh脚本启动canal-adapter服务。

4. es操作

在Elasticsearch中创建索引:

代码语言:javascript
复制
PUT activy_index
{
  "mappings": {
    "properties": {
      "channelTypeCode":{
        "type":"text"
      },
      "actvyId":{
        "type":"text"
      },
      "actvyName":{
        "type":"text"
      },
      "actvyStatusCd":{
        "type":"text"
      },
      "provinceShrCd":{
        "type":"text"
      },
      "cityCd":{
        "type":"text"
      },
      "districtCd":{
        "type":"text"
      },
      "competitionOrgId":{
        "type":"text"
      },
      "enrollStartTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "enrollEndTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "actvyStartTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "actvyEndTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "addr":{
        "type":"text"
      },
      "longitude":{
        "type":"text"
      },
      "latitude":{
        "type":"text"
      },
      "actvyAdvertisePicLink":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript
复制
PUT info_index
{
  "mappings": {
    "properties": {
      "channelTypeCode":{
        "type":"text"
      },
      "infoId":{
        "type":"text"
      },
      "title":{
        "type":"text"
      },
      "infoTypeCd":{
        "type":"text"
      },
      "provinceShrCd":{
        "type":"text"
      },
      "cityCd":{
        "type":"text"
      },
      "picLink":{
        "type":"text"
      },
      "topFlag":{
        "type":"text",
        "fields":{
		        "keyword":{
		          "type":"keyword"
		        }
		      }
      },
      "validFlag":{
        "type":"text"
      },
      "relFlag":{
        "type":"text"
      },
      "relTm":{
        "type":"date",
        "format": "yyyy-MM-dd HH:mm:ss"

      },
      "infoTypeDesc":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript
复制
PUT mer_index
{
  "mappings": {
    "properties": {
      "merId":{
        "type":"text"
      },
      "roleCode":{
        "type":"text"
      },
	  "merlogo":{
		"type":"text"
	  },
      "merName":{
        "type":"text"
      },
      "merStatus":{
        "type":"text"
      },
      "provinceCode":{
        "type":"text"
      },
      "cityCode":{
        "type":"text"
      },
      "countyCode":{
        "type":"text"
      },
      "addr":{
        "type":"text"
      },
      "latitude":{
        "type":"text"
      },
      "longitude":{
        "type":"text"
      }
    }
  }
}
代码语言:javascript
复制
PUT resource_index
{
  "mappings": {
    "properties": {
      "resGuid":{
        "type":"text"
      },
      "resLink":{
        "type":"text"
      },
      "validFlag":{
        "type":"text"
      }
    }
  }
}

5.数据同步:

curl -X POST http://127.0.0.1:8081/etl/es7/product.yml

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. MySQL信息
  • 2. canal操作
  • 3. canal-adapter使用操作
  • 4. es操作
  • 5.数据同步:
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档