前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊puma的DefaultTaskExecutor

聊聊puma的DefaultTaskExecutor

作者头像
code4it
发布2020-06-09 13:10:11
5910
发布2020-06-09 13:10:11
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下puma的DefaultTaskExecutor

TaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/TaskExecutor.java

代码语言:javascript
复制
public interface TaskExecutor extends LifeCycle {

    boolean isStop();

    boolean isMerging();

    void stopUntil(long timestamp);

    void cancelStopUntil();

    void setContext(PumaContext context);

    void initContext();

    PumaContext getContext();

    String getTaskId();

    void setTaskId(String taskId);

    String getTaskName();

    void setTaskName(String taskName);

    String getDefaultBinlogFileName();

    void setDefaultBinlogFileName(String binlogFileName);

    Long getDefaultBinlogPosition();

    void setDefaultBinlogPosition(Long binlogFileName);

    void setInstanceStorageManager(InstanceStorageManager holder);

    List<Sender> getFileSender();

    DataHandler getDataHandler();

    void resume() throws Exception;

    void pause() throws Exception;

    PumaTaskStateEntity getTaskState();

    void setTaskState(PumaTaskStateEntity taskState);

    void setInstanceTask(InstanceTask instanceTask);

    InstanceTask getInstanceTask();

    TableSet getTableSet();
}
  • TaskExecutor继承了LifeCycle,定义了initContext、getContext等方法

AbstractTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/AbstractTaskExecutor.java

代码语言:javascript
复制
@ThreadUnSafe
public abstract class AbstractTaskExecutor implements TaskExecutor {
    private PumaContext context;

    private String taskId;

    private long serverId;

    protected String taskName;

    protected Date beginTime;

    protected TableSet tableSet;

    private String defaultBinlogFileName;

    private Long defaultBinlogPosition;

    protected Parser parser;

    protected DataHandler dataHandler;

    protected Dispatcher dispatcher;

    private volatile boolean stop = true;

    protected InstanceStorageManager instanceStorageManager;

    protected PumaTaskStateEntity state;

    protected InstanceManager instanceManager;

    @Override
    public String getTaskId() {
        return taskId;
    }

