前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

原创
作者头像
用户2031163
发布2023-10-20 17:57:58
2450
发布2023-10-20 17:57:58
举报

RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析

RocketMQ的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是Netty服务器用来负责与客户端的连接请求处理,而Producer/Consumer端是Netty客户端用来负责与Netty服务器的通信及请求响应处理。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

代码语言:javascript
复制
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

1 Netty客户端/服务端运行

Netty客户端/服务端运行
Netty客户端/服务端运行

Netty客户端/服务端运行

1.1 Broker端Netty服务器

  • BrokerController引导启动Netty服务器

创建NettyRemotingServer对象并调用start()方法来启动Netty服务器

代码语言:javascript
复制
public class BrokerController {
    protected void initializeRemotingServer() {
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        ...
    }
    protected void startBasicService() throws Exception {
        ...
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        ...
    }
}
  • Netty服务端启动

这里就是我们熟悉的Netty的ServerBootstrap服务端引导类,通过设置EventLoopGroup然后绑定端口接着添加一系列的ChannelHandler启动服务器; 对于RocketMQ来说主要的Handler是NettyServerHandler这个处理类,它主要负责接收客户端请求并进行处理。

代码语言:javascript
复制
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    // sharable handlers
    private NettyServerHandler serverHandler;
    
    @Override
    public void start() {
        ...
        // 用于处理客户端请求命令
        serverHandler = new NettyServerHandler();
        ...
        serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                ...
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline()
                                ...
                                .addLast(defaultEventExecutorGroup, ..., serverHandler);
                    }
                });
        try {
            ChannelFuture sync = serverBootstrap.bind().sync();
            ...
        } catch (Exception e) {
            ...
        }
        ...
    }
}

1.2 Producer/Consumer端Netty客户端

Producer和Consumer的启动最终都会调用mQClientFactory.start()来创建Netty客户端

代码语言:javascript
复制
// Producer
public class DefaultMQProducerImpl implements MQProducerInner {
    
    private MQClientInstance mQClientFactory;
    
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
// Consumer
public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    private MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                ...
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                ...
                // 这里创建Netty客户端
                mQClientFactory.start();
                ...
            case RUNNING:
            ...
            default:
                break;
        }
        ...
    }
}
  • 创建Netty客户端对象

执行mQClientFactory.start() --> this.mQClientAPIImpl.start() --> this.remotingClient.start(),最终NettyRemotingClient对象调用start()方法来创建Netty客户端; Netty客户端由Bootstrap引导程序创建,之后请求/响应通过Netty客户端处理。

代码语言:javascript
复制
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    @Override
    public void start() {
        this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                ...
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ...
                        ch.pipeline().addLast(..., new NettyClientHandler());
                    }
                });
        ...
    }
}

2 消息的生产及保存

消息的生产及保存
消息的生产及保存

2.1 Producer生产消息到Broker

我们调用producer.send发送消息时,程序会使用RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE)把消息封装为自定义的通信协议RemotingCommand, 之后NettyRemotingClient会找到Broker地址并建立连接生成Channel对象调用writeAndFlush方法将请求(RemotingCommand)发送到Netty服务器

  • 使用producer.send发送同步消息
代码语言:javascript
复制
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.start();
        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        SendResult sendResult = producer.send(msg);
    }
}
  • 封装为RemotingCommand消息协议进一步执行发送逻辑
代码语言:javascript
复制
public class MQClientAPIImpl implements NameServerUpdateCallback {
    public SendResult sendMessage(final Message msg, final SendMessageRequestHeader requestHeader, ...) {
        // 构建请求命令 RequestCode.SEND_MESSAGE
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        request.setBody(msg.getBody());
        switch (communicationMode) {
            ...
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            ...
        }
        return null;
    }
}
  • 调用与服务端建立的Channel的writeAndFlush方法将请求RemotingCommand发送到Broker
代码语言:javascript
复制
public abstract class NettyRemotingAbstract {
    public RemotingCommand invokeSyncImpl(Channel channel, RemotingCommand request) {
        ...
        try {
            ...
            // 向服务端Broker通道发送消息
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                ...
            });
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            ...
            return responseCommand;
        } 
        ...
    }
}

2.2 Broker接收消息并保存

在开始介绍说Broker中NettyServer的启动时会添加NettyServerHandler一个处理器,这个handler负责处理client发过来的请求指令。

上面当客户端Producer发送RemotingCommand(RequestCode.SEND_MESSAGE)这个指令的请求时,Broker收到请求后通过RequestCode 找到对应的SendMessageProcessor处理器执行processRequest方法去处理消息接收后的逻辑。

  • SendMessageProcessor处理器来接收消息调用MessageStore保存消息
