前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dapr 入门教程之消息队列

Dapr 入门教程之消息队列

作者头像
我是阳明
发布2022-09-29 20:49:16
7080
发布2022-09-29 20:49:16
举报
文章被收录于专栏:k8s技术圈k8s技术圈

前面我们了解了 Dapr 对发布订阅的支持,本节我们将来介绍了 Dapr 中对消息队列的支持。消息队列,分为两种绑定,一种是输出绑定,一种是输入绑定。出和入是看数据的流向,输出绑定就是作为生产者的服务把消息通过 Dapr 传给消息队列,输入绑定就是作为消费者的服务通过 Dapr 从消息队列里得到消息。

这里的消息队列和发布订阅里的消息总线有什么区别呢?一个消息进入消息总线的话,所有订阅者都能得到这个消息,而一个消息进入消息队列的话,由消费者来取,一次只有一个人能得到。此外,消息总线是不要求处理顺序的,两个消息进入消息总线,谁先被拿到顺序是不一定的,而消息队列可以保证是先入先出的。

本节我们将创建两个微服务,一个具有输入绑定,另一个具有输出绑定,前面我们都使用的 Redis 这种中间件,这里我们将绑定到 Kafka。

  • Node.js 微服务使用输入绑定
  • Python 微服务利用输出绑定

绑定连接到 Kafka,允许我们将消息推送到 Kafka 实例(从 Python 微服务)中,并从该实例(从 Node.js 微服务)接收消息,而不必知道实例的位置。相反,同样只需要直接使用 Dapr API 通过 sidecars 连接即可。

本地运行

首先我们在本地来运行示例应用,对应的架构图如下所示:

Bindings 本地模式

同样使用 quickstarts 这个代码仓库:

代码语言:javascript
复制
git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git

由于我们这里是使用 Kafka 来做消息队列的中间件,所以我们首先需要在本地环境运行 Kafka,我们可以直接使用 https://github.com/wurstmeister/kafka-docker 这个项目以 Docker 方式运行。

定位到 quickstartstutorials/bindings 目录,下面有一个 docker-compose-single-kafka.yml 文件:

代码语言:javascript
复制
$ cd tutorials/bindings
$ cat docker-compose-single-kafka.yml
version: '2'
services:
  zookeeper:
    image: ghcr.io/dapr/3rdparty/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: ghcr.io/dapr/3rdparty/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "sample:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我们可以直接而使用 docker-compose 来启动一个单实例的 Kafka:

代码语言:javascript
复制
$ docker-compose -f ./docker-compose-single-kafka.yml up -d

隔一段时间镜像拉取完成后以容器方式启动 Kafka:

代码语言:javascript
复制
$ docker-compose -f ./docker-compose-single-kafka.yml ps
NAME                   COMMAND                  SERVICE             STATUS              PORTS
bindings-kafka-1       "start-kafka.sh"         kafka               running             0.0.0.0:9092->9092/tcp
bindings-zookeeper-1   "/bin/sh -c '/usr/sb…"   zookeeper           running             0.0.0.0:2181->2181/tcp

在本地运行了 Kafka 后,接着我们可以运行输入绑定的 Node.js 微服务:

代码语言:javascript
复制
$ cd nodeapp

同样先安装服务依赖:

代码语言:javascript
复制
$ npm install  # 或者执行 yarn 命令

然后我们就可以使用 dapr run 命令来启动该微服务了,启动方式我们应该比较熟悉了,如下所示:

代码语言:javascript
复制
$ dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --components-path ../components

上面的命令和前面有点不一样的地方是多了一个 --components-path 用来指定组件路径,这是因为现在我们要使用 Kafka 这种中间件来作为我们的消息队列组件,那么我们就需要告诉 Dapr,在 ./components 目录下面就包含一个对应的 kafka_bindings.yaml 文件,内容如下所示:

代码语言:javascript
复制
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: localhost:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

前面在本地模式下面我们没有主动声明组件,是因为我们使用的就是默认的 Redis,而 Kafka 并不是内置就有的,所以需要我们主动声明,注意上面组件的类型为 type: bindings.kafkametadata 下面是访问 Kafka 相关的元数据。正常情况下上面的启动命令会输出如下所示的日志信息:

