AMQP 连接器

最近更新时间:2023-06-08 14:48:02

我的收藏

简介

AMQP 连接器用于连接 RabbitMQ 集群和腾讯云数据连接器集群,可消费 RAbbitMQ 集群的消息并作为集成流的 trigger,也可将集成流的消息生产发布到 RabbitMQ 集群中。
AMQP 连接器遵从 AMQP 0.9.1 协议。AMQP 协议基本概念如下:

img


Producer:生产者,负责向 RabbitMQ 投递消息(实际是向交换机投递而非消息队列)。
RoutingKey:路由键,生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。
Consumer:消费者,负责从 RabbitMQ 的消息队列中接收消息。
Exchange:交换机。生产者在向 RabbitMQ 投递消息时,并不能直接将消息投递到消息队列中。实际上,生产者是将消息投递到生产者所指定交换机上,不同类型的交换机具有其各自的路由规则,交换机根据一定的路由规则(使用消息所携带的 RoutingKey 和绑定在该交换机上的消息队列的 BindingKey),再将消息转发给相应的消息队列上。
Queue : 消息队列,用于接收、储存交换机转发的消息,消费者通过其接收、消费消息。
BindingKey : 绑定键,在消息队列绑定到交换机的过程中,可以指定 BindingKey 参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。

连接器配置

连接配置参数
通用配置
消费者配置
生产者配置
服务质量配置
参数
数据类型
描述
是否必填
默认值
集群地址
string
rabbitMQ 集群地址
-
集群端口
int
rabbitMQ 集群端口号
-
虚拟集群地址
string
虚拟集群地址
/
用户名
string
rabbitMQ 集群用户名
-
密码
string
rabbitMQ 集群密码
-
使能 TLS 安全传输协议
bool
是否使用 TLS 加密和 rabbitMQ 集群间的连接
false
TLS 客户端证书
file
可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置
-
TLS 客户端 Key
file
可选,使用提供的证书对连接进行加密,需和客户端证书同时提供,仅当使 TLS 安全传输协议设置为 True 才可配置
-
TLS 服务端证书
file
可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置
-




参数
数据类型
描述
是否必填
默认值
消息类型
enum
消息类型,默认为 text/plain
text/plain
消息编码格式
enum
消息编码格式
UTF-8



参数
数据类型
描述
是否必填
默认值
应答模式
enum
消息消费后的应答模式分为两种:

消费后直到触发的流成功结束后才确认 offset
消费后立即确认 offset
流运行成功后确认
消费模式
enum
消费模式,可选:

拉模式
推模式
拉模式
NoLocal
bool
设置为 true,表示不能将同一个 Connection 中生产者发送的消息传递给该 Connection 中的消费者
false
Exclusive
bool
设置为是否排他
false
消费者数量
int
设置消费者数量
1



参数
数据类型
描述
是否必填
默认值
投递模式
int
生产消息时的投递模式(持久化/不持久化)
持久化



参数
数据类型
描述
是否必填
默认值
预取大小(字节)
int
预取大小
0
预取数量
int
预取数量
0




操作配置

AMQP 连接器包含 Consumer 和 Publish 两种操作。

Consumer 操作

输入参数

参数
数据类型
说明
是否必填
默认值
队列名称
string
消费的队列名称
-
消费者标识
string
消费者标识
-
NoWait
bool
为 true 时,client 端将不会等待 server 的 response,server 也不会 response;若 server 无法完成消费请求,则会抛出 channel 或 connection异常。
true




输出

Consumer 操作执行成功后,输出结果会保存在 message 消息体的 payload;执行失败后,错误信息会保存在 Message 消息体的 error。
组件输出的 message 信息如下:
message 属性
payload
执行成功后,payload 为 byte 数组
error
执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息
attribute
执行成功后,attribute 为 dict,属性列表见下表
variable
继承上个组件的 variable 信息
atrrbites 属性列表:
atrrbites 属性
数据类型
headers
map[string]interface{}
deliveryMode
uint8,non-persistent (1) or persistent (2)
priority
uint8,0 to 9
correlationId
string
replyTo
string
expiration
string
messageId
string
timestamp
time.Time
type
string
userId
string
appId
string
例如:执行成功后,message payload 值为 byte 数组,message attribute 值如下:
{
"headers": {},
"deliveryMode": 1,
"priority": 4,
"correlationId": "someId",
"replyTo": "some_replyto",
"expiration": "messageId",
"timestamp": "2021-04-25T15:30:30Z08:00",
"type": "",
"userId": "",
"appId": ""
}
执行失败后,message error 值如下:
{
"Code": "CORE:RUNTIME",
"Description": "some error message."
}

案例

1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。
image-20210426154407837


2. 新建流,将 AMQP Consumer 作为 trigger 节点,配置待消费的队列名称。
image-20210426154524045


3. 添加其他业务逻辑,处理消费到的数据。例如:发布到 kafka。


4. 若参数配置正确,流发布后,即可将 rabbitMQ 的消息消费并发布到 kafka 队列中。

Publish 操作

输入参数

通用配置
Exchange 配置
消息配置
参数
数据类型
说明
是否必填
Exchange 名称
string
AMQP 模型的 Exchange 名称
Exchange 类型
enum
AMQP 模型的 Exchange 类型
路由键
string
生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中
上下文 ID
string
上下文 ID



参数
数据类型
说明
是否必填
默认值
不等待服务端确认投递结果(NoWait)
bool
设置为 True 时,客户端不等待服务端确认投递结果
false
是否持久化
bool
设置是否持久化
true
是否自动删除
bool
设置是否自动删除
false

image-20210426163635595


参数
数据类型
说明
是否必填
默认值
优先级
int
设置消息优先级
0
消息标识 ID
string
设置消息标识 ID
-
应答队列名称
string
设置应答队列名称
-
消息存活时间(毫秒)
int
设置消息存活时间
1800000
生产者用户 ID
string
生产者用户 ID
-
生产者应用 ID
string
生产者应用 ID
-
消息类型
string
消息的 MIME 类型
-
消息头
map
消息头,字典类型,可添加用户自定义 header
-
消息体
string
消息体,投递的具体消息内容
-




输出

Publish 操作执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性
payload
继承上个组件的 payload 信息
error
执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息
attribute
继承上个组件的 attribute 信息
variable
继承上个组件的 variable 信息
执行失败后,message error 值如下:
{
"Code": "CORE:RUNTIME",
"Description": "some error message."
}

案例

1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。
image-20210426154407837


2. 新建流,添加 trigger,例如:http_listener。
image-20210426160900282


3. 在流中增加 amqp publish 操作,填写消息属性及消息内容。
image-20210426160829456


image-20210426160947755


4. 发布流,触发流后,若连接参数及消息参数配置正确,消息将成功投递到所配置的 rabbitMQ 集群的对应队列中。