RocketMQ 底层通信机制 源码分析

概述

RocketMQ 底层通讯是使用Netty来实现的。 下面我们通过源码分析下RocketMQ是怎么利用Netty进行通讯的。

本文分析的是RocketMQ 最新版本 4.3.2版本。

RocketMQ 项目结构

首先来看下 RocketMQ 模块构成。

通过 RocketMQ 项目结构可以看出,RocketMQ 分了好多模块。 broker、client、filter、namesrv、remoting 等。

大家比较熟悉的几个模块对应的源码如下: Broker Master 和 Slave 对应的 broker 模块。 Producer 和 Consumer 对应的是 client 模块。 NameSerer 服务对应的是 namesrv 模块。

而各个服务之间的通讯则使用的 remoting 模块。

Remoting 模块

通过romoting 的模块结构大概了解,RocketMQ 通讯使用了Netty进行传输通讯。并在 org.apache.rocketmq.remoting.protocol 包中自定义了通讯协议。

通信模块主要接口和类

RemotingService 接口

public interface RemotingService {
    //开启服务
    void start();
    //关闭服务
    void shutdown();
    //注册 hook (可以在调用之前和调用之后做一些扩展处理)
    void registerRPCHook(RPCHook rpcHook);
}

RemotingService 定义了服务端和客户端都需要的三个接口。 registerRPCHook() 方法可以注册一个 hook。可以在远程通信之前和通信之后,执行用户自定的一些处理。类似前置处理器和后置处理器。

RPCHook 接口

public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

在启动服务之前,可以把自己实现的 RPCHook 注册到服务中,执行远程调用的时候处理一些业务逻辑。比如打印请求和响应的日志信息。

RemotingServer 和 RemotingClient 接口

RemotingServer 和 RemotingClient 接口都继承了RemotingService 接口,并扩展了自己特有的方法。

RemotingServer 接口

public interface RemotingServer extends RemotingService {

    //注册一个处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    //注册一个默认的处理器,当根据requestCode匹配不到处理器,则使用这个默认的处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    //获取端口
    int localListenPort();
    //根据requestCode获取请求处理器
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    //同步调用(同步发送消息)
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
    //异步调用(异步发送消息)
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //单向发送消息,只发送消息。不用处理发送的结果。
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}
  • 1、registerProcessor 方法 注册一个处理请求的处理器, 存放到 HashMap中,requestCode为 Map 的 key。 HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
  • 2、registerDefaultProcessor 方法 注册一个默认的处理器,当根据requestCode匹配不到处理器,则使用这个默认的处理器
  • 3、invokeSync 方法 以同步的方式向客户端发送消息。
  • 4、invokeAsync 方法 以异步的方式向客户端发送消息。
  • 5、invokeOneway 方法 只向客户端发送消息,而不处理客户端返回的消息。该方法只是向socket中写入数据,而不需要处理客户端返回的消息。

RemotingClient 接口

public interface RemotingClient extends RemotingService {
    //更新 NameServer 地址
    void updateNameServerAddressList(final List<String> addrs);
    //获取 NameServer 地址
    List<String> getNameServerAddressList();
    //同步调用(同步发送消息)
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;
    //异步调用(异步发送消息)
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //单向发送消息,只发送消息。不用处理发送的结果。
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;
    //注册一个处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    //设置发送异步消息的线程池,如果不设置,则使用默认的
    void setCallbackExecutor(final ExecutorService callbackExecutor);
    //获取线程池
    ExecutorService getCallbackExecutor();
    //判断 channel 是否可写
    boolean isChannelWritable(final String addr);
}
  • 1、updateNameServerAddressList、getNameServerAddressList 方法 更新 NameServer 地址。 获取 NameServer 地址。
  • 2、invokeSync、invokeAsync、invokeOneway 方法 这三个方法参见 RemotingServer 接口中的方法。
  • 3、setCallbackExecutor 设置处理异步响应消息的线程池。

服务端和客户端的实现

  • NettyRemotingServer 类实现了RemotingServer 接口
  • NettyRemotingClient 类实现了RemotingClient接口

这两个类使用Netty 来实现服务端和客户端服务的。

NettyRemotingServer 解析

通过 NettyRemotingServer类中的start() 方法开启一个 Netty 的服务端。 代码如下:

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                //编码
                                new NettyEncoder(),
                                //解码
                                new NettyDecoder(),
                                //心跳检测
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //连接管理handler,处理connect, disconnect, close等事件
                                new NettyConnectManageHandler(),
                                //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                                new NettyServerHandler()
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

