首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊SimpleCanalConnector的getWithoutAck

聊聊SimpleCanalConnector的getWithoutAck

原创
作者头像
code4it
修改2020-04-08 09:51:51
4840
修改2020-04-08 09:51:51
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下SimpleCanalConnector的getWithoutAck

getWithoutAck

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    private static final Logger  logger                = LoggerFactory.getLogger(SimpleCanalConnector.class);
    private SocketAddress        address;
    private String               username;
    private String               password;
    private int                  soTimeout             = 60000;                                              // milliseconds
    private int                  idleTimeout           = 60 * 60 * 1000;                                     // client和server之间的空闲链接超时的时间,默认为1小时
    private String               filter;                                                                     // 记录上一次的filter提交值,便于自动重试时提交
​
    private final ByteBuffer     readHeader            = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private final ByteBuffer     writeHeader           = ByteBuffer.allocate(4).order(ByteOrder.BIG_ENDIAN);
    private SocketChannel        channel;
    private ReadableByteChannel  readableChannel;
    private WritableByteChannel  writableChannel;
    private List<Compression>    supportedCompressions = new ArrayList<Compression>();
    private ClientIdentity       clientIdentity;
    private ClientRunningMonitor runningMonitor;                                                             // 运行控制
    private ZkClientx            zkClientx;
    private BooleanMutex         mutex                 = new BooleanMutex(false);
    private volatile boolean     connected             = false;                                              // 代表connected是否已正常执行,因为有HA,不代表在工作中
    private boolean              rollbackOnConnect     = true;                                               // 是否在connect链接成功后,自动执行rollback操作
    private boolean              rollbackOnDisConnect  = false;                                              // 是否在connect链接成功后,自动执行rollback操作
    private boolean              lazyParseEntry        = false;                                              // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析
    // 读写数据分别使用不同的锁进行控制,减小锁粒度,读也需要排他锁,并发度容易造成数据包混乱,反序列化失败
    private Object               readDataLock          = new Object();
    private Object               writeDataLock         = new Object();
​
    private volatile boolean     running               = false;
​
    //......
​
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        return getWithoutAck(batchSize, null, null);
    }
​
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return null;
        }
        try {
            int size = (batchSize <= 0) ? 1000 : batchSize;
            long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
            if (unit == null) {
                unit = TimeUnit.MILLISECONDS;
            }
​
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.GET)
                .setBody(Get.newBuilder()
                    .setAutoAck(false)
                    .setDestination(clientIdentity.getDestination())
                    .setClientId(String.valueOf(clientIdentity.getClientId()))
                    .setFetchSize(size)
                    .setTimeout(time)
                    .setUnit(unit.ordinal())
                    .build()
                    .toByteString())
                .build()
                .toByteArray());
            return receiveMessages();
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }
​
    //......
​
}
  • getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit

writeWithHeader

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    //......
​
    private void writeWithHeader(byte[] body) throws IOException {
        writeWithHeader(writableChannel, body);
    }
​
    private void writeWithHeader(WritableByteChannel channel, byte[] body) throws IOException {
        synchronized (writeDataLock) {
            writeHeader.clear();
            writeHeader.putInt(body.length);
            writeHeader.flip();
            channel.write(writeHeader);
            channel.write(ByteBuffer.wrap(body));
        }
    }
​
    //......
}
  • writeWithHeader在header写入body的长度,然后写入header,再写入body

receiveMessages

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

public class SimpleCanalConnector implements CanalConnector {
​
    //......
​
    private Message receiveMessages() throws IOException {
        byte[] data = readNextPacket();
        return CanalMessageDeserializer.deserializer(data, lazyParseEntry);
    }
​
    private byte[] readNextPacket() throws IOException {
        return readNextPacket(readableChannel);
    }
​
    private byte[] readNextPacket(ReadableByteChannel channel) throws IOException {
        synchronized (readDataLock) {
            readHeader.clear();
            read(channel, readHeader);
            int bodyLen = readHeader.getInt(0);
            ByteBuffer bodyBuf = ByteBuffer.allocate(bodyLen).order(ByteOrder.BIG_ENDIAN);
            read(channel, bodyBuf);
            return bodyBuf.array();
        }
    }
​
    //......
}
  • receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

小结

getWithoutAck方法先执行writeWithHeader,然后在执行receiveMessages;writeWithHeader的Packet为GET类型,其body设置了autoAck为false,还设置了destination、clientId、fetchSize、timeout、unit;receiveMessages方法执行的是readNextPacket方法,该方法先通过read方法读取header获取body长度,然后再通过read方法读取body,最后返回body

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • getWithoutAck
  • writeWithHeader
  • receiveMessages
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档