代码语言:javascript
复制
ℹ️  Starting Dapr with id bindings-nodeapp. HTTP Port: 54215. gRPC Port: 54216
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 347.136ms  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️  Updating metadata for app command: node app.js
✅  You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placement tables updated, version: 0          app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4

接下来,需要运行输出绑定的 Python 微服务,定位到 pythonapp 目录,安装 requests 依赖:

代码语言:javascript
复制
$ cd pythonapp
$ pip3 install requests

然后同样用 dapr run 命令来启动该微服务,也要注意带上后面的 --components-path 参数:

代码语言:javascript
复制
$ dapr run --app-id bindings-pythonapp python3 app.py --components-path ../components
ℹ️  Starting Dapr with id bindings-pythonapp. HTTP Port: 54554. gRPC Port: 54555
ℹ️  Checking if Dapr sidecar is listening on HTTP port 54554
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-pythonapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
ℹ️  Checking if Dapr sidecar is listening on GRPC port 54555
ℹ️  Dapr sidecar is up and running.
ℹ️  Updating metadata for app command: python3 app.py
✅  You're up and running! Both Dapr and your app logs will appear here.

启动完成后,观察 Python 服务的日志,可以看到不断输出如下所示成功输出绑定到 Kafka 的日志:

代码语言:javascript
复制
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......

同样这个时候 Node.js 微服务中也不断有新的日志数据产生:

代码语言:javascript
复制
== APP == <Response [204]>
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......

这是因为 Python 微服务每隔 1s 就会向我们绑定的消息队列发送一条消息,而 Node.js 微服务作为消费者当然会接收到对应的消息数据。

在 Kubernetes 中运行

上面在本地环境下可以正常运行 Dapr bindings 服务,接下来我们再次将该示例部署到 Kubernetes 集群中来进行观察。

同样首先需要提供一个可用的 Kafka 实例,这里我们仍然使用 Helm Chart 方式来进行安装:

代码语言:javascript
复制
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

然后使用如下所示的命令来安装 Kafka:

代码语言:javascript
复制
$ helm upgrade --install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml --create-namespace

这里我们指定了一个无需持久化数据(仅供测试)的 values 文件 kafka-non-persistence.yaml,内容如下所示:

代码语言:javascript
复制
replicas: 1

# Disable persistent storage
persistence:
  enabled: false
zookeeper:
  persistence:
    enabled: false
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: kubernetes.io/os
                operator: In
                values:
                  - linux
              - key: kubernetes.io/arch
                operator: In
                values:
                  - amd64

autoCreateTopicsEnable: true

affinity:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/os
              operator: In
              values:
                - linux
            - key: kubernetes.io/arch
              operator: In
              values:
                - amd64

安装完成后可以查看 Pod 的状态来保证 Kafka 启动成功:

代码语言:javascript
复制
$ kubectl -n kafka get pods -w

NAME                     READY   STATUS    RESTARTS   AGE
dapr-kafka-0             1/1     Running   0          2m7s
dapr-kafka-zookeeper-0   1/1     Running   0          2m57s

接下来我们首先需要在 Kubernetes 集群中配置使用 Kafka 作为 Binding 消息中间件的 Component 组件:

代码语言:javascript
复制
# kafka_bindings.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: dapr-kafka.kafka:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

注意该对象上面指定的组件类型为 bindings.kafkametadata 下面的元信息包括 Kafka brokers 地址、生产者和消费者的配置等等,直接应用上面的资源清单即可:

代码语言:javascript
复制
$ kubectl apply -f kafka_bindings.yaml
$ kubectl get components sample-topic
NAME           AGE
sample-topic   13s

创建完成后在 Dapr Dashboard 中也可以看到对应的组件信息:

dapr dashboard components

接着部署两个 Node.js 和 Python 微服务即可:

