首页
学习
活动
专区
工具
TVP
发布

StreamNative 宣布开源 Function Mesh: 简化云上的复杂流任务

StreamNative 郑重宣布开源 Function Mesh。Function Mesh 是为事件流应用程序构建的无服务框架,为在 Kubernetes 上运行的复杂事件流任务管理 Pulsar FunctionsPulsar I/O connector,增强应用程序的事件流功能。

Function Mesh 简介

Function Mesh 是一种 Kubernetes operator,助力用户在 Kubernetes 上原生使用 Pulsar Functionsconnectors,解锁 Kubernetes 的全部特性,包括部署、扩缩容、管理应用程序等。例如,Function Mesh 依赖 Kubernetes 的调度能力,确保 Functions 的故障恢复能力,并且可以在任意时间适当调度 Functions。

Function Mesh 采用无服务架构,用于管理 Pulsar Functions 和 connectors,简化了创建复杂流任务的流程。对于寻求云原生无服务流解决方案的用户而言,Function Mesh 是最佳选择。Function Mesh 的主要优势如下:

  • 便于管理多个 function 和 connector。
  • 充分使用 Kubernetes 调度器的全部功能,包括重平衡、重调度、容错等。
  • 完全解锁 Pulsar Functions 和 connectors 在云环境中的能力。
  • 支持在云上的多个消息系统中使用 Pulsar Functions,或与云环境中的现有工具进行集成(Function Mesh 与 Pulsar 相互独立使用 Pulsar Functions 和 connectors)。

Function Mesh 适用于常见的轻量化流使用场景(如 ETL 任务),但不适合作为流引擎单独使用。

为什么使用 Function Mesh

在 2.0 版本,Pulsar 引入 Pulsar Functions 和 Pulsar I/O connector。

Pulsar Functions 是 Apache Pulsar 原生支持的无服务事件流架构。Pulsar Functions 支持用户基于消息创建事件处理逻辑、简化搭建事件流应用程序的操作、为事件流引入无服务概念,从而避免部署单独的系统。Pulsar Functions 的常见使用场景包括 ETL 任务、实时聚合、微服务、响应式服务、事件路由等。

Pulsar I/O connector 通过现有 Pulsar Functions 传递数据。Pulsar I/O connector 包含两个组件:source connector(即 source) 和 sink connector(即 sink)。Source 将外部系统中的数据写入到 Pulsar;sink 则将 Pulsar 中的数据输出到外部系统。

Pulsar Functions 和 Pulsar I/O connector 简化搭建事件流应用程序的操作。Pulsar Functions 支持在 Kubernetes 上运行 function 和 connector,但现有实现仍有一些不足。

  1. Function 的元数据存储在 Pulsar 中,但 function 的运行状态由 Kubernetes 管理。这导致元数据和运行状态之间可能出现不一致的情况,用户管理 Pulsar Functions 变得困难。例如,当用户从 Kuberbetes 中删除运行 Pulsar Functions 的 StatefulSet 时,Pulsar 不会感知到该操作。
  2. 现有的实现使用 Pulsar topic 存储 function 元数据。如果存储 function 元数据的 topic 临时不可用,可能会造成 broker 故障循环。
  3. Functions 与特定 Pulsar 集群绑定,因此很难跨集群使用 function。
  4. 目前,在 Kubernetes 上部署 Pulsar Functions 并实现特定功能(如自动扩缩容)有一定难度。

越来越多的团队选择使用 Pulsar Functions 和 Pulsar I/O connector 搭建无服务事件流应用程序,实现复杂的事件流能力。如果不借助 Function Mesh 等平台,需要耗费大量人力来管理流任务中的多个 function 和 Pulsar I/O connector。

为了解决上述痛点问题,并使 Kubernetes 原生支持 Pulsar Functions 和 connector,简化搭建复杂事件流任务的操作,我们开发了 Function Mesh。

主要概念

Function Mesh 支持用户基于 Apache Pulsar 和其他流处理技术搭建事件流应用程序,其中的三个基本组件为流、function 和 connector。

流是不可更改、仅追加的分区序列,用于存储事件的历史操作。例如,可以通过流事件为金融交易建模,如“甲向乙发送 100 元”,然后“乙向丙发送 50 元”。流连接了 function 和 connector。Function Mesh 借助 Apache Pulsar topic 实现流。

