前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 底层通信机制 源码分析

RocketMQ 底层通信机制 源码分析

作者头像
java404
发布2018-12-17 10:37:31
9630
发布2018-12-17 10:37:31
举报
文章被收录于专栏:java 成神之路java 成神之路

概述

RocketMQ 底层通讯是使用Netty来实现的。 下面我们通过源码分析下RocketMQ是怎么利用Netty进行通讯的。

本文分析的是RocketMQ 最新版本 4.3.2版本。

RocketMQ 项目结构

首先来看下 RocketMQ 模块构成。

通过 RocketMQ 项目结构可以看出,RocketMQ 分了好多模块。 broker、client、filter、namesrv、remoting 等。

大家比较熟悉的几个模块对应的源码如下: Broker Master 和 Slave 对应的 broker 模块。 Producer 和 Consumer 对应的是 client 模块。 NameSerer 服务对应的是 namesrv 模块。

而各个服务之间的通讯则使用的 remoting 模块。

Remoting 模块

通过romoting 的模块结构大概了解,RocketMQ 通讯使用了Netty进行传输通讯。并在 org.apache.rocketmq.remoting.protocol 包中自定义了通讯协议。

通信模块主要接口和类

RemotingService 接口
代码语言:javascript
复制
public interface RemotingService {
    //开启服务
    void start();
    //关闭服务
    void shutdown();
    //注册 hook (可以在调用之前和调用之后做一些扩展处理)
    void registerRPCHook(RPCHook rpcHook);
}

RemotingService 定义了服务端和客户端都需要的三个接口。 registerRPCHook() 方法可以注册一个 hook。可以在远程通信之前和通信之后,执行用户自定的一些处理。类似前置处理器和后置处理器。

RPCHook 接口
代码语言:javascript
复制
public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

在启动服务之前,可以把自己实现的 RPCHook 注册到服务中,执行远程调用的时候处理一些业务逻辑。比如打印请求和响应的日志信息。

RemotingServer 和 RemotingClient 接口

RemotingServer 和 RemotingClient 接口都继承了RemotingService 接口,并扩展了自己特有的方法。

RemotingServer 接口
代码语言:javascript
复制
public interface RemotingServer extends RemotingService {

    //注册一个处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    //注册一个默认的处理器,当根据requestCode匹配不到处理器,则使用这个默认的处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    //获取端口
    int localListenPort();
    //根据requestCode获取请求处理器
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    //同步调用(同步发送消息)
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
    //异步调用(异步发送消息)
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //单向发送消息,只发送消息。不用处理发送的结果。
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}
  • 1、registerProcessor 方法 注册一个处理请求的处理器, 存放到 HashMap中,requestCode为 Map 的 key。 HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
  • 2、registerDefaultProcessor 方法 注册一个默认的处理器,当根据requestCode匹配不到处理器,则使用这个默认的处理器
  • 3、invokeSync 方法 以同步的方式向客户端发送消息。
  • 4、invokeAsync 方法 以异步的方式向客户端发送消息。
  • 5、invokeOneway 方法 只向客户端发送消息,而不处理客户端返回的消息。该方法只是向socket中写入数据,而不需要处理客户端返回的消息。
RemotingClient 接口
代码语言:javascript
复制
public interface RemotingClient extends RemotingService {
    //更新 NameServer 地址
    void updateNameServerAddressList(final List<String> addrs);
    //获取 NameServer 地址
    List<String> getNameServerAddressList();
    //同步调用(同步发送消息)
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;
    //异步调用(异步发送消息)
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    //单向发送消息,只发送消息。不用处理发送的结果。
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;
    //注册一个处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    //设置发送异步消息的线程池,如果不设置,则使用默认的
    void setCallbackExecutor(final ExecutorService callbackExecutor);
    //获取线程池
    ExecutorService getCallbackExecutor();
    //判断 channel 是否可写
    boolean isChannelWritable(final String addr);
}
  • 1、updateNameServerAddressList、getNameServerAddressList 方法 更新 NameServer 地址。 获取 NameServer 地址。
  • 2、invokeSync、invokeAsync、invokeOneway 方法 这三个方法参见 RemotingServer 接口中的方法。
  • 3、setCallbackExecutor 设置处理异步响应消息的线程池。

