前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu

实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu

作者头像
腾讯云大数据
发布2022-02-17 20:10:07
2.8K0
发布2022-02-17 20:10:07
举报
文章被收录于专栏:腾讯云大数据腾讯云大数据

作者:于乐,腾讯 CSIG 工程师

解决方案描述

概述

Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。本方案主要对 flink-connector-oracle-cdc进行试用。首先在本地对 Oracle CDC 进行调试通过,然后结合腾讯云产品流计算 Oceanus、EMR(Kudu)实现了 Oracle-Oceanus-Kudu 一体化解决方案,其中并无复杂的业务逻辑实现(这里进行最简单的数据转移,用户可根据实际业务情况编写相应代码),并对其中发现的一些问题进行归纳整理与读者分享。

方案架构

这里的 Oracle 数据库环境是通过 Docker 建立在 EMR 集群下的某台 CVM 上,通过手动向 Oracle 数据库写入、更新数据,Oceanus 实时捕获变更的数据后存储在 EMR 的 Kudu 组件上。根据以上方案,设计了如下架构图:

前置准备

创建私有网络 VPC

私有网络(VPC)是一块在腾讯云上自定义的逻辑隔离网络空间,在构建 Oceanus 集群、Redis 组件等服务时选择的网络建议选择同一个 VPC,网络才能互通。否则需要使用对等连接、NAT 网关、VPN 等方式打通网络。私有网络创建步骤请参考 帮助文档 [1]。

创建流计算 Oceanus 集群

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。VPC 及子网使用刚刚创建好的网络。创建完后 Oceanus 的集群如下:

创建 EMR 集群

EMR 是云端托管的弹性开源泛 Hadoop 服务,支持 Kudu、HDFS、Presto、Flink、Druid 等大数据框架,本次示例主要需要使用 Kudu、Zookeeper、HDFS、Yarn、Impala、Knox 组件。

进入 EMR 控制台 [2],单击左上角【创建集群】进行集群的创建,创建过程中注意选择【产品版本】,不同的版本包含的组件不同,笔者这里选择EMR-V3.2.1版本,另外【集群网络】需选择之前创建好的 VPC 及对应的子网。具体过程可参考 创建 EMR 集群 [3]。

配置 Oracle 环境

1. 安装 Oracle 镜像

下载 Docker: 不同 CVM 环境可能不相同,笔者这里采用离线安装模式,安装包官网地址 [4]。下载配置完成之后按如下命令启动并运行 Docker 服务。

代码语言:javascript
复制
# 启动systemctl start docker
# 设置开机启动systemctl enable docker.service
# 查看 docker 服务状态systemctl status docker

下载 Oracle 镜像:

代码语言:javascript
复制
# 查找 Oracle 镜像版本docker search oracle
# 下载相对应镜像,这里我们下载 truevoly/oracle-12c 版本docker pull truevoly/oracle-12c
# 运行 Docker 容器docker run -d -p 1521:1521 --name oracle12c truevoly/oracle-12c
# 进入容器docker exec -it oracle12c /bin/bash

2. 配置 Oracle 数据库

启用日志归档:

代码语言:javascript
复制
-- 如有必要重新 source 一下 .profile 文件source /home/oracle/.profile
-- 1. 切换到 Oracle 用户su oracle
-- 2. 以 DBA 身份连接数据库$ORACLE_HOME/bin/sqlplus /nologconn /as sysdbashow user
-- 3. 启用日志归档alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope = spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;
-- 4. 检查日志是否归档archive log list;

注意:

  1. /opt/oracle/oradata/recovery_area路径需使用root用户提前建立,并赋予读写权限:chmod 777 /opt/oracle/oradata/recovery_area
  2. 启用日志归档需重启数据库。
  3. 归档日志会占用大量磁盘空间,需定期清理过期日志。

创建表空间:

代码语言:javascript
复制
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

注意:/opt/oracle/oradata/SID路径需使用root用户提前创建,并赋予读写权限:chmod 777 /opt/oracle/oradata/SID

创建用户并授权:

代码语言:javascript
复制
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT CREATE TABLE TO flinkuser;GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

数据准备:

代码语言:javascript
复制
-- 创建 Oracle 表,用做 Source 端CREATE TABLE FLINKUSER.TEST1(  ID   NUMBER(10,0) NOT NULL ENABLE,    NAME VARCHAR2(50),   PRIMARY KEY(ID)  ) TABLESPACE LOGMINER_TBS;-- 手动插入几条数据INSERT INTO FLINKUSER.TEST1 (ID,NAME) VALUES (1,'1111');

启动补充日志记录:

代码语言:javascript
复制
-- 对数据库配置ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;-- 对表进行配置ALTER TABLE FLINKUSER.TEST1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

方案实现

