专栏首页大数据成神之路Canal连接kafka实现实时同步mysql数据

Canal连接kafka实现实时同步mysql数据

canal-kafka是阿里云最近更新的一个新的安装包。主要功能是实现canal与kafka的对接,实现海量的消息传输同步。在canal-kafka中,消息是以ByteString进行传输的,并且用户只能通过配置来指定一些kafka的配置,从某种程度上有一定的局限性,所以我们使用canal来自定义客户端kafka,会有更好的灵活性,但维护成本会更大,所以如何选择根据实际情况而定。

构建maven依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.25</version>
</dependency> 
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>

SimpleCanalClient

package com.unigroup.client.canal;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.core.canal.CanalToKG;

/**   
* @Title: SimpleCanalClient.java 
* @Package com.unigroup.canal 
* @Description: canal单实例接口
*/
public class SimpleCanalClient {

    private CanalConnector connector=null;

    public SimpleCanalClient(String ip,String port,String instance) {

        // 创建链接
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, Integer.parseInt(port)),instance, "", "");
    }
    public List<Entry> execute(int batchSize,Class<?> clazz ) throws InstantiationException, IllegalAccessException, NoSuchMethodException, SecurityException {

        //int batchSize = 1;
        int emptyCount = 0;
        Object obj = clazz.newInstance();
        Method method = clazz.getMethod("send",Message.class);
        try {
            connector.connect();
            // connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");

            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    method.invoke(obj, message);            
                }
                connector.ack(batchId); // 提交确认

                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } catch (IllegalAccessException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
        return null;
    }
}

CanalKafkaProducer

package com.unigroup.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.protocol.Message;
import com.unigroup.kafka.producer.KafkaProperties.Topic;
import com.unigroup.utils.MessageSerializer;

/**   
* @Title: CanalKafkaProducer.java 
* @Package com.unigroup.kafka.producer 
* @version V1.0   
*/
public class CanalKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);

    private Producer<String, Message> producer;

    public void init(KafkaProperties kafkaProperties) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", "all");
        properties.put("retries", kafkaProperties.getRetries());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", MessageSerializer.class.getName());
        producer = new KafkaProducer<String, Message>(properties);
    }

    public void stop() {
        try {
            logger.info("## stop the kafka producer");
            producer.close();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping kafka producer:", e);
        } finally {
            logger.info("## kafka producer is down.");
        }
    }

    public void send(Topic topic, Message message) throws IOException {

        ProducerRecord<String, Message> record;
        if (topic.getPartition() != null) {
            record = new ProducerRecord<String, Message>(topic.getTopic(), topic.getPartition(), null, message);
        } else {
            record = new ProducerRecord<String, Message>(topic.getTopic(), message);
        }
        producer.send(record);
        if (logger.isDebugEnabled()) {
            logger.debug("send message to kafka topic: {} \n {}", topic.getTopic(), message.toString());
        }
    }
}

canalToKafkaServer

package com.unigroup.kafka.server;

import com.unigroup.client.canal.SimpleCanalClient;
import com.unigroup.kafka.producer.CanalKafkaProducer;
import com.unigroup.utils.GetProperties;

/**   
* @Title: canal.java 
* @Package com.unigroup.kafka.server 
* @version V1.0   
*/
public class canalToKafkaServer {
    public static void execute() {
        SimpleCanalClient simpleCanalClient = new SimpleCanalClient(GetProperties.getValue("MYSQL_HOST"),
                GetProperties.getValue("MTSQL_PORT"), GetProperties.getValue("INSTANCE"));
        try {
            simpleCanalClient.execute(1,CanalKafkaProducer.class);
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

至此一个简单的canal到kafka的demo已经完成。这些都只是测试代码,实际应用中根据不同的情况,可以自己开发更多功能。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 天天在用消息队列,却不知道为啥要用 MQ ,这就尴尬了

    一个用消息队列的人,不知道为啥用,有点尴尬。没有复习这点,很容易被问蒙,然后就开始胡扯了。

    芋道源码
  • Kafka发送消息时提示请求数据过大是怎么回事?

    然后我去服务器查看了下 producer 的配置,发现没有配置 max.request.size,默认值为 1048576,而他发送的消息大小为 1575543...

    张乘辉
  • spring boot整合kafka

    最近项目需求用到了kafka信息中间件,在此做一次简单的记录,方便以后其它项目用到。

    GreizLiao
  • 不仅会用@Async,我把源码也梳理了一遍(上)

    说起异步化,很多人会想起异步线程、消息队列等,消息队列不是文章的主题,今天我们来聊聊spring对异步化的支持@EnableAsync&@Async。

    java思维导图
  • 使用多线程增加kafka消费能力

    kafka通过一系列优化,写入和读取速度能够达到数万条/秒。通过增加分区数量,能够通过部署多个消费者增加并行消费能力。但还是有很多情况下,某些业务的执行速度实在...

    xjjdog
  • (知识短文)kafka中partition和消费者对应关系

    kafka 为了保证同一类型的消息顺序性(FIFO),一个partition只能被同一组的一个consumer消费,不同组的consumer可以消费同一个par...

    互扯程序
  • 深入理解 AMQP 协议

    原文链接:https://blog.csdn.net/weixin_37641832/article/details/...

    微风-- 轻许--
  • 剖析nsq消息队列(一) 简介及去中心化实现原理

    分布式消息队列nsq,简单易用,去中心化的设计使nsq更健壮,nsq充分利用了go语言的goroutine和channel来实现的消息处理,代码量也不大,读不了...

    lpxxn
  • Openstack Stein 部署遇到的问题

    OpenStack 的第 19 个版本 Stein,支持 5G 和边缘计算。 OpenStack Stein 强化裸机和网络管理性能,同时更快速的启动 Kube...

    后端云
  • 为什么要使用MQ消息中间件?这3个点让你彻底明白!

    一个用消息队列的人,不知道为啥用,有点尴尬。没有复习这点,很容易被问蒙,然后就开始胡扯了。

    程序员追风

扫码关注云+社区

领取腾讯云代金券