服务端和客户端的实现

  • NettyRemotingServer 类实现了RemotingServer 接口
  • NettyRemotingClient 类实现了RemotingClient接口

这两个类使用Netty 来实现服务端和客户端服务的。

NettyRemotingServer 解析

通过 NettyRemotingServer类中的start() 方法开启一个 Netty 的服务端。 代码如下:

代码语言:javascript
复制
    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                //编码
                                new NettyEncoder(),
                                //解码
                                new NettyDecoder(),
                                //心跳检测
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //连接管理handler,处理connect, disconnect, close等事件
                                new NettyConnectManageHandler(),
                                //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                                new NettyServerHandler()
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

从 start 方法中启动一个Netty 的服务端。

  • 通过设置的自定义的 NettyEncoder对发送的消息进行编码(序列化)。
  • 通过NettyDecoder 对接收的消息进行解码操作(反序列化)
  • 最后再把反序列化的对象交给 NettyServerHandler 进行处理。
NettyRemotingClient 解析

通过 NettyRemotingClient 类中的 start 方法开启一个 netty 客户端

代码语言:javascript
复制
@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        //发送消息编码
                        new NettyEncoder(),
                        //接收消息解码
                        new NettyDecoder(),
                        //心跳监测
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        //连接管理handler,处理connect, disconnect, close等事件
                        new NettyConnectManageHandler(),
                       //处理接收到RemotingCommand消息后的事件, 收到服务器端响应后的相关操作
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