代码语言:javascript
复制
$ kubectl apply -f deploy/node.yaml
service/bindings-nodeapp created
deployment.apps/bindings-nodeapp created
$ kubectl apply -f deploy/python.yaml
deployment.apps/bindings-pythonapp created
$ kubectl get pods
NAME                                  READY   STATUS    RESTARTS         AGE
bindings-nodeapp-8bcdd744d-pj2j7      2/2     Running   0                3m44s
bindings-pythonapp-7b7fcc579b-kqx6p   2/2     Running   0                3m39s

部署完成后可以同样分别观察 Node.js 和 Python 微服务的日志:

代码语言:javascript
复制
$ kubectl logs --selector app=bindingspythonapp -c python --tail=-1
{'data': {'orderId': 1}, 'operation': 'create'}
HTTPConnectionPool(host='localhost', port=3500): Max retries exceeded with url: /v1.0/bindings/sample-topic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e75181390>: Failed to establish a new connection: [Errno 111] Connection refused'))
{'data': {'orderId': 2}, 'operation': 'create'}
<Response [204]>
{'data': {'orderId': 3}, 'operation': 'create'}
<Response [204]>
# ......
$ kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
Node App listening on port 3000!
Hello from Kafka!
{ orderId: 2 }
Hello from Kafka!
{ orderId: 3 }
# ......

可以看到两个微服务的日志也服务我们的预期的。

如何工作

前面我们在本地或 Kubernetes 中都运行了示例应用,而且没有更改任何代码,应用结果都符合预期,接下来我们看看这是如何工作的。

在查看应用程序代码之前,我们先看看 Kafka 绑定组件的资源清单文件,它们为 Kafka 连接指定 brokers,为消费者指定 topicsconsumerGroup,为生产者指定了 publishTopic

我们创建了名为 sample-topic 的组件,然后我们通过该组件配置的 Kafka 中的 sample 主题来设置输入和输出绑定。

代码语言:javascript
复制
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: [kafka broker address]
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

现在我们先导航到 nodeapp 目录下面打开 app.js 文件,这是 Node.js 输入绑定示例应用的代码。这里使用 Express 暴露了一个 API 端点,需要注意的是 API 名称必须与在 Kafka 绑定组件中声明的组件名称相同,然后 Dapr 运行时将使用来自 sample 主题的事件,然后将 POST 请求与事件负载一起发送给 Node 应用程序。

代码语言:javascript
复制
const express = require("express");
const bodyParser = require("body-parser");
const port = process.env.APP_PORT ?? "3000";

require("isomorphic-fetch");

const app = express();
app.use(bodyParser.json());

// 这里的 api 端点需要与声明的组件名称相同
app.post("/sample-topic", (req, res) => {
  console.log("Hello from Kafka!");
  console.log(req.body);
  res.status(200).send();
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));

所以当 Kafka 中收到消息后就会打印类似如下所示的日志:

代码语言:javascript
复制
Hello from Kafka!
{ orderId: 3 }

然后我们导航到 pythonapp 目录下面打开 app.py 文件,这是输出绑定示例(生产者)应用程序的代码,该服务会每秒发送一次 POST 请求到 Dapr 的 http 端点的 http://localhost:3500/v1.0/bindings/<output_bindings_name>,并带有事件的 payload 数据。这个应用程序使用 bindings 组件名 sample-topic 作为 <output_bindings_name>,然后 Dapr 运行时将事件发送到上面的 Kafka 绑定组件中指定的 sample 主题上去。

代码语言:javascript
复制
import time
import requests
import os

dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)

dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
    n += 1
    payload = { "data": {"orderId": n}, "operation": "create" }
    print(payload, flush=True)
    try:
        response = requests.post(dapr_url, json=payload)
        print(response, flush=True)

    except Exception as e:
        print(e, flush=True)

    time.sleep(1)

上面代码中最重要的依然是 Dapr API 地址 dapr_url 的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port),注意我们依然是面向 localhost 编程,而 v1.0/bindings/<output_bindings_name> 端点则是 Dapr API 为我们封装的输出消息绑定的统一接口,非常简单方便。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-09-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 k8s技术圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本地运行
  • 在 Kubernetes 中运行
  • 如何工作
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档