专栏首页码匠的流水账聊聊SimpleCanalConnector的getWithoutAck
原创

聊聊SimpleCanalConnector的getWithoutAck

本文主要研究一下SimpleCanalConnector的getWithoutAck

getWithoutAck

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    private static final Logger  logger                = LoggerFactory.getLogger(SimpleCanalConnector.class);
    private SocketAddress        address;
    private String               username;
    private String               password;
    private int                  soTimeout             = 60000;                                              // milliseconds
    private int                  idleTimeout           = 60 * 60 * 1000;                                     // client和server之间的空闲链接超时的时间,默认为1小时
    private String               filter;                                                                     // 记录上一次的filter提交值,便于自动重试时提交
​
    private final ByteBuffer     readHeader            = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private final ByteBuffer     writeHeader           = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private SocketChannel        channel;
    private ReadableByteChannel  readableChannel;
    private WritableByteChannel  writableChannel;
    private List<Compression>    supportedCompressions = new ArrayList<Compression>();
    private ClientIdentity       clientIdentity;
    private ClientRunningMonitor runningMonitor;                                                             // 运行控制
    private ZkClientx            zkClientx;
    private BooleanMutex         mutex                 = new BooleanMutex(false);
    private volatile boolean     connected             = false;                                              // 代表connected是否已正常执行,因为有HA,不代表在工作中
    private boolean              rollbackOnConnect     = true;                                               // 是否在connect链接成功后,自动执行rollback操作
    private boolean              rollbackOnDisConnect  = false;                                              // 是否在connect链接成功后,自动执行rollback操作
    private boolean              lazyParseEntry        = false;                                              // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析
    // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
    private Object               readDataLock          = new Object();
    private Object               writeDataLock         = new Object();
​
    private volatile boolean     running               = false;
​
    //......
​
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        return getWithoutAck(batchSize, null, null);
    }
​
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return null;
        }
        try {
            int size = (batchSize <= 0) ? 1000 : batchSize;
            long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
            if (unit == null) {
                unit = TimeUnit.MILLISECONDS;
            }
​
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.GET)
                .setBody(Get.newBuilder()
                    .setAutoAck(false)
                    .setDestination(clientIdentity.getDestination())
                    .setClientId(String.valueOf(clientIdentity.getClientId()))
                    .setFetchSize(size)
                    .setTimeout(time)
                    .setUnit(unit.ordinal())
                    .build()
                    .toByteString())
                .build()
                .toByteArray());
            return receiveMessages();
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }
​
    //......
​
}
  • getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit

writeWithHeader

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    //......
​
    private void writeWithHeader(byte[] body) throws IOException {
        writeWithHeader(writableChannel, body);
    }
​
    private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {
        synchronized (writeDataLock) {
            writeHeader.clear();
            writeHeader.putInt(body.length);
            writeHeader.flip();
            channel.write(writeHeader);
            channel.write(ByteBuffer.wrap(body));
        }
    }
​
    //......
}
  • writeWithHeader在header写入body的长度,然后写入header,再写入body

receiveMessages

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    //......
​
    private Message receiveMessages() throws IOException {
        byte[] data = readNextPacket();
        return CanalMessageDeserializer.deserializer(data, lazyParseEntry);
    }
​
    private byte[] readNextPacket() throws IOException {
        return readNextPacket(readableChannel);
    }
​
    private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {
        synchronized (readDataLock) {
            readHeader.clear();
            read(channel, readHeader);
            int bodyLen = readHeader.getInt(0);
            ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
            read(channel, bodyBuf);
            return bodyBuf.array();
        }
    }
​
    //......
}
  • receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

小结

getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit;receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊SimpleCanalConnector的getWithoutAck

    本文主要研究一下SimpleCanalConnector的getWithoutAck

    codecraft
  • 聊聊openjdk的BufferPoolMXBean

    java.management/java/lang/management/PlatformManagedObject.java

    codecraft
  • 聊聊openjdk的BufferPoolMXBean

    java.management/java/lang/management/PlatformManagedObject.java

    codecraft
  • 聊聊SimpleCanalConnector的getWithoutAck

    本文主要研究一下SimpleCanalConnector的getWithoutAck

    codecraft
  • 既然Java反射可以访问和修改私有成员变量,那封装成private还有意义么?

    安全是指不让代码被非法看到/访问。但是只要人能拿到代码,总会有办法去查看和改变代码。其他答案提到反射可以用SecurityManager来防止private被访...

    大宽宽
  • RocketMQ学习-NameServer-1

    NameServer在RocketMQ中的角色是配置中心,主要有两个功能:Broker管理、路由管理。因此NameServer上存放的主要信息也包括两类:Bro...

    阿杜
  • NullException、Token的作用、Mapstruct用法

    token主要用在会话管理,防止表单提交和防止CSRF攻击,同时token支持跨域访问,无状态,不存储session信息。

    关忆北.
  • Android实现轮询的三种方式

    砸漏
  • springboot 根据实体类生成数据库中表BaseEntity(公共实体)配置文件application.yml 子类实体:

    1.标注为@MappedSuperclass的类将不是一个完整的实体类,他将不会映射到数据库表,但是他的属性都将映射到其子类的数据库字段中。

    用户5899361
  • 广告系统设计与实现(八) -广告检索系统的设计与实现 - 下

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    chenchenchen

扫码关注云+社区

领取腾讯云代金券