前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊BinaryLogClient的connect

聊聊BinaryLogClient的connect

原创
作者头像
code4it
修改2020-04-29 10:03:18
9840
修改2020-04-29 10:03:18
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下BinaryLogClient的connect

connect(timeout)

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
​
    public void connect(final long timeout) throws IOException, TimeoutException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        AbstractLifecycleListener connectListener = new AbstractLifecycleListener() {
            @Override
            public void onConnect(BinaryLogClient client) {
                countDownLatch.countDown();
            }
        };
        registerLifecycleListener(connectListener);
        final AtomicReference<IOException> exceptionReference = new AtomicReference<IOException>();
        Runnable runnable = new Runnable() {
​
            @Override
            public void run() {
                try {
                    setConnectTimeout(timeout);
                    connect();
                } catch (IOException e) {
                    exceptionReference.set(e);
                    countDownLatch.countDown(); // making sure we don't end up waiting whole "timeout"
                }
            }
        };
        newNamedThread(runnable, "blc-" + hostname + ":" + port).start();
        boolean started = false;
        try {
            started = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, e.getMessage());
            }
        }
        unregisterLifecycleListener(connectListener);
        if (exceptionReference.get() != null) {
            throw exceptionReference.get();
        }
        if (!started) {
            try {
                terminateConnect();
            } finally {
                throw new TimeoutException("BinaryLogClient was unable to connect in " + timeout + "ms");
            }
        }
    }
​
    //......
​
}
  • 带timeout参数的connect方法使用setConnectTimeout方法设置了超时时间,然后再执行无参的connect方法;之后在捕获IOException的时候更新exceptionReference以及触发countDownLatch.countDown(),之后抛出exceptionReference.get()

connect

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
​
    public void connect() throws IOException {
        if (!connectLock.tryLock()) {
            throw new IllegalStateException("BinaryLogClient is already connected");
        }
        boolean notifyWhenDisconnected = false;
        try {
            Callable cancelDisconnect = null;
            try {
                try {
                    long start = System.currentTimeMillis();
                    channel = openChannel();
                    if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {
                        cancelDisconnect = scheduleDisconnectIn(connectTimeout -
                            (System.currentTimeMillis() - start));
                    }
                    if (channel.getInputStream().peek() == -1) {
                        throw new EOFException();
                    }
                } catch (IOException e) {
                    throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
                        ". Please make sure it's running.", e);
                }
                GreetingPacket greetingPacket = receiveGreeting();
                authenticate(greetingPacket);
                connectionId = greetingPacket.getThreadId();
                if ("".equals(binlogFilename)) {
                    synchronized (gtidSetAccessLock) {
                        if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
                            gtidSet = new GtidSet(fetchGtidPurged());
                        }
                    }
                }
                if (binlogFilename == null) {
                    fetchBinlogFilenameAndPosition();
                }
                if (binlogPosition < 4) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
                    }
                    binlogPosition = 4;
                }
                ChecksumType checksumType = fetchBinlogChecksum();
                if (checksumType != ChecksumType.NONE) {
                    confirmSupportOfChecksum(checksumType);
                }
                if (heartbeatInterval > 0) {
                    enableHeartbeat();
                }
                gtid = null;
                tx = false;
                requestBinaryLogStream();
            } catch (IOException e) {
                disconnectChannel();
                throw e;
            } finally {
                if (cancelDisconnect != null) {
                    try {
                        cancelDisconnect.call();
                    } catch (Exception e) {
                        if (logger.isLoggable(Level.WARNING)) {
                            logger.warning("\"" + e.getMessage() +
                                "\" was thrown while canceling scheduled disconnect call");
                        }
                    }
                }
            }
            connected = true;
            notifyWhenDisconnected = true;
            if (logger.isLoggable(Level.INFO)) {
                String position;
                synchronized (gtidSetAccessLock) {
                    position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
                }
                logger.info("Connected to " + hostname + ":" + port + " at " + position +
                    " (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
            }
            for (LifecycleListener lifecycleListener : lifecycleListeners) {
                lifecycleListener.onConnect(this);
            }
            if (keepAlive && !isKeepAliveThreadRunning()) {
                spawnKeepAliveThread();
            }
            ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
            synchronized (gtidSetAccessLock) {
                if (gtidSet != null) {
                    ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
                    ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
                }
            }
            listenForEventPackets();
        } finally {
            connectLock.unlock();
            if (notifyWhenDisconnected) {
                for (LifecycleListener lifecycleListener : lifecycleListeners) {
                    lifecycleListener.onDisconnect(this);
                }
            }
        }
    }
