聊聊sentinel的SimpleHttpCommandCenter

本文主要研究一下sentinel的SimpleHttpCommandCenter

SimpleHttpCommandCenter

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java

/***
 * The simple command center provides service to exchange information.
 *
 * @author youji.zj
 */
public class SimpleHttpCommandCenter implements CommandCenter {

    private static final int PORT_UNINITIALIZED = -1;

    private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
    private static final int DEFAULT_PORT = 8719;

    private static final Map<String, CommandHandler> handlerMap = new HashMap<String, CommandHandler>();

    private ExecutorService executor = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-command-center-executor"));
    private ExecutorService bizExecutor;

    private ServerSocket socketReference;

    @Override
    public void beforeStart() throws Exception {
        // Register handlers
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        registerCommands(handlers);
    }

    @Override
    public void start() throws Exception {
        int nThreads = Runtime.getRuntime().availableProcessors();
        this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            new NamedThreadFactory("sentinel-command-center-service-executor"),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    CommandCenterLog.info("EventTask rejected");
                    throw new RejectedExecutionException();
                }
            });

        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception e) {
                    port = DEFAULT_PORT;
                }
            }

            @Override
            public void run() {
                int repeat = 0;

                int tmpPort = port;
                boolean success = false;
                while (true) {
                    ServerSocket serverSocket = null;
                    try {
                        serverSocket = new ServerSocket(tmpPort);
                    } catch (IOException ex) {
                        CommandCenterLog.info(
                            String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex);

                        tmpPort = adjustPort(repeat);
                        try {
                            TimeUnit.SECONDS.sleep(5);
                        } catch (InterruptedException e1) {
                            break;
                        }
                        repeat++;
                    }

                    if (serverSocket != null) {
                        CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                        socketReference = serverSocket;
                        executor.submit(new ServerThread(serverSocket));
                        success = true;
                        break;
                    }
                }

                if (!success) {
                    tmpPort = PORT_UNINITIALIZED;
                }
                TransportConfig.setRuntimePort(tmpPort);
                executor.shutdown();
            }

            /**
             * Adjust the port to settle down.
             */
            private int adjustPort(int repeat) {
                int mod = repeat / 5;
                return port + mod;
            }
        };

        new Thread(serverInitTask).start();
    }

    @Override
    public void stop() throws Exception {
        if (socketReference != null) {
            try {
                socketReference.close();
            } catch (IOException e) {
                CommandCenterLog.warn("Error when releasing the server socket", e);
            }
        }
        bizExecutor.shutdownNow();
        executor.shutdownNow();
        TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
        handlerMap.clear();
    }

    /**
     * Get the name set of all registered commands.
     */
    public static Set<String> getCommands() {
        return handlerMap.keySet();
    }

    public static CommandHandler getHandler(String commandName) {
        return handlerMap.get(commandName);
    }

    public static void registerCommands(Map<String, CommandHandler> handlerMap) {
        if (handlerMap != null) {
            for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {
                registerCommand(e.getKey(), e.getValue());
            }
        }
    }

    public static void registerCommand(String commandName, CommandHandler handler) {
        if (StringUtil.isEmpty(commandName)) {
            return;
        }

        if (handlerMap.containsKey(commandName)) {
            CommandCenterLog.info("Register failed (duplicate command): " + commandName);
            return;
        }

        handlerMap.put(commandName, handler);
    }

    /**
     * Avoid server thread hang, 3 seconds timeout by default.
     */
    private void setSocketSoTimeout(Socket socket) throws SocketException {
        if (socket != null) {
            socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT);
        }
    }
}
  • 这里直接使用的是java的ServerSocket(bio)构建的tcp服务
  • 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
  • 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread

ServerThread

    class ServerThread extends Thread {

        private ServerSocket serverSocket;

        ServerThread(ServerSocket s) {
            this.serverSocket = s;
            setName("sentinel-courier-server-accept-thread");
        }

        @Override
        public void run() {
            while (true) {
                Socket socket = null;
                try {
                    socket = this.serverSocket.accept();
                    setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    bizExecutor.submit(eventTask);
                } catch (Exception e) {
                    CommandCenterLog.info("Server error", e);
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e1) {
                            CommandCenterLog.info("Error when closing an opened socket", e1);
                        }
                    }
                    try {
                        // In case of infinite log.
                        Thread.sleep(10);
                    } catch (InterruptedException e1) {
                        // Indicates the task should stop.
                        break;
                    }
                }
            }
        }
    }
  • ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor

