前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >我发现了 高性能异步编程 和 实时流模型 那千丝万缕的联系!

我发现了 高性能异步编程 和 实时流模型 那千丝万缕的联系!

作者头像
kk大数据
发布2021-03-15 14:51:21
3630
发布2021-03-15 14:51:21
举报
文章被收录于专栏:kk大数据kk大数据

本次我们来实现一个支持百万并发连接数的采集服务器,并且找到异步+ NIO 的编程方式和实时流模型的那些千丝万缕的联系。

一、需求说明

简单的需求功能如下:

  • 数据采集是一个 web 服务器,可以接收 http 请求传上来的事件,事件是 json 格式的;
  • 收到之后,进行解码,校验字符串是否可以解码成 json 对象;
  • 对消息进行抽取、清洗、转化;
  • 最后发送到 kafka 中。

性能要求:

  • 支持百万并发连接数;
  • 对 CPU 资源和 IO 资源充分利用;

二、实现方案剖析

网页和手机端会产生一些埋点文件,通过 http 方式发送给采集服务器。涉及到网络连接,第一个想到的就是 Socket。

1、初始版本:使用 BIO 实现的客户端和服务端通信

我们很容易就可以用多线程快速实现一个 web 端服务器,模型图如下(为了节省篇幅,代码我就不写了,很简单,但不实用,百度有很多)。

简单描述一下:每次来一个请求,都创建一个线程来执行。如下图:

但是弊端也显而易见,随随便便就能列出三点:

1.1 创建和销毁线程动作代价昂贵

Java 中的线程模型是基于操作系统原生线程模型实现的,也就是说 Java 中的线程其实是基于内核线程实现的,线程的创建,析构与同步都需要进行系统调用,而系统调用需要在用户态与内核中来回切换,代价相对较高;

1.2 线程本身占用大量的内存

Java 中,默认一个线程,线程栈大小是 1 M。一旦线程数过千,恐怕整个 Jvm 内存都会被吃掉一半;

1.3 线程切换成本是很高

操作系统在切换线程的时候,需要保留线程的上下文,然后再执行系统调用。如果线程数过多,可能执行线程的时间都会大于线程执行的时间,使系统陷于几乎不可用的状态;

2、改造版本:异步 + NIO 实现的高性能网络通信

这是 Java 中的 NIO 模型,如下图:

可能你一下无法接受同时出现这么多陌生的概念(Acceptor,Selector,Channel),没关系,NIO 再慢慢学,这里我们只要抓住它的核心:

  • 使用了队列将请求接收器和工作线程隔开,让请求接收器和工作线程各自尽其所能的工作,更加充分的利用 IO 和 CPU 资源;
  • 现在,NIO 连接器能够保持的并发连接数,不再受限于工作线程数量,无需分配大量线程,就能支持大量并发连接了。
3、进阶版本,如何充分利用 CPU 和 网络 IO 资源

在第二步,我们解决了高并发连接数的问题,但是还远远不够。

在一个采集系统中,我们需要做这三件事情,解码,清洗转化,发送。

其中,解码和清洗转化过程纯粹是 CPU 计算,占用 CPU 资源,而发送会大量占用 IO 资源。

如果让一个线程串行的执行这三件事,前面两件事,CPU 会很快做完,势必最后会等在 IO 操作上,这个线程就被操作系统挂起了,在那里空等,直到 IO 操作完成。

如何解决,只能增加工作线程数量,但是增加工作线程数量,会导致过多的线程调度和上下文切换,是另一种形式的 CPU 浪费。

如何解决,我们可以使用异步的方式。何谓异步,比如做饭过程就是异步,先把饭放电饭煲煮着,趁着这个时间去做菜,这就是“异步”。如果一直等到米饭煮熟,再去烧菜,这就是“同步”。

可能你就已经知道了,上次我们讲过 CompletableFuture,这是一个异步编程框架,可以将不同的线程编排起来。

并且可以指定线程池,让不同的事情,在不同的线程池里面完成。

看下面的代码:

代码语言:javascript
复制
// 解码线程池
Executor decoderExecutor = ExecutorHelper.createExecutor(2, "decoder");
// 转换线程池
Executor ectExecutor = ExecutorHelper.createExecutor(8, "ect");
// 发送线程池
Executor senderExecutor = ExecutorHelper.createExecutor(2, "sender");

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req) throws Exception {
    CompletableFuture
   // 解码过程
            .supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
   // 转换过程
            .thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e), this.ectExecutor)
   // IO 过程
            .thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor);
}

其中 channelRead0 是 Netty 框架的请求方法,每次请求都会到这个方法中。每次请求进来时,三个动作分别异步执行,但是 CompletableFuture 框架会保证每个请求三个动作执行的先后顺序。

这样,CPU 资源和 IO 资源,就可以得到充分的利用了。

三、进阶版本中存在的问题

1、问题描述

上述的异步模型可以用下面的图来表示

在上面的异步编程代码中,我们把不同类型的任务提交到不同的线程池中,而线程池是需要队列的,图上的队列就是线程池的队列。

其中,解码过程和转换过程,都是比较快速的过程,而发送的 I/O 过程则比较慢。

那么前面的消息会一直积压在发送过程的线程池队列中,等待执行。如果队列选择的是无界队列,那么越来越多的任务会积压,最终会用光所有的虚拟机所有的内存,导致 OOM。

