前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的LitePullConsumer

聊聊rocketmq的LitePullConsumer

作者头像
code4it
发布2020-02-10 11:52:35
1.3K0
发布2020-02-10 11:52:35
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的LitePullConsumer

LitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java

代码语言:javascript
复制
public interface LitePullConsumer {

    void start() throws MQClientException;

    void shutdown();

    void subscribe(final String topic, final String subExpression) throws MQClientException;

    void subscribe(final String topic, final MessageSelector selector) throws MQClientException;

    void unsubscribe(final String topic);

    void assign(Collection<MessageQueue> messageQueues);

    List<MessageExt> poll();

    List<MessageExt> poll(long timeout);

    void seek(MessageQueue messageQueue, long offset) throws MQClientException;

    void pause(Collection<MessageQueue> messageQueues);

    void resume(Collection<MessageQueue> messageQueues);

    boolean isAutoCommit();

    void setAutoCommit(boolean autoCommit);

    Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException;

    Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;

    void commitSync();

    Long committed(MessageQueue messageQueue) throws MQClientException;

    void registerTopicMessageQueueChangeListener(String topic,
        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException;
}
  • LitePullConsumer接口定义了start、shutdown、subscribe、unsubscribe、assign、poll、seek、pause、resume、isAutoCommit、setAutoCommit、fetchMessageQueues、offsetForTimestamp、commitSync、committed、registerTopicMessageQueueChangeListener方法

DefaultLitePullConsumer

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java

代码语言:javascript
复制
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {

    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

    private String consumerGroup;

    private long brokerSuspendMaxTimeMillis = 1000 * 20;

    private long consumerTimeoutMillisWhenSuspend = 1000 * 30;

    private long consumerPullTimeoutMillis = 1000 * 10;

    private MessageModel messageModel = MessageModel.CLUSTERING;

    private MessageQueueListener messageQueueListener;

    private OffsetStore offsetStore;

    private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();

    private boolean unitMode = false;

    private boolean autoCommit = true;

    private int pullThreadNums = 20;

    private long autoCommitIntervalMillis = 5 * 1000;

    private int pullBatchSize = 10;

    private long pullThresholdForAll = 10000;

    private int consumeMaxSpan = 2000;

    private int pullThresholdForQueue = 1000;

    private int pullThresholdSizeForQueue = 100;

    private long pollTimeoutMillis = 1000 * 5;

    private long topicMetadataCheckIntervalMillis = 30 * 1000;

    public DefaultLitePullConsumer() {
        this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
    }

    public DefaultLitePullConsumer(final String consumerGroup) {
        this(null, consumerGroup, null);
    }

    public DefaultLitePullConsumer(RPCHook rpcHook) {
        this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
    }

    public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
        this(null, consumerGroup, rpcHook);
    }

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }

    @Override
    public void start() throws MQClientException {
        this.defaultLitePullConsumerImpl.start();
    }

    @Override
    public void shutdown() {
        this.defaultLitePullConsumerImpl.shutdown();
    }

    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }

    @Override
    public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
        this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
    }

    @Override
    public void unsubscribe(String topic) {
        this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
    }

    @Override
    public void assign(Collection<MessageQueue> messageQueues) {
        defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
    }

    @Override
    public List<MessageExt> poll() {
        return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
    }

    @Override
    public List<MessageExt> poll(long timeout) {
        return defaultLitePullConsumerImpl.poll(timeout);
    }

    @Override
    public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
        this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
    }

    @Override
    public void pause(Collection<MessageQueue> messageQueues) {
        this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
    }

    @Override
    public void resume(Collection<MessageQueue> messageQueues) {
        this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
    }

    @Override
    public Collection<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
        return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
    }

    @Override
    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException {
        return this.defaultLitePullConsumerImpl.searchOffset(queueWithNamespace(messageQueue), timestamp);
    }

    @Override
    public void registerTopicMessageQueueChangeListener(String topic,
        TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException {
        this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(withNamespace(topic), topicMessageQueueChangeListener);
    }

    @Override
    public void commitSync() {
        this.defaultLitePullConsumerImpl.commitSync();
    }

    @Override
    public Long committed(MessageQueue messageQueue) throws MQClientException {
        return this.defaultLitePullConsumerImpl.committed(messageQueue);
    }

    @Override
    public boolean isAutoCommit() {
        return autoCommit;
    }

    @Override
    public void setAutoCommit(boolean autoCommit) {
        this.autoCommit = autoCommit;
    }

    //......
}
  • DefaultLitePullConsumer继承了ClientConfig,实现了LitePullConsumer接口,其构造器会创建DefaultLitePullConsumerImpl,LitePullConsumer接口定义的方法,其内部实现都委托给了DefaultLitePullConsumerImpl

小结

rocketmq6.0引入了LitePullConsumer,解决Add lite pull consumer support for RocketMQ #1388,提供了如下功能:

代码语言:javascript
复制
(1) Support consume messages in subscribe way with auto rebalance.
(2) Support consume messages in assign way with no auto rebalance support.
(3) Add seek/commit offset for a specified message queue.

doc

  • Add lite pull consumer support for RocketMQ #1388
  • [[ISSUE #1388]Add lite pull consumer support for RocketMQ #1386](https://github.com/apache/rocketmq/pull/1386)
  • LitePullConsumer
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • LitePullConsumer
  • DefaultLitePullConsumer
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档