前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊spring for kafka对producer的封装与集成

聊聊spring for kafka对producer的封装与集成

作者头像
code4it
发布2018-09-17 15:04:43
1.2K0
发布2018-09-17 15:04:43
举报
文章被收录于专栏:码匠的流水账

本文主要解析一下spring for apache kafka对原生的kafka client producer的封装与集成。

producer工厂

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

代码语言:javascript
复制
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
    @Override
    public void destroy() throws Exception { //NOSONAR
        CloseSafeProducer<K, V> producer = this.producer;
        this.producer = null;
        if (producer != null) {
            producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
        }
    }

    @Override
    public void start() {
        this.running = true;
    }

    @Override
    public void stop() {
        try {
            destroy();
        }
        catch (Exception e) {
            logger.error("Exception while stopping producer", e);
        }
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public Producer<K, V> createProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new CloseSafeProducer<K, V>(createKafkaProducer());
                }
            }
        }
        return this.producer;
    }
}

集成spring的第一步就是集成到spring容器托管,然后跟随spring容器的生命周期正常启动和销毁。这里创建了CloseSafeProducer,它实际的操作都委托给kafka producer

KafkaTemplate

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/KafkaTemplate.java 实现了如下接口

代码语言:javascript
复制
public interface KafkaOperations<K, V> {

    /**
     * Send the data to the default topic with no key or partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(V data);

    /**
     * Send the data to the default topic with the provided key and no partition.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

    /**
     * Send the data to the default topic with the provided key and partition.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);

    /**
     * Send the data to the provided topic with no key or partition.
     * @param topic the topic.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, V data);

    /**
     * Send the data to the provided topic with the provided key and no partition.
     * @param topic the topic.
     * @param key the key.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

    /**
     * Send the data to the provided topic with the provided partition and no key.
     * @param topic the topic.
     * @param partition the partition.
     * @param data The data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);

    /**
     * Send the data to the provided topic with the provided key and partition.
     * @param topic the topic.
     * @param partition the partition.
     * @param key the key.
     * @param data the data.
     * @return a Future for the {@link SendResult}.
     */
    ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);

    /**
     * Send a message with routing information in message headers. The message payload
     * may be converted before sending.
     * @param message the message to send.
     * @return a Future for the {@link SendResult}.
     * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
     * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
     * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
     */
    ListenableFuture<SendResult<K, V>> send(Message<?> message);

    /**
     * See {@link Producer#partitionsFor(String)}.
     * @param topic the topic.
     * @return the partition info.
     * @since 1.1
     */
    List<PartitionInfo> partitionsFor(String topic);

    /**
     * See {@link Producer#metrics()}.
     * @return the metrics.
     * @since 1.1
     */
    Map<MetricName, ? extends Metric> metrics();

    /**
     * Execute some arbitrary operation(s) on the producer and return the result.
     * @param callback the callback.
     * @param <T> the result type.
     * @return the result.
     * @since 1.1
     */
    <T> T execute(ProducerCallback<K, V, T> callback);

    /**
     * Flush the producer.
     */
    void flush();

    /**
     * A callback for executing arbitrary operations on the {@link Producer}.
     * @param <K> the key type.
     * @param <V> the value type.
     * @param <T> the return type.
     * @since 1.1
     */
    interface ProducerCallback<K, V, T> {

        T doInKafka(Producer<K, V> producer);

    }

}

主要的send方法如下,这就是spring对producer的主要包装的地方:

代码语言:javascript
复制
/**
     * Send the producer record.
     * @param producerRecord the producer record.
     * @return a Future for the {@link RecordMetadata}.
     */
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        final Producer<K, V> producer = getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        producer.send(producerRecord, new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult<>(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null
                                && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
                                    producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                        }
                    }
                    else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
                                    producerRecord.partition(),
                                    producerRecord.key(),
                                    producerRecord.value(),
                                    exception);
                        }
                    }
                }
                finally {
                    producer.close();
                }
            }

        });
        if (this.autoFlush) {
            flush();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }
        return future;
    }

不要被CloseSafeProducer的close方法误导,它里头是个空方法。

  • spring对send方法包装了一层之后,返回SettableListenableFuture,里头是个SendResult
  • 然后对异常也进行了一次包装,包装为spring定义的KafkaException
  • 支持了listener,同步调用
  • 内置MessagingMessageConverter
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-10-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • producer工厂
    • KafkaTemplate
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档