2、如何控制事件的速度

我们可以直接想到,严格控制上游的发送速度,比如控制上游每秒钟只能发送 1000 条消息。这种方法虽然可行,但是非常不优雅。

如果下游遇到特殊原因,每秒只能处理 500 条,那仍然还是会 OOM,我们没法去估出一个合适的值的。

3、反向压力

有一种更加优雅的方案,叫做反向压力。

上面我们的方案出问题,主要原因还是在于队列是无界的,消息一直在积压,并且是非阻塞的。

要实现反向压力,只需要从两个方面来控制:

  • 执行器的任务队列,它的容量必须是有限的;
  • 当执行器的任务队列满了的时候,就阻止上游继续提交任务,直到任务队列中有新的空间为止。

如上图,可以看到,如果发送的线程池队列满了之后,就阻止上游的转换任务继续提交任务。过了一会,转换过程的队列也会满,同样的它也会阻止解码过程提交任务。

对于我们这种数据处理场景的话,可以通过横向增加服务器来解决 TPS 低的问题;如果是流式处理场景,那么最上游应该是主动从 Kafka 拉取消息,这个时候,它就放缓自己拉取消息的速度,从而做到流量控制。

当一段时间后,发送线程池队列有空闲了,又会继续处理消息。

4、实现反压

如何来实现反压?其实很简单,当队列满了之后,会进入线程池的拒绝策略中,在拒绝策略中,我们使用 while 循环来重复提交任务,直到任务提交成功,看下面的代码:

代码语言:javascript
复制
private final List<ExecutorService> executors;
private final Partitioner partitioner;
private Long rejectSleepMills = 1L;

public BackPressureExecutor(String name, int executorNumber, int coreSize, int maxSize, int capacity, long rejectSleepMills) {
    this.rejectSleepMills = rejectSleepMills;
    this.executors = new ArrayList<>(executorNumber);
    for (int i = 0; i < executorNumber; i++) {
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
        this.executors.add(new ThreadPoolExecutor(
                coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
                queue,
                new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-%d").build(),
                new ThreadPoolExecutor.AbortPolicy()));
    }
    this.partitioner = new RoundRobinPartitionSelector(executorNumber);
}

@Override
public void execute(Runnable command) {
    boolean rejected;
    do {
        try {
           rejected = false;
           executors.get(partitioner.getPartition()).execute(command);
        } catch (RejectedExecutionException e) {
            rejected = true;
            try {
                TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
            } catch (InterruptedException e1) {
                logger.warn("Reject sleep has been interrupted.", e1);
            }
        }
    } while (rejected);
}

可以看到上面的代码,关键点有两个:

第一个是,在创建 ThreadPoolExecutor 对象时,采用 ArrayBlockingQueue。这是一个容量有限的阻塞队列。因此,当任务队列已经满了时,就会停止继续往队列里添加新的任务,从而避免内存无限大,造成 OOM 问题。

第二个是,将 ThreadPoolExecutor 拒绝任务时,采用的策略设置为 AbortPolicy。这就意味着,在任务队列已经满了的时候,如果再向任务队列提交任务,就会抛出 RejectedExecutionException 异常。之后,我们再通过一个 while 循环,在循环体内,捕获 RejectedExecutionException 异常,并不断尝试,重新提交任务,直到成功为止。

这样,经过上面的改造,当下游的步骤执行较慢时,它的任务队列就会占满。这个时候,如果上游继续往下游提交任务,它就会不停重试。这样,自然而然地降低了上游步骤的处理速度,从而起到了流量控制的作用。

四、这不就是一个实时流模型!

上面的那个图,是不是似曾相识?没错,它就是实时流模型啊。

并且反压,已经成为流计算领域的共识,并且已经形成了反向压力相关的标准。

Flink 中是通过 Netty 的网络模型的阻塞来把压力一层层往上游传递的,和我们实现的这个有异曲同工之妙。

在如今的异步编程模型中,无处不存在着队列的影子,甚至在操作系统底层,也会使用队列来对性能做极致的优化,比如大名鼎鼎的 epoll。

而“队列”正是流计算系统最重要的组成结构。有“队列”的系统,它注定会是一个异步执行的过程,这也意味着“流”这种模式注定了是“异步”的。

五、总结

如今的分布式系统,都是几百甚至上千机器在一起协同工作,那不同的机器的不同进程一定会通信。

像 Spark,使用了 Netty 作为通信框架,Flink 也有在使用 Netty (还有Akka)作为通信框架。而要去了解一个分布式框架,第一步就是要了解它的通信框架,不然进程和进程的通信部分就没法看懂,整个框架核心逻辑也就无法透彻看懂。

而且在分布式系统中,异步编程的代码也是非常多,像 Flink 提交任务的过程,就使用了 CompletableFuture 异步编程框架来提交任务。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、需求说明
  • 二、实现方案剖析
    • 1、初始版本:使用 BIO 实现的客户端和服务端通信
      • 2、改造版本:异步 + NIO 实现的高性能网络通信
        • 3、进阶版本,如何充分利用 CPU 和 网络 IO 资源
        • 三、进阶版本中存在的问题
          • 1、问题描述
            • 2、如何控制事件的速度
              • 3、反向压力
                • 4、实现反压
                • 四、这不就是一个实时流模型!
                • 五、总结
                相关产品与服务
                大数据
                全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档