简介
AMQP 连接器用于连接 RabbitMQ 集群和腾讯云数据连接器集群,可消费 RAbbitMQ 集群的消息并作为集成流的 trigger,也可将集成流的消息生产发布到 RabbitMQ 集群中。
AMQP 连接器遵从 AMQP 0.9.1 协议。AMQP 协议基本概念如下:
Producer:生产者,负责向 RabbitMQ 投递消息(实际是向交换机投递而非消息队列)。
RoutingKey:路由键,生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。
Consumer:消费者,负责从 RabbitMQ 的消息队列中接收消息。
Exchange:交换机。生产者在向 RabbitMQ 投递消息时,并不能直接将消息投递到消息队列中。实际上,生产者是将消息投递到生产者所指定交换机上,不同类型的交换机具有其各自的路由规则,交换机根据一定的路由规则(使用消息所携带的 RoutingKey 和绑定在该交换机上的消息队列的 BindingKey),再将消息转发给相应的消息队列上。
Queue : 消息队列,用于接收、储存交换机转发的消息,消费者通过其接收、消费消息。
BindingKey : 绑定键,在消息队列绑定到交换机的过程中,可以指定 BindingKey 参数,用于交换机根据一定的路由规则转发消息到相关消息队列中。
连接器配置
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
集群地址 | string | rabbitMQ 集群地址 | 是 | - |
集群端口 | int | rabbitMQ 集群端口号 | 是 | - |
虚拟集群地址 | string | 虚拟集群地址 | 否 | / |
用户名 | string | rabbitMQ 集群用户名 | 否 | - |
密码 | string | rabbitMQ 集群密码 | 否 | - |
使能 TLS 安全传输协议 | bool | 是否使用 TLS 加密和 rabbitMQ 集群间的连接 | 否 | false |
TLS 客户端证书 | file | 可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置 | 否 | - |
TLS 客户端 Key | file | 可选,使用提供的证书对连接进行加密,需和客户端证书同时提供,仅当使 TLS 安全传输协议设置为 True 才可配置 | 否 | - |
TLS 服务端证书 | file | 可选,使用提供的证书对连接进行加密,仅当使 TLS 安全传输协议设置为 True 才可配置 | 否 | - |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
消息类型 | enum | 消息类型,默认为 text/plain | 否 | text/plain |
消息编码格式 | enum | 消息编码格式 | 否 | UTF-8 |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
应答模式 | enum | 消息消费后的应答模式分为两种: 消费后直到触发的流成功结束后才确认 offset 消费后立即确认 offset | 否 | 流运行成功后确认 |
消费模式 | enum | 消费模式,可选: 拉模式 推模式 | 否 | 拉模式 |
NoLocal | bool | 设置为 true,表示不能将同一个 Connection 中生产者发送的消息传递给该 Connection 中的消费者 | 否 | false |
Exclusive | bool | 设置为是否排他 | 否 | false |
消费者数量 | int | 设置消费者数量 | 否 | 1 |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
投递模式 | int | 生产消息时的投递模式(持久化/不持久化) | 否 | 持久化 |
参数 | 数据类型 | 描述 | 是否必填 | 默认值 |
预取大小(字节) | int | 预取大小 | 否 | 0 |
预取数量 | int | 预取数量 | 否 | 0 |
操作配置
AMQP 连接器包含 Consumer 和 Publish 两种操作。
Consumer 操作
输入参数
参数 | 数据类型 | 说明 | 是否必填 | 默认值 |
队列名称 | string | 消费的队列名称 | 是 | - |
消费者标识 | string | 消费者标识 | 否 | - |
NoWait | bool | 为 true 时,client 端将不会等待 server 的 response,server 也不会 response;若 server 无法完成消费请求,则会抛出 channel 或 connection异常。 | 否 | true |
输出
Consumer 操作执行成功后,输出结果会保存在 message 消息体的 payload;执行失败后,错误信息会保存在 Message 消息体的 error。
组件输出的 message 信息如下:
message 属性 | 值 |
payload | 执行成功后,payload 为 byte 数组 |
error | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息 |
attribute | 执行成功后,attribute 为 dict,属性列表见下表 |
variable | 继承上个组件的 variable 信息 |
atrrbites 属性列表:
atrrbites 属性 | 数据类型 |
headers | map[string]interface{} |
deliveryMode | uint8,non-persistent (1) or persistent (2) |
priority | uint8,0 to 9 |
correlationId | string |
replyTo | string |
expiration | string |
messageId | string |
timestamp | time.Time |
type | string |
userId | string |
appId | string |
例如:执行成功后,message payload 值为 byte 数组,message attribute 值如下:
{"headers": {},"deliveryMode": 1,"priority": 4,"correlationId": "someId","replyTo": "some_replyto","expiration": "messageId","timestamp": "2021-04-25T15:30:30Z08:00","type": "","userId": "","appId": ""}
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "some error message."}
案例
1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。
2. 新建流,将 AMQP Consumer 作为 trigger 节点,配置待消费的队列名称。
3. 添加其他业务逻辑,处理消费到的数据。例如:发布到 kafka。
4. 若参数配置正确,流发布后,即可将 rabbitMQ 的消息消费并发布到 kafka 队列中。
Publish 操作
输入参数
参数 | 数据类型 | 说明 | 是否必填 |
Exchange 名称 | string | AMQP 模型的 Exchange 名称 | 是 |
Exchange 类型 | enum | AMQP 模型的 Exchange 类型 | 是 |
路由键 | string | 生产者投递消息时所指定的参数,用于交换机根据一定的路由规则转发消息到相关消息队列中 | 否 |
上下文 ID | string | 上下文 ID | 否 |
参数 | 数据类型 | 说明 | 是否必填 | 默认值 |
不等待服务端确认投递结果(NoWait) | bool | 设置为 True 时,客户端不等待服务端确认投递结果 | 否 | false |
是否持久化 | bool | 设置是否持久化 | 否 | true |
是否自动删除 | bool | 设置是否自动删除 | 否 | false |
参数 | 数据类型 | 说明 | 是否必填 | 默认值 |
优先级 | int | 设置消息优先级 | 否 | 0 |
消息标识 ID | string | 设置消息标识 ID | 否 | - |
应答队列名称 | string | 设置应答队列名称 | 否 | - |
消息存活时间(毫秒) | int | 设置消息存活时间 | 否 | 1800000 |
生产者用户 ID | string | 生产者用户 ID | 否 | - |
生产者应用 ID | string | 生产者应用 ID | 否 | - |
消息类型 | string | 消息的 MIME 类型 | 否 | - |
消息头 | map | 消息头,字典类型,可添加用户自定义 header | 否 | - |
消息体 | string | 消息体,投递的具体消息内容 | 否 | - |
输出
Publish 操作执行失败后,错误信息会保存在 message 消息体的 error。
组件输出的 message 信息如下:
message 属性 | 值 |
payload | 继承上个组件的 payload 信息 |
error | 执行成功后,error 为空;执行失败后,error 为 dict 类型,包含“Code”和“Description”字段:“Code”字段表示错误类型,“Description”字段表示错误具体信息 |
attribute | 继承上个组件的 attribute 信息 |
variable | 继承上个组件的 variable 信息 |
执行失败后,message error 值如下:
{"Code": "CORE:RUNTIME","Description": "some error message."}
案例
1. 按照 rabbitMQ 集群信息,填写 AMQP 连接器配置的连接配置,其他配置保持默认。
2. 新建流,添加 trigger,例如:http_listener。
3. 在流中增加 amqp publish 操作,填写消息属性及消息内容。
4. 发布流,触发流后,若连接参数及消息参数配置正确,消息将成功投递到所配置的 rabbitMQ 集群的对应队列中。