​
    //......
​
}
  • connect方法先执行openChannel方法,之后在connectTimeout大于0且keepAliveThreadRunning为false时通过scheduleDisconnectIn调度执行超时逻辑;之后执行receiveGreeting获取greetingPacket,然后执行authenticate(greetingPacket),之后获取connectionId及设置gtidSet;在binlogFilename为null时执行fetchBinlogFilenameAndPosition;之后执行fetchBinlogChecksum,对于heartbeatInterval大于0的执行enableHeartbeat,最后执行requestBinaryLogStream;若出现IOException则执行disconnectChannel并抛出异常;连接成功之后回调lifecycleListeners的onConnect,对于需要keepAlive的执行spawnKeepAliveThread;最后执行listenForEventPackets

scheduleDisconnectIn

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
​
    private Callable scheduleDisconnectIn(final long timeout) {
        final BinaryLogClient self = this;
        final CountDownLatch connectLatch = new CountDownLatch(1);
        final Thread thread = newNamedThread(new Runnable() {
            @Override
            public void run() {
                try {
                    connectLatch.await(timeout, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, e.getMessage());
                    }
                }
                if (connectLatch.getCount() != 0) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.warning("Failed to establish connection in " + timeout + "ms. " +
                            "Forcing disconnect.");
                    }
                    try {
                        self.disconnectChannel();
                    } catch (IOException e) {
                        if (logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, e.getMessage());
                        }
                    }
                }
            }
        }, "blc-disconnect-" + hostname + ":" + port);
        thread.start();
        return new Callable() {
​
            public Object call() throws Exception {
                connectLatch.countDown();
                thread.join();
                return null;
            }
        };
    }
​
    private void disconnectChannel() throws IOException {
        connected = false;
        if (channel != null && channel.isOpen()) {
            channel.close();
        }
    }
​
    //......
​
}
  • scheduleDisconnectIn方法主要是通过CountDownLatch来执行超时等待,超时的话执行disconnectChannel方法,它会关闭channel

receiveGreeting

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
​
    private GreetingPacket receiveGreeting() throws IOException {
        byte[] initialHandshakePacket = channel.read();
        if (initialHandshakePacket[0] == (byte) 0xFF /* error */) {
            byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
                    errorPacket.getSqlState());
        }
        return new GreetingPacket(initialHandshakePacket);
    }
​
    private void fetchBinlogFilenameAndPosition() throws IOException {
        ResultSetRowPacket[] resultSet;
        channel.write(new QueryCommand("show master status"));
        resultSet = readResultSet();
        if (resultSet.length == 0) {
            throw new IOException("Failed to determine binlog filename/position");
        }
        ResultSetRowPacket resultSetRow = resultSet[0];
        binlogFilename = resultSetRow.getValue(0);
        binlogPosition = Long.parseLong(resultSetRow.getValue(1));
    }
​
    private ChecksumType fetchBinlogChecksum() throws IOException {
        channel.write(new QueryCommand("show global variables like 'binlog_checksum'"));
        ResultSetRowPacket[] resultSet = readResultSet();
        if (resultSet.length == 0) {
            return ChecksumType.NONE;
        }
        return ChecksumType.valueOf(resultSet[0].getValue(1).toUpperCase());
    }
​
    private void enableHeartbeat() throws IOException {
        channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
        byte[] statementResult = channel.read();
        if (statementResult[0] == (byte) 0xFF /* error */) {
            byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);
            ErrorPacket errorPacket = new ErrorPacket(bytes);
            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
                errorPacket.getSqlState());
        }
    }
​
    private void requestBinaryLogStream() throws IOException {
        long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
        Command dumpBinaryLogCommand;
        synchronized (gtidSetAccessLock) {
            if (gtidSet != null) {
                dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
                    useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
                    useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
                    gtidSet);
            } else {
                dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
            }
        }
        channel.write(dumpBinaryLogCommand);
    }    
