前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在docker中使用`canal`同步数据到`elasticsearch`

在docker中使用`canal`同步数据到`elasticsearch`

作者头像
用户1215919
发布2021-12-28 12:43:08
3780
发布2021-12-28 12:43:08
举报
文章被收录于专栏:大大的微笑大大的微笑

准备:

  • mysql:v5.7
  • canal-server:v1.4.1
  • elasticsearch:v7.5.1

创建网络:

代码语言:javascript
复制
docker network create net

创建volume

代码语言:javascript
复制
docker volume create elasticsearch
docker volume create mysql

创建container

代码语言:javascript
复制
#mysql
docker run -d --name mysql -p 3306:3306 --privileged=true  -v mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --net net mysql:5.7 --log-bin=mysql-bin --binlog-format=ROW --server_id=101

#canal server
docker run -d --name canal-server --net net  -p 11111:11111 -e canal.instance.master.address=mysql:3306 \
         -e canal.instance.dbUsername=root \
         -e canal.instance.dbPassword=123 \
         -e canal.instance.connectionCharset=UTF-8 canal/canal-server:v1.1.4
         
#elasticsearch
docker run -d --name elasticsearch -p 9200:9200 -v elasticsearch -e "discovery.type=single-node"  --network net elasticsearch:7.5.1          

查看mysql配置是否成功:

代码语言:javascript
复制
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
2 rows in set (0.00 sec)

mysql> show variables like '%binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

mysql> show variables like '%server_id';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| server_id     | 101   |
+---------------+-------+
1 row in set (0.00 sec)

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      884 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.01 sec)

授权 canal 链接 MySQL 账号具有作为 MySQL slave的权限

代码语言:javascript
复制
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

elasticsearch客户端依赖:

代码语言:javascript
复制
  <properties>
        <elasticsearch.version>7.5.1</elasticsearch.version>
    </properties>
 <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>

实体类:

代码语言:javascript
复制
   public class Person {
    private Long id;
    private String name;
   }

基本的增删改:

代码语言:javascript
复制
RestHighLevelClient client;

    public SearchIndex() {
        init();
    }

    private void init() {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http"));
    }

    /**
     * delete index
     * @param person
     * @throws IOException
     */
    public void deleteIdx(Person person) throws IOException {
        DeleteRequest request = new DeleteRequest(
                generateIdx(person.getClass()),
                Long.toString(person.getId()));
        DeleteResponse deleteResponse = client.delete(
                request, RequestOptions.DEFAULT);
        client.close();
        System.out.println(deleteResponse.toString());
    }


    /**
     * update index
     */
    public void updateIdx(Person person) throws IOException {
        UpdateRequest request = new UpdateRequest(
                generateIdx(person.getClass()),
                Long.toString(person.getId()));
        String jsonStr = JSON.toJSONString(person);
        request.doc(jsonStr, XContentType.JSON);
        request.timeout(TimeValue.timeValueSeconds(5));
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        String[] includes = new String[]{"name", "birthday",};
        String[] excludes = new String[]{"id", "gender"}; // 这里无效
        request.fetchSource(
                new FetchSourceContext(true, includes, excludes));
        UpdateResponse updateResponse = client.update(
                request, RequestOptions.DEFAULT);
        client.close();

    }


    /**
     * create index
     *
     * @throws IOException
     */
    public void createIdx(Person person) throws IOException {
        IndexRequest request = new IndexRequest(generateIdx(person.getClass()));
        request.id(Long.toString(person.getId()));

        String jsonString = JSON.toJSONString(person);
        System.out.println(jsonString);
        request.source(jsonString, XContentType.JSON);
        request.versionType(VersionType.INTERNAL);
        request.opType(DocWriteRequest.OpType.CREATE);
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        request.timeout(TimeValue.timeValueSeconds(5));
        client.indexAsync(request, RequestOptions.DEFAULT, new EsIndexListener(client));

    }

canal-client依赖:

代码语言:javascript
复制
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

客户端代码:

代码语言:javascript
复制
public class SyncData {
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
                11111), "topic", "", "");

        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    continue;
                }

                new Thread(new DataProcessor(message.getEntries())).start();
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

        } finally {
            connector.disconnect();
        }
    }

业务处理:

代码语言:javascript
复制
public class DataProcessor implements Runnable {
    private List<Entry> entries;
    private SearchIndex idx = new SearchIndex();

    public DataProcessor(List<Entry> entries) {
        this.entries = entries;
    }

    @Override
    public void run() {
        try {
            processor();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * processor
     */
    private void processor() throws IOException {
        for (Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                Person person = new Person();
                List<CanalEntry.Column> columns = null;
                switch (eventType) {
                    case DELETE:
                        columns = rowData.getBeforeColumnsList();
                        break;
                    case UPDATE:
                    case INSERT:
                        columns = rowData.getAfterColumnsList();
                        break;
                }
                columns.forEach(data -> {
                    if ("id".equals(data.getName())) {
                        person.setId(Long.parseLong(data.getValue()));
                    } else if ("name".equals(data.getName())) {
                        person.setName(data.getValue());
                    }

                });
                if(person.getId() == null){
                    continue;
                }
                switch (eventType) {
                    case DELETE:
                        idx.deleteIdx(person);
                        break;
                    case INSERT:
                        idx.createIdx(person);
                        break;
                    case UPDATE:
                        idx.updateIdx(person);
                        break;
                }
            }
        }
    }

或者可以直接使用canal-adapter

需要注意的是使用最新版本的mysql(8.x)可能会导致canal server无法启动

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档