代码语言:javascript
复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        ...
        response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
        ...
    }

    public RemotingCommand sendMessage(..., final RemotingCommand request, ...) throws RemotingCommandException {
        ...
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        ...
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        ...
    }
}
  • MessageStore保存消息到CommitLog中
代码语言:javascript
复制
public class DefaultMessageStore implements MessageStore {
    
    @Override
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        return waitForPutResult(asyncPutMessages(messageExtBatch));
    }

    @Override
    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
        ...
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
        ...
    }
}
public class CommitLog implements Swappable {
    public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
        ...
        result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
        ...
    }
}

3 消息的拉取及消费

消息的拉取及消费
消息的拉取及消费

3.1 Consumer向Broker发送拉取请求

创建Consumer实例,订阅Topic并注册MessageListener后调用start方法启动程序; 接着开启一个PullMessageService任务去向Broker发送消息拉取请求,通过DefaultMQPushConsumerImpl.pullMessage方法设置请求回调逻辑(如:获取到消息则使用MessageListener去消费), 接着继续执行MQClientAPIImpl().pullMessage将请求信息封装为RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE),最终由NettyRemotingClient发送请求到Broker。

  • 创建DefaultMQPushConsumer调用start()启动消费者程序,PullMessageService任务开启
代码语言:javascript
复制
public class Consumer {
    
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.subscribe(TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

public class MQClientInstance {

    public void start() throws MQClientException {
        ...
        // Start pull service
        this.pullMessageService.start();
        ...
    }
}
  • 这里会设置回调逻辑等并继续调用底层实现发起请求,开始注册的MessageListener会在这里调用的
代码语言:javascript
复制
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    
    public void pullMessage(final PullRequest pullRequest) {
        ...
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    switch (pullResult.getPullStatus()) {
                        // 获取到消息
                        case FOUND:
                            ...
                            // 这个里面会执行注册的MessageListener
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), ...);
                            ...
                            // 继续拉取消息
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            ...
                        ...
                    }
                }
            }
            ...
        };
        ...
        try {
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), ..., pullRequest.getNextOffset(), ..., pullCallback);
        } catch (Exception e) {
            ...
        }
    }
}

上面执行到pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl().pullMessage来封装请求报文最终交由NettyRemotingClient.invokeSyncImpl真正发出请求。

  • 封装请求报文RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE)
代码语言:javascript
复制
public class MQClientAPIImpl implements NameServerUpdateCallback {
    public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
        ...
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        ...
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        ...
    }
}
  • RemotingClient调用channel.writeAndFlush(request)发出拉取请求
代码语言:javascript
复制
public abstract class NettyRemotingAbstract {
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
        ...
        try {
            ...
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                ...
            });
            ...
        } 
        ...
    }
}

3.2 Broker接收拉取请求匹配返回消息

同样Broker中NettyServerHandler收到RemotingCommand(RequestCode.PULL_MESSAGE)这个指令找到PullMessageProcessor调用processRequest方法去处理。

  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
代码语言:javascript
复制
public class PullMessageProcessor implements NettyRequestProcessor {
    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, ...) {
        ...
        messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter)
                .thenApply(result -> {
                    ...
                })
                // 写回客户端
                .thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
        ...
    }
}
  • PullMessageProcessor.processRequest处理时,调用messageStore.getMessageAsync去队列里查找消息,之后写回客户端
代码语言:javascript
复制
public class DefaultMessageStore implements MessageStore {
    @Override
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, ...) {
        ...
        GetMessageResult getResult = new GetMessageResult();
        ...
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
        ...
        return getResult;
    }
}
  • 从commitLog中获取消息
代码语言:javascript
复制
public class CommitLog implements Swappable {
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(pos, size);
            if (null != selectMappedBufferResult) {
                selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
                return selectMappedBufferResult;
            }
        }
        return null;
    }
}

最后

至此我们把RocketMQ中Broker与生产者/消费者基于Netty简单的通信调用链路讲完了,大家有什么问题可以下面留言哦,一起学习进步啊。

Tip:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。

代码语言:javascript
复制
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
    • 1 Netty客户端/服务端运行
      • 1.1 Broker端Netty服务器
      • 1.2 Producer/Consumer端Netty客户端
    • 2 消息的生产及保存
      • 2.1 Producer生产消息到Broker
      • 2.2 Broker接收消息并保存
    • 3 消息的拉取及消费
      • 3.1 Consumer向Broker发送拉取请求
      • 3.2 Broker接收拉取请求匹配返回消息
    • 最后
    相关产品与服务
    消息队列 TDMQ
    消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档