文档中心>数据湖计算 DLC>实践教程>使用 Apache DolphinScheduler 调度 DLC 引擎提交任务

使用 Apache DolphinScheduler 调度 DLC 引擎提交任务

最近更新时间:2025-03-21 11:17:03

我的收藏
本文介绍 DLC 对 Apache DolphinScheduler 调度工具的支持,并提供示例来演示如何使用 Apache DolphinScheduler 运行 DLC 引擎提交任务。

背景信息

Apache DolphinScheduler 是一个开源的分布式工作流调度系统,旨在为大数据场景提供高效的任务调度和管理。它支持可视化工作流设计、任务依赖管理、定时调度等功能,适用于数据处理、ETL 和机器学习等多种应用场景。如需更多信息,请参见 Apache DolphinScheduler 官方网站

前提条件

1. Apache DolphinScheduler 环境准备。
1.1 已安装并启动Apache DolphinScheduler,更多安装及启动Apache DolphinScheduler操作,请参见 Apache DolphinScheduler 快速上手
2. 数据湖计算 DLC 环境准备。
2.1 已开通数据湖计算 DLC 引擎服务。
2.2 如使用SuperSQL引擎,准备好 DLC JDBC 驱动,点击下载 JDBC 驱动

标准 Spark 引擎对接 Apache DolphinScheduler 关键步骤

选择添加 Kyuubi 或 Spark 数据源

注意:
Apache DolphinScheduler 低于3.2.1版本未支持 kyuubi 数据源,只能选择 spark 数据源。



参数如图设置:



参数说明:
参数
必填
说明
源名称
自定义即可。
描述
自定义即可。
IP 主机名
填写能访问到引擎的 ip。
如果是内网访问,可以在 DLC 控制台查看,如下图所示:



如果是公网访问,需要先开通引擎的公网访问,请参见 配置公网访问标擎
端口
10009
用户名
填写引擎名称和资源组名称,两个名称之间用“&”符号连接。
密码
填写 SecretId 和 SecretKey,id 和key 之间用“&”符号连接。

创建项目和工作流

1. 如图在项目管理下创建一个项目:

2. 进入项目创建一个工作流:



3. 在工作流中创建一个 SQL 节点,如图选择刚刚创建的数据源实例,并且输入 SQL。

4. 保存该节点和工作流,先将工作流上线再运行:

5. 在工作流实例中,能够看到历史运行任务列表,可以点击实例名,查看运行结果,日志等信息:


标准 Presto 引擎对接 Apache DolphinScheduler 关键步骤

当前不支持直接使用 kyuubi 或者 presto 数据源直接连接 presto 引擎,用户可以通过 kyuubi 的 Python 节点访问 DLC。

配置 PYTHON_HOME

找到 Apache DolphinScheduler 安装路径下的配置文件 dolphinscheduler_env.sh,修改其中的 PYTHON_HOME 参数,配置为当前 Python 的路径。或者链接Python 到指定位置:
ln -s /usr/bin/python /opt/soft/python

下载腾讯云 Python SDK

pip install --upgrade tencentcloud-sdk-python

创建项目和工作流

1. 如图在项目管理下创建一个项目:

2. 配置节点:

示例代码:
说明:
该脚本会读取本地的 sql_file_path/dlc.sql 文件,将文件内的 sql 全部提交到指定的 DLC presto 引擎。
注意替换脚本中的 secretId、secretKey、region、engineName,文件路径等参数。
import json
import types
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.dlc.v20210125 import dlc_client, models
import base64
import time
import sys


def create_task(sql):
global err
try:
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred = credential.Credential("secretId", "secretKey")
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "dlc.tencentcloudapi.com"

# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的
client = dlc_client.DlcClient(cred, "region", clientProfile)

# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.CreateTasksRequest()
base64sql = base64.b64encode(sql.encode('utf-8')).decode('utf-8')
# print(base64sql)
# print(base64sql)
params = {
"DatabaseName": "db",
"Tasks": {
"TaskType": "SQLTask",
"FailureTolerance": "Terminate",
"SQL": base64sql,
},
"DatasourceConnectionName": "DataLakeCatalog",
"DataEngineName": "engineName"
}
req.from_json_string(json.dumps(params))

# 返回的resp是一个CreateTasksResponse的实例,与请求对象对应
resp = client.CreateTasks(req)
# 输出json格式的字符串回包
print(resp.to_json_string())

return resp
except TencentCloudSDKException as err:
print(err)

if __name__ == "__main__":
try:
sql_file = "sql_file_path/dlc.sql"
print(sql_file)
with open(sql_file, 'r') as file:
sqls = file.read()
print(sqls)
create_rsp = create_task(sqls)

except Exception as main_err:
print(main_err)

SuperSQL 引擎对接 Apache DolphinScheduler 关键步骤

指定环境变量





创建项目和工作流

1. 如图在项目管理下创建一个项目:

2. 进入项目创建一个工作流:

3. 新建 Python 脚本,拖动 Python 组件到编辑区域。

4. 设置节点。

5. 设置脚本。

示例代码:


import jaydebeapi jdbc_url = "jdbc:dlc:dlc.tencentcloudapi.com?task_type=SQLTask&datasource_connection_name=DataLakeCatalog®ion=ap-guangzhou&data_engine_name=public-engine" user = "xx" pwd = "xx" driver = "com.tencent.cloud.dlc.jdbc.DlcDriver" jar_file = '/opt/dolphinscheduler/libs/dlc-jdbc-2.2.3-jar-with-dependencies.jar' sql = "select 1" conn = jaydebeapi.connect(driver, jdbc_url, [user, pwd], jar_file) curs = conn.cursor() curs.execute(sql) array_size = curs.arraysize.real rowcount = curs.rowcount.real print(array_size) print(rowcount) rows = curs.rowcount.real if rows != 0: result = curs.fetchall() print(result) curs.close() conn.close()
参数说明:
参数
说明
jdbc_url
jdbc 的连接地址以及配置参数。详情请参见 DLC JDBC 访问
user
SecretId
pwd
SecretKey
dirver
加载 JDBC 驱动。详情请参见 DLC JDBC 访问
jar_file
驱动 jar 包的存放路径。详情请参见 DLC JDBC 访问
6. 单击工作流保存确定。

7. 上线工作流。

8. 执行工作流。

9. 查看执行结果。