作者 | Ali Naqvi
译者 | 平川
策划 | 丁晓昀
在这篇文章中,我们将介绍 Pipy,一个开源的云原生网络流处理器。本文将首先描述它的模块化设计,然后介绍如何快速构建一个高性能的网络代理来满足特定的需求。Pipy 经过了实战检验,已经被多个商业客户所使用。
Pipy 是一个 开源、轻量级、高性能、模块化、可编程的云原生网络流处理器。它适用于众多应用场景,包括但不限于边缘路由器、负载均衡器 & 代理解决方案、API 网关、静态 HTTP 服务器、服务网格挎斗等。
事实证明,这些属性对 Pipy 来说都有特定的意义,让我们逐项来看一下。
轻量级
编译后的 Pipy 可执行文件大小约为 6MB,只需要很小的内存就能运行。
高性能
Pipy 是用 C++ 编写的,以 Asio 异步 I/O 库为基础构建。
模块化
Pipy 的内核采用了模块化设计,有许多可重用的小模块(过滤器),把它们连接在一起就可以形成一个管道,网络数据在这个管道中流动并被处理。
流处理器
Pipy 使用一个事件驱动的管道来操作网络流,它消耗输入流,执行用户提供的转换,并输出流。Pipy 流将数据字节抽象为以下其中一类事件:
可编程
Pipy 通过其定制开发的组件 PipyJS 提供内置的 JavaScript 支持,该组件是 Pipy 代码库的一部分,但不依赖于该代码库。PipyJS 具有高度的可定制性,性能可预测,而且没有垃圾收集开销。将来,PipyJS 可能会转移到单独的包中。
Pipy 的设计
Pipy 的内部工作原理类似于 Unix 管道),但不同的是,Unix 管道处理的是离散的字节,而 Pipy 处理的是事件流。
Pipy 通过一个过滤器链来处理传入的数据流,过滤器 负责处理请求记录、认证、SSL 卸载、请求转发等常规问题。每个过滤器都从其输入中读取事件流并写入输出,一个过滤器的输出与下一个过滤器的输入相连。
管道
一条过滤器链即一个 管道,Pipy 根据其输入源将管道分为 3 个不同的类别。
从一个网络端口读入 数据 事件,处理它们,然后将结果写回同一端口。这就是最常用的请求和响应模式。
例如,当 Pipy 作为 HTTP 服务器时,端口管道的输入是来自客户端的 HTTP 请求,而管道的输出则是发回客户端的 HTTP 响应。
周期性地获取 MessageStart 和 MessageEnd 事件对作为输入。当需要 类似于 cron job 的功能 时很有用。
它与 连接过滤器(例如 link)协同工作,它从前面的管道接收事件,将其送入子管道进行处理,然后再从子管道读回输出,并将其传递给下一个过滤器。
理解 子管道 和 连接过滤器 的最好方法是,把它们看成过程编程中子程序的被调用者和调用者。连接过滤器的输入是子程序的参数,连接过滤器的输出是其返回值。
注意:像 端口 & 计时器 这样的 根 管道不能从 连接过滤器 调用。
上下文
Pipy 另一个重要的概念是上下文。上下文 是隶属于一个管道的一组变量。在 Pipy 实例中,每条管道都可以访问相同的变量集。换句话说,上下文具有相同的形状。当启动一个 Pipy 实例时,所做的第一件事就是通过定义变量和它们的初始值来定义上下文的形状。
每个 根 管道都会克隆你在开始时定义的初始上下文。当一个子管道启动时,它要么共享要么克隆其父管道的上下文,这取决于你使用了哪一个连接过滤器。例如,link 过滤器共享其父管道的上下文,而 demux 过滤器则克隆它。
对于嵌入管道的脚本来说,这些上下文变量就是它们的全局变量,也就是说,只要这些变量存在于同一个脚本文件中,这些脚本就可以从任何地方访问它们。
对于一名经验丰富的程序员来说,这可能显得很奇怪,因为全局变量通常意味着它们是全局唯一的。这些变量只能有一组,但在 Pipy 中,我们可以有很多组这样的变量(又称上下文),这取决于针对传入的网络连接开放了多少管道,以及有多少子管道克隆了其父管道的上下文。
PipyJS
PipyJS 是一个嵌入式的小型 JavaScript 引擎,其设计旨在实现高性能,消除垃圾收集开销。它支持 ECMAScript 标准的一个子集,某些方面有偏离。目前,它支持 JavaScript 表达式、函数,并实现了 JavaScript 标准 API,如 String、Array 等。
如上所述,上下文是 PipyJS 一个非常关键的特性,Pipy 用一种特定的方式对其进行了扩展,以满足代理服务器的特殊需求,后者需要为每个连接提供多组全局变量。每个上下文状态对其他上下文都是不可见的,因此,它是唯一的,只有它的定义者才能访问。
如果你熟悉多线程编程的概念,那么你也可以把 上下文 看作是 TLS(线程本地存储),其中全局变量在不同的线程中具有不同的值。
兼容性
Pipy 的设计旨在跨不同的操作系统和 CPU 架构实现高度兼容。Pipy 已经在以下这些平台上进行了全面测试:
在生产环境中,建议使用 CentOS7/REHL7 或 FreeBSD。
快速入门
对于那些缺乏耐心的读者,可以使用 docker 运行 Pipy 的生产版本,使用 Pipy 官方 GitHub 仓库提供的一个教程脚本即可。这里,让我们遵循经典示例Hello World!的规范,但把这句话改为Hi there!
Pipy 的 Docker 镜像可以通过几个环境变量来配置:
$ docker run --rm -e PIPY_CONFIG_FILE=\
https://raw.githubusercontent.com/flomesh-io/pipy/main/tutorial/01-hello/hello.js \
-e PIPY_SPAWN=1 -p 8080:8080 flomesh/pipy-pjs:latest
上述命令会用提供的脚本启动 Pipy 服务器。敏锐的用户可能已经注意到,我们通过环境变量PIPY_CONFIG_FILE提供了一个远程 Pipy 脚本的链接,而不是一个本地文件,Pipy 足够智能,可以处理这种情况。
下面是tutorial/01-hello/hello.js文件的内容,供参考:
pipy()
.listen(8080)
.serveHTTP(
new Message('Hi, there!\n')
)
在这个脚本中,我们定义了一个 端口管道,它监听 8080 端口,并为从监听端口收到的每个 HTTP 请求返回“Hi, there!”。
既然我们已经通过上面的docker run命令暴露了本地 8080 端口,那么我们可以在同一端口上进行测试了:
$ curl http://localhost:8080
执行上述命令,控制台中应该显示“Hi, there!”。
如果是出于学习、开发或调试的目的,建议在本地安装 Pipy(从源代码构建 Pipy 或针对你的操作系统下载一个预构建版本),因为它提供了 Web 管理控制台以及相关的文档和教程。
安装到本地后,运行pipy,不需要任何参数,就可以在6060端口启动管理控制台,但如果要监听不同的端口,可以通过--admin-port=参数配置。
监听 6060 端口的 Pipy 管理控制台
要从源代码构建 Pipy 或针对你的操作系统安装预编译的二进制文件,请参考 PipyGithub 库的 README.md 文件。
通过 CLI 运行
要启动 Pipy 代理,可以用一个 PipyJS 脚本文件运行 Pipy。例如,如果需要一个简单的回显服务器,针对每个传入的请求都用所接收到的消息体进行响应,那么就用脚本tutorial/01-hello/hello.js:
$ pipy tutorial/01-hel lo/hello.js
另外,在开发和调试时,可以启动带有内置 Web UI 的 Pipy:
$ pipy tutorial/01-hello/hello.js --admin-port=6060
显示命令行选项
$ pipy --help
列出内置过滤器及其参数
$ pipy --list-filters
$ pipy --help-filters
前文从概念和技术上对 Pipy 做了一个简短的介绍,这些内容也是我们实现一个支持缓存和负载均衡的网络代理所需要了解的,这一点我们在下一节会看到。
编写一个网络代理
假设我们正在运行不同服务的单独实例,我们想要添加一个代理,根据请求的 URL 路径将流量转发到相关服务。这样做的好处是,我们只需要提供一个 URL,并在后端扩展我们的服务,而用户不需要分别记住不同服务的 URL。在正常情况下,服务会在不同的节点上运行,每个服务可以有多个实例在运行。假设在这个例子中,我们正在运行下面的服务,我们希望根据 URI 将流量分配给它们。
Pipy 的脚本是用 JavaScript 编写的,你可以用任何文本编辑器来编辑它们。另外,如果你在本地安装了 Pipy,就可以使用 Pipy 提供的 Web 端管理 UI,它提供了语法高亮、自动完成、提示等特性,你甚至可以运行脚本,所有这些都在同一个控制台上。
好了,让我们启动一个 Pipy 实例,不需要任何参数,这样,Pipy 管理控制台将在 6060 端口启动。现在,打开你喜欢的 Web 浏览器,导航到 http://localhost:6060,就会看到 Pipy 内置的 Web 端管理 UI(如图 1)。
创建一个 Pipy 程序
将代码和配置分开是一种很好的设计实践。Pipy 通过 插件(你可以把它想成是 JavaScript 模块)来支持这种模块化设计。也就是说,我们将把配置数据存储在 config 文件夹下,把编码逻辑存储在 plugins 文件夹下不同的文件中。主代理服务器脚本将存储在根目录下,主代理脚本(proxy.js)将包含并组合这些单独的模块所定义的功能。一旦我们完成了下述步骤,最终的文件夹结构将是下面这个样子:
├── config
│ ├── balancer.json
│ ├── proxy.json
│ └── router.json
├── plugins
│ ├── balancer.js
│ ├── default.js
│ └── router.js
└── proxy.js
让我们开始吧:
{
"listen": 8000,
"plugins": [
"plugins/router.js",
"plugins/balancer.js",
"plugins/default.js"
]
}
重复步骤 2 和 3,创建另一个文件/config/router.json,它将存储路由信息,配置数据如下:
{
"routes": {
"/hi/*": "service-hi",
"/echo": "service-echo",
"/ip/*": "service-tell-ip"
}
}
重复步骤 2 和 3,创建另一个文件/config/balancer.json,它将存储服务到目标的映射信息,内容如下:
{
"services": {
"service-hi" : ["127.0.0.1:8080", "127.0.0.1:8082"],
"service-echo" : ["127.0.0.1:8081"],
"service-tell-ip" : ["127.0.0.1:8082"]
}
}
现在,我们编写第一个 Pipy 脚本,当我们收到一个没有配置任何目标(端点 /url)的请求时,它将被用作默认的后备选项。重复上述步骤,创建文件/plugins/default.js。使用 default 作为文件名只是一个习惯做法,并不是 Pipy 的要求,你可以选择任何你喜欢的名字。该脚本将包含如下代码,返回 HTTP 状态代码 404,信息为 No handler found:
pipy()
.pipeline('request')
.replaceMessage(
new Message({ status: 404 }, 'No handler found')
)
创建/plugins/router.js文件,存储路由逻辑:
(config =>
pipy({
_router: new algo.URLRouter(config.routes),
})
.export('router', {
__serviceID: '',
})
.pipeline('request')
.handleMessageStart(
msg => (
__serviceID = _router.find(
msg.head.headers.host,
msg.head.path,
)
)
)
)(JSON.decode(pipy.load('config/router.json')))
创建文件/plugins/balancer.js,存储了我们的负载均衡逻辑。顺便说明一下,Pipy 提供了多种负载均衡算法,但简单起见,我们这里将使用 Round Robin 算法。
(config =>
pipy({
_services: (
Object.fromEntries(
Object.entries(config.services).map(
([k, v]) => [
k, new algo.RoundRobinLoadBalancer(v)
]
)
)
),
_balancer: null,
_balancerCache: null,
_target: '',
})
.import({
__turnDown: 'proxy',
__serviceID: 'router',
})
.pipeline('session')
.handleStreamStart(
() => (
_balancerCache = new algo.Cache(
// k is a balancer, v is a target
(k ) => k.select(),
(k,v) => k.deselect(v),
)
)
)
.handleStreamEnd(
() => (
_balancerCache.clear()
)
)
.pipeline('request')
.handleMessageStart(
() => (
_balancer = _services[__serviceID],
_balancer && (_target = _balancerCache.get(_balancer)),
_target && (__turnDown = true)
)
)
.link(
'forward', () => Boolean(_target),
''
)
.pipeline('forward')
.muxHTTP(
'connection',
() => _target
)
.pipeline('connection')
.connect(
() => _target
)
)(JSON.decode(pipy.load('config/balancer.json')))
现在,我们来编写入口点或代理服务器脚本,它会使用上述插件。创建一个新的代码库(步骤 1),这个过程会创建一个默认的main.js文件作为入口点。我们可以用它作为我们的主入口点,或者如果你希望换个名字,可以随时删除main.js
,然后用你选的名字新建一个文件。让我们删除它并新建一个名为/proxy.js的文件。务必点下顶部的旗标,将其设置为主入口点,这可以确保在你点击运行按钮(右侧的箭头图标)时开始执行脚本:
(config =>
pipy()
.export('proxy', {
__turnDown: false,
})
.listen(config.listen)
.use(config.plugins, 'session')
.demuxHTTP('request')
.pipeline('request')
.use(
config.plugins,
'request',
'response',
() => __turnDown
)
)(JSON.decode(pipy.load('config/proxy.json')))
如果你已经按照上面的步骤进行了操作,就可以看到类似于以下截图的东西:
现在,我们点击 播放 图标按钮(右起第四个)来运行我们的脚本。如果脚本没有任何错误,我们将看到 Pipy 运行我们的代理脚本,输出类似下面这样:
这表明我们的代理服务器正在监听 8000 端口(这是在/config/proxy.json中配置的)。我们用 curl 来运行一个测试:
$ curl -i http://localhost:8000
HTTP/1.1 404 Not Found
content-length: 10
connection: keep-alive
No handler found
这没问题,因为我们没有为 root 配置任何目标。让我们试下配置过的路由,如 /hi:
$ curl -i http://localhost:8000/hi
HTTP/1.1 502 Connection Refused
content-length: 0
connection: keep-alive
我们看到了 502 Connection Refused 这个消息,因为我们没有在配置的目标端口上运行服务。
你可以更新/config/balancer.json,加入你已经运行的服务的主机、端口等细节,以匹配你的实际情况,或者我们在 Pipy 中编写一个脚本,监听我们配置的端口,并返回简单的消息。
将以下代码片段保存到你本地计算机上的一个文件中,命名为mock-proxy.js,并记住文件的存储位置。
pipy()
.listen(8080)
.serveHTTP(
new Message('Hi, there!\n')
)
.listen(8081)
.serveHTTP(
msg => new Message(msg.body)
)
.listen(8082)
.serveHTTP(
msg => new Message(
`You are requesting ${msg.head.path} from ${__inbound.remoteAddress}\n`
)
)
打开一个新的终端窗口,通过 Pipy 运行这个脚本(其中/path/to是存储该脚本文件的位置):
$ pipy /path/to/mock-proxy.js
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] Module /mock-proxy.js
2022-01-11 18:56:31 [INF] [config] ================
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8080]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8081]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [config] [Listen on :::8082]
2022-01-11 18:56:31 [INF] [config] ----->|
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] serveHTTP
2022-01-11 18:56:31 [INF] [config] |
2022-01-11 18:56:31 [INF] [config] <-----|
2022-01-11 18:56:31 [INF] [config]
2022-01-11 18:56:31 [INF] [listener] Listening on port 8080 at ::
2022-01-11 18:56:31 [INF] [listener] Listening on port 8081 at ::
2022-01-11 18:56:31 [INF] [listener] Listening on port 8082 at ::
现在,我们已经模拟了监听 8080、8081 和 8082 端口的服务。让我们在代理服务器上再做一次测试,你会看到,模拟服务返回了正确的响应。
小 结
我们使用了 Pipy 的许多特性,包括变量声明、导入 / 导出变量、插件、管道、子管道、过滤器链、handleMessageStart、handleStreamStart和link等 Pipy 过滤器,以及JSON、algo.URLRouter、algo.RoundRobinLoadBalancer和algo.Cache等 Pipy 类。彻底解释所有这些概念超出了本文的范围,如果你希望了解更多信息,请阅读 Pipy 的文档。你可以通过 Pipy 的 Web 端管理 UI 查看这些文档,并按照入门教程一步步操作。
结 语
来自 Flomesh 的 Pipy 是一个开源、高性能、轻量级的网络流量处理器,适用于多种场景,包括边缘路由器、负载平衡 & 代理(正向 / 反向)、API 网关、静态 HTTP 服务器、服务网格挎斗等。Pipy 仍在积极开发之中,并由全职的提交者和贡献者维护,虽然仍是早期版本,但已有多个商业客户完成了测试并投入生产应用。它的创建者和维护者 Flomesh.cn 提供的商用解决方案就是以 Pipy 为核心。
这篇文章对 Pipy 做了一个非常简要的介绍和概述。GitHub 上提供了入门教程和文档,你也可以通过 Pipy 管理控制台的 Web UI 查看。社区非常欢迎大家为 Pipy 的发展做贡献,也欢迎大家在自己特定的场景下进行试用,或者提供反馈和意见。
作者简介:
Ali Naqvi 是一位拥有超过 20 年 IT 行业经验的专业人士。他非常热衷于开发以及为开源软件做贡献。他主要关注开发、软件架构、DevOps 等领域。他经常发表演讲,是当地社区 / 分会的活跃成员,致力于传播 OSS、DevOps 和 Agile 理念和知识。
原文链接:
https://www.infoq.com/articles/network-proxy-stream-processor-pipy/