有奖捉虫:行业应用 & 管理与支持文档专题 HOT
本节将通过几个典型的场景案例说明 Dataway 在腾讯云数据连接器平台中应用。

案例一:使用 Dataway 获取 token

腾讯云数据连接器中封装企业微信连接器,可以完成成员管理、消息推送等多种操作,每次操作本质上都是调用 企业微信 API 完成的。在调用 API 前,需要获取 access_token。相当于创建一个登录凭证,其他的业务接口都依赖 access_token 鉴权调用者身份。
本示例将模拟该调用过程,并说明 Dataway 脚本在其中起到的作用。
gettoken-flow流程图


上图为整个流的配置图,其中 Transform 组件使用 Dataway 表达式设置 corpId 和 corpSecret 两个 variable 参数。
gettoken-transform1


gettoken-transform2


HTTP Request 组件向企业微信 API 地址发起 get-token 请求调用,请求参数为 msg.vars 中的 corpId 和 corpSecret。在 HTTP Request 组件的请求参数中,使用 Dataway 表达式进行动态设置。




Set-Variable 组件将获取到 HTTP response 解析,获取 access_token 后存到 msg.vars 中,使用 Dataway 动态赋值。

gettoken-setvariable


Cache 组件通过键值对保存 access_token,以方便下次无需请求直接从缓存中获取。Cache 的数据使用 Dataway 表达式获取。
gettoken-cache1


实际上企业微信连接器使用 XML 语言完成 get-token,本示例为了方便查看在流中复现该操作。Dataway 表达式在其中主要的作用是通过操作流转消息 Message,实现动态赋值,并交给上层调用组件使用。

案例二:将 JSON 类型输入转换成 XML 类型输入

使用 Dataway 表达式可以完成数据类型的转换,例如:JSON 转换成 XML、XML 转换成 CSV 等。本示例将通过一个将 JSON 类型的输入转换成 XML 的示例,说明 Dataway 的数据转换作用。

输入

我们在流中预先使用 Set Payload 组件对 Message 的 payload 属性设置成封装了 JSON 格式的 Entity 数据。
数据类型装换-输入



Dataway 表达式

在 Set Payload 组件后新增 Transform 组件,使用 Dataway 表达式重新设置 Message 消息的 payload。
def dw_process(msg):
input_param = msg.payload['^value']
return Entity.from_value({
'root': {
'name': input_param['name'],
'age': input_param['age'],
'male': input_param['male'],
'brothers': input_param['brothers'],
"@id": "haha"
}
}, mime_type="application/xml")

输出

Transform 组件将 Message 的 payload 设置为 Entity 类型,使用 Entity['^blob'] 获取二进制数据并解码,可以查看输出的 XML 结构字符串,如下所示:
<?xml version="1.0" encoding="utf-8"?>
<root id="haha">
<name>zhangsan</name>
<age>12</age>
<male>true</male>
<brothers>lisi</brothers>
<brothers>wangwu</brothers>
</root>
Dataway 表达式通过 Entity 类型提供强大的数据转换功能,通过设置不同的 mime_type 参数,可以实现不同类型的数据转换。使用 Entity 对象 更加详细的说明使用 Entity 对象。

案例三:Dataway 处理流式数据

流式数据指数据从流的开始到结束,过程是流式的,数据不需要全读入内存。这样的好处是能支持大文件且可以做到实时处理,因此在腾讯云数据连接器中常用于实时、格式化得处理大数据。
背景说明


本示例将说明如何读取流式的输入和产生流式的输出。

产生流式输出

使用 Dataway 表达式可以产生流式的输出。在 Set Payload 组件中使用如下 Dataway 表达式:

Dataway 表达式

def dw_process(msg):
for x in range(10):
yield x


输出

Dataway 表达式将产生一个流式对象,并设置到 payload 中,供下游使用。

读取流式输入

使用 Dataway 表达式可以读取上一步产生的流式输入。msg.payload 为一个流式对象,可以使用 for 循环读取该流式对象,并进行数据处理。

Dataway 表达式

def dw_process(msg):
sum = 0
for x in msg.payload:
sum += x
return sum

输出

Dataway 表达式使用流式处理输入数据,并最终返回 45