简介
AMQP 连接器用于连接 RabbitMQ 集群和腾讯云数据连接器集群,可消费 RAbbitMQ 集群的消息并作为集成流的 trigger,也可将集成流的消息生产发布到 RabbitMQ 集群中。
AMQP 连接器遵从 AMQP 0.9.1 协议。AMQP 协议基本概念如下:

 

Producer:生产者,负责向 RabbitMQ 投递消息(实际是向交换机投递而非消息队列)。
RoutingKey:路由键,生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。
Consumer:消费者,负责从 RabbitMQ 的消息队列中接收消息。
Exchange:交换机。生产者在向 RabbitMQ 投递消息时,并不能直接将消息投递到消息队列中。实际上,生产者是将消息投递到生产者所指定交换机上,不同类型的交换机具有其各自的路由规则,交换机根据一定的路由规则(使用消息所携带的 RoutingKey 和绑定在该交换机上的消息队列的 BindingKey),再将消息转发给相应的消息队列上。
Queue : 消息队列,用于接收、储存交换机转发的消息,消费者通过其接收、消费消息。
BindingKey : 绑定键,在消息队列绑定到交换机的过程中,可以指定 BindingKey 参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。
连接器配置
参数  | 数据类型  | 描述  | 是否必填  | 默认值  | 
集群地址  | string  | rabbitMQ 集群地址  | 是  | -  | 
集群端口  | int  | rabbitMQ 集群端口号  | 是  | -  | 
虚拟集群地址  | string  | 虚拟集群地址  | 否  | /  | 
用户名  | string  | rabbitMQ 集群用户名  | 否  | -  | 
密码  | string  | rabbitMQ 集群密码  | 否  | -  | 
使能 TLS 安全传输协议  | bool  | 是否使用 TLS 加密和 rabbitMQ 集群间的连接  | 否  | false  | 
TLS 客户端证书  | file  | 可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置  | 否  | -  | 
TLS 客户端 Key  | file  | 可选,使用提供的证书对连接进行加密,需和客户端证书同时提供,仅当使 TLS 安全传输协议设置为 True 才可配置  | 否  | -  | 
TLS 服务端证书  | file  | 可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置  | 否  | - ![]()  | 
参数  | 数据类型  | 描述  | 是否必填  | 默认值  | 
消息类型  | enum  | 消息类型,默认为 text/plain  | 否  | text/plain  | 
消息编码格式  | enum  | 消息编码格式  | 否  | UTF-8  | 

 

参数  | 数据类型  | 描述  | 是否必填  | 默认值  | 
应答模式  | enum  | 消息消费后的应答模式分为两种:    消费后直到触发的流成功结束后才确认 offset    消费后立即确认 offset  | 否  | 流运行成功后确认  | 
消费模式  | enum  | 消费模式,可选:   拉模式  推模式  | 否  | 拉模式  | 
NoLocal  | bool  | 设置为 true,表示不能将同一个 Connection 中生产者发送的消息传递给该 Connection 中的消费者  | 否  | false  | 
Exclusive  | bool  | 设置为是否排他  | 否  | false  | 
消费者数量  | int  | 设置消费者数量  | 否  | 1  | 

 

参数  | 数据类型  | 描述  | 是否必填  | 默认值  | 
投递模式  | int  | 生产消息时的投递模式(持久化/不持久化)  | 否  | 持久化  | 

 

参数  | 数据类型  | 描述  | 是否必填  | 默认值  | 
预取大小(字节)  | int  | 预取大小  | 否  | 0  | 
预取数量  | int  | 预取数量  | 否  | 0  | 

 

操作配置
AMQP 连接器包含 Consumer 和 Publish 两种操作。
Consumer 操作
输入参数
参数  | 数据类型  | 说明  | 是否必填  | 默认值  | 
队列名称  | string  | 消费的队列名称  | 是  | -  | 
消费者标识  | string  | 消费者标识  | 否  | -  | 
NoWait  | bool  | 为 true 时,client 端将不会等待 server 的 response,server 也不会 response;若 server 无法完成消费请求,则会抛出 channel 或 connection异常。  | 否  | true  | 

 

