前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty协议-Rocket MQ之NettyRemotingClient/Server

Netty协议-Rocket MQ之NettyRemotingClient/Server

作者头像
关忆北.
发布2022-05-05 15:10:36
3810
发布2022-05-05 15:10:36
举报
文章被收录于专栏:关忆北.关忆北.

RocketMQ性能强劲依赖于Netty通讯协议和特定的通讯协议。

通信流程

image-20220323165900443
image-20220323165900443

NettyRemotingAbstract类的内部有一个NettyServerHandler内部类,该类中封装了客户端与服务端交互的基础代码。

processMessageReceived根据cmd类型处理msg信息

代码语言:javascript
复制
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

processResponseCommand

代码语言:javascript
复制
 /**
     * Process incoming request command issued by remote peer.
     *
     * @param ctx channel handler context.
     * @param cmd request command.
     */
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

                final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        //解析远程(服务端)channel
                        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        //RPC调用前执行钩子函数
                        doBeforeRpcHooks(remoteAddr, cmd);
                        final RemotingResponseCallback callback = new RemotingResponseCallback() {
                            @Override
                            public void callback(RemotingCommand response) {
                                doAfterRpcHooks(remoteAddr, cmd, response);
                                //判断是否为单向通信
                                if (!cmd.isOnewayRPC()) {
                                    if (response != null) {
                                        response.setOpaque(opaque);
                                        response.markResponseType();
                                        try {
                                            //消息追加到内存后刷入硬盘中
                                            ctx.writeAndFlush(response);
                                        } catch (Throwable e) {
                                        }
                                    } else {
                                    }
                                }
                            }
                        };
                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                            //异步处理请求
                            processor.asyncProcessRequest(ctx, cmd, callback);
                        } else {
                            NettyRequestProcessor processor = pair.getObject1();
                            RemotingCommand response = processor.processRequest(ctx, cmd);
                            callback.callback(response);
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };


            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                if (!cmd.isOnewayRPC()) {
                    //非oneWay方式消息发送,构造发送消息响应信息
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    
    ...
    }

processResponseCommand

代码语言:javascript
复制
/**
 * Process response from remote peer to the previous issued requests.
 *
 * @param ctx channel handler context.
 * @param cmd response command instance.
 */
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    //opaque被赋值为requestId
    final int opaque = cmd.getOpaque();
    //从map缓存获取正在进行的其中一个请求
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);
        //移除本次请求
        responseTable.remove(opaque);

        if (responseFuture.getInvokeCallback() != null) {
            //执行回调(排序、执行完成)
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}
代码语言:javascript
复制
 protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
 new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
 protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
 new ConcurrentHashMap<Integer, ResponseFuture>(256);
 

processorTable :记录全部请求、以及处理器。 responseTable记录所有响应。

Rocket MQ消息交互流程
Rocket MQ消息交互流程

更清晰的文件请下载: 点我下载

通讯协议

RocketMQ自定义的私有协议栈都是基于TCP/IP协议,使用Netty的NIO TCP协议栈进行私有协议栈的定制和开发。

RocketMQ协议共分为四个部分:

  • Header data:协议头,数据是序列化【fastjosn】后的json,json的每个key字段都是固定的,
  • body data:请求的二进制实际数据,例如发送消息的网络请求中,Body传输实际的消息内容。
  • Length:消息总长度
  • Header length:序列化类型&消息头长度,第一个字节表示序列化类型,后面三个自己表示消息头长度。

RemotingCommand类封装了通讯消息、编码、解码方式,这些组成了RocketMQ的通讯协议。

代码语言:javascript
复制
//根据自定义协议解析消息头、消息体
    public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        //获取头部报文长度
        int headerLength = getHeaderLength(oriHeaderLen);  
//获取头部报文数据
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    //反序列化解析Header data和RemotingCommand类
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    //获取body长度
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        //获取Body数据
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }

    cmd.body = bodyData;

    return cmd;
}

//根据自定义协议编码消息头、消息体
    public ByteBuffer encode() {
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        result.flip();

        return result;
    }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档