Function

Pulsar Functions 是轻量级事件处理器,用于消费来自输入流的消息。Pulsar Functions 将用户提供的处理逻辑应用于接收到的消息,并将处理结果发送到其他流。Function Mesh 基于 Pulsar Functions 实现 function。

Connector

Connector 用于为流传输事件。Function Mesh 上有两类 connector:

  • source connector(即 source):将外部系统中的事件写入到流。
  • sink connector(即 sink):将流中的事件输出到外部系统。

Function Mesh 基于 Pulsar I/O connector 实现 connector。访问 StreamNative Hub 官网,查看全部可用的 Pulsar I/O connector。

FunctionMesh

FunctionMesh(即 Mesh)是多个流支撑的 function 和 connector 的集合(可以是有向无环图(Directed Acyclic Graph,DAG),也可以是循环图),可以实现强大的流处理逻辑。Function Mesh 支持同时创建、更新和终止多个 function 和 connector。同一个 Mesh 中, function 和 connector 的生命周期相同。Function Mesh 支持长期运行 function 和 connector,并且可以根据工作负载自动扩缩容 function 和 connector。

Function Mesh 工作原理

Function Mesh API 基于现有 Kubernetes API 实现,因此 Function Mesh 资源与其他 Kubernetes 原生资源兼容,集群管理员可以使用现有 Kubernetes 工具管理 Function Mesh 资源。Function Mesh 采用 Kubernetes Custom Resource Definition(CRD),集群管理员可以通过 CRD 自定义资源,开发事件流应用程序。

用户可以使用 kubectl CLI 工具将 CRD 直接提交到 Kubernetes 集群,无需使用 pulsar-admin CLI 工具向 Pulsar 集群发送 function 请求。Function Mesh 控制器监测 CRD 并创建 Kubernetes 资源,运行自定义的 function、source、sink 或 Mesh。这种方法的优势在于 Kubernetes 直接存储并管理 function 元数据和运行状态,从而避免 Pulsar 现有方案中可能存在的元数据与运行状态不一致问题。

下图为 Function Mesh 使用流程示意图。

Function Mesh 架构

Function Mesh 主要由 Kubernetes operator 和 Function Runner 两个组件组成。Kubernetes operator 监测 Function Mesh CRD、创建 Kubernetes 资源(即 StatefulSet),从而在 Kubernetes 上运行 function、connector 和 Mesh。Function Runner 负责调用 function 和 connector 逻辑,处理从输入流中接收的事件,并将处理结果发送到输出流。目前,Function Runner 基于 Pulsar Functions runner 实现。

下图为 Function Mesh 的整体架构。当用户创建 Function Mesh CRD 时,Function Mesh 控制器从 Kubernetes API 服务器接收已提交的 CRD,然后处理 CRD 并生成相应的 Kubernetes 资源。例如,Function Mesh 控制器在处理 Function CRD 时,会创建 StatefulSet,其上的每个 pod 都会启动一个 Runner 来调用对应的 function。

如何使用 Function Mesh

如需使用 Function Mesh,首先需要在 Kubernetes 集群中安装 Function Mesh operator 和 CRD。安装 Function Mesh 的详细信息,参阅安装指南

安装 Function Mesh operator 并部署 Pulsar 集群后,用户需要打包 function 和 connector,并为 function、connector 和 Mesh 定义 CRD,再使用以下命令将 CRD 提交到 Kubernetes 集群。

$ kubectl apply -f /path/to/custom-crd.yaml 

当 Kubernetes 集群接收到 CRD 后,Function Mesh operator 逐一调度这些 CRD,并将 function 与其他必须的资源对象作为 StatefulSet 运行。

下面我们将举例解释如何运行 function、connector 和 Mesh。

如何使用 Function Mesh 运行 function

Function Mesh 不会影响在云上运行的 Pulsar Functions 的开发流程,但提交 function 时应使用 yaml 文件而非 pulsar-admin CLI 工具。在后台,我们开发了用于 Pulsar Functions 的 CRD 资源和相应的控制器。

开发并测试 function 后,用户需要打包 function 并将其提交到 Pulsar 集群;或将其构建为 Docker 镜像并上传到 image registry,再提交 function CRD 到 Pulsar 集群。详情参阅使用 Function Mesh 运行 Pulsar Functions

