前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊MaxwellKafkaProducer

聊聊MaxwellKafkaProducer

原创
作者头像
code4it
修改2020-05-06 15:34:55
2910
修改2020-05-06 15:34:55
举报

本文主要研究一下MaxwellKafkaProducer

MaxwellKafkaProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

public class MaxwellKafkaProducer extends AbstractProducer {
    private final ArrayBlockingQueue<RowMap> queue;
    private final MaxwellKafkaProducerWorker worker;
​
    public MaxwellKafkaProducer(MaxwellContext context, Properties kafkaProperties, String kafkaTopic) {
        super(context);
        this.queue = new ArrayBlockingQueue<>(100);
        this.worker = new MaxwellKafkaProducerWorker(context, kafkaProperties, kafkaTopic, this.queue);
        Thread thread = new Thread(this.worker, "maxwell-kafka-worker");
        thread.setDaemon(true);
        thread.start();
    }
​
    @Override
    public void push(RowMap r) throws Exception {
        this.queue.put(r);
    }
​
    @Override
    public StoppableTask getStoppableTask() {
        return this.worker;
    }
​
    @Override
    public KafkaProducerDiagnostic getDiagnostic() {
        return new KafkaProducerDiagnostic(worker, context.getConfig(), context.getPositionStoreThread());
    }
}
  • MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据

MaxwellKafkaProducerWorker

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/MaxwellKafkaProducer.java

class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
​
    private final Producer<String, String> kafka;
    private final String topic;
    private final String ddlTopic;
    private final MaxwellKafkaPartitioner partitioner;
    private final MaxwellKafkaPartitioner ddlPartitioner;
    private final KeyFormat keyFormat;
    private final boolean interpolateTopic;
    private final ArrayBlockingQueue<RowMap> queue;
    private Thread thread;
    private StoppableTaskState taskState;
    private String deadLetterTopic;
    private final ConcurrentLinkedQueue<Pair<ProducerRecord<String,String>, KafkaCallback>> deadLetterQueue;
​
    public static MaxwellKafkaPartitioner makeDDLPartitioner(String partitionHashFunc, String partitionKey) {
        if ( partitionKey.equals("table") ) {
            return new MaxwellKafkaPartitioner(partitionHashFunc, "table", null, "database");
        } else {
            return new MaxwellKafkaPartitioner(partitionHashFunc, "database", null, null);
        }
    }
​
    public MaxwellKafkaProducerWorker(MaxwellContext context, String kafkaTopic, ArrayBlockingQueue<RowMap> queue,
        Producer<String,String> producer)
    {
        super(context);
​
        if ( kafkaTopic == null ) {
            this.topic = "maxwell";
        } else {
            this.topic = kafkaTopic;
        }
​
        this.interpolateTopic = this.topic.contains("%{");
        this.kafka = producer;
​
        String hash = context.getConfig().kafkaPartitionHash;
        String partitionKey = context.getConfig().producerPartitionKey;
        String partitionColumns = context.getConfig().producerPartitionColumns;
        String partitionFallback = context.getConfig().producerPartitionFallback;
        this.partitioner = new MaxwellKafkaPartitioner(hash, partitionKey, partitionColumns, partitionFallback);
​
        this.ddlPartitioner = makeDDLPartitioner(hash, partitionKey);
        this.ddlTopic =  context.getConfig().ddlKafkaTopic;
        this.deadLetterTopic = context.getConfig().deadLetterTopic;
        this.deadLetterQueue = new ConcurrentLinkedQueue<>();
​
        if ( context.getConfig().kafkaKeyFormat.equals("hash") )
            keyFormat = KeyFormat.HASH;
        else
            keyFormat = KeyFormat.ARRAY;
​
        this.queue = queue;
        this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
    }
​
    public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic,
        ArrayBlockingQueue<RowMap> queue)
    {
        this(context, kafkaTopic, queue,
            new KafkaProducer<String,String>(kafkaProperties, new StringSerializer(), new StringSerializer()));
    }