输出
Consumer 操作执行成功后,输出结果会保存在 message 消息体的 payload;执行失败后,错误信息会保存在 Message 消息体的 error。
组件输出的 message 信息如下:
message 属性  | 值  | 
payload  | 执行成功后,payload 为 byte 数组  | 
error  | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息  | 
attribute  | 执行成功后,attribute 为 dict,属性列表见下表  | 
variable  | 继承上个组件的 variable 信息  | 
atrrbites 属性列表:
atrrbites 属性  | 数据类型  | 
headers  | map[string]interface{}  | 
deliveryMode  | uint8,non-persistent (1) or persistent (2)  | 
priority  | uint8,0 to 9  | 
correlationId  | string  | 
replyTo  | string  | 
expiration  | string  | 
messageId  | string  | 
timestamp  | time.Time  | 
type  | string  | 
userId  | string  | 
appId  | string  | 
例如:执行成功后,message payload 值为 byte 数组,message attribute 值如下:
{"headers": {},"deliveryMode": 1,"priority": 4,"correlationId": "someId","replyTo": "some_replyto","expiration": "messageId","timestamp": "2021-04-25T15:30:30Z08:00","type": "","userId": "","appId": ""}
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "some error message."}
案例
1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。

 

2. 新建流,将 AMQP Consumer 作为 trigger 节点,配置待消费的队列名称。

 

3. 添加其他业务逻辑,处理消费到的数据。例如:发布到 kafka。

 

4. 若参数配置正确,流发布后,即可将 rabbitMQ 的消息消费并发布到 kafka 队列中。
Publish 操作
输入参数
参数  | 数据类型  | 说明  | 是否必填  | 
Exchange 名称  | string  | AMQP 模型的 Exchange 名称  | 是  | 
Exchange 类型  | enum  | AMQP 模型的 Exchange 类型  | 是  | 
路由键  | string  | 生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中  | 否  | 
上下文 ID  | string  | 上下文 ID  | 否  | 

 

参数  | 数据类型  | 说明  | 是否必填  | 默认值  | 
不等待服务端确认投递结果(NoWait)  | bool  | 设置为 True 时,客户端不等待服务端确认投递结果  | 否  | false  | 
是否持久化  | bool  | 设置是否持久化  | 否  | true  | 
是否自动删除  | bool  | 设置是否自动删除  | 否  | false  | 

 

参数  | 数据类型  | 说明  | 是否必填  | 默认值  | 
优先级  | int  | 设置消息优先级  | 否  | 0  | 
消息标识 ID  | string  | 设置消息标识 ID  | 否  | -  | 
应答队列名称  | string  | 设置应答队列名称  | 否  | -  | 
消息存活时间(毫秒)  | int  | 设置消息存活时间  | 否  | 1800000  | 
生产者用户 ID  | string  | 生产者用户 ID  | 否  | -  | 
生产者应用 ID  | string  | 生产者应用 ID  | 否  | -  | 
消息类型  | string  | 消息的 MIME 类型  | 否  | -  | 
消息头  | map  | 消息头,字典类型,可添加用户自定义 header  | 否  | -  | 
消息体  | string  | 消息体,投递的具体消息内容  | 否  | -  | 

 

输出
Publish 操作执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性  | 值  | 
payload  | 继承上个组件的 payload 信息  | 
error  | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息  | 
attribute  | 继承上个组件的 attribute 信息  | 
variable  | 继承上个组件的 variable 信息  | 
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "some error message."}
案例
1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。

 

2. 新建流,添加 trigger,例如:http_listener。

 

3. 在流中增加 amqp publish 操作,填写消息属性及消息内容。

 
 


4. 发布流,触发流后,若连接参数及消息参数配置正确,消息将成功投递到所配置的 rabbitMQ 集群的对应队列中。