从 start 方法中启动一个Netty 客户端服务。

  • 通过设置的自定义的 NettyEncoder对发送的消息进行编码(序列化)。
  • 通过NettyDecoder对接收的消息进行解码操作(反序列化)
  • 最后再把反序列化的对象交给 NettyServerHandler` 进行处理。

序列化反序列化

通过分析 RemotingServerRemotingClient 接口及实现可以发现,发送消息和接收到的消息都是 RemotingCommand 对象。 经过分析 NettyEncoderNettyDecoder 发现,序列化和反序列化调用的是 RemotingCommand 对象的 encodedecode 方法

消息格式
  • 第一部分是消息的长度,占用4个字节。等于第二、三、四部分长度的总和。
  • 第二部分是消息头的长度,占用4个字节。等于第三部分长度大小。
  • 第三部分是通过Json序列化的消息头的数据。
  • 第四部分是序列化的消息数据。

具体的消息格式我们通过 RemotingCommand类的 encodedecode 方法进行分析。

RemotingCommand.encode() 方法
代码语言:javascript
复制
    public ByteBuffer encode() {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }

1、定义消息头的长度为 length = 4 2、通过 this.headerEncode() 获取序列化的 header data。 3、然后申请一个长度为 length + header length + header data +body 大小的ByteBuffer。 ByteBuffer result = ByteBuffer.allocate(4 + length); 4、然后向 ByteBuffer result 中填充数据

headerEncode 方法

该方法主要是实现了消息头的序列化。

代码语言:javascript
复制
private byte[] headerEncode() {
        this.makeCustomHeaderToNet();
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

序列化消息头有两种方式SerializeType.ROCKETMQ 和 SerializeType.JSON。 如果是SerializeType.JSON方式序列化比较简单。

RemotingSerializable.encode 方法

SerializeType.JSON 类型序列化。

代码语言:javascript
复制
    public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

直接把对象转换成json字符串,然后转换成 byte[] 数组

RocketMQSerializable.rocketMQProtocolEncode 方法

SerializeType.ROCKETMQ 类型序列化。

代码语言:javascript
复制
    public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
        // String remark
        byte[] remarkBytes = null;
        int remarkLen = 0;
        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
            remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
            remarkLen = remarkBytes.length;
        }

        // HashMap<String, String> extFields
        byte[] extFieldsBytes = null;
        int extLen = 0;
        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
            extFieldsBytes = mapSerialize(cmd.getExtFields());
            extLen = extFieldsBytes.length;
        }

        int totalLen = calTotalLen(remarkLen, extLen);

        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
        // int code(~32767)
        headerBuffer.putShort((short) cmd.getCode());
        // LanguageCode language
        headerBuffer.put(cmd.getLanguage().getCode());
        // int version(~32767)
        headerBuffer.putShort((short) cmd.getVersion());
        // int opaque
        headerBuffer.putInt(cmd.getOpaque());
        // int flag
        headerBuffer.putInt(cmd.getFlag());
        // String remark
        if (remarkBytes != null) {
            headerBuffer.putInt(remarkBytes.length);
            headerBuffer.put(remarkBytes);
        } else {
            headerBuffer.putInt(0);
        }
        // HashMap<String, String> extFields;
        if (extFieldsBytes != null) {
            headerBuffer.putInt(extFieldsBytes.length);
            headerBuffer.put(extFieldsBytes);
        } else {
            headerBuffer.putInt(0);
        }

        return headerBuffer.array();
    }

可以看到 代码把 RemotingCommand 对象中的数据按照一定的顺序转换成字节存储到ByteBuffer 中。

从代码中可以看出消息头中包括,request code、请求端实现语言、版本等信息。

RemotingCommand.decode() 方法

代码语言:javascript
复制
    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

这里的byteBuffer中的数据包含 header length + header data +body data

为什么不是包含 length+header length + header data +body data呢? 因为netty在获取这条消息的时候是通过io.netty.handler.codec.LengthFieldBasedFrameDecoder进行拆包的。该拆包的原理就是通过 消息的 length长度进行拆分的。所以拆分出来的数据就是header length + header data +body data这部分。

1、从byteBuffer中获取header length 长度。 2、然后再通过header length 长度从 byteBuffer 获取 header data。 3、剩下的byteBuffer数据就是body的数据。 把解析出来的数据转换成 RemotingCommand 对象。

代码语言:javascript
复制
    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
        switch (type) {
            case JSON:
                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                resultJson.setSerializeTypeCurrentRPC(type);
                return resultJson;
            case ROCKETMQ:
                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                resultRMQ.setSerializeTypeCurrentRPC(type);
                return resultRMQ;
            default:
                break;
        }

        return null;
    }

判断该数据是通过 SerializeType.ROCKETMQ 序列化还是 SerializeType.JSON 序列化的。 然后根据类型进行反序列化操作。

RemotingSerializable.decode 方法

SerializeType.JSON 反序列化。

代码语言:javascript
复制
    public static <T> T decode(final byte[] data, Class<T> classOfT) {
        final String json = new String(data, CHARSET_UTF8);
        return fromJson(json, classOfT);
    }

直接把 json 数据反序列化成对象。

RocketMQSerializable.rocketMQProtocolDecode 方法

SerializeType.ROCKETMQ 反序列化。

代码语言:javascript
复制
    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
        RemotingCommand cmd = new RemotingCommand();
        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
        // int code(~32767)
        cmd.setCode(headerBuffer.getShort());
        // LanguageCode language
        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
        // int version(~32767)
        cmd.setVersion(headerBuffer.getShort());
        // int opaque
        cmd.setOpaque(headerBuffer.getInt());
        // int flag
        cmd.setFlag(headerBuffer.getInt());
        // String remark
        int remarkLength = headerBuffer.getInt();
        if (remarkLength > 0) {
            byte[] remarkContent = new byte[remarkLength];
            headerBuffer.get(remarkContent);
            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
        }

        // HashMap<String, String> extFields
        int extFieldsLength = headerBuffer.getInt();
        if (extFieldsLength > 0) {
            byte[] extFieldsBytes = new byte[extFieldsLength];
            headerBuffer.get(extFieldsBytes);
            cmd.setExtFields(mapDeserialize(extFieldsBytes));
        }
        return cmd;
    }

根据 encode 的顺序进行反序列化操作。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.11.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • RocketMQ 项目结构
  • Remoting 模块
  • 通信模块主要接口和类
    • RemotingService 接口
      • RPCHook 接口
        • RemotingServer 和 RemotingClient 接口
          • RemotingServer 接口
          • RemotingClient 接口
          • NettyRemotingServer 解析
          • NettyRemotingClient 解析
          • 消息格式
          • RemotingCommand.encode() 方法
          • headerEncode 方法
          • RemotingSerializable.encode 方法
          • RocketMQSerializable.rocketMQProtocolEncode 方法
          • RemotingSerializable.decode 方法
          • RocketMQSerializable.rocketMQProtocolDecode 方法
      • 服务端和客户端的实现
      • 序列化反序列化
      • RemotingCommand.decode() 方法
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档