    @Override
    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }

    @Override
    public String getTaskName() {
        return taskName;
    }

    @Override
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    /**
     * @param instanceStorageManager
     *           the binlogPositionHolder to set
     */
    public void setInstanceStorageManager(InstanceStorageManager instanceStorageManager) {
        this.instanceStorageManager = instanceStorageManager;
    }

    public void setContext(PumaContext context) {
        this.context = context;
    }

    public PumaContext getContext() {
        return context;
    }

    public String getDefaultBinlogFileName() {
        return defaultBinlogFileName;
    }

    public void setDefaultBinlogFileName(String binlogFileName) {
        this.defaultBinlogFileName = binlogFileName;
    }

    /**
     * @return the defaultBinlogPosition
     */
    public Long getDefaultBinlogPosition() {
        return defaultBinlogPosition;
    }

    /**
     * @param defaultBinlogPosition
     *           the defaultBinlogPosition to set
     */
    public void setDefaultBinlogPosition(Long defaultBinlogPosition) {
        this.defaultBinlogPosition = defaultBinlogPosition;
    }

    /**
     * @param parser
     *           the parser to set
     */
    public void setParser(Parser parser) {
        this.parser = parser;
    }

    /**
     * @param dataHandler
     *           the dataHandler to set
     */
    public void setDataHandler(DataHandler dataHandler) {
        this.dataHandler = dataHandler;
    }

    /**
     * @param dispatcher
     *           the dispatcher to set
     */
    public void setDispatcher(Dispatcher dispatcher) {
        this.dispatcher = dispatcher;
    }

    public long getServerId() {
        return serverId;
    }

    public void setServerId(long serverId) {
        this.serverId = serverId;
    }

    public boolean isStop() {
        return stop;
    }

    protected abstract void doStop() throws Exception;

    protected abstract void doStart() throws Exception;

    @Override
    public void start() {
        try {
            stop = false;

            parser.start();
            dataHandler.start();
            dispatcher.start();
            doStart();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void stop() {
        try {
            stop = true;

            parser.stop();
            dataHandler.stop();
            dispatcher.stop();

            doStop();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void resume() throws Exception {
        stop = false;
    }

    public void pause() throws Exception {
        stop = true;
    }

    @Override
    public List<Sender> getFileSender() {
        return dispatcher.getSenders();
    }

    @Override
    public DataHandler getDataHandler() {
        return this.dataHandler;
    }

    public PumaTaskStateEntity getTaskState() {
        return state;
    }

    public void setTaskState(PumaTaskStateEntity state) {
        this.state = state;
    }

    public Date getBeginTime() {
        return beginTime;
    }

    public void setBeginTime(Date beginTime) {
        this.beginTime = beginTime;
    }

    public TableSet getTableSet() {
        return tableSet;
    }

    public void setTableSet(TableSet tableSet) {
        this.tableSet = tableSet;
    }

    public InstanceManager getInstanceManager() {
        return instanceManager;
    }

    public void setInstanceManager(InstanceManager instanceManager) {
        this.instanceManager = instanceManager;
    }
}
  • AbstractTaskExecutor声明实现TaskExecutor接口,它定义了context、defaultBinlogFileName、defaultBinlogPosition、parser、dataHandler、dispatcher等属性;其start方法执行parser、dataHandler、dispatcher的start方法及doStart方法;其stop方法执行parser、dataHandler、dispatcher的stop方法及doStop方法

DefaultTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

代码语言:javascript
复制
@ThreadUnSafe
public class DefaultTaskExecutor extends AbstractTaskExecutor {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskExecutor.class);

    private SrcDbEntity currentSrcDbEntity;

    private DefaultTableMetaInfoFetcher tableMetaInfoFetcher;

    private String encoding = "utf-8";

    private Socket mysqlSocket;

    private InputStream is;

    private OutputStream os;

    private InstanceTask instanceTask;

    private boolean merging = false;

    private long runUntilTimestamp;

    @Override
    public void doStart() throws Exception {
        Thread.currentThread().setName("DefaultTaskExecutor-" + taskName);
        long failCount = 0;
        merging = false;
        SystemStatusManager.addServer(getTaskName(), "", 0, tableSet);

        do {
            try {
                loadServerId(instanceManager.getUrlByCluster(instanceTask.getInstance()));

                // 读position/file文件
                BinlogInfo binlogInfo = instanceStorageManager.getBinlogInfo(getContext().getPumaServerName());

                if (binlogInfo == null) {
                    this.currentSrcDbEntity = initSrcDbByServerId(-1);
                    if (beginTime != null) {
                        binlogInfo = getBinlogByTimestamp(beginTime.getTime() / 1000);
                    }
                } else {
                    this.currentSrcDbEntity = initSrcDbByServerId(binlogInfo.getServerId());

                    if (binlogInfo.getServerId() != currentSrcDbEntity.getServerId()) {
                        BinlogInfo oldBinlogInfo = binlogInfo;
                        binlogInfo = getBinlogByTimestamp(oldBinlogInfo.getTimestamp() - 60);
                        if (binlogInfo == null) {
                            throw new IOException("Switch Binlog Failed!");
                        } else {
                            Cat.logEvent("BinlogSwitch", taskName, Message.SUCCESS,
                                    oldBinlogInfo.toString() + " -> " + binlogInfo.toString());
                        }
                    }
                }

                updateTableMetaInfoFetcher();
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

                if (!connect()) {
                    throw new IOException("Connection failed.");
                }

                initConnect();

                initBinlogPosition(binlogInfo);

                if (dumpBinlog()) {
                    processBinlog();
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            } catch (Exception e) {
                if (++failCount % 3 == 0) {
                    this.currentSrcDbEntity = chooseNextSrcDb();
                    updateTableMetaInfoFetcher();
                    failCount = 0;
                }
                String msg = "Exception occurs. taskName: " + getTaskName() + " dbServerId: " + (currentSrcDbEntity == null ? 0 : currentSrcDbEntity.getServerId())
                        + ". Reconnect...";
                LOG.error(msg, e);
                Cat.logError(msg, e);

                Thread.sleep(((failCount % 10) + 1) * 2000);
            }
        } while (!isStop() && !Thread.currentThread().isInterrupted());

    }

    protected void doStop() throws Exception {
        LOG.info("TaskName: " + getTaskName() + ", Stopped.");
        closeTransport();
        SystemStatusManager.deleteServer(getTaskName());
    }

    //......

}
  • DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

getBinlogByTimestamp

代码语言:javascript
复制
    protected BinlogInfo getBinlogByTimestamp(long time) throws IOException {
        BinlogInfo binlogResult = null;
        Transaction t = Cat.newTransaction("BinlogFindByTime", taskName);

        Cat.logEvent("BinlogFindByTime.Time", String.valueOf(time));

        try {
            if (!connect()) {
                throw new IOException("Connection failed.");
            }
            initConnect();
            List<BinlogInfo> binaryLogs = getBinaryLogs();

            Cat.logEvent("BinlogFindByTime.BinaryLogs", currentSrcDbEntity.toString(), Message.SUCCESS, Joiner.on(",").join(binaryLogs));

            BinlogInfo closestBinlogInfo = null;

            for (int k = binaryLogs.size() - 1; k >= 0; k--) {
                if (binlogResult != null) {
                    break;
                }

                BinlogInfo newBinlogInfo = binaryLogs.get(k);

                Cat.logEvent("BinlogFindByTime.Start", newBinlogInfo.toString());

                getContext().setDBServerId(currentSrcDbEntity.getServerId());
                getContext().setBinlogFileName(newBinlogInfo.getBinlogFile());
                getContext().setBinlogStartPos(4);
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

                if (!connect()) {
                    throw new IOException("Connection failed.");
                }
                initConnect();

                if (dumpBinlog()) {
                    while (!isStop()) {
                        BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is,
                                PacketType.BINLOG_PACKET,
                                getContext());

                        if (!binlogPacket.isOk()) {
                            LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                            throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
                        } else {
                            BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

                            try {
                                getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());

                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    if (closestBinlogInfo == null) {
                                        break;
                                    } else {
                                        continue;
                                    }
                                }

                                if (binlogEvent.getHeader().getTimestamp() >= time) {
                                    if (closestBinlogInfo != null) {
                                        binlogResult = closestBinlogInfo;
                                    }
                                    break;
                                }

                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.XID_EVENT
                                        && binlogEvent.getHeader().getTimestamp() < time) {
                                    closestBinlogInfo = new BinlogInfo(
                                            currentSrcDbEntity.getServerId(),
                                            getContext().getBinlogFileName(),
                                            binlogEvent.getHeader().getNextPosition(),
                                            0, binlogEvent.getHeader().getTimestamp());
                                }
                            } finally {
                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    RotateEvent rotateEvent = (RotateEvent) binlogEvent;
                                    getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
                                    getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
                                } else {
                                    getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
                                }
                            }
                        }
                    }
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            }

            Cat.logEvent("BinlogFindByTime.Success", taskName, Message.SUCCESS,
                    time + " -> " + (binlogResult == null ? "null" : binlogResult.toString()));
            t.setStatus(Message.SUCCESS);
            t.complete();
            return binlogResult;
        } catch (IOException e) {
            t.setStatus(e);
            t.complete();
            throw e;
        }
    }
  • getBinlogByTimestamp方法先执行connect、initConnect,然后通过getBinaryLogs获取binaryLogs,之后遍历binaryLogs执行dumpBinlog,获取binlogEvent.getHeader().getTimestamp()大于等于指定time的BinlogInfo

