有奖捉虫:办公协同&微信生态&物联网文档专题 HOT
例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中,例如自动持续从Kafka导入数据。 本文档主要介绍该功能的实现原理、使用方式以及最佳实践。

基本原理

+---------+
| Client |
+----+----+
|
+-----------------------------+
| FE | |
| +-----------v------------+ |
| | | |
| | Routine Load Job | |
| | | |
| +---+--------+--------+--+ |
| | | | |
| +---v--+ +---v--+ +---v--+ |
| | task | | task | | task | |
| +--+---+ +---+--+ +---+--+ |
| | | | |
+-----------------------------+
| | |
v v v
+---+--+ +--+---+ ++-----+
| BE | | BE | | BE |
+------+ +------+ +------+
如上图,Client 向 FE 提交一个 Routine Load 作业。
1. FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
2. 在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后向 FE 汇报。
3. FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
4. 整个 Routine Load 作业通过不断的产生新的 Task,来完成数据不间断的导入。

Kafka 例行导入

当前我们仅支持从 Kafka 进行例行导入。该部分会详细介绍 Kafka 例行导入使用方式和最佳实践。

使用限制

1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。
3. 默认支持 Kafka 0.10.0.0(含) 以上版本。如果要使用 Kafka 0.10.0.0 以下版本 (0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 BE 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建routine load的时候直接设置 property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。

创建任务

创建例行导入任务的详细语法可以连接到 Doris 后,查看 CREATE ROUTINE LOAD 命令手册,或者执行 HELP ROUTINE LOAD; 查看语法帮助。 下面我们以几个例子说明如何创建 Routine Load 任务:
1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。指定列分隔符和 group.id 和 client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
2. 严格模式 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务:
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);

3. 导入 Json 格式数据使用示例: Routine Load 导入的 json 格式仅支持以下两种: 第一种只有一条记录,且为 json 对象:
{"category":"a9jadhx","author":"test","price":895}
第二种为 json 数组,数组中可含多条记录:
[
{"category":"11",
"title":"SayingsoftheCentury",
"price":895,
"timestamp":1589191587
},
{
"category":"22",
"author":"2avc",
"price":895,
"timestamp":1589191487
},
{
"category":"33",
"author":"3avc",
"title":"SayingsoftheCentury",
"timestamp":1589191387
}
]
创建待导入的 Doris 数据表:
CREATE TABLE `example_tbl` (
`category` varchar(24) NULL COMMENT "",
`author` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`price` double REPLACE
) ENGINE=OLAP
AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(
PARTITION p0 VALUES [("-2147483648"), ("20200509")),
PARTITION p20200509 VALUES [("20200509"), ("20200510")),
PARTITION p20200510 VALUES [("20200510"), ("20200511")),
PARTITION p20200511 VALUES [("20200511"), ("20200512"))
)
DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
PROPERTIES (
"replication_num" = "1"
);
以简单模式导入 json 数据:
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
精准导入 json 格式数据:
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\\"$.category\\",\\"$.author\\",\\"$.price\\",\\"$.timestamp\\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
strict mode 与 source data 的导入关系 这里以列类型为 TinyInt 来举例。 当表中的列允许导入空值时:
source data
source data example
string to int
strict_mode
result
空值
\\N
N/A
true or false
NULL
not null
aaa or 2000
NULL
true
invalid data(filtered)
not null
aaa
NULL
false
NULL
not null
1
1
true or false
correct data
这里以列类型为 Decimal(1,0) 举例。 当表中的列允许导入空值时:
source data
source data example
string to int
strict_mode
result
空值
\\N
N/A
true or false
NULL
not null
aaa
NULL
true
invalid data(filtered)
not null
aaa
NULL
false
NULL
not null
1 or 10
1
true or false
correct data
注意
10 虽然是一个超过范围的值,但是因为其类型符合 decimal 的要求,所以 strict mode 对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。
访问 SSL 认证的 Kafka 集群 访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Doris 中,**并且 catalog 名称为 kafka**。CREATE FILE 命令的具体帮助可以参见 HELP CREATE FILE;。这里给出示例:
1. 上传文件
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
2. 创建例行导入作业
CREATE ROUTINE LOAD db1.job1 on tbl1
PROPERTIES
(
"desired_concurrent_number"="1"
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);
说明
Doris 通过 Kafka 的 C++ API librdkafka 来访问 Kafka 集群。librdkafka 所支持的参数可以参阅 Configuration properties