HttpEventTask

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java

public class HttpEventTask implements Runnable {

    private final Socket socket;

    private boolean writtenHead = false;

    public HttpEventTask(Socket socket) {
        this.socket = socket;
    }

    public void close() throws Exception {
        socket.close();
    }

    @Override
    public void run() {
        if (socket == null) {
            return;
        }

        BufferedReader in = null;
        PrintWriter printWriter = null;
        try {
            long start = System.currentTimeMillis();
            in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
            OutputStream outputStream = socket.getOutputStream();

            printWriter = new PrintWriter(
                new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

            String line = in.readLine();
            CommandCenterLog.info("[CommandCenter] socket income:" + line
                + "," + socket.getInetAddress());
            CommandRequest request = parseRequest(line);

            // Validate the target command.
            String commandName = HttpCommandUtils.getTarget(request);
            if (StringUtil.isBlank(commandName)) {
                badRequest(printWriter, "Invalid command");
                return;
            }

            // Find the matching command handler.
            CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
            if (commandHandler != null) {
                CommandResponse<?> response = commandHandler.handle(request);
                handleResponse(response, printWriter, outputStream);
            } else {
                // No matching command handler.
                badRequest(printWriter, "Unknown command `" + commandName + '`');
            }
            printWriter.flush();

            long cost = System.currentTimeMillis() - start;
            CommandCenterLog.info("[CommandCenter] deal a socket task:" + line
                + "," + socket.getInetAddress() + ", time cost=" + cost + " ms");
        } catch (Throwable e) {
            CommandCenterLog.info("CommandCenter error", e);
            try {
                if (printWriter != null) {
                    String errorMessage = SERVER_ERROR_MESSAGE;
                    if (!writtenHead) {
                        internalError(printWriter, errorMessage);
                    } else {
                        printWriter.println(errorMessage);
                    }
                    printWriter.flush();
                }
            } catch (Exception e1) {
                CommandCenterLog.info("CommandCenter close serverSocket failed", e);
            }
        } finally {
            closeResource(in);
            closeResource(printWriter);
            closeResource(socket);
        }
    }

    private void closeResource(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
                CommandCenterLog.info("CommandCenter close resource failed", e);
            }
        }
    }

    private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter,
        /*@NonNull*/ final OutputStream rawOutputStream) throws Exception {
        if (response.isSuccess()) {
            if (response.getResult() == null) {
                writeOkStatusLine(printWriter);
                return;
            }
            // Write 200 OK status line.
            writeOkStatusLine(printWriter);
            // Here we directly use `toString` to encode the result to plain text.
            byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset());
            rawOutputStream.write(buffer);
            rawOutputStream.flush();
        } else {
            String msg = SERVER_ERROR_MESSAGE;
            if (response.getException() != null) {
                msg = response.getException().getMessage();
            }
            badRequest(printWriter, msg);
        }
    }

    /**
     * Write `400 Bad Request` HTTP response status line and message body, then flush.
     */
    private void badRequest(/*@NonNull*/ final PrintWriter out, String message) {
        out.print("HTTP/1.1 400 Bad Request\r\n"
            + "Connection: close\r\n\r\n");
        out.print(message);
        out.flush();
        writtenHead = true;
    }

    /**
     * Write `500 Internal Server Error` HTTP response status line and message body, then flush.
     */
    private void internalError(/*@NonNull*/ final PrintWriter out, String message) {
        out.print("HTTP/1.1 500 Internal Server Error\r\n"
            + "Connection: close\r\n\r\n");
        out.print(message);
        out.flush();
        writtenHead = true;
    }

    /**
     * Write `200 OK` HTTP response status line and flush.
     */
    private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) {
        out.print("HTTP/1.1 200 OK\r\n"
            + "Connection: close\r\n\r\n");
        out.flush();
        writtenHead = true;
    }

    /**
     * Parse raw HTTP request line to a {@link CommandRequest}.
     *
     * @param line HTTP request line
     * @return parsed command request
     */
    private CommandRequest parseRequest(String line) {
        CommandRequest request = new CommandRequest();
        if (StringUtil.isBlank(line)) {
            return request;
        }
        int start = line.indexOf('/');
        int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?');
        int space = line.lastIndexOf(' ');
        String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());
        request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);
        if (ask == -1 || ask == space) {
            return request;
        }
        String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());
        for (String parameter : parameterStr.split("&")) {
            if (StringUtil.isBlank(parameter)) {
                continue;
            }

            String[] keyValue = parameter.split("=");
            if (keyValue.length != 2) {
                continue;
            }

            String value = StringUtil.trim(keyValue[1]);
            try {
                value = URLDecoder.decode(value, SentinelConfig.charset());
            } catch (UnsupportedEncodingException e) {
            }

            request.addParam(StringUtil.trim(keyValue[0]), value);
        }
        return request;
    }

    private static final String SERVER_ERROR_MESSAGE = "Command server error";

}
  • 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
  • 可以看到这里是手工解析http协议

