canal是阿里开源的一个同步mysql到其他存储的一个中间件,它的原理如下:首先伪装成一个mysql的slave服务器消费mysql的binlog,然后在本地根据需要提供tcp服务供下游消费,或者转发到kafka等消息队列中,供下游使用。
canal收到binlog后会解析成entry,一条sql 增删改命令对应一个entry,entry的内容包括表名,操作类型等信息,rowdata是具体的数据内容,由于一个更新语句可能影响多行,所以它是一个list。通过解析entry我们可以自己进行过滤,转化等操作然后存储到下游系统。本文操作的同步mysql到es的系统架构是:mysql->canal->kafka->elasticsearch.
首先搭建我们的服务,在踩了很多坑以后,我整理成了docker.compose.yml文件,方便快速启动,它和k8s的deployment很像。
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.3
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
# client 要访问的 broker 地址
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
# 通过端口连接 zookeeper
#- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# 每个容器就是一个 broker,设置其对应的 ID
#- KAFKA_BROKER_ID=0
# 外部网络只能获取到容器名称,在内外网络隔离情况下
# 通过名称是无法成功访问 kafka 的
# 因此需要通过绑定这个监听器能够让外部获取到的是 IP
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
# kafka 监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
# Kafka默认使用-Xmx1G -Xms1G的JVM内存配置,由于服务器小,调整下启动配置
# 这个看自己的现状做调整,如果资源充足,可以不用配置这个
#- KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
# 设置 kafka 日志位置
#- KAFKA_LOG_DIRS: "/kafka/logs"
#volumes:
# - /var/run/docker.sock:/var/run/docker.sock
# 挂载 kafka 日志
# :前面的路径是你电脑上路径 后面是kafka容器日志路径
# - ~/data/docker/kafka/logs:/kafka/logs
depends_on:
- zookeeper
mysql:
image: docker.io/mysql:5.7
ports:
- "3306:3306"
volumes:
- "~/learn/canal-server/mysql/log:/var/log/mysql"
- "~/learn/canal-server/mysql/data:/var/lib/mysql"
- "~/learn/canal-server/mysql/conf/my.cnf:/etc/mysql/my.cnf"
- "~/learn/canal-server/mysql/mysql-files:/var/lib/mysql-files"
environment:
- MYSQL_ROOT_PASSWORD=canal
canal-server:
image: docker.io/canal/canal-server
ports:
- "11111:11111"
volumes:
- "~/learn/canal-server/instance.properties:/home/admin/canal-server/conf/example/instance.properties"
- "~/learn/canal-server/canal.properties:/home/admin/canal-server/conf/canal.properties"
- "~/learn/canal-server/meta.dat:/home/admin/canal-server/conf/example/meta.dat"
- "~/learn/canal-server/logs/:/home/admin/canal-server/logs/"
depends_on:
- mysql
- kafka
elasticsearch:
image: docker.io/elasticsearch:7.17.6
ports:
- "9200:9200"
- "9300:9300"
volumes:
- "~/learn/elasticsearch/config/config:/usr/share/elasticsearch/config"
- "~/learn/elasticsearch/data/node1/:/usr/share/elasticsearch/data/"
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
kibana:
image: docker.io/kibana:7.17.6
ports:
- "5601:5601"
depends_on:
- kafka
#docker run -d --name kibana --net canal -p 5601:5601 kibana:8.4.3
# docker run -p 3306:3306 --name mysql --network canal -v ~/learn/canal-server/mysql/log:/var/log/mysql -v ~/learn/canal-server/mysql/data:/var/lib/mysql -v ~/learn/canal-server/mysql/conf/my.cnf:/etc/mysql/my.cnf -v ~/learn/canal-server/mysql/mysql-files:/var/lib/mysql-files -e MYSQL_ROOT_PASSWORD=canal mysql:5.7
# docker run --privileged --name canal-server --network host -p 11111:11111 \
# -v ~/learn/canal-server/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
# -v ~/learn/canal-server/canal.properties:/home/admin/canal-server/conf/canal.properties \
# -v ~/learn/canal-server/meta.dat:/home/admin/canal-server/conf/example/meta.dat \
# -v ~/learn/canal-server/logs/:/home/admin/canal-server/logs/ \
# canal/canal-server
# docker run -d --name elasticsearch --net canal -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:8.4.3
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
首先部署依赖的kafka,但是kafka依赖zookeeper,然后是es和kibana,刚开始尝试8.3.4的时候发现跑不通改为低版本了。最后部署mysql 5.7和canal最新版本,中间也尝试过mysql8.0但是支持不太完善,中间有坑。
由于我们需要修改这些中间件的配置文件,所以将这些配置文件挂载在宿主机防止重启后丢失。接着我们开始配置。首先修改mysql的binlog格式为row(row,statement,mixed),基于声明的最省空间,但是部分函数会导致主从不一致,所以改成行格式,每次dml操作会产生最多两条数据,对于插入是更新后的,删除是更新前的,修改是前后各一条。
learn/canal-server/mysql/conf/my.cnf
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
设置账号信息
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.04 sec)
mysql>
grant all privileges on *.* to 'canal'@'%'IDENTIFIED BY 'canal';
Query OK, 0 rows affected, 1 warning (0.02 sec)
mysql>
flush privileges;
Query OK, 0 rows affected (0.01 sec)
重启mysql检查下
select * from mysql.user where User="canal"\G
*************************** 1. row ***************************
Host: %
User: canal
Select_priv: Y
Insert_priv: Y
Update_priv: Y
Delete_priv: Y
Create_priv: Y
Drop_priv: Y
Reload_priv: Y
Shutdown_priv: Y
Process_priv: Y
File_priv: Y
Grant_priv: N
References_priv: Y
Index_priv: Y
Alter_priv: Y
Show_db_priv: Y
Super_priv: Y
Create_tmp_table_priv: Y
Lock_tables_priv: Y
Execute_priv: Y
Repl_slave_priv: Y
Repl_client_priv: Y
Create_view_priv: Y
Show_view_priv: Y
Create_routine_priv: Y
Alter_routine_priv: Y
Create_user_priv: Y
Event_priv: Y
Trigger_priv: Y
Create_tablespace_priv: Y
ssl_type:
ssl_cipher: 0x
x509_issuer: 0x
x509_subject: 0x
max_questions: 0
max_updates: 0
max_connections: 0
max_user_connections: 0
plugin: mysql_native_password
authentication_string: *E3619321C1A937C46A0D8BD1DAC39F93B27D4458
password_expired: N
password_last_changed: 2022-10-16 01:38:41
password_lifetime: NULL
account_locked: N
1 row in set (0.01 sec)
show variables like 'binlog_rows%';
+------------------------------+-------+
| Variable_name | Value |
+------------------------------+-------+
| binlog_rows_query_log_events | OFF |
+------------------------------+-------+
1 row in set (0.08 sec)
接着配置canal将serverMode改成kafka,如果我们想通过sdk访问可以使用默认的tcp
learn/canal-server/canal.properties
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafk
learn/canal-server/instance.properties
canal.instance.mysql.slaveId=10
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example
配置完毕后我们启动服务,在mysql里编辑内容就可以看到kafka已经可以消费了
mysql> insert into orders values(11,'c');
Query OK, 1 row affected (0.00 sec)
% docker exec -it 19b6f42ad805 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example
{"data":[{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"},{"id":"5","name":"b"}],"database":"test","es":1665905655000,"id":66,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":[{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"},{"id":"4"}],"pkNames":null,"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1665908723520,"type":"UPDATE"}
至此完成了一半工作了,接下来就是比较熟悉的消费kafka写入es的工作
package main
import (
"encoding/json"
"fmt"
"learn/learn/elasticsearch/es"
"learn/learn/elasticsearch/kafka"
"strconv"
)
func main() {
var brokers, topics, group string
brokers = "127.0.0.1:9092" //"docker.for.mac.host.internal:9092"
topics = "example"
group = "canal-es-4"
consumer := kafka.NewConsumer(brokers, topics, group, true)
esClient := es.NewClient(topics)
go func() {
for message := range consumer.GetMessages() {
fmt.Println(string(message.Value))
if message.Topic != topics {
continue
}
canal := &Canal{}
if err := json.Unmarshal(message.Value, &canal); err != nil {
fmt.Println(err)
}
for _, v := range canal.Data {
id, _ := strconv.ParseInt(v.ID, 10, 10)
esClient.Insert(&es.Orders{Id: int64(id), Name: v.Name})
}
}
}()
consumer.Consume()
}
package kafka
import (
"context"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
brokers, topics, group string
oldest bool
client sarama.ConsumerGroup
messages chan *sarama.ConsumerMessage
}
func NewConsumer(brokers, topics, group string, oldest bool) *Consumer {
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
assignor := "roundrobin"
switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
case "roundrobin":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
case "range":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
/**
* Setup a new Sarama consumer group
*/
consumer := &Consumer{
ready: make(chan bool),
brokers: brokers,
topics: topics,
group: group,
messages: make(chan *sarama.ConsumerMessage, 10),
}
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
consumer.client = client
return consumer
}
func (c *Consumer) GetMessages() chan *sarama.ConsumerMessage {
return c.messages
}
func (c *Consumer) Consume() {
ctx, cancel := context.WithCancel(context.Background())
consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
log.Println(strings.Split(c.topics, ","), c.brokers, c.topics)
if err := c.client.Consume(ctx, strings.Split(c.topics, ","), c); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
c.ready = make(chan bool)
}
}()
<-c.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
keepRunning := true
for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(c.client, &consumptionIsPaused)
}
}
cancel()
wg.Wait()
if err := c.client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message := <-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
consumer.messages <- message
session.Commit() //
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}
*isPaused = !*isPaused
}
然后就是写入es的逻辑
package es
import (
"context"
"fmt"
"log"
"os"
"strconv"
elastic "github.com/olivere/elastic/v7"
)
type Client struct {
*elastic.Client
index string
}
func NewClient(index string) *Client {
errorlog := log.New(os.Stdout, "APP ", log.LstdFlags)
// Obtain a client. You can also provide your own HTTP client here.
client, err := elastic.NewClient(elastic.SetErrorLog(errorlog), elastic.SetSniff(false))
// Trace request and response details like this
// client, err := elastic.NewClient(elastic.SetTraceLog(log.New(os.Stdout, "", 0)))
if err != nil {
// Handle error
panic(err)
}
// Ping the Elasticsearch server to get e.g. the version number
info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
// Getting the ES version number is quite common, so there's a shortcut
esversion, err := client.ElasticsearchVersion("http://127.0.0.1:9200")
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Elasticsearch version %s\n", esversion)
return &Client{Client: client, index: index}
}
type Orders struct {
Id int64 `json:"id"`
Name string `json:"name"`
}
func (c *Client) Insert(orders *Orders) {
// Use the IndexExists service to check if a specified index exists.
exists, err := c.Client.IndexExists(c.index).Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
if !exists {
// Create a new index.
mapping := `
{
"settings":{
"number_of_shards":1,
"number_of_replicas":0
},
"mappings":{
"properties":{
"id":{
"type":"integer"
},
"name":{
"type":"text",
"store": true,
"fielddata": true
}
}
}
}
`
createIndex, err := c.Client.CreateIndex(c.index).Body(mapping).Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
if !createIndex.Acknowledged {
// Not acknowledged
}
}
// Index a tweet (using JSON serialization)
//tweet1 := Orders{User: "olivere", Message: "Take Five", Retweets: 0}
put1, err := c.Client.Index().
Index(c.index).
Id(strconv.FormatInt(orders.Id, 10)).
BodyJson(orders).
Do(context.Background())
if err != nil {
// Handle error
panic(err)
}
}
至此我们完成了简单的mysql同步到es的部署。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!