查看作业状态

查看作业状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD; 命令查看。
查看任务运行状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD TASK; 命令查看。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业。具体说明可以通过 HELP ALTER ROUTINE LOAD; 命令查看或参阅 ALTER ROUTINE LOAD

作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。可以通过 HELP STOP ROUTINE LOAD;HELP PAUSE ROUTINE LOAD; 以及 HELP RESUME ROUTINE LOAD; 三个命令查看帮助和示例。

其他说明

1. 例行导入作业和 ALTER TABLE 操作的关系。
例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显示指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。
删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂停。
2. 例行导入作业和其他导入作业的关系(LOAD,DELETE,INSERT)。
例行导入和其他 LOAD 作业以及 INSERT 操作没有冲突。
当执行 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。
3. 例行导入作业和 DROP DATABASE/TABLE 操作的关系。 当例行导入对应的 database 或 table 被删除后,作业会自动 CANCEL。
4. kafka 类型的例行导入作业和 kafka topic 的关系。
当用户在创建例行导入声明的 kafka_topic 在kafka集群中不存在时。
如果用户 kafka 集群的 broker 设置了 auto.create.topics.enable = true,则 kafka_topic 会先被自动创建,自动创建的 partition 个数是由用户方的kafka集群中的 broker 配置 num.partitions 决定的。例行作业会正常的不断读取该 topic 的数据。
如果用户 kafka 集群的 broker 设置了 auto.create.topics.enable = false, 则 topic 不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为 PAUSED。 所以,如果用户希望当 kafka topic 不存在的时候,被例行作业自动创建的话,只需要将用户方的 kafka 集群中的 broker 设置 auto.create.topics.enable = true 即可。
5. 在网络隔离的环境中可能出现的问题 在有些环境中存在网段和域名解析的隔离措施,所以需要注意:
1. 创建 Routine load 任务中指定的 Broker list 必须能够被 Doris 服务访问。
2. Kafka 中如果配置了advertised.listeners, advertised.listeners 中的地址必须能够被 Doris 服务访问。
3. 关于指定消费的 Partition 和 Offset。 Doris 支持指定 Partition 和 Offset 开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。 有三个相关参数:
kafka_partitions:指定待消费的 partition 列表,如:"0, 1, 2, 3"。
kafka_offsets:指定每个分区的起始offset,必须和 kafka_partitions 列表个数对应。如:"1000, 1000, 2000, 2000"
property.kafka_default_offset:指定分区默认的起始offset。
在创建导入作业时,这三个参数可以有以下组合:
组合
kafka_partitions
kafka_offsets
property.kafka_default_offset
行为
1
No
No
No
系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费
2
No
No
Yes
系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费
3
Yes
No
No
系统会从指定分区的 OFFSET_END 开始消费
4
Yes
Yes
No
系统会从指定分区的指定offset 处开始消费
5
Yes
No
Yes
系统会从指定分区,default offset 指定的位置开始消费
4. STOP 和 PAUSE 的区别。 FE 会自动定期清理 STOP 状态的 ROUTINE LOAD,而 PAUSE 状态的则可以再次被恢复启用。

相关参数

一些系统配置参数会影响例行导入的使用。
1. max_routine_load_task_concurrent_num FE 配置项,默认为 5,可以运行时修改。该参数限制了一个例行导入作业最大的子任务并发数。建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。
2. max_routine_load_task_num_per_be FE 配置项,默认为5,可以运行时修改。该参数限制了每个 BE 节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。
3. max_routine_load_job_num FE 配置项,默认为100,可以运行时修改。该参数限制的例行导入作业的总数,包括 NEED_SCHEDULED,RUNNING,PAUSE 这些状态。超过后,不能在提交新的作业。
4. max_consumer_num_per_group BE 配置项,默认为 3。该参数表示一个子任务中最多生成几个 consumer 进行数据消费。对于 Kafka 数据源,一个 consumer 可能消费一个或多个 kafka partition。假设一个任务需要消费 6 个 kafka partition,则会生成 3 个 consumer,每个 consumer 消费 2 个 partition。如果只有 2 个 partition,则只会生成 2 个 consumer,每个 consumer 消费 1 个 partition。
5. push_write_mbytes_per_sec BE 配置项。默认为 10,即 10MB/s。该参数为导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于 SSD 等高性能存储设备,可以适当增加这个限速。
6. max_tolerable_backend_down_num FE 配置项,默认值是0。在满足某些条件下,Doris 可 PAUSED 的任务重新调度,即变成 RUNNING。该参数为0代表只有所有 BE 节点是 alive 状态才允许重新调度。
7. period_of_auto_resume_min FE 配置项,默认是5分钟。Doris 重新调度,只会在5分钟这个周期内,最多尝试3次。如果3次都失败则锁定当前任务,后续不在进行调度。但可通过人为干预,进行手动恢复。

