前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >TBase如何接入Kafka组件进行数据消费

TBase如何接入Kafka组件进行数据消费

原创
作者头像
用户7689089
修改2020-11-26 16:25:01
8090
修改2020-11-26 16:25:01
举报

TBase如何接入Kafka组件进行数据消费

TBase是腾讯云数据库团队维护的HTAP分布式数据库集群。

TBase
TBase
代码语言:txt
复制
    分布式HTAP数据库 TBase(TencentDB for TBase,TBase)是基于postgresql-xc的BSD开源协议 ,进行自主研发的分布式数据库系统。TBase 集高扩展性、SQL 高兼容度、完整的分布式事务支持、多级容灾及多维度资源隔离等功能于一身,目TBaseV2.15完全兼容pgV10。采用无共享的集群架构,提供容灾、备份、恢复、监控、安全、审计等全套解决方案,适用于TB- PB级的数据应用场景。
代码语言:txt
复制
    同时TBase支持异构数据的同步和迁移,在对应的运维管理平台OSS系统中留有对应的KAFKA接口,我们可以使用kafka来做其他数据库到TBase或反向TBase到其他数据的数据迁移或者同步工作。接下来我们就来简单看下,TBase是如何接入和使用kafka组件来进行数据处理的。

kafka简介:Kafka是一个开源流处理平台,Kafka是通过解析数据库端日志来进行发布订阅消息的系统,它可以处理消费者在网站中的所有动作流数据。

KAFKA
KAFKA

本次我将kafka接入TBase平台,进行TBase数据的数据消费,即我们将其作为如下图中producer的角色来生产数据,然后接入kafka平台经过加工,将数据转换为json格式读取出来再进行处理,这个过程叫消费consumer。这次我们先简单的用kafka 的单机来进行本次实验。

KAFKA工作流程
KAFKA工作流程

本次实验一共分为以下几个部分:

第一部分:KAFKA的主机配置

第二部分:KAFKA接入TBase 的OSS管理平台

第三部分:连接TBase进行实验数据的创建

第四部分:消费TBase生产的数据查看效果

具体实验操作如下:

实验环境:

操作系统centos 7.6

1、已安装TBase分布式数据库,2个dn节点

cn001:172.21.16.17 :11345

dn001:172.21.16.17 :11000

dn002:172.21.16.21 :11000

2、kafka独立服务器:172.21.16.12

使用端口:2181 、9092、8083

第一部分:KAFKA的主机配置

kafka主机配置如下:

1、获取软件包和依赖的jdk包地址如下: 在kafka主机上下载

wget https://tasev2-1300276124.cos.ap-beijing.myqcloud.com/kafka_2.11-2.4.0.tar.gz

wget https://tasev2-1300276124.cos.ap-beijing.myqcloud.com/jdk-8u40-linux-x64.gz

2、解压并配置相关环境变量

tar -zxvf jdk-8u40-linux-x64.gz -C /usr/local/ ; tar - xvf kafka_2.11-2.4.0.tar.gz -C /usr/local/

使用root账号,配置环境变量如下:

cat /etc/profile

export JAVAHOME=/usr/local/jdk1.8.0_40

export CLASSPATH=/usr/local/jdk1.8.0_40/lib/dt.jar:/usr/local/jdk1.8.0_40/lib/tools.ja

export KAFKAHOME=/usr/local/kafka_2.11-2.4.0

export PATH=$JAVAHOME/bin:$KAFKAHOME/bin:$PATH

使变量生效:source /etc/profile

  1. kafka参数文件修改和目录创建,修改3个文件,创建2个目录
KAFKA修改zookeeper配置文件
KAFKA修改zookeeper配置文件

cd /usr/local/kafka_2.11-2.4.0/config

egrep dataDir * 查看对应的zookeeper组件的数据目录,并创建

mkdir -p /data/tbase/kafka/zookeeper

查看zookeeper目录
查看zookeeper目录

2.2配置kafka服务

cd /data/tbase/kafka/kafka_2.11-2.4.0/config

vim server.properties

log.dirs=/data/tbase/kafka/kafka-logs

listeners=PLAINTEXT://172.21.16.12:9092 <----- 修改IP为kafka主机IP

mkdir -p /data/tbase/kafka/kafka-logs

kafka-log目录
kafka-log目录

2.3 配置connector服务

vim connect-distributed.properties

bootstrap.servers=172.21.16.12:9092

connector 配置
connector 配置

检查一下配置内容,然后启动对应的三个配置的服务

1)启动zookeeper-server 服务

zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/zookeeper.properties

#确认2181端口是否被监听

lsof -i:2181

2)启动kafka服务

kafka-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/server.properties

确认9092端口是否被监听

lsof -i:9092

3)启动connector服务

connect-distributed.sh -daemon /usr/local/kafka_2.11-2.4.0/config/connect-distributed.properties

#确认8083端口是否被监听

lsof -i:8083

如果有服务无法启动,可以查看kafka 的日志目录/usr/local/kafka_2.11-2.4.0/logs中分析日志进行排查。

