消息队列节点

最近更新时间:2026-03-26 17:04:22

我的收藏

节点功能

消息队列节点支持连接用户自己的消息队列,实现向消息队列中发送消息的功能,消息队列节点属于基础节点
说明:
消息队列节点仅实现消息的发送,订阅消息队列并消费消息仍需要用户自行开发服务实现。




操作说明

输入变量

输入变量仅在该节点内部生效,不能跨节点使用。点击添加,进行如下配置添加输入变量。
配置
说明
变量名称
该变量的名称,只能包含字母、数字或下划线,并且以字母或下划线开头,必填。
变量描述
该变量的说明信息,非必填。
数据来源
该变量的数据来源,支持“引用”“输入”两种选项。“引用”可选择前序所有节点的输出变量,“输入”可手动填入固定值,必填。
类型
该变量的数据类型,不可选择,默认为引用的变量类型或输入的 string 类型。

消息队列类型

消息队列节点支持对接 Kafka(详情请参见 Kafka介绍)、 RocketMQ(详情请参见 RocketMQ介绍)以及RabbitMQ(详情请参见RabbitMQ介绍)这三种常见的消息队列,每种消息队列的详细配置有区别。用户可以根据业务场景需求选择合适的消息队列。

连接设置

根据用户所选择的消息队列类型配置连接方式,本节点支持了 Kafka、RocketMQ、RabbitMQ 常见的连接配置信息。

Kafka 配置说明

配置
说明
Protocol 协议
客户端与 Kafka 集群之间网络通信的安全与认证基础协议。
PLAINTEXT:明文传输,无身份认证,适合在内网测试环境中使用。
SASL_PLAINTEXT:明文传输,但需要身份认证。
SASL_SSL:最高安全级别。既包含身份认证,又通过 SSL/TLS 对网络传输链路进行加密。适用于跨公网传输或对安全性要求极高的企业内网。
Mechanism 加密协议
当 Protocol 选择了带有 SASL 的选项时,此项用于指定具体的 SASL 身份验证机制。
PLAIN:最简单的文本密码验证机制。
SCRAM_SHA_256:一种挑战-响应机制,使用SHA-256 算法,安全性高于 PLAIN,可防止密码被网络抓包窃取。
SCRAM_SHA_512:与上述机制相同,使用更复杂的 SHA-512 算法,安全性最高。
消息队列 URL
IP 地址与端口或内部域名与端口,例如:192.168.1.10:9092。
主题
需要生产或消费的业务线 Topic 名称。
用户名
用于 SASL 身份验证的账号名称。
密码
用于 SASL 身份验证的账号密码。

RocketMQ 配置说明

配置
说明
版本
RocketMQ 服务端的版本号。
消息队列 URL
RocketMQ 集群的 NameServer 地址,例如:192.168.1.100:9876。
主题
需要生产或消费的业务线 Topic 名称。
用户名
用于身份验证的账号名称。
密码
用于身份验证的账号密码。
命名空间
用于在同一个 RocketMQ 物理集群内部实现逻辑上的资源隔离。在不同的命名空间下,可以存在同名的 Topic 和消费组(ConsumerGroup)而互不冲突。

RabbitMQ 配置说明

配置
说明
消息队列 URL
RabbitMQ 服务端的连接地址,需填写提供 RabbitMQ 服务的 IP 地址或内网域名,并附带端口号。
例如:192.168.1.100:5672。
虚拟主机
对应的虚拟主机名称。例如:/ad_system_dev。
交换机名称
服务端已经创建好的交换机名称。
交换机类型
交换机采用何种规则将消息路由到队列中。
Direct:精确匹配。消息的路由键必须与队列绑定交换机时指定的路由键完全一致。
Topic:模糊匹配。路由键支持通配符(* 代表一个词,# 代表零个或多个词),适合按规则分类的分发场景(如 order.created.*)。
Fanout:广播模式。忽略路由键,将消息发送给所有绑定到该交换机的队列。
Headers:根据消息头部(Headers)的键值对进行匹配。
路由键
消息的“标签”或“地址”。交换机会根据路由键和它自身的类型规则,决定把消息投递到哪个队列。
SSL加密
是否通过 SSL/TLS 协议对客户端与 RabbitMQ 服务端之间的网络通信进行加密。
用户名
访问 RabbitMQ 虚拟主机所需的鉴权账号。
密码
访问 RabbitMQ 虚拟主机所需的鉴权密码。

消息内容

消息内容即向消息队列中发送的信息,支持引用该节点的输入变量或手动输入消息内容。

输出变量

消息队列执行消息发送动作后的输出变量。包含消息队列的发送状态 SendStatus(表示消息是否发送成功,1 表示成功,0 表示失败)、错误信息 ErrorMsg、时间戳 Timestamp、消息大小 MessageSize,以及运行时报错信息 Error(数据类型为 object,正常运行时该字段为空)。




异常处理

可手动开启异常处理(异常处理默认关闭),支持超时触发处理、异常重试和异常处理方式的配置。配置内容见下表。
当前仅支持用户设置“超时触发处理”的时长,其他异常由平台自动识别。
配置
说明
超时触发处理
节点运行的最大时长,超过时触发异常处理。消息队列节点的超时时间默认值为60秒,超时时间设置范围为1-60秒。
最大重试次数
节点运行异常时重新运行的最大次数。重试超过设定次数,认为该节点调用失败,执行下方的异常处理方式,默认为3次。
重试时间间隔
每次重新运行的时间间隔,默认为1秒。
异常处理方式
支持“输出特定内容”、“执行异常流程”和“中断流程”三种。
异常情况的输出变量
选择异常处理方式为“输出特定内容”时,超过最大重试次数后节点返回的输出变量。



选择异常处理方式为“输出特定内容”时,发生异常后工作流不会中断,节点异常重试后直接返回用户在输出内容中设置的输出变量及变量值;
选择异常处理方式为“执行异常流程”时,发生异常后工作流不会中断,节点异常重试后执行用户自定义的异常处理流程;



选择异常处理方式为“中断流程”时,没有更多设置项,发生异常后工作流执行中断。

应用示例

批量分析社媒文章,针对文章打出相应标签。然后通过消息队列节点将社媒文章分析结果发送到消息队列,其他服务可以通过订阅该消息队列并消费其中的文章分析结果。示例工作流如下:




常见问题

1. 消息处理顺序重要吗?如何在工作流中控制?
对于审批、订单处理等有状态依赖的场景,消息顺序非常重要。在工作流中通过编排消息队列节点的前后顺序,以实现控制消息发送顺序。
2. 消息格式或内容是否有要求?
工作流消息队列节点提供了消息配置文本框,支持用户任意输入消息内容,对于消息格式没有强制要求。