本示例使用 Function CRD 在 Kubernetes 集群启动 ExclamationFunction,同时启用自动扩缩容,并使用 Java 运行时与 Pulsar 消息系统进行交互。

apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
  name: function-sample
  namespace: default
spec:
  className: org.apache.pulsar.functions.api.examples.ExclamationFunction
  replicas: 1
  maxReplicas: 5
  image: streamnative/function-mesh-example:latest
  logTopic: persistent://public/default/logging-function-logs
  input:
    topics:
    - persistent://public/default/source-topic
    typeClassName: java.lang.String
  output:
    topic: persistent://public/default/sink-topic
    typeClassName: java.lang.String
  resources:
    requests:
      cpu: "0.1"
      memory: 1G
    limits:
      cpu: "0.2"
      memory: 1.1G
  pulsar:
    pulsarConfig: "test-pulsar"
  java:
    jar:  "/pulsar/examples/api-examples.jar"

如何使用 Function Mesh 运行 connector

Source 和 sink 是特定的 function。StreamNative 为 Pulsar 内置和 StreamNative 托管的 connector 提供 Docker 镜像。如果想要创建 Pulsar 内置或 StreamNative 托管的 connector,可以在 CRD 中指定 source 或 sink 的 Docker 镜像。用户可以在 Docker Hub 中找到相关的 Docker 镜像,名称格式为 streamnative/pulsar-io-CONNECTOR-NAME:TAG,如 streamnative/pulsar-io-hbase:2.7.1。访问 StreamNative Hub 官网即可查看 Function Mesh 支持的 connector。

如果使用自定义 connector,则可以将其打包并上传到 Pulsar Package Service 上,或构建成 Docker 镜像再通过 CRD 提交。更多详细信息,参阅使用 Function Mesh 运行 Pulsar connector

在以下 source CRD yaml 示例文件中,connector 接收来自 DebeziumMongoDB 的数据。

DebeziumMongoDB connector yaml 文件:

apiVersion: compute.functionmesh.io/v1alpha1
kind: Source
metadata:
  name: source-sample
spec:
  image: streamnative/pulsar-io-debezium-mongodb:2.7.1
  className: org.apache.pulsar.io.debezium.mongodb.DebeziumMongoDbSource
  replicas: 1
  output:
    topic: persistent://public/default/destination
    typeClassName: org.apache.pulsar.common.schema.KeyValue
  sourceConfig:
    mongodb.hosts: rs0/mongo-dbz-0.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-1.mongo.default.svc.cluster.local:27017,rs0/mongo-dbz-2.mongo.default.svc.cluster.local:27017
    mongodb.name: dbserver1
    mongodb.user: debezium
    mongodb.password: dbz
    mongodb.task.id: "1"
    database.whitelist: inventory
    pulsar.service.url: pulsar://test-pulsar-broker.default.svc.cluster.local:6650
  pulsar:
    pulsarConfig: "test-source"
  java:
    jar: connectors/pulsar-io-debezium-mongodb-2.7.1.nar
    jarLocation: "" # use pulsar provided connectors

在以下 sink CRD yaml 示例文件中,connector 将数据发送到 ElasticSearch

ElasticSearch connector yaml 文件:

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: sink-sample
spec:
  image: streamnative/pulsar-io-elastic-search:2.7.1
  className: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
  replicas: 1
  input:
    topics:
    - persistent://public/default/input
    typeClassName: "[B"
  sinkConfig:
    elasticSearchUrl: "http://quickstart-es-http.default.svc.cluster.local:9200"
    indexName: "my_index"
    typeName: "doc"
    username: "elastic"
    password: "X2Mq33FMWMnqlhvw598Z8562"
  pulsar:
    pulsarConfig: "test-sink"
  java:
    jar: connectors/pulsar-io-elastic-search-2.7.1.nar
    jarLocation: "" # use pulsar provided connectors

如何在 Kubernetes 上运行 Function Mesh

FunctionMesh CRD 允许用户定义一系列包含 function、source 和 sink 的资源,它们通过 topics 字段连接起来,形成一个完整的数据流处理任务。当用户提交自定义的 FunctionMesh CRD 到 Kubernetes 集群后,FunctionMesh 控制器会协调 FunctionMesh CRD 中定义的多个 function、source 和 sink 资源,并将这些资源分配给相应的控制器,function、source 和 sink 控制器则协调各个任务并启动相应的 StatefulSet。FunctionMesh 控制器从系统收集每个 StatefulSet 的状态,并整合为 FunctionMesh 的状态。