最佳实践

这里以持续从kafka导入TPC-H数据集中的lineitem表为例子展示routine load的最佳实践

确定kafka msg格式和目标表结构

写入到 kafka msg 中的格式:
1992-01-02 3273383 3 2508983 508984 42 83658.12 0.06 0.07 R F 1992-03-19 1992-01-18 COLLECT COD TRUCK the carefully ironic accounts hin 1992-01-02 26696039 2 8782384 532409 50 73297.5 0.02 0.07 R F 1992-01-31 1992-01-13 DELIVER IN PERSON REG AIR . furiously regul 1992-01-02 47726080 1 5950048 950049 24 26346 0.02 0.02 A F 1992-02-23 1992-01-16 DELIVER IN PERSON RAIL l pearls. spec 1992-01-02 77918949 2 7276493 526501 23 33789.99 0.06 0 R F 1992-03-28 1992-01-26 NONE TRUCK y express requests. fin 1992-01-02 87026306 4 9061545 811573 11 16566.99 0.08 0.06 R F 1992-02-09 1992-01-21 TAKE BACK RETURN AIR ly regular instructions s 1992-01-02 135925030 4 14097984 598013 30 59438.4 0.02 0.08 R F 1992-02-24 1992-01-06 COLLECT COD REG AIR es boost regular platelets. reg 1992-01-02 189122402 2 13648194 398234 5 5707.55 0.1 0.08 R F 1992-03-24 1992-01-06 COLLECT COD FOB aggle caref 1992-01-02 235359552 4 8148971 898996 29 58567.53 0.1 0.07 A F 1992-02-10 1992-02-01 NONE AIR furiously ironic p 1992-01-02 298717351 3 4078182 78183 42 48719.16 0.09 0.07 R F 1992-03-08 1992-01-26 DELIVER IN PERSON MAIL hely regular accounts. blithely i 1992-01-02 305288709 3 14997743 747786 33 60720 0.04 0.06 A F 1992-03-25 1992-01-04 TAKE BACK RETURN TRUCK counts engage across the
doris 中routine 目标表lineitem 的表结构,实际在doris中的表名称为:lineitem_rtload。
CREATE TABLE lineitem_rtload ( l_shipdate date NOT NULL, l_orderkey bigint(20) NOT NULL, l_linenumber int(11) NOT NULL, l_partkey int(11) NOT NULL, l_suppkey int(11) NOT NULL, l_quantity decimalv3(15, 2) NOT NULL, l_extendedprice decimal(15, 2) NOT NULL, l_discount decimalv3(15, 2) NOT NULL, l_tax decimalv3(15, 2) NOT NULL, l_returnflag varchar(1) NOT NULL, l_linestatus varchar(1) NOT NULL, l_commitdate date NOT NULL, l_receiptdate date NOT NULL, l_shipinstruct varchar(25) NOT NULL, l_shipmode varchar(10) NOT NULL, l_comment varchar(44) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(l_shipdate, l_orderkey) COMMENT 'OLAP' DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
基于写入格式和目标表信息,创建 routine load 任务。
CREATE ROUTINE LOAD tpch_100_d.rtl_20230809 ON lineitem_rtload COLUMNS TERMINATED BY "\\t" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "max_error_number" = "100", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "10.0.1.138:9092", "kafka_topic" = "doris_routine_load_test", "property.group.id" = "routine_test", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
COLUMNS TERMINATED BY "\\t" 指定kafka msg中的字段分隔符,默认为 "\\t"
max_batch_interval/max_batch_rows/max_batch_size
这三个参数分别表示:
每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。 每个子任务最多读取的行数。必须大于等于200000。默认是200000。 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB。 这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
max_error_number
采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
被 where 条件过滤掉的行不算错误行。
strict_mode
是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为:
"strict_mode" = "true"
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0),原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

