聊聊sentinel的NettyHttpCommandCenter

本文主要研究一下sentinel的NettyHttpCommandCenter

NettyHttpCommandCenter

com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.java

public class NettyHttpCommandCenter implements CommandCenter {

    private final HttpServer server = new HttpServer();

    private final ExecutorService pool = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-netty-command-center-executor"));

    @Override
    public void start() throws Exception {
        pool.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    server.start();
                } catch (Exception ex) {
                    RecordLog.info("Start netty server error", ex);
                    ex.printStackTrace();
                    System.exit(-1);
                }
            }
        });
    }

    @Override
    public void stop() throws Exception {
        server.close();
        pool.shutdownNow();
    }

    @Override
    public void beforeStart() throws Exception {
        // Register handlers
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        server.registerCommands(handlers);
    }
}
  • 这里特意newSingleThreadExecutor,用来异步启动HttpServer
  • 在启动之前调用CommandHandlerProvider.getInstance().namedHandlers(),触发收集命令及handler
  • 然后调用server.registerCommands(handlers)来注册这些handler

HttpServer

com/alibaba/csp/sentinel/transport/command/netty/HttpServer.java

public final class HttpServer {

    private static final int DEFAULT_PORT = 8719;

    private Channel channel;

    final static Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new HttpServerInitializer());
            int port;
            try {
                if (StringUtil.isEmpty(TransportConfig.getPort())) {
                    CommandCenterLog.info("Port not configured, using default port: " + DEFAULT_PORT);
                    port = DEFAULT_PORT;
                } else {
                    port = Integer.parseInt(TransportConfig.getPort());
                }
            } catch (Exception e) {
                throw new IllegalArgumentException("Illegal port: " + TransportConfig.getPort());
            }
            channel = b.bind(port).sync().channel();
            channel.closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public void close() {
        channel.close();
    }

    public void registerCommand(String commandName, CommandHandler handler) {
        if (StringUtil.isEmpty(commandName) || handler == null) {
            return;
        }

        if (handlerMap.containsKey(commandName)) {
            CommandCenterLog.info("Register failed (duplicate command): " + commandName);
            return;
        }

        handlerMap.put(commandName, handler);
    }

    public void registerCommands(Map<String, CommandHandler> handlerMap) {
        if (handlerMap != null) {
            for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {
                registerCommand(e.getKey(), e.getValue());
            }
        }
    }
}
  • 这里使用netty构造nio server,处理handler为HttpServerInitializer

HttpServerInitializer

com/alibaba/csp/sentinel/transport/command/netty/HttpServerInitializer.java

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpObjectAggregator(1024 * 1024));
        p.addLast(new HttpResponseEncoder());

        p.addLast(new HttpServerHandler());
    }
}
  • 设置了HttpRequestDecoder、HttpObjectAggregator、HttpResponseEncoder、HttpServerHandler,前三个为netty自带的组件

HttpServerHandler

com/alibaba/csp/sentinel/transport/command/netty/HttpServerHandler.java

