实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为使用自定义Connector的实践。分享给大家~
SQL示例
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.f_sequence.kind'='sequence', -- 有界序列(结束后自动停止输出)
'fields.f_sequence.start'='1', -- 序列的起始值
'fields.f_sequence.end'='10000', -- 序列的终止值
'fields.f_random.kind'='random', -- 无界的随机数
'fields.f_random.min'='1', -- 随机数的最小值
'fields.f_random.max'='1000', -- 随机数的最大值
'fields.f_random_str.length'='10' -- 随机字符串的长度
);
CREATE TABLE Data_Output (
`id` BIGINT,
`name` STRING
) WITH (
'connector.type' = 'kudu'
,'kudu.masters' = 'master01:7051,master02:7051,master03:7051'
,'kudu.table' = 'Data_Output'
,'kudu.hash-columns' = 'id'
,'kudu.primary-key-columns' = 'id'
,'kudu.max-buffer-size' = '5000'
,'kudu.flush-interval' = '1000'
);
INSERT INTO `Data_Output`
SELECT f_sequence, f_random_str FROM random_source;
注:首次使用Kudu表时,kudu表用impala-shell查询时需要在Impala中创建对应的外表才能查到kudu的表数据。
自定义Conector可参考开源代码自己进行修改,打包。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。