​
    @Override
    public void run() {
        this.thread = Thread.currentThread();
        while ( true ) {
            try {
                drainDeadLetterQueue();
                RowMap row = queue.take();
                if (!taskState.isRunning()) {
                    taskState.stopped();
                    return;
                }
                this.push(row);
            } catch ( Exception e ) {
                taskState.stopped();
                context.terminate(e);
                return;
            }
        }
    }
​
    void drainDeadLetterQueue() {
        Pair<ProducerRecord<String, String>, KafkaCallback> pair;
        while ((pair = deadLetterQueue.poll()) != null) {
            sendAsync(pair.getLeft(), pair.getRight());
        }
    }
​
    //......
​
}
  • MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

AbstractAsyncProducer

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java

public abstract class AbstractAsyncProducer extends AbstractProducer {
​
    public class CallbackCompleter {
        private InflightMessageList inflightMessages;
        private final MaxwellContext context;
        private final MaxwellConfig config;
        private final Position position;
        private final boolean isTXCommit;
        private final long messageID;
​
        public CallbackCompleter(InflightMessageList inflightMessages, Position position, boolean isTXCommit, MaxwellContext context, long messageID) {
            this.inflightMessages = inflightMessages;
            this.context = context;
            this.config = context.getConfig();
            this.position = position;
            this.isTXCommit = isTXCommit;
            this.messageID = messageID;
        }
​
        public void markCompleted() {
            inflightMessages.freeSlot(messageID);
            if(isTXCommit) {
                InflightMessageList.InflightMessage message = inflightMessages.completeMessage(position);
​
                if (message != null) {
                    context.setPosition(message.position);
                    long currentTime = System.currentTimeMillis();
                    long age = currentTime - message.sendTimeMS;
​
                    messagePublishTimer.update(age, TimeUnit.MILLISECONDS);
                    messageLatencyTimer.update(Math.max(0L, currentTime - message.eventTimeMS - 500L), TimeUnit.MILLISECONDS);
​
                    if (age > config.metricsAgeSlo) {
                        messageLatencySloViolationCount.inc();
                    }
                }
            }
        }
    }
​
    private InflightMessageList inflightMessages;
​
    public AbstractAsyncProducer(MaxwellContext context) {
        super(context);
​
        this.inflightMessages = new InflightMessageList(context);
​
        Metrics metrics = context.getMetrics();
        String gaugeName = metrics.metricName("inflightmessages", "count");
        metrics.register(gaugeName, (Gauge<Long>) () -> (long) inflightMessages.size());
    }
​
    public abstract void sendAsync(RowMap r, CallbackCompleter cc) throws Exception;
​
    @Override
    public final void push(RowMap r) throws Exception {
        Position position = r.getNextPosition();
        // Rows that do not get sent to a target will be automatically marked as complete.
        // We will attempt to commit a checkpoint up to the current row.
        if(!r.shouldOutput(outputConfig)) {
            if ( position != null ) {
                inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);
​
                InflightMessageList.InflightMessage completed = inflightMessages.completeMessage(position);
                if (completed != null) {
                    context.setPosition(completed.position);
                }
            }
            return;
        }
​
        // back-pressure from slow producers
​
        long messageID = inflightMessages.waitForSlot();
​
        if(r.isTXCommit()) {
            inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
        }
​
        CallbackCompleter cc = new CallbackCompleter(inflightMessages, position, r.isTXCommit(), context, messageID);
​
        sendAsync(r, cc);
    }
}
  • AbstractAsyncProducer继承了AbstractProducer,其push方法主要执行inflightMessages.addMessage及sendAsync

小结

MaxwellKafkaProducer继承了AbstractProducer,其构造器会创建ArrayBlockingQueue、ArrayBlockingQueue;其push方法则往queue中put数据;MaxwellKafkaProducerWorker继承了AbstractAsyncProducer,实现了Runnable及StoppableTask接口;其run方法使用while循环,不断执行drainDeadLetterQueue、queue.take()、this.push(row);drainDeadLetterQueue方法从deadLetterQueue拉取数据,然后通过sendAsync再次发送

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MaxwellKafkaProducer
  • MaxwellKafkaProducerWorker
  • AbstractAsyncProducer
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档