数据库 HBase

最近更新时间:2023-11-23 16:14:21

我的收藏

介绍

HBase Connector 提供了对 HBase 集群的读写支持。Oceanus 已经提供了内置的flink-connector-hbase Connector 组件,具体使用可参考 使用 MySQL 关联 HBase 维表数据到 ClickHouse 进一步了解。

版本说明

Flink 版本
说明
1.11
支持 hbase 版本为:1.4.x
1.13
支持 hbase 版本为:1.4.x、2.2.x、2.3.x
1.14
支持 hbase 版本为:1.4.x、2.2.x
1.16
支持 hbase 版本为: 1.4.x、2.2.x

适用范围

可以作为源表,维表,以及Tuple、Upsert 数据流的目的表。

DDL 定义

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
'table-name' = 'hbase_sink_table', -- Hbase 表名
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
);

WITH 参数

参数
说明
是否必填
备注
connector
表类型
hbase-1.4 或者 hbase-2.2
如果您用了 hbase 2.3.x 版本,那么,connector 参数值需要替换为 hbase-2.2
table-name
HBase 表名
-
zookeeper.quorum
HBase 的 zookeeper 地址
查看 hbase-site.xml 确定参数值
zookeeper.znode.parent
HBase 在 zookeeper 中的根目录
查看 hbase-site.xml 确定参数值
null-string-literal
HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 null-string-literal,并写入 HBase
默认为 null
sink.buffer-flush.max-size
写入 HBase 前,内存中缓存的数据量(字节)大小。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
默认值为2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为0表示不进行缓存
sink.buffer-flush.max-rows
写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟和内存使用。仅作为 Sink 时使用
默认值为1000,设置为0表示不进行缓存
sink.buffer-flush.interval
将缓存数据周期性写入到 HBase 的间隔,可以控制写入 HBase 的延迟。仅作为 Sink 时使用
默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0表示关闭定期写入

类型映射

HBase 将所有的数据存为字节数组。读写操作时需要将数据进行序列化和反序列化。Flink 与 HBase 的数据转换关系如下:
Flink 字段类型
HBase 转换
CHAR / VARCHAR / STRING
byte[] toBytes(String s) String toString(byte[] b)
BOOLEAN
byte[] toBytes(boolean b)boolean toBoolean(byte[] b)
BINARY / VARBINARY
byte[]
DECIMAL
byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b)
TINYINT
new byte[] { val } bytes[0]
SMALLINT
byte[] toBytes(short val)short toShort(byte[] bytes)
INT
byte[] toBytes(int val)int toInt(byte[] bytes)
BIGINT
byte[] toBytes(long val)long toLong(byte[] bytes)
FLOAT
byte[] toBytes(float val)float toFloat(byte[] bytes)
DOUBLE
byte[] toBytes(double val)double toDouble(byte[] bytes)
DATE
将日期转换成自1970.01.01以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
TIME
将时间转换成自00:00:00以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组
TIMESTAMP
将时间戳转换成自1970-01-01 00:00:00以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组
ARRAY
不支持
MAP / MULTISET
不支持
ROW
不支持

代码示例

包含 HBase 维表的实时计算作业代码,示例如下:
CREATE TABLE datagen_source_table (
id INT,
name STRING,
`proc_time` AS PROCTIME()
) with (
'connector'='datagen',
'rows-per-second'='1'
);

CREATE TABLE hbase_table (
rowkey INT,
cf ROW < school_name STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2
'table-name' = 'hbase_sink_table', -- Hbase 表名
'zookeeper.quorum' = 'ip:port,ip:port,ip:port' -- Hbase 的 zookeeper 地址
);

CREATE TABLE blackhole_sink(
id INT,
name STRING
) with (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT id, cf.school_name as name FROM datagen_source_table src
JOIN hbase_table FOR SYSTEM_TIME AS OF src.`proc_time` as h ON src.id = h.rowkey;

注意事项

HBase Connector 一般会使用 DDL 语句中定义的主键,以 upsert 模式工作,与外部系统交换变更日志信息。因此,必须在 HBase 的 rowkey 字段上定义主键(必须声明 rowkey 字段)。如果未声明 PRIMARY KEY 子句,则 HBase 连接器默认将 rowkey 作为主键。

Kerberos 认证授权

1. 登录集群 Master 节点,获取 krb5.conf、emr.keytab、core-site.xml、hdfs-site.xml、hbase-site.xml 文件,路径如下。
/etc/krb5.conf
/var/krb5kdc/emr.keytab
/usr/local/service/hadoop/etc/hadoop/core-site.xml
/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
/usr/local/service/hbase/conf/hbase-site.xml
2. 对获取的配置文件构建 jar 包。
jar cvf hbase-xxx.jar krb5.conf emr.keytab core-site.xml hdfs-site.xml hbase-site.xml
3. 校验 jar 的结构(可以通过 vim 命令查看),jar 里面包含如下信息,请确保文件不缺失且结构正确。
META-INF/
META-INF/MANIFEST.MF
krb5.conf
emr.keytab
core-site.xml
hdfs-site.xml
hbase-site.xml
4. 程序包管理 页面上传 jar 包,并在作业参数配置里引用该程序包。
5. 获取 kerberos principal,用于 作业高级参数 配置。
klist -kt /var/krb5kdc/emr.keytab

# 输出如下所示,选取第一个即可:hadoop/172.28.28.51@EMR-OQPO48B9
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
2 08/09/2021 15:34:40 hadoop/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/172.28.28.51@EMR-OQPO48B9
2 08/09/2021 15:34:40 hadoop/VM-28-51-centos@EMR-OQPO48B9
2 08/09/2021 15:34:40 HTTP/VM-28-51-centos@EMR-OQPO48B9
6. 作业高级参数 配置。
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
security.kerberos.login.principal: hadoop/172.28.28.51@EMR-OQPO48B9
security.kerberos.login.keytab: emr.keytab
security.kerberos.login.conf: krb5.conf
注意:
历史 Oceanus 集群可能不支持该功能,您可以联系我们升级集群管控服务,以支持 Kerberos 访问。