从 start 方法中启动一个Netty 的服务端。

  • 通过设置的自定义的 NettyEncoder对发送的消息进行编码(序列化)。
  • 通过NettyDecoder 对接收的消息进行解码操作(反序列化)
  • 最后再把反序列化的对象交给 NettyServerHandler 进行处理。

NettyRemotingClient 解析

通过 NettyRemotingClient 类中的 start 方法开启一个 netty 客户端

@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        //发送消息编码
                        new NettyEncoder(),
                        //接收消息解码
                        new NettyDecoder(),
                        //心跳监测
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        //连接管理handler,处理connect, disconnect, close等事件
                        new NettyConnectManageHandler(),
                       //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

从 start 方法中启动一个Netty 客户端服务。

  • 通过设置的自定义的 NettyEncoder对发送的消息进行编码(序列化)。
  • 通过NettyDecoder对接收的消息进行解码操作(反序列化)
  • 最后再把反序列化的对象交给 NettyServerHandler` 进行处理。

序列化反序列化

通过分析 RemotingServerRemotingClient 接口及实现可以发现,发送消息和接收到的消息都是 RemotingCommand 对象。 经过分析 NettyEncoderNettyDecoder 发现,序列化和反序列化调用的是 RemotingCommand 对象的 encodedecode 方法

消息格式

  • 第一部分是消息的长度,占用4个字节。等于第二、三、四部分长度的总和。
  • 第二部分是消息头的长度,占用4个字节。等于第三部分长度大小。
  • 第三部分是通过Json序列化的消息头的数据。
  • 第四部分是序列化的消息数据。

具体的消息格式我们通过 RemotingCommand类的 encodedecode 方法进行分析。

RemotingCommand.encode() 方法

    public ByteBuffer encode() {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }

1、定义消息头的长度为 length = 4 2、通过 this.headerEncode() 获取序列化的 header data。 3、然后申请一个长度为 length + header length + header data +body 大小的ByteBuffer。 ByteBuffer result = ByteBuffer.allocate(4 + length); 4、然后向 ByteBuffer result 中填充数据

headerEncode 方法

该方法主要是实现了消息头的序列化。

private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

序列化消息头有两种方式SerializeType.ROCKETMQ 和 SerializeType.JSON。 如果是SerializeType.JSON方式序列化比较简单。

RemotingSerializable.encode 方法

SerializeType.JSON 类型序列化。

    public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

直接把对象转换成json字符串,然后转换成 byte[] 数组

RocketMQSerializable.rocketMQProtocolEncode 方法

SerializeType.ROCKETMQ 类型序列化。

    public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
        // String remark
        byte[] remarkBytes = null;
        int remarkLen = 0;
        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
            remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
            remarkLen = remarkBytes.length;
        }

        // HashMap<String, String> extFields
        byte[] extFieldsBytes = null;
        int extLen = 0;
        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
            extFieldsBytes = mapSerialize(cmd.getExtFields());
            extLen = extFieldsBytes.length;
        }

        int totalLen = calTotalLen(remarkLen, extLen);

        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
        // int code(~32767)
        headerBuffer.putShort((short) cmd.getCode());
        // LanguageCode language
        headerBuffer.put(cmd.getLanguage().getCode());
        // int version(~32767)
        headerBuffer.putShort((short) cmd.getVersion());
        // int opaque
        headerBuffer.putInt(cmd.getOpaque());
        // int flag
        headerBuffer.putInt(cmd.getFlag());
        // String remark
        if (remarkBytes != null) {
            headerBuffer.putInt(remarkBytes.length);
            headerBuffer.put(remarkBytes);
        } else {
            headerBuffer.putInt(0);
        }
        // HashMap<String, String> extFields;
        if (extFieldsBytes != null) {
            headerBuffer.putInt(extFieldsBytes.length);
            headerBuffer.put(extFieldsBytes);
        } else {
            headerBuffer.putInt(0);
        }

        return headerBuffer.array();
    }

可以看到 代码把 RemotingCommand 对象中的数据按照一定的顺序转换成字节存储到ByteBuffer 中。

从代码中可以看出消息头中包括,request code、请求端实现语言、版本等信息。

RemotingCommand.decode() 方法

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

这里的byteBuffer中的数据包含 header length + header data +body data

为什么不是包含 length+header length + header data +body data呢? 因为netty在获取这条消息的时候是通过io.netty.handler.codec.LengthFieldBasedFrameDecoder进行拆包的。该拆包的原理就是通过 消息的 length长度进行拆分的。所以拆分出来的数据就是header length + header data +body data这部分。

1、从byteBuffer中获取header length 长度。 2、然后再通过header length 长度从 byteBuffer 获取 header data。 3、剩下的byteBuffer数据就是body的数据。 把解析出来的数据转换成 RemotingCommand 对象。

    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
        switch (type) {
            case JSON:
                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                resultJson.setSerializeTypeCurrentRPC(type);
                return resultJson;
            case ROCKETMQ:
                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                resultRMQ.setSerializeTypeCurrentRPC(type);
                return resultRMQ;
            default:
                break;
        }

        return null;
    }

判断该数据是通过 SerializeType.ROCKETMQ 序列化还是 SerializeType.JSON 序列化的。 然后根据类型进行反序列化操作。

RemotingSerializable.decode 方法

SerializeType.JSON 反序列化。

    public static <T> T decode(final byte[] data, Class<T> classOfT) {
        final String json = new String(data, CHARSET_UTF8);
        return fromJson(json, classOfT);
    }

直接把 json 数据反序列化成对象。

RocketMQSerializable.rocketMQProtocolDecode 方法

SerializeType.ROCKETMQ 反序列化。

    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
        RemotingCommand cmd = new RemotingCommand();
        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
        // int code(~32767)
        cmd.setCode(headerBuffer.getShort());
        // LanguageCode language
        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
        // int version(~32767)
        cmd.setVersion(headerBuffer.getShort());
        // int opaque
        cmd.setOpaque(headerBuffer.getInt());
        // int flag
        cmd.setFlag(headerBuffer.getInt());
        // String remark
        int remarkLength = headerBuffer.getInt();
        if (remarkLength > 0) {
            byte[] remarkContent = new byte[remarkLength];
            headerBuffer.get(remarkContent);
            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
        }

        // HashMap<String, String> extFields
        int extFieldsLength = headerBuffer.getInt();
        if (extFieldsLength > 0) {
            byte[] extFieldsBytes = new byte[extFieldsLength];
            headerBuffer.get(extFieldsBytes);
            cmd.setExtFields(mapDeserialize(extFieldsBytes));
        }
        return cmd;
    }

根据 encode 的顺序进行反序列化操作。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏偏前端工程师的驿站

asp.net 解码gb2312下urlencode后的字符串

公司网站前期的网页用了gb2312保存用户数据,而我负责的部分用的是utf8,今天恰好要获取前期录入的数据于是毫无悬念地出现乱码问题,经过一番网上的搜索还是找不...

2095
来自专栏JavaEdge

高性能队列——Disruptor总论1 背景2 Java内置队列3 ArrayBlockingQueue的问题4 Disruptor的设计方案代码样例性能等待策略Log4j 2应用场景

这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列 Disruptor特性限于3.3.4

2073
来自专栏从零开始学自动化测试

python笔记9-多线程Threading之阻塞(join)和守护线程(setDaemon)

前言 今天小编YOYO请xiaoming和xiaowang吃火锅,吃完火锅的时候会有以下三种场景: - 场景一:小编(主)先吃完了,xiaoming(客)和xi...

3746
来自专栏JadePeng的技术博客

RPC框架原理与实现

RPC,全称 Remote Procedure Call(远程过程调用),即调用远程计算机上的服务,就像调用本地服务一样。那么RPC的原理是什么呢?了解一个技术...

7477
来自专栏你不就像风一样

深入理解[Master-Worker模式]原理与技术

Master-Worker模式是常用的并行模式之一。它的核心思想是,系统由两类进程协作工作:Master进程和Worker进程。Master进程负责接收和分配任...

2215
来自专栏Java编程技术

使用数据库悲观锁实现不可重入的分布式锁

在同一个jvm进程中时,可以使用JUC提供的一些锁来解决多个线程竞争同一个共享资源时候的线程安全问题,但是当多个不同机器上的不同jvm进程共同竞争同一个共享资源...

751
来自专栏积累沉淀

Java批处理

批处理 JDBC对批处理的操作,首先简单说一下JDBC操作sql语句的简单机制。 JDBC执行数据库操作语句,首先需要将sql语句打包成为网络字...

4415
来自专栏coolblog.xyz技术专栏

MyBatis 源码分析 - 缓存原理

在 Web 应用中,缓存是必不可少的组件。通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,减轻数据库压力。作为一个重...

1501
来自专栏JackieZheng

RabbitMQ入门-Routing直连模式

Hello World模式,告诉我们如何一对一发送和接收消息; Work模式,告诉我们如何多管齐下高效的消费消息; Publish/Subscribe模式,告...

27310
来自专栏大史住在大前端

webpack4.0各个击破(5)—— Module篇

使用webpack对脚本进行合并是非常方便的,因为webpack实现了对各种不同模块规范的兼容处理,对前端开发者来说,理解这种实现方式比学习如何配置webpac...

1272

扫码关注云+社区

领取腾讯云代金券