首页
学习
活动
专区
圈层
工具
发布
34 篇文章
1
腾讯云流计算 Oceanus 最佳实践&解决方案汇总
2
腾讯云流计算 Oceanus Connector 使用示例汇总
3
Flink 实践教程:进阶11-SQL 关联:Regular Join
4
Flink 实践教程:进阶10-自定义聚合函数(UDAF)
5
Flink 实践教程:进阶9-自定义表值函数(UDTF)
6
Flink 实践教程:进阶8-自定义标量函数(UDF)
7
Flink 实践教程:进阶7-基础运维
8
Flink 实践教程:进阶6-CEP 复杂事件处理
9
Flink 实践教程:进阶5-乱序调整
10
Flink 实践教程:进阶4-窗口 TOP N
11
Flink 实践教程:进阶3-窗口操作
12
Flink 实践教程:进阶2-复杂格式数据抽取
13
Flink 实践教程:进阶1-维表关联
14
Flink 实践教程:入门10-Python作业的使用
15
Flink 实践教程:入门9-Jar 作业开发
16
Flink 实践教程:入门8-简单 ETL 作业
17
Flink 实践教程:入门7-消费 Kafka 数据写入 PG
18
Flink 实践教程:入门6-读取 PG 数据写入 ClickHouse
19
Flink 实践教程:入门5-写入 ClickHouse
20
Flink 实践教程:入门4-读取 MySQL 数据写入 ES
21
Flink 实践教程:入门3-读取 MySQL 数据
22
Flink 实践教程:入门2-写入 Elasticsearch
23
Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务
24
Oceanus 实践-从0到1接入 CKafka SQL 作业
25
Oceanus 实践-从0到1开发ClickHouse SQL作业
26
Oceanus 实践-从0到1开发PG SQL作业
27
基于腾讯云Oceanus实现MySQL和Hbase维表到数据仓库ClickHouse的实时分析
28
基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
29
Flink社区 | Flink CDC 2.0 正式发布,核心改进详解
30
用Python进行实时计算——PyFlink快速入门
31
实时数据湖:Flink CDC流式写入Hudi
32
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
33
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
34
Flink Metrics&REST API 介绍和原理解析

Flink 实践教程:进阶8-自定义标量函数(UDF)

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将您详细介绍如何使用自定义标量函数(UDF),对随机产生的数据进行处理后存入 MySQL 中。

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群。进入 Oceanus 控制台 ,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

创建 MySQL 实例

进入 MySQL 控制台 ,点击【新建】。具体可参考官方文档 创建 MySQL 实例 。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。

创建 MySQL 表

代码语言:javascript
复制
-- 建表语句,用于接收 Sink 端数据
CREATE TABLE `udf_output` (
  `id`        int(10) NOT NULL,
  `len_name`  int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

开发 UDF

这里定义一个获取字符串字段长度的函数。如果传入一个字段,则获取这个字段的长度后返回;如果传入两个字段,则获取这两个字段的长度和后返回。

1. 代码编写

在本地IDE中创建 maven 项目,编写自定义函数UDF的代码。

代码语言:javascript
复制
// 类名:StringLengthUdf
package demos.UDF;
​
import org.apache.flink.table.functions.ScalarFunction;
​
public class StringLengthUdf extends ScalarFunction {
​
    public long eval(String a) {
        return a == null ? 0 : a.length();
    }
​
    public long eval(String b, String c) {
        return eval(b) + eval(c);
    }
}

2. 项目打包

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。 命令行打包命令:

代码语言:javascript
复制
mvn clean package

命令行打包后生成的 JAR 包可以在项目 target 目录下找到。

流计算 Oceanus 作业

上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 JAR 包。

创建 SQL 作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。 单击【作业参数】,在【引用程序包】处选择刚才上传的 JAR 包。

1. 创建 Function

代码语言:javascript
复制
CREATE TEMPORARY SYSTEM FUNCTION StringLengthUdf AS 'demos.UDF.StringLengthUdf';

StringLengthUdf代表创建的函数名,demos.UDF.StringLengthUdf代表类路径。

2. 创建 Source

代码语言:javascript
复制
CREATE TABLE random_source ( 
  id INT, 
  name1 VARCHAR, 
  name2 VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',          -- 每秒产生的数据条数
  'fields.id.kind'='sequence',    -- 无界的随机数
  'fields.id.start'='1',          -- 随机数的最小值
  'fields.id.end'='5',            -- 随机数的最大值
  'fields.name1.length'='10',     -- 随机字符串的长度
  'fields.name2.length'='10'      -- 随机字符串的长度
);

3. 创建 Sink

代码语言:javascript
复制
CREATE TABLE `jdbc_upsert_sink_table` (
    `id` INT,
    `len_name` INT,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
    -- 指定数据库连接参数
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',         -- 请替换为您的实际 MySQL 连接参数
    'table-name' = 'udf_output',           -- 需要写入的数据表
    'username' = 'root',                   -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'xxxxxxxxx',              -- 数据库访问的密码
    'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数
    'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔
);

4. 编写业务 SQL

代码语言:javascript
复制
INSERT INTO jdbc_upsert_sink_table
SELECT
id,
CAST(StringLengthUdf(name1,name2) AS INT) AS `len_name`
FROM random_source;

运行作业

点击【发布草稿】->【运行版本】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

总结

本文首先在本地开发 UDF 函数,将其打成 JAR 包后上传到 Oceanus 平台引用。接下来使用 Datagen 连接器产生虚拟数据,调用 UDF 函数进行不同字段的字符串长度的加和操作后存入 MySQL 中。

  • 自定义标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。
  • UDF 需要在 ScalarFunction 类中实现 eval 方法,且必须声明为 public 类型;自定义函数中 open 方法和 close 方法可选;可被重载,即在一个 UDF 中实现多个 eval 方法。

参考阅读

[1] 用户自定义函数 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs

下一篇
举报
领券