本方案对最近上线的 flink-connector-oracle-cdc 功能进行尝试。笔者这里首先在本地机器上使用 Docker 配置安装 Oracle 11g 和 Oracle 12c 两个版本进行本地 Oracle 表的读取测试,对读取到的数据进行 toRetractStream 转换后进行打印输出,对其中发现的一些问题归纳整理后和大家分享一下。随后在 EMR 集群上选择一台 CVM 配置 Oracle 12c 环境,将代码移植到 Oceanus 平台,并将最终的数据落到 Kudu 上,实现 Oracle To Kudu 的一整套解决方案。

本地代码开发

1. Maven 依赖
代码语言:javascript
复制
<dependency>   <groupId>com.ververica</groupId>   <artifactId>flink-connector-oracle-cdc</artifactId>   <version>2.2-SNAPSHOT</version>   <!-- 此处依赖需要设置为 scope,其他 flink 依赖需设置为 provied,Oceanus 平台已提供 -->   <scope>compile</scope></dependency>

2. 代码编写

代码语言:javascript
复制
package com.demo;
import com.ververica.cdc.connectors.oracle.OracleSource;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OracleToKudu {    public static void main(String[] args) throws Exception {        EnvironmentSettings settings = EnvironmentSettings                .newInstance()                .useBlinkPlanner()                .inStreamingMode()                .build();
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,settings);
        // SQL 写法        tEnv.executeSql("CREATE TABLE `oracleSource` (\n" +                " ID      BIGINT,\n" +                " NAME    VARCHAR,\n" +                " PRIMARY KEY(ID) NOT ENFORCED )\n" +                " WITH (\n" +                "  'connector' = 'oracle-cdc',\n" +                // 请修改成 Oracle 所在的实际 IP 地址                "  'hostname' = 'xx.xx.xx.xx',\n" +                "  'port' = '1521',\n" +                "  'username' = 'flinkuser',\n" +                "  'password' = 'flinkpw',\n" +                "  'database-name' = 'xe',\n" +                "  'schema-name' = 'flinkuser',\n" +                "  'table-name' = 'test1'\n" +                ")");
        // Stream API 写法        // SourceFunction<String> sourceFunction = OracleSource.<String>builder()        //         .hostname("xx.xx.xx.xx")        //         .port(1521)        //         .database("xe")        //         .schemaList("flinkuser")        //         .tableList("flinkuser.test1")        //         .username("flinkuser")        //         .password("flinkpw")        //         .deserializer(new JsonDebeziumDeserializationSchema())        //         .build();        // sEnv.addSource(sourceFunction)

        tEnv.executeSql("CREATE TABLE `kudu_sink_table` (\n" +                " `id`    BIGINT,\n" +                " `name`  VARCHAR\n" +                ") WITH (\n" +                " 'connector.type' = 'kudu',\n" +                // 请修改成实际的 master IP 地址                " 'kudu.masters' = 'master-01:7051,master-02:7051,master-03:7051',\n" +                " 'kudu.table' = 'JoylyuTest1',\n" +                " 'kudu.hash-columns' = 'id',\n" +                " 'kudu.primary-key-columns' = 'id'\n" +                ")");
        // 笔者这里只是进行了最简化的数据转移功能,请根据实际业务情况进行开发        tEnv.executeSql("insert into kudu_sink_table select * from oracleSource");
    }}

流计算 Oceanus JAR 作业

1. 上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

2. 创建作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。

【主程序包】选择刚刚上传的依赖,并选择最新版本,【主类】填入 com.demos.OracleToKudu

单击【作业参数】,在【内置 Connector】处选择 flink-connector-kudu,单击【保存】。

3. 运行作业

点击【发布草稿】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。

数据查询

在 EMR 集群下面选择一台 CVM 进入,查询写入 kudu 的数据。

代码语言:javascript
复制
# 进入 kudu 目录下cd /usr/local/service/kudu/bin
# 查看集群所有表./kudu table list master-01,master-02,master-03
# 查询 JoylyuTest1 表的数据./kudu table scan master-01,master-02,master-03 JoylyuTest1

当然,Kudu 也可以与 Impala 集成,通过 Impala 查询数据,不过需在 Impala 上面建立与 Kudu 表对应的外部表才可以查询。具体可参考 Oceanus Kudu Sink 总结 [5]。

代码语言:javascript
复制
CREATE EXTERNAL TABLE ImpalaJoylyuTest1STORED AS KUDUTBLPROPERTIES (  'kudu.master_addresses' = 'master-01:7051,master-02:7051,master-03:7051',   'kudu.table_name' = 'JoylyuTest1');

问题整理

笔者这里在本地对两种不同的 Oracle 版本:Oracle 11g 和 Oracle 12c Debug 时发现了一些问题,这里进行归纳总结一下。 

