前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊sentinel的NettyHttpCommandCenter

聊聊sentinel的NettyHttpCommandCenter

作者头像
code4it
发布2018-09-17 17:21:10
6430
发布2018-09-17 17:21:10
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下sentinel的NettyHttpCommandCenter

NettyHttpCommandCenter

com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.java

代码语言:javascript
复制
public class NettyHttpCommandCenter implements CommandCenter {

    private final HttpServer server = new HttpServer();

    private final ExecutorService pool = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-netty-command-center-executor"));

    @Override
    public void start() throws Exception {
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    server.start();
                } catch (Exception ex) {
                    RecordLog.info("Start netty server error", ex);
                    ex.printStackTrace();
                    System.exit(-1);
                }
            }
        });
    }

    @Override
    public void stop() throws Exception {
        server.close();
        pool.shutdownNow();
    }

    @Override
    public void beforeStart() throws Exception {
        // Register handlers
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        server.registerCommands(handlers);
    }
}
  • 这里特意newSingleThreadExecutor,用来异步启动HttpServer
  • 在启动之前调用CommandHandlerProvider.getInstance().namedHandlers(),触发收集命令及handler
  • 然后调用server.registerCommands(handlers)来注册这些handler

HttpServer

com/alibaba/csp/sentinel/transport/command/netty/HttpServer.java

代码语言:javascript
复制
public final class HttpServer {

    private static final int DEFAULT_PORT = 8719;

    private Channel channel;

    final static Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new HttpServerInitializer());
            int port;
            try {
                if (StringUtil.isEmpty(TransportConfig.getPort())) {
                    CommandCenterLog.info("Port not configured, using default port: " + DEFAULT_PORT);
                    port = DEFAULT_PORT;
                } else {
                    port = Integer.parseInt(TransportConfig.getPort());
                }
            } catch (Exception e) {
                throw new IllegalArgumentException("Illegal port: " + TransportConfig.getPort());
            }
            channel = b.bind(port).sync().channel();
            channel.closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public void close() {
        channel.close();
    }

    public void registerCommand(String commandName, CommandHandler handler) {
        if (StringUtil.isEmpty(commandName) || handler == null) {
            return;
        }

        if (handlerMap.containsKey(commandName)) {
            CommandCenterLog.info("Register failed (duplicate command): " + commandName);
            return;
        }

        handlerMap.put(commandName, handler);
    }

    public void registerCommands(Map<String, CommandHandler> handlerMap) {
        if (handlerMap != null) {
            for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {
                registerCommand(e.getKey(), e.getValue());
            }
        }
    }
}
  • 这里使用netty构造nio server,处理handler为HttpServerInitializer

HttpServerInitializer

com/alibaba/csp/sentinel/transport/command/netty/HttpServerInitializer.java

代码语言:javascript
复制
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpObjectAggregator(1024 * 1024));
        p.addLast(new HttpResponseEncoder());

        p.addLast(new HttpServerHandler());
    }
}
  • 设置了HttpRequestDecoder、HttpObjectAggregator、HttpResponseEncoder、HttpServerHandler,前三个为netty自带的组件

HttpServerHandler

com/alibaba/csp/sentinel/transport/command/netty/HttpServerHandler.java

代码语言:javascript
复制
public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {

    private final CodecRegistry codecRegistry = new CodecRegistry();

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest httpRequest = (FullHttpRequest)msg;
        try {
            CommandRequest request = parseRequest(httpRequest);
            if (StringUtil.isBlank(HttpCommandUtils.getTarget(request))) {
                writeErrorResponse(BAD_REQUEST.code(), "Invalid command", ctx);
                return;
            }
            handleRequest(request, ctx, HttpUtil.isKeepAlive(httpRequest));

        } catch (Exception ex) {
            writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx);
            CommandCenterLog.warn("Internal error", ex);
        }
    }

    private void handleRequest(CommandRequest request, ChannelHandlerContext ctx, boolean keepAlive)
        throws Exception {
        String commandName = HttpCommandUtils.getTarget(request);
        // Find the matching command handler.
        CommandHandler<?> commandHandler = getHandler(commandName);
        if (commandHandler != null) {
            CommandResponse<?> response = commandHandler.handle(request);
            writeResponse(response, ctx, keepAlive);
        } else {
            // No matching command handler.
            writeErrorResponse(BAD_REQUEST.code(), String.format("Unknown command \"%s\"", commandName), ctx);
        }
    }

    private Encoder<?> pickEncoder(Class<?> clazz) {
        if (clazz == null) {
            throw new IllegalArgumentException("Bad class metadata");
        }
        for (Encoder<?> encoder : codecRegistry.getEncoderList()) {
            if (encoder.canEncode(clazz)) {
                return encoder;
            }
        }
        return null;
    }

    private void writeErrorResponse(int statusCode, String message, ChannelHandlerContext ctx) {
        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
            HttpResponseStatus.valueOf(statusCode),
            Unpooled.copiedBuffer(message, Charset.forName(SentinelConfig.charset())));

        httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset());
        ctx.write(httpResponse);

        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    private void writeResponse(CommandResponse response, ChannelHandlerContext ctx, boolean keepAlive)
        throws Exception {
        byte[] body;
        if (response.isSuccess()) {
            if (response.getResult() == null) {
                body = new byte[] {};
            } else {
                Encoder encoder = pickEncoder(response.getResult().getClass());
                if (encoder == null) {
                    writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx);
                    CommandCenterLog.warn("Error when encoding object",
                        new IllegalStateException("No compatible encoder"));
                    return;
                }
                body = encoder.encode(response.getResult());
            }
        } else {
            body = response.getException().getMessage().getBytes(SentinelConfig.charset());
        }

        HttpResponseStatus status = response.isSuccess() ? OK : BAD_REQUEST;

        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
            Unpooled.copiedBuffer(body));

        httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset());

        //if (keepAlive) {
        //    httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
        //    httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        //}
        //ctx.write(httpResponse);
        //if (!keepAlive) {
        //    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        //}
        httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
        httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
        ctx.write(httpResponse);
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    private CommandRequest parseRequest(FullHttpRequest request) {
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
        CommandRequest serverRequest = new CommandRequest();
        Map<String, List<String>> paramMap = queryStringDecoder.parameters();
        // Parse request parameters.
        if (!paramMap.isEmpty()) {
            for (Entry<String, List<String>> p : paramMap.entrySet()) {
                if (!p.getValue().isEmpty()) {
                    serverRequest.addParam(p.getKey(), p.getValue().get(0));
                }
            }
        }
        // Parse command name.
        String target = parseTarget(queryStringDecoder.rawPath());
        serverRequest.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);
        // Parse body.
        if (request.content().readableBytes() <= 0) {
            serverRequest.setBody(null);
        } else {
            serverRequest.setBody(request.content().array());
        }
        return serverRequest;
    }

    private String parseTarget(String uri) {
        if (StringUtil.isEmpty(uri)) {
            return "";
        }
        String[] arr = uri.split("/");
        if (arr.length < 2) {
            return "";
        }
        return arr[1];
    }

    private CommandHandler getHandler(String commandName) {
        if (StringUtil.isEmpty(commandName)) {
            return null;
        }
        return HttpServer.handlerMap.get(commandName);
    }

    private void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.write(response);
    }

    private static final String SERVER_ERROR_MESSAGE = "Command server error";
}
  • 继承SimpleChannelInboundHandler
  • channelRead0方法主要是解析CommandRequest,然后获取响应commandName,调用对应的commandHandler,最后写入结果

小结

NettyHttpCommandCenter提供的是基于netty的http实现,sentinel-transport还有一个SimpleHttpCommandCenter,是基于java socket的bio外加工作线程池模式的实现。

doc

  • NettyHttpCommandCenter
  • 聊聊sentinel的SimpleHttpCommandCenter
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • NettyHttpCommandCenter
  • HttpServer
  • HttpServerInitializer
  • HttpServerHandler
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档