connect

代码语言:javascript
复制
    private boolean connect() {
        try {
            closeTransport();
            this.mysqlSocket = new Socket();
            this.mysqlSocket.setTcpNoDelay(false);
            this.mysqlSocket.setKeepAlive(true);
            this.mysqlSocket.connect(new InetSocketAddress(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()));
            this.is = new BufferedInputStream(mysqlSocket.getInputStream());
            this.os = new BufferedOutputStream(mysqlSocket.getOutputStream());
            PacketFactory.parsePacket(is, PacketType.CONNECT_PACKET, getContext());

            LOG.info("TaskName: " + getTaskName() + ", Connection db success.");

            return true;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Connect failed. Reason: " + e.getMessage());

            return false;
        }
    }
  • connect方法先执行closeTransport,然后创建mysqlSocket进行connect

initConnect

代码语言:javascript
复制
    protected void initConnect() throws IOException {
        if (!auth()) {
            throw new IOException("Login failed.");
        }

        if (getContext().isCheckSum()) {
            if (!updateSetting()) {
                throw new IOException("Update setting command failed.");
            }
        }

        if (!queryBinlogFormat()) {
            throw new IOException("Query config binlogformat failed.");
        }
        if (!queryBinlogImage()) {
            throw new IOException("Query config binlog row image failed.");
        }

        if (queryServerId() != currentSrcDbEntity.getServerId()) {
            throw new IOException("Server Id Changed.");
        }
    }

    private boolean auth() {
        try {
            LOG.info("server logining taskName: " + getTaskName() + " host: " + currentSrcDbEntity.getHost() + " port: " + currentSrcDbEntity.getPort() + " username: "
                    + currentSrcDbEntity.getUsername() + " dbServerId: " + currentSrcDbEntity.getServerId());
            AuthenticatePacket authPacket = (AuthenticatePacket) PacketFactory.createCommandPacket(
                    PacketType.AUTHENTICATE_PACKET, getContext());

            authPacket.setPassword(currentSrcDbEntity.getPassword());
            authPacket.setUser(currentSrcDbEntity.getUsername());
            authPacket.buildPacket(getContext());
            authPacket.write(os, getContext());

            OKErrorPacket okErrorPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET,
                    getContext());
            boolean isAuth;

            if (okErrorPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Server login success.");
                isAuth = true;
            } else {
                isAuth = false;
                LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + okErrorPacket.getMessage());
            }

            return isAuth;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + e.getMessage());

            return false;
        }
    }

    private boolean queryBinlogFormat() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show global variables like 'binlog_format'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() != 2 || columnValues.get(1) == null) {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog format query result.");
                isQuery = false;
            }
            BinlogFormat binlogFormat = BinlogFormat.valuesOf(columnValues.get(1));
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            if (binlogFormat == null || !binlogFormat.isRow()) {
                isQuery = false;
                LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog format: " + binlogFormat.value);
            }

            Cat.logEvent("Slave.dbBinlogFormat", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlogformat is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }

    private boolean queryBinlogImage() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show variables like 'binlog_row_image'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() == 0) {// 5.1
                isQuery = true;
            } else if (columnValues != null && columnValues.size() == 2 && columnValues.get(1) != null) {// 5.6
                BinlogRowImage binlogRowImage = BinlogRowImage.valuesOf(columnValues.get(1));
                isQuery = true;
                if (binlogRowImage == null || !binlogRowImage.isFull()) {
                    isQuery = false;
                    LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog row image: " + binlogRowImage.value);
                }
            } else {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog row image query result.");
                isQuery = false;
            }
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogRowImage", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlog row image is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }
  • initConnect方法依次执行auth、queryBinlogFormat、queryBinlogImage方法;auth方法进行账号密码校验;queryBinlogFormat主要执行show global variables like 'binlog_format'命令;queryBinlogImage主要执行show variables like 'binlog_row_image'