其一:表名大小写问题

笔者这里首先对 Oracle 11g 进行测试,在配置完如上的步骤之后本地运行,数据打印出来之后立即报错如下:

代码语言:javascript
复制
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table HELOWIN.FLINKUSER.test1.  Use command: ALTER TABLE FLINKUSER.test1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS  at io.debezium.connector.oracle.logminer.LogMinerHelper.checkSupplementalLogging(LogMinerHelper.java:407)  at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:132)  ... 7 more

通过日志信息可以发现,笔者的表 TEST1自动转换为了小写的表 test1。报错信息出现在 checkSupplementalLogging ,于是根据这个报错信息查看源码发现这里会对 Oracle 里面的 ALL_LOG_GROUPS 表进行一次查询,数据查询不出来导致报错(ALL_LOG_GROUPS表里存储的是大写的表名TEST1)。

代码语言:javascript
复制
static String tableSupplementalLoggingCheckQuery(TableId tableId) {        return String.format("SELECT 'KEY', LOG_GROUP_TYPE FROM %s WHERE OWNER = '%s' AND TABLE_NAME = '%s'", ALL_LOG_GROUPS, tableId.schema(), tableId.table());    }
代码语言:javascript
复制
// 将表名转为小写的源代码如下private TableId toLowerCaseIfNeeded(TableId tableId) {    return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;}

目前我们可以通过三种方法绕过该问题:

  • 直接修改源码,将上述的 toLowercase 修改为 toUppercase
  • 在创建 Oracle Source Table 时,在 WITH 参数里面添加 'debezium.database.tablename.case.insensitive'='false' 配置,让其失去“大小写不敏感”特性,在table-name中需指定大写表名。
  • 切换其他的 Oracle 版本。笔者这里使用 Oracle 12c 版本后正常。

其二:数据更新延迟问题

笔者在手动向 Oracle 数据库写数据,通过在 IDEA 控制台打印输出数据时,发现当数据为追加写入(Append)时,数据会有大概 15s 的延时,当为更新写入(Upsert)时,出现的延时更大,有时需要 3-5分钟才能捕捉到数据的变化。对于该问题,Flink CDC FAQ 中给出了明确的解决方案,在创建 Oracle Source Table 时,在 WITH 参数里面添加如下两个配置项:

代码语言:javascript
复制
'debezium.log.mining.strategy'='online_catalog','debezium.log.mining.continuous.mine'='true'

其三:并行度设置问题

笔者这里在尝试开启用并行度为 2 来进行数据读取时,发现报错如下:

代码语言:javascript
复制
Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.  at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)  at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)  at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:114)  at com.demo.OracleToKudu.main(OracleToKudu.java:67)

经过堆栈信息发现,Oracle CDC 的并行度只能设置为 1,与 Oracle CDC 官方文档 [6]一致。

代码语言:javascript
复制
// 报错代码部分public static void validateParallelism(int parallelism, boolean canBeParallel) {    Preconditions.checkArgument(canBeParallel || parallelism == 1, "The parallelism of non parallel operator must be 1.");    Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism of an operator must be at least 1, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");}

参考链接

  1. VPC 帮助文档:https://cloud.tencent.com/document/product/215/36515
  2. EMR 控制台:https://console.cloud.tencent.com/emr/
  3. 创建 EMR 集群:https://cloud.tencent.com/document/product/589/10981
  4. Docker 安装包地址:https://download.docker.com/linux/static/stable/x86_64/
  5. Oceanus Sink Kudu 总结:https://cloud.tencent.com/developer/article/1845785
  6. Oracle CDC 官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 作者:于乐,腾讯 CSIG 工程师
  • 解决方案描述
    • 概述
      • 方案架构
      • 前置准备
        • 创建私有网络 VPC
          • 创建流计算 Oceanus 集群
            • 创建 EMR 集群
              • 配置 Oracle 环境
                • 1. 安装 Oracle 镜像
              • 本地代码开发
                • 1. Maven 依赖
                • 1. 上传依赖
                • 2. 创建作业
                • 3. 运行作业
              • 数据查询
              相关产品与服务
              私有网络
              私有网络(Virtual Private Cloud,VPC)是基于腾讯云构建的专属云上网络空间,为您在腾讯云上的资源提供网络服务,不同私有网络间完全逻辑隔离。作为您在云上的专属网络空间,您可以通过软件定义网络的方式管理您的私有网络 VPC,实现 IP 地址、子网、路由表、网络 ACL 、流日志等功能的配置管理。私有网络还支持多种方式连接 Internet,如弹性 IP 、NAT 网关等。同时,您也可以通过 VPN 连接或专线接入连通腾讯云与您本地的数据中心,灵活构建混合云。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档