前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊puma的Sender

聊聊puma的Sender

原创
作者头像
code4it
修改2020-06-08 14:25:57
4100
修改2020-06-08 14:25:57
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下puma的Sender

Sender

puma/puma/src/main/java/com/dianping/puma/sender/Sender.java

public interface Sender extends LifeCycle {
​
    String getName();
​
    void send(ChangedEvent event, PumaContext context) throws SenderException;
}
  • Sender定义了getName、send方法

AbstractSender

puma/puma/src/main/java/com/dianping/puma/sender/AbstractSender.java

public abstract class AbstractSender implements Sender {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractSender.class);
​
    private String name;
​
    private int maxTryTimes = 3;
​
    private boolean canMissEvent = false;
​
    private volatile boolean stopped = true;
​
    private final String MSG_SKIP = "[Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";
​
    private final String MSG_LOOP_FAILED = "[Can't Miss]Send event failed for %d times. [servername=%s; current binlogfile=%s; current binlogpos=%d; next binlogpos=%d] ";
​
    /**
     * @return the stop
     */
    public boolean isStop() {
        return stopped;
    }
​
    /**
     * @return the maxTryTimes
     */
    public int getMaxTryTimes() {
        return maxTryTimes;
    }
​
    /**
     * @param maxTryTimes the maxTryTimes to set
     */
    public void setMaxTryTimes(int maxTryTimes) {
        this.maxTryTimes = maxTryTimes;
    }
​
    /**
     * @return the canMissEvent
     */
    public boolean isCanMissEvent() {
        return canMissEvent;
    }
​
    /**
     * @param canMissEvent the canMissEvent to set
     */
    public void setCanMissEvent(boolean canMissEvent) {
        this.canMissEvent = canMissEvent;
    }
​
    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#start()
     */
    @Override
    public void start() {
        stopped = false;
    }
​
    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#stop()
     */
    @Override
    public void stop() {
        stopped = true;
    }
​
    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.sender.Sender#getName()
     */
    @Override
    public String getName() {
        return name;
    }
​
    public void setName(String name) {
        this.name = name;
    }
​
    @Override
    public void send(ChangedEvent event, PumaContext context) throws SenderException {
        long retryCount = 0;
​
        while (true) {
            if (isStop()) {
                break;
            }
​
            try {
                doSend(event, context);
                break;
            } catch (Exception e) {
                LOG.error("Send error!", e);
​
                if (retryCount++ > maxTryTimes) {
                    if (canMissEvent) {
                        LOG.error(String.format(MSG_SKIP, maxTryTimes, context.getPumaServerName(),
                                context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
                        return;
                    } else {
                        if (retryCount % 100 == 0) {
                            LOG.error(String.format(MSG_LOOP_FAILED, maxTryTimes, context.getPumaServerName(),
                                    context.getBinlogFileName(), context.getBinlogStartPos(), context.getNextBinlogPos()));
                        }
                    }
                }
​
                try {
                    Thread.sleep(((retryCount % 15) + 1) * 300);
                } catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                    throw new SenderException("Interrupted", e1);
                }
            }
        }
    }
​
    protected abstract void doSend(ChangedEvent event, PumaContext context) throws SenderException;
}
  • AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

FileDumpSender

puma/puma/src/main/java/com/dianping/puma/sender/FileDumpSender.java

public class FileDumpSender extends AbstractSender {
    private Map<String, WriteChannel> writeChannels = new ConcurrentHashMap<String, WriteChannel>();
​
    private ChangedEvent transactionBegin;
​
    private EventFilterChain storageEventFilterChain;
​
    @Override
    public void start() {
        super.start();
    }
​
    @Override
    public void stop() {
        for (WriteChannel channel : writeChannels.values()) {
            channel.stop();
        }
        super.stop();
    }
​
    @Override
    protected void doSend(ChangedEvent event, PumaContext context) throws SenderException {
        // Storage filter.
        storageEventFilterChain.reset();
        if (!storageEventFilterChain.doNext(event)) {
            return;
        }
​
        try {
            String database = event.getDatabase();
​
            if (database != null && database.length() > 0) {
                WriteChannel writeChannel = this.writeChannels.get(database);
​
                if (writeChannel == null) {
                    writeChannel = buildEventStorage(database);
                    this.writeChannels.put(database, writeChannel);
                }
​
                boolean isTransactionBegin = false;
​
                if (event instanceof RowChangedEvent) {
                    isTransactionBegin = ((RowChangedEvent) event).isTransactionBegin();
                }
​
                if (transactionBegin != null && !isTransactionBegin) {
                    //readChannel.store(transactionBegin);
                    transactionBegin = null;
                }
​
​
                writeChannel.append(event);
            } else {
                if (event instanceof RowChangedEvent) {
                    if (((RowChangedEvent) event).isTransactionBegin()) {
                        transactionBegin = event;
                    } else {
                        Cat.logEvent("Puma", "RowChangeEvent-Has-No-Database");
                        LOG.error(String.format("RowChangeEvent[%s] has no database", event.toString()));
                    }
                } else {
                    Cat.logEvent("Puma", "ChangeEvent-Has-No-Database");
                    LOG.error(String.format("ChangeEvent[%s] has no database", event.toString()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
​
    private WriteChannel buildEventStorage(String database) {
        WriteChannel writeChannel = ChannelFactory.newWriteChannel(database);
        writeChannel.start();
        return writeChannel;
    }
​
    public void setStorageEventFilterChain(EventFilterChain storageEventFilterChain) {
        this.storageEventFilterChain = storageEventFilterChain;
    }
}
  • FileDumpSender继承了AbstractSender,它定义了writeChannels属性,其stop方法会遍历writeChannels,挨个执行channel.stop();其doSend方法先执行storageEventFilterChain.doNext(event),返回false的话则直接返回,之后获取或者创建指定database的writeChannel,执行writeChannel.append(event);对于database为null的则判断是否是RowChangedEvent,是的话,在RowChangedEvent的isTransactionBegin时设置transactionBegin

小结

Sender定义了getName、send方法;AbstractSender声明实现了Sender接口,其send方法通过while循环执行doSend(event, context)方法,出现Exception时,在retryCount没有大于maxTryTimes时则sleep((retryCount % 15) + 1) * 300之后再次重试

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Sender
  • AbstractSender
  • FileDumpSender
  • 小结
  • doc
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档