initBinlogPosition

代码语言:javascript
复制
    protected void initBinlogPosition(BinlogInfo binlogInfo) throws IOException {
        if (binlogInfo == null) {
            List<BinlogInfo> binaryLogs = getBinaryLogs();
            BinlogInfo begin = beginTime == null ? binaryLogs.get(binaryLogs.size() - 1) : binaryLogs.get(0);
            binlogInfo = new BinlogInfo(currentSrcDbEntity.getServerId(), begin.getBinlogFile(), 4l, 0, begin.getTimestamp());
        }

        getContext().setDBServerId(currentSrcDbEntity.getServerId());
        getContext().setBinlogFileName(binlogInfo.getBinlogFile());
        getContext().setBinlogStartPos(binlogInfo.getBinlogPosition());
        setBinlogInfo(binlogInfo);

        SystemStatusManager.addServer(getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort(), tableSet);
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);
    }
  • initBinlogPosition主要是将binlogInfo信息设置到PumaContext中

dumpBinlog

代码语言:javascript
复制
    private boolean dumpBinlog() {
        try {
            ComBinlogDumpPacket dumpBinlogPacket = (ComBinlogDumpPacket) PacketFactory.createCommandPacket(
                    PacketType.COM_BINLOG_DUMP_PACKET, getContext());
            dumpBinlogPacket.setBinlogFileName(getContext().getBinlogFileName());
            dumpBinlogPacket.setBinlogFlag(0);
            dumpBinlogPacket.setBinlogPosition(getContext().getBinlogStartPos());
            dumpBinlogPacket.setServerId(getServerId());
            dumpBinlogPacket.buildPacket(getContext());

            dumpBinlogPacket.write(os, getContext());

            OKErrorPacket dumpCommandResultPacket = (OKErrorPacket) PacketFactory.parsePacket(is,
                    PacketType.OKERROR_PACKET, getContext());

            if (dumpCommandResultPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Dump binlog command success.");

                return true;
            } else {
                LOG.error("TaskName: " + getTaskName() + ", Dump binlog failed. Reason: "
                        + dumpCommandResultPacket.getMessage());

                return false;
            }
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + " Dump binlog failed. Reason: " + e.getMessage());

            return false;
        }

    }
  • dumpBinlog方法主要是发送COM_BINLOG_DUMP_PACKET

processBinlog