CommandHandlerProvider

sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java

/**
 * Provides and filters command handlers registered via SPI.
 *
 * @author Eric Zhao
 */
public class CommandHandlerProvider implements Iterable<CommandHandler> {

    private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);

    /**
     * Get all command handlers annotated with {@link CommandMapping} with command name.
     *
     * @return list of all named command handlers
     */
    public Map<String, CommandHandler> namedHandlers() {
        Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
        for (CommandHandler handler : serviceLoader) {
            String name = parseCommandName(handler);
            if (!StringUtil.isEmpty(name)) {
                map.put(name, handler);
            }
        }
        return map;
    }

    private String parseCommandName(CommandHandler handler) {
        CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);
        if (commandMapping != null) {
            return commandMapping.name();
        } else {
            return null;
        }
    }

    @Override
    public Iterator<CommandHandler> iterator() {
        return serviceLoader.iterator();
    }

    private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider();

    public static CommandHandlerProvider getInstance() {
        return INSTANCE;
    }
}
  • 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map

com.alibaba.csp.sentinel.command.CommandHandler

sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler

com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
  • 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现

小结

  • sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
  • simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
  • HttpEventTask解析请求的命令然后获取相应的handler执行返回
  • commandHandler采用的是SPI的模式进行动态加载

doc

  • SimpleHttpCommandCenter

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏GIS讲堂

shape文件上传与展示

1402
来自专栏机器人网

中英文对照,瞬间理解西门子PLC指令

指令( 英文全称意思 ) :指令含义 1、LD ( Load 装载 ) :动合触点 2、LDN ( Load Not 不装载 ) : 动断触点 3、A ( A...

3587
来自专栏我杨某人的青春满是悔恨

封装一个 Swift-Style 的网络模块

Swift 跟 OC 有着完全不同的设计哲学,它鼓励你使用 protocol 而不是 super class,使用 enum 和 struct 而不是 clas...

993
来自专栏架构之路

Spring 数据库连接(Connection)绑定线程(Thread)的实现

最近在看spring事务的时候在想一个问题:spring中的很多bean都是单例的,是非状态的,而数据库连接是一种有状态的对象,所以spring一定在创建出co...

3853
来自专栏服务端技术杂谈

dubbo源码学习笔记----monitor

核心类 public abstract class AbstractMonitorFactory implements MonitorFactory { ...

3448
来自专栏Spark学习技巧

textFile构建RDD的分区及compute计算策略

1,textFile A),第一点,就是输入格式,key,value类型及并行度的意义。 def textFile( path: String, mi...

2557
来自专栏wannshan(javaer,RPC)

dubbo消费方服务调用过程源码分析

dubbo PRC服务调用过程很复杂,这里准备通过分析一个典型rpc方法调用的调用栈来说明调用过程。说它典型,是因为本次分析的调用场景很典型简单 先定义一个接...

2.7K8
来自专栏码匠的流水账

聊聊eureka server的response cache

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/ApplicationResource....

1143
来自专栏xdecode

Java高并发之无锁与Atomic源码分析

2124
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

1881

扫码关注云+社区

领取腾讯云代金券