查看任务运行情况

创建 routine load 任务之后,可以通过show routine load命令查看运行状态的例行任务,如果在show routine load中没有找到对应的例行任务,则可能因为例行任务失败或者错误数过多被停止或者暂停,使用show all routine load查看所有状态的例行任务。
MySQL [tpch_100_d]> show routine load\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: NULL EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 3 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":568128,"runningTxns":[],"errorRows":0,"committedTaskNum":31,"loadedRows":4400,"loadRowsRate":7,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":4400,"unselectedRows":0,"receivedBytesRate":905,"taskExecuteTimeMs":627757} Progress: {"0":"1599","1":"1316","2":"1482"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
其中 每个字段说明如下:
Id: 作业ID
Name: 作业名称
CreateTime: 作业创建时间
PauseTime: 最近一次作业暂停时间
EndTime: 作业结束时间
DbName: 对应数据库名称
TableName: 对应表名称
State: 作业运行状态
DataSourceType: 数据源类型:KAFKA
CurrentTaskNum: 当前子任务数量
JobProperties: 作业配置详情
DataSourceProperties: 数据源配置详情 CustomProperties: 自定义配置 Statistic: 作业运行状态统计信息 Progress: 作业运行进度 Lag: 作业延迟状态 ReasonOfStateChanged: 作业状态变更的原因 ErrorLogUrls: 被过滤的质量不合格的数据的查看地址 OtherMsg: 其他错误信息
State 有以下4种 State: NEED_SCHEDULE:作业等待被调度 RUNNING:作业运行中 PAUSED:作业被暂停 STOPPED:作业已结束 CANCELLED:作业已取消

Kafka开启写入任务

这里通过脚本模式kafka写入,每隔5秒写入200条msg。 之后通过show routine load命令可以在process字段看到kafka的3个offset不断的在推进。
Progress: {"0":"2061","1":"2135","2":"2254"} Progress: {"0":"2279","1":"2293","2":"2321"}
修改routine load内容 可以通过以下sql 对routine load 进行修改
ALTER ROUTINE LOAD FOR [db.]job_name [job_properties] FROM data_source [data_source_properties]
注意:
能修改处于 PAUSED 状态的作业。
例如,需要修改 max_batch_interval 为 10s。
先停止对应的任务 使用以下命令暂停任务:
PAUSE [ALL] ROUTINE LOAD FOR job_name
MySQL [tpch_100_d]> PAUSE ROUTINE LOAD FOR rtl_20230809; Query OK, 0 rows affected (0.00 sec) MySQL [tpch_100_d]> show ROUTINE LOAD FOR rtl_20230809\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: 2023-08-09 21:03:21 EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: PAUSED DataSourceType: KAFKA CurrentTaskNum: 0 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":1123678,"runningTxns":[],"errorRows":0,"committedTaskNum":114,"loadedRows":8703,"loadRowsRate":3,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":8703,"unselectedRows":0,"receivedBytesRate":486,"taskExecuteTimeMs":2310792} Progress: {"0":"2917","1":"2754","2":"3029"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'} ErrorLogUrls: OtherMsg: 1 row in set (0.00 sec)
可以看到状态已经为暂停:State: PAUSED

修改任务参数

ALTER ROUTINE LOAD FOR tpch_100_d.rtl_20230809 PROPERTIES ( "max_batch_interval" = "10" );

重启任务

RESUME ROUTINE LOAD FOR rtl_20230809;
通过 show ROUTINE LOAD FOR rtl_20230809\\G; 命令可以看到任务状态变为 running。并且 Progress 字段刷新的频率变为5s左右,证明变更生效。

更多帮助

关于 Routine Load 使用的更多详细语法,可以在 Mysql 客户端命令行下输入 HELP ROUTINE LOAD 获取更多帮助信息。