代码语言:javascript
复制
    private void processBinlog() throws IOException {
        while (!isStop()) {
            BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET,
                    getContext());

            if (!binlogPacket.isOk()) {
                LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
            } else {
                processBinlogPacket(binlogPacket);
            }
        }
    }

    protected void processBinlogPacket(BinlogPacket binlogPacket) throws IOException {
        BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

        if (merging) {
            if (binlogEvent.getHeader().getTimestamp() >= runUntilTimestamp) {
                stop();
            }
        }

        SystemStatusManager.incServerParsedCounter(getTaskName());

        if (binlogEvent.getHeader().getEventType() == BinlogConstants.INTVAR_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.RAND_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.USER_VAR_EVENT) {
            LOG.error("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support.");
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogFormat", eventName, "1", "");
            Cat.logError("Puma.server.mixedorstatement.format", new IllegalArgumentException("TaskName: "
                    + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support."));
            stopTask();
        }

        if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());
        }

        if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
            processRotateEvent(binlogEvent);
        } else {
            processDataEvent(binlogEvent);
        }
    }

    protected void processRotateEvent(BinlogEvent binlogEvent) {
        RotateEvent rotateEvent = (RotateEvent) binlogEvent;
        getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
        getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
    }

    protected void processDataEvent(BinlogEvent binlogEvent) {
        DataHandlerResult dataHandlerResult = null;
        // 一直处理一个binlogEvent的多行,处理完每行马上分发,以防止一个binlogEvent包含太多ChangedEvent而耗费太多内存
        int eventIndex = 0;
        do {
            dataHandlerResult = dataHandler.process(binlogEvent, getContext());
            if (dataHandlerResult != null && !dataHandlerResult.isEmpty()) {
                ChangedEvent changedEvent = dataHandlerResult.getData();

                changedEvent.getBinlogInfo().setEventIndex(eventIndex++);

                updateOpsCounter(changedEvent);

                dispatch(changedEvent);
            }
        } while (dataHandlerResult != null && !dataHandlerResult.isFinished());

        if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
            setBinlogInfo(new BinlogInfo(getBinlogInfo().getServerId(), getBinlogInfo().getBinlogFile(), binlogEvent
                    .getHeader().getNextPosition(), 0, 0));
        }

        BinlogInfo binlogInfo = new BinlogInfo(getContext().getDBServerId(), getContext()
                .getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp());
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);

        if (binlogEvent.getHeader().getNextPosition() != 0
                && StringUtils.isNotBlank(getContext().getBinlogFileName())
                && dataHandlerResult != null
                && !dataHandlerResult.isEmpty()
                && (dataHandlerResult.getData() instanceof DdlEvent || (dataHandlerResult.getData() instanceof RowChangedEvent && ((RowChangedEvent) dataHandlerResult
                .getData()).isTransactionCommit()))) {


            instanceStorageManager.setBinlogInfo(getTaskName(), binlogInfo);
        }
    }

- processBinlog方法循环接收binlogPacket,然后执行processBinlogPacket;该方法通过parser.parse获取binlogEvent,对于FORMAT_DESCRIPTION_EVENT,则更新binlogEvent.getHeader().getNextPosition()到context中;对于ROTATE_EVENT则执行processRotateEvent,否则执行processDataEvent;processRotateEvent主要是更新binlogFileName及binlogStartPos;processDataEvent则主要是通过dataHandler.process(binlogEvent, getContext())处理,然后执行dispatch(changedEvent)

closeTransport

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

代码语言:javascript
复制
    private void closeTransport() {
        // Close in.
        try {
            if (this.is != null) {
                this.is.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the input stream.");
        } finally {
            this.is = null;
        }

        // Close os.
        try {
            if (this.os != null) {
                this.os.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the output stream");
        } finally {
            this.os = null;
        }

        // Close socket.
        try {
            if (this.mysqlSocket != null) {
                this.mysqlSocket.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the socket", ioEx);
        } finally {
            this.mysqlSocket = null;
        }
    }
  • closeTransport主要是关闭InputStream、OutputStream及mysqlSocket

小结

DefaultTaskExecutor继承了AbstractTaskExecutor,其doStart方法通过instanceStorageManager.getBinlogInfo获取binlogInfo,若为null且beginTime不为null则从getBinlogByTimestamp获取binlogInfo,之后执行updateTableMetaInfoFetcher、connect、initConnect、initBinlogPosition、dumpBinlog、processBinlog方法;其doStop方法主要执行closeTransport、SystemStatusManager.deleteServer(getTaskName())方法

doc

  • DefaultTaskExecutor
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TaskExecutor
  • AbstractTaskExecutor
  • DefaultTaskExecutor
    • getBinlogByTimestamp
      • connect
        • initConnect
          • initBinlogPosition
            • dumpBinlog
              • processBinlog
                • closeTransport
                • 小结
                • doc
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档