​
    private void spawnKeepAliveThread() {
        final ExecutorService threadExecutor =
            Executors.newSingleThreadExecutor(new ThreadFactory() {
​
                @Override
                public Thread newThread(Runnable runnable) {
                    return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
                }
            });
        try {
            keepAliveThreadExecutorLock.lock();
            threadExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    while (!threadExecutor.isShutdown()) {
                        try {
                            Thread.sleep(keepAliveInterval);
                        } catch (InterruptedException e) {
                            // expected in case of disconnect
                        }
                        if (threadExecutor.isShutdown()) {
                            return;
                        }
                        boolean connectionLost = false;
                        if (heartbeatInterval > 0) {
                            connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
                        } else {
                            try {
                                channel.write(new PingCommand());
                            } catch (IOException e) {
                                connectionLost = true;
                            }
                        }
                        if (connectionLost) {
                            if (logger.isLoggable(Level.INFO)) {
                                logger.info("Trying to restore lost connection to " + hostname + ":" + port);
                            }
                            try {
                                terminateConnect();
                                connect(connectTimeout);
                            } catch (Exception ce) {
                                if (logger.isLoggable(Level.WARNING)) {
                                    logger.warning("Failed to restore connection to " + hostname + ":" + port +
                                        ". Next attempt in " + keepAliveInterval + "ms");
                                }
                            }
                        }
                    }
                }
            });
            keepAliveThreadExecutor = threadExecutor;
        } finally {
            keepAliveThreadExecutorLock.unlock();
        }
    }
​
    //......
​
}
  • receiveGreeting方法接收initialHandshakePacket;fetchBinlogFilenameAndPosition方法发送show master status命令,然后解析binlogFilename及binlogPosition;fetchBinlogChecksum方法发送show global variables like 'binlog_checksum'命令;enableHeartbeat方法发送set @master_heartbeat_period命令;requestBinaryLogStream方法发送DumpBinaryLogGtidCommand命令;spawnKeepAliveThread方法会调度执行keepAliveThread,它主体是while循环,然后通过Thread.sleep(keepAliveInterval)来实现间隔,然后定时发送PingCommand命令,对于connectionLost的执行terminateConnect及connect方法

listenForEventPackets

mysql-binlog-connector-java-0.20.1/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

代码语言:javascript
复制
public class BinaryLogClient implements BinaryLogClientMXBean {
    
    //......
​
    private void listenForEventPackets() throws IOException {
        ByteArrayInputStream inputStream = channel.getInputStream();
        boolean completeShutdown = false;
        try {
            while (inputStream.peek() != -1) {
                int packetLength = inputStream.readInteger(3);
                inputStream.skip(1); // 1 byte for sequence
                int marker = inputStream.read();
                if (marker == 0xFF) {
                    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
                    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
                        errorPacket.getSqlState());
                }
                if (marker == 0xFE && !blocking) {
                    completeShutdown = true;
                    break;
                }
                Event event;
                try {
                    event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
                        new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
                        inputStream);
                    if (event == null) {
                        throw new EOFException();
                    }
                } catch (Exception e) {
                    Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
                    if (cause instanceof EOFException || cause instanceof SocketException) {
                        throw e;
                    }
                    if (isConnected()) {
                        for (LifecycleListener lifecycleListener : lifecycleListeners) {
                            lifecycleListener.onEventDeserializationFailure(this, e);
                        }
                    }
                    continue;
                }
                if (isConnected()) {
                    eventLastSeen = System.currentTimeMillis();
                    updateGtidSet(event);
                    notifyEventListeners(event);
                    updateClientBinlogFilenameAndPosition(event);
                }
            }
        } catch (Exception e) {
            if (isConnected()) {
                for (LifecycleListener lifecycleListener : lifecycleListeners) {
                    lifecycleListener.onCommunicationFailure(this, e);
                }
            }
        } finally {
            if (isConnected()) {
                if (completeShutdown) {
                    disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
                } else {
                    disconnectChannel();
                }
            }
        }
    }
​
    //......
​
}
  • listenForEventPackets方法读取channel.getInputStream(),并通过eventDeserializer.nextEvent解析为event,然后通过notifyEventListeners方法将event通知eventListener

小结

connect方法先执行openChannel方法,之后在connectTimeout大于0且keepAliveThreadRunning为false时通过scheduleDisconnectIn调度执行超时逻辑;之后执行receiveGreeting获取greetingPacket,然后执行authenticate(greetingPacket),之后获取connectionId及设置gtidSet;在binlogFilename为null时执行fetchBinlogFilenameAndPosition;之后执行fetchBinlogChecksum,对于heartbeatInterval大于0的执行enableHeartbeat,最后执行requestBinaryLogStream;若出现IOException则执行disconnectChannel并抛出异常;连接成功之后回调lifecycleListeners的onConnect,对于需要keepAlive的执行spawnKeepAliveThread;最后执行listenForEventPackets

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • connect(timeout)
  • connect
    • scheduleDisconnectIn
      • receiveGreeting
        • listenForEventPackets
        • 小结
        • doc
        相关产品与服务
        云数据库 SQL Server
        腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档