以下示例中的 FunctionMesh 任务启动了两个 function,并通过这两个 function 流式传输输入,追加感叹号。

apiVersion: compute.functionmesh.io/v1alpha1
kind: FunctionMesh
metadata:
  name: mesh-sample
spec:
  functions:
    - name: ex1
      className: org.apache.pulsar.functions.api.examples.ExclamationFunction
      replicas: 1
      maxReplicas: 5
      input:
        topics:
          - persistent://public/default/source-topic
        typeClassName: java.lang.String
      output:
        topic: persistent://public/default/mid-topic
        typeClassName: java.lang.String
      pulsar:
        pulsarConfig: "mesh-test-pulsar"
      java:
        jar: pulsar-functions-api-examples.jar
        jarLocation: public/default/test
   - name: ex2
      className: org.apache.pulsar.functions.api.examples.ExclamationFunction
      replicas: 1
      maxReplicas: 3
      input:
        topics:
          - persistent://public/default/mid-topic
        typeClassName: java.lang.String
      output:
        topic: persistent://public/default/sink-topic
        typeClassName: java.lang.String
      pulsar:
        pulsarConfig: "mesh-test-pulsar"
      java:
        jar: pulsar-functions-api-examples.jar
        jarLocation: public/default/test

第一个 function 的 output topic 和第二个 function 的 input topic 是同一个 topic。因此,当第一个 function 将结果发布到其 output topic 中时,第二个 function 可以从该 topic 读取数据。

使用 pulsar-admin CLI 工具

如果使用 Function Mesh operator 但不想改变创建、提交 function 的方式,则可以选择 Function Mesh worker 服务。此服务类似于 Pulsar Functions worker 服务,使用 Function Mesh 调度并运行 function,并且支持用户使用 pulsar-admin CLI 工具管理 Function Mesh 中的 Pulsar Functions 和 connector。下图展示了 Function Mesh worker 服务如何与 Pulsar proxy 协同合作,以及如何转换并转发请求到 Kubernetes 集群。

详情参阅使用 pulsar-admin CLI 工具

迁移 Pulsar Functions 到 Function Mesh

如果用户使用现有 Kubernetes 运行时运行 Pulsar Functions,并且想将这些 function 迁移到 Function Mesh,则可以通过迁移工具为现有 function 生成 CRD 列表。然后,Function Mesh 通过这些 CRD 接管 Kubernetes 中 Pulsar Functions 的管理和运行。更多详细信息,参阅迁移 Pulsar Functions 指南

支持的特性

目前,Function Mesh 具有以下特性:

  • 确保 Kubernetes 原生支持 Pulsar Functions 和 connector。
  • 连接 Pulsar Functions 和 connector,组成流任务。
  • 与原 Pulsar Admin API 兼容,支持使用 pulsar-admin CLI 工具提交 function 和 connector。
  • 使用 Pod 水平自动扩缩(Horizontal Pod Autoscaler,HPA),按需自动扩缩 function 和 connector。
  • 身份验证与授权。
  • Schema 和 SerDe。
  • 支持 Java、Python、Golang 等语言的运行时。

未来规划

新版本将会支持以下功能,如果您有任何建议或想为 Function Mesh 做出贡献,欢迎随时与我们联系。

  • 提高 Function Mesh operator 的功能级别。
  • 与 Pulsar Functions 功能匹敌。例如,实现有状态的 function。
  • 支持基于已有 function 运行时的其他运行时,如 web-assembly。
  • 改进管理和检测 Function Mesh 的工具/前端。
  • 为 function 单元分组,降低延迟,减少成本。
  • 支持基于 Pulsar metrics 的高级自动扩缩容。
  • 将 function registry 与 Apache Pulsar Packages 集成。

开始试用 Function Mesh

Function Mesh 已经开源,现在就在 Kubernetes 集群上试用吧!

更多关于 Function Mesh 的信息,参阅 Function Mesh 文档或点击观看操作演示

关于 Function Mesh 的任何反馈或建议,可以通过电子邮件或在 GitHub 仓库中创建 issue 联系我们。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/TK1BciEasI0zaAo9MdPo
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券