第二部分:KAFKA接入TBase 的OSS管理平台

1、接下来登录TBase分布式数据的管控平台,进行kafka的接入配置。

TBase 管理控制台OSS
TBase 管理控制台OSS

2、将配置好的kafka服务器接入到TBase 的数据同步模块中

接入kafka数据同步
接入kafka数据同步

3、开启同步开关

打开数据同步开关
打开数据同步开关

4、配置TBase允许访问的主机IP,添加KAFKA主机的ip到dn001、dn002 两个节点的名单中,并下发配置。

dn001添加鉴权配置
dn001添加鉴权配置
dn002数据节点鉴权配置
dn002数据节点鉴权配置

5、查看配置过之后的数据同步配置详情信息。

查看数据同步配置详情
查看数据同步配置详情

第三部分:连接TBase进行实验数据的创建

1、连接到TBase 的命令行界面创建测试表t1

创建TBase分布式表t1:

create table t1 (id int primary key,name varchar(20)) distribute by shard (id);

插入测试数据:

insert into t1 values(100,'张三' ),(200,'李四' ),(300,'王五' );

创建测试数据
创建测试数据
插入测试数据
插入测试数据

第四部分:消费TBase生产的数据查看效果

切换到kafka 主机上进行数据的消费测试:

1、 查询生成的topic(相当于数据库中的表)

kafka-topics.sh --list --zookeeper 172.21.16.12:2181

列出topic信息
列出topic信息

消费刚刚创建的t1表的全部数据

2、kafka-console-consumer.sh --bootstrap-server 172.21.16.12:9092 --topic tbase_zhao_1.postgres.public.t1 --from-beginning

注:tbase_zhao_1.postgres.public.t1 格式:实例名字:数据库名:模式名:表名

消费TBase 表t1的数据
消费TBase 表t1的数据

3、消费出来或叫做读取出来的结果如下:

第一行数据
第一行数据
第二行数据
第二行数据
第三行数据
第三行数据

同时我们在进行TBase端的数据插入时,数据会被实时的消费出来。

postgres=# insert into t1 values(400,'张飞');

INSERT 0 1

postgres=# insert into t1 values(500,'刘备');

INSERT 0 1

postgres=# insert into t1 values(600,'关羽');

INSERT 0 1

============================消费记录如下============================================

"payload":{"before":null,"after":{"id":400,"name":"张飞"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377445676,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887184,"xmin":null},"op":"c","ts_ms":1606377445679,"transaction":null}}

"payload":{"before":null,"after":{"id":500,"name":"刘备"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377457368,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887896,"xmin":null},"op":"c","ts_ms":1606377457372,"transaction":null}}

"payload":{"before":null,"after":{"id":600,"name":"关羽"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377470187,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":683,"lsn":234888832,"xmin":null},"op":"c","ts_ms":1606377470191,"transaction":null}}

因为目前多种数据库在数据类型中支json数据类型,我们可以将kafka消费的数据接入到对应的数据库中加载使用。或者借助应用程序将其处理为纯文本的数据,进而可以进行跨平台或版本的异构数据迁移的同步或迁移操作。

可以使用kafka 将异构平台数据迁到TBase中或反向迁移等,同时也可将TBase数据消费使用,如果异构平台如Oracle,mysql,postgresql,等数据如果有需求迁到TBase中的话,也可以借助腾讯云的DTS中的DB bridge工具进行异构平台数据迁移评估,兼容性语句语法改造,全量/增量同步等功能的一个迁移方案。

Kafka是分布式流平台。

有3个主要特征:

  • 发布和订阅消息流,这一点与传统的消息队列相似。
  • 以容灾持久化方式的消息流存储。
  • 在消息流发生时处理消息流。

Kafka通常使用在两大类应用中:

  • 在系统或应用之间,构建实时、可靠的消息流管道。
  • 构建实时流应用程序,用于转换或响应数据流

Kafka的几个基本概念:

  • Kafka可以作为一个集群运行在跨越多个数据中心的多个服务上。
  • Kafka集群按照分类存储的消息流叫做topic
  • 每一个消息由一个主键、一个值、和一个时间戳组成。

具体更多的kafka相关原理和使用介绍,请访问腾讯云社区Kafka精进

1、https://cloud.tencent.com/developer/article/1476637

2、https://cloud.tencent.com/developer/article/1645644

3、https://cloud.tencent.com/developer/article/1607445

谢谢!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TBase如何接入Kafka组件进行数据消费
  • 第一部分:KAFKA的主机配置
  • 第二部分:KAFKA接入TBase 的OSS管理平台
  • 第三部分:连接TBase进行实验数据的创建
  • 第四部分:消费TBase生产的数据查看效果
相关产品与服务
分布式数据库 TDSQL
分布式数据库(Tencent Distributed SQL,以下简称 TDSQL)是腾讯打造的一款企业级数据库产品,具备强一致高可用、全球部署架构、高 SQL 兼容度、分布式水平扩展、高性能、完整的分布式事务支持、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档