public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {

    private final CodecRegistry codecRegistry = new CodecRegistry();

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest httpRequest = (FullHttpRequest)msg;
        try {
            CommandRequest request = parseRequest(httpRequest);
            if (StringUtil.isBlank(HttpCommandUtils.getTarget(request))) {
                writeErrorResponse(BAD_REQUEST.code(), "Invalid command", ctx);
                return;
            }
            handleRequest(request, ctx, HttpUtil.isKeepAlive(httpRequest));

        } catch (Exception ex) {
            writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx);
            CommandCenterLog.warn("Internal error", ex);
        }
    }

    private void handleRequest(CommandRequest request, ChannelHandlerContext ctx, boolean keepAlive)
        throws Exception {
        String commandName = HttpCommandUtils.getTarget(request);
        // Find the matching command handler.
        CommandHandler<?> commandHandler = getHandler(commandName);
        if (commandHandler != null) {
            CommandResponse<?> response = commandHandler.handle(request);
            writeResponse(response, ctx, keepAlive);
        } else {
            // No matching command handler.
            writeErrorResponse(BAD_REQUEST.code(), String.format("Unknown command \"%s\"", commandName), ctx);
        }
    }

    private Encoder<?> pickEncoder(Class<?> clazz) {
        if (clazz == null) {
            throw new IllegalArgumentException("Bad class metadata");
        }
        for (Encoder<?> encoder : codecRegistry.getEncoderList()) {
            if (encoder.canEncode(clazz)) {
                return encoder;
            }
        }
        return null;
    }

    private void writeErrorResponse(int statusCode, String message, ChannelHandlerContext ctx) {
        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
            HttpResponseStatus.valueOf(statusCode),
            Unpooled.copiedBuffer(message, Charset.forName(SentinelConfig.charset())));

        httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset());
        ctx.write(httpResponse);

        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    private void writeResponse(CommandResponse response, ChannelHandlerContext ctx, boolean keepAlive)
        throws Exception {
        byte[] body;
        if (response.isSuccess()) {
            if (response.getResult() == null) {
                body = new byte[] {};
            } else {
                Encoder encoder = pickEncoder(response.getResult().getClass());
                if (encoder == null) {
                    writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx);
                    CommandCenterLog.warn("Error when encoding object",
                        new IllegalStateException("No compatible encoder"));
                    return;
                }
                body = encoder.encode(response.getResult());
            }
        } else {
            body = response.getException().getMessage().getBytes(SentinelConfig.charset());
        }

        HttpResponseStatus status = response.isSuccess() ? OK : BAD_REQUEST;

        FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
            Unpooled.copiedBuffer(body));

        httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset());

        //if (keepAlive) {
        //    httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
        //    httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        //}
        //ctx.write(httpResponse);
        //if (!keepAlive) {
        //    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        //}
        httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes());
        httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
        ctx.write(httpResponse);
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    private CommandRequest parseRequest(FullHttpRequest request) {
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
        CommandRequest serverRequest = new CommandRequest();
        Map<String, List<String>> paramMap = queryStringDecoder.parameters();
        // Parse request parameters.
        if (!paramMap.isEmpty()) {
            for (Entry<String, List<String>> p : paramMap.entrySet()) {
                if (!p.getValue().isEmpty()) {
                    serverRequest.addParam(p.getKey(), p.getValue().get(0));
                }
            }
        }
        // Parse command name.
        String target = parseTarget(queryStringDecoder.rawPath());
        serverRequest.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);
        // Parse body.
        if (request.content().readableBytes() <= 0) {
            serverRequest.setBody(null);
        } else {
            serverRequest.setBody(request.content().array());
        }
        return serverRequest;
    }

    private String parseTarget(String uri) {
        if (StringUtil.isEmpty(uri)) {
            return "";
        }
        String[] arr = uri.split("/");
        if (arr.length < 2) {
            return "";
        }
        return arr[1];
    }

    private CommandHandler getHandler(String commandName) {
        if (StringUtil.isEmpty(commandName)) {
            return null;
        }
        return HttpServer.handlerMap.get(commandName);
    }

    private void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.write(response);
    }

    private static final String SERVER_ERROR_MESSAGE = "Command server error";
}
  • 继承SimpleChannelInboundHandler
  • channelRead0方法主要是解析CommandRequest,然后获取响应commandName,调用对应的commandHandler,最后写入结果

小结

NettyHttpCommandCenter提供的是基于netty的http实现,sentinel-transport还有一个SimpleHttpCommandCenter,是基于java socket的bio外加工作线程池模式的实现。

doc

  • NettyHttpCommandCenter
  • 聊聊sentinel的SimpleHttpCommandCenter

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-08-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊jpa的batch操作的实现

hibernate-core-5.0.12.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

23910
来自专栏difcareer的技术笔记

dextra DEX/ODEX/ART/OAT分析工具

27930
来自专栏函数式编程语言及工具

SDP(2):ScalikeJDBC-Connection Pool Configuration

  scalikeJDBC可以通过配置文件来设置连接池及全局系统参数。对配置文件的解析是通过TypesafeConfig工具库实现的。默认加载classpath...

36240
来自专栏青蛙要fly的专栏

项目需求讨论: 文字显示排版— Html格式

我们看到,我用红框框出来的地方 1.直接使用系统自带的AlertDialog的提示框,我们看到了我们更新提示里面的具体内容是(-Bug修改 -新增更新提示);...

23720
来自专栏码匠的流水账

聊聊spring cloud gateway的PreserveHostHeaderGatewayFilter

本文主要研究下spring cloud gateway的PreserveHostHeaderGatewayFilter

14720
来自专栏DT乱“码”

基于springMVC拦截器实现操作日志统计

1.spring配置文件配置。  <!-- 拦截器 --> <mvc:interceptors> <!-- 日志拦截器 --> <bean cl...

26370
来自专栏码匠的流水账

聊聊spring-data-redis的连接池的校验

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/o...

70910
来自专栏向治洪

获取手机短信内容

原理是通过,contentprovider获取系统短信数据库中的字段信息而达到获取内容目的 效果图如下: ? 具体代码如下: package com.inter...

23480
来自专栏码匠的流水账

聊聊spring cloud gateway的NettyConfiguration

本文主要研究下spring cloud gateway的NettyConfiguration

43110
来自专栏函数式编程语言及工具

Cats(3)- freeK-Free编程更轻松,Free programming with freeK

   在上一节我们讨论了通过Coproduct来实现DSL组合:用一些功能简单的基础DSL组合成符合大型多复杂功能应用的DSL。但是我们发现:cats在处理多层...

21470

扫码关注云+社区

领取腾讯云代金券