前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一个完整的Mysql到Hbase数据同步项目思想与实战

一个完整的Mysql到Hbase数据同步项目思想与实战

作者头像
公众号guangcity
发布2019-09-20 17:25:48
3.4K1
发布2019-09-20 17:25:48
举报
文章被收录于专栏:光城(guangcity)光城(guangcity)

一个完整的Mysql到Hbase数据同步项目思想与实战

0.导语

对于上次文章预告,这次则以项目实战从后往前进行,先给大家一个直观的应用,从应用中学习,实践中学习。

欢迎大家留言,转发,多多支持!

本次可以学习如下知识:

  • MySQL binlog启用配置与使用
  • binlog查看提取方案maxwell
  • Kafka基本使用
  • Hbase基本使用
  • Python操纵Hbase
  • binlog->maxwell->kafka->hbase方案

这次的实践名字为:数据增量同步!

采用方案为:binlog->maxwell->Kafka->Hbase!

1.Mysql binlog

binlog是sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志。

可以简单的理解该log记录了sql标中的更新删除插入等操作记录。通常应用在数据恢复、备份等场景。

1.1 如何开启?

开启binlog

对于我的mysql的配置文件在下面这个文件夹,当然直接编辑my.cnf也是可以的。

代码语言:javascript
复制
vi /etc/mysql/mysql.conf.d/mysqld.cnf 

对配置文件设置如下:

1.2 查看是否启用

进入mysql客户端输入:

代码语言:javascript
复制
show variables like '%log_bin%';

1.3 binlog介绍

我的log存放在var下面的log的mysql下面:

在mysql-bin.index中包含了所有的log文件,比如上述图就是包含了1与2文件,文件长度超过相应大小就会新开一个log文件,索引递增,如上面的000001,000002。

1.4 binlog实战

首先创建一个表:

代码语言:javascript
复制
create table house(id int not null primary key,house int,price int);

向表中插入数据:

代码语言:javascript
复制
insert into loaddb.house(id,house,price) values(1,2,3);

上面提到插入数据后,binlog会更新,那么我们去查看上面log文件,应该会看到插入操作。

Mysql binlog日志有ROWStatementMiXED三种格式;

代码语言:javascript
复制
set global binlog_format='ROW/STATEMENT/MIXED'

命令行:

代码语言:javascript
复制
show variables like 'binlog_format'

对于mysql5.7的,binlog格式默认为ROW,所以不用修改。

那么为何要了解binlog格式呢,原因很简单,我要查看我的binlog日志,而该日志为二进制文件,打开后是乱码的。对于不同的格式,查看方式不一样!

对于ROW模式生成的sql编码需要解码,不能用常规的办法去生成,需要加上相应的参数,如下代码:

代码语言:javascript
复制
sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v

使用mysqlbinlog工具查看日志文件:

2.Kafka

Kafka是使用Java开发的应用程序,Kafka需要运行Zookeeper,两者都需要Java,所以在需要安装ZookeeperKafka之前,先安装Java环境。

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。在这里可以把Kafka理解为生产消费者模式。

2.1 Zookeeper安装及配置

Zookeeper下载: https://www.apache.org/dyn/closer.cgi/zookeeper/

下载相应的tar.gz文件,然后解压后移动到/usr/local下面即可。

配置:

代码语言:javascript
复制
cp zoo_sample.cfg zoo.cfg

重要配置:

代码语言:javascript
复制
# 数据目录
dataDir=/usr/local/zookeeper/data
# 日志
dataLogDir=/usr/local/zookeeper/logs
# 客户端访问Zookeeper的端口号
clientPort=2181

如果日志文件夹logs不存在,记得mkdir创建一下即可。data目录也是这样。

最后配置到用户PATH里面:

代码语言:javascript
复制
vi ~/.bashrc

系统环境变量

代码语言:javascript
复制
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

环境变量生效:

代码语言:javascript
复制
source ~/.bashrc

2.2 启动Zookeeper

直接输入zkServer.sh start即可!

2.3 Kafka安装及配置

Kafka下载地址: http://kafka.apache.org/downloads

同上述安装,这里下载.tgz文件,也是解压后移动到/usr/local即可!

关于配置文件可以直接采用默认的即可。

2.4 启动Kafka

代码语言:javascript
复制
./bin/kafka-server-start.sh ./config/server.properties 

2.5 封装上述两个启动

将Zookeeper与Kafka启动封装成一个脚本:

启动脚本:

代码语言:javascript
复制
#!/bin/bash
./zookeeper/bin/zkServer.sh start
./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties

关闭脚本:

代码语言:javascript
复制
#!/bin/bash
./kafka/bin/kafka-server-stop.sh # first stop kafka
./zookeeper/bin/zkServer.sh stop # then stop zookeeper

验证启动结果:

验证关闭结果:

2.6 Topic创建

当使用下面一节maxwell提取出来的binlog信息的时候,默认使用kafka进行消费。

代码语言:javascript
复制
./kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.7 发布与订阅

向Topic上发布消息,按Ctrl+D结束:

代码语言:javascript
复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

从Topic上接收消息,按Ctrl+C结束:

代码语言:javascript
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3.binlog提取工具Maxwell

3.1 Maxwell安装及配置

Maxwell是将mysql binlog中的insert、update等操作提取出来,并以json数据返回的一个工具。

当然自己也可以用编程实现!

下载地址:http://maxwells-daemon.io/

安装方式同上。

3.2 mysql配置Maxwell

Maxwell配置文件中默认用户名密码均为maxwell,所以需要在mysql中做相应的授权。

代码语言:javascript
复制
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'maxwell';
mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
mysql> flush privileges;

3.3 配置Maxwell

代码语言:javascript
复制
cp config.properties.example config.properties
vi config.properties

maxwell配置:

代码语言:javascript
复制
log_level=info
# 默认生产者
producer=kafka
kafka.bootstrap.servers=localhost:9092

# mysql login info
host=localhost
user=maxwell
password=maxwell

# kafka配置
kafka_topic=test
kafka.compression.type=snappy
kafka.acks=all
kinesis_stream=test

3.4 启动maxwell

代码语言:javascript
复制
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test

当然也可以把上述封装成一个启动脚本:

代码语言:javascript
复制
#!/bin/bash
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test

直接启动:

代码语言:javascript
复制
./start_maxwell.sh

4.Hbase

4.1 安装及配置

下载Hbase: https://mirrors.cnnic.cn/apache/hbase/

代码语言:javascript
复制
wget http://mirrors.cnnic.cn/apache/hbase/xxx/hbase-xxx-bin.tar.gz
tar zxvf hbase-xxx-bin.tar.gz
sudo mv zxvf hbase-xxx-bin /usr/local/hbase

然后把Hadoop中的conf/hbase-site.xml配置如下:

代码语言:javascript
复制
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:///home/hadoop/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hadoop/zookeeper</value>
  </property>
</configuration>

并将该配置文件拷贝到hbase/conf下面即可。

启动Hbase:

代码语言:javascript
复制
bin/hbase shell

环境变量设置:

修改bashrc文件,添加如下:

代码语言:javascript
复制
export HBASE_HOME=/usr/local/hbase
export PATH=$PATH:$HBASE_HOME/bin

后面启动只需要:

代码语言:javascript
复制
hbase shell

4.2 基本使用

HBase 是一种列式的分布式数据库,不支持多表连接查询,可以按照ROW查询,当中列字段在簇里面可以设置。

查询所有表

代码语言:javascript
复制
list

创建表

info就是簇

代码语言:javascript
复制
create 'test','info'

添加数据

a,b,c是info簇下的三列,value1,value2,value3就是值。

代码语言:javascript
复制
put 'test', 'row1', 'info:a', 'value1'
put 'test', 'row2', 'info:b', 'value2'
put 'test', 'row3', 'info:c', 'value3'

查询所有数据

代码语言:javascript
复制
scan 'test'

关于更多数据库操作及介绍可以去官网学习,掌握上述知识对于本节的实战就够了!

4.3 Python操作Hbase

代码语言:javascript
复制
pip install thrift
pip install happybase

python连接Hbase需要启用thrift接口,启用方式:

代码语言:javascript
复制
./hbase/bin/hbase-daemon.sh start thrift

4.4 Python代码实现

代码语言:javascript
复制
import happybase

class hbase():

    def __init__(self):
        self.conn = happybase.Connection("127.0.0.1", 9090)
        print("===========HBASE数据库表=============\n")
        print(self.conn.tables())
        self.conn.open()

    def createTable(self,table_name,families):
        self.conn.create_table(table_name,families)

    def insertData(self,table_name,row,data):
        table = self.conn.table(table_name)
        table.put(row=row,data=data)


    def deletTable(self,table_name,flag):
        self.conn.delete_table(table_name,flag)

    def getRow(self,table_name):
        table = self.conn.table(table_name)
        print(table.scan())
        i=0
        for key, value in table.scan():
            print(key)
            print(value)
            i+=1
        print(i)

    def closeTable(self):
        self.conn.close()

htb = hbase()
table_name = 'test1'
families = {'info':{}}
htb.createTable(table_name,families)
htb.insertData(table_name,'row',{'info:content':'光城','info:price':'299'})
htb.deletTable(table_name,True)
htb.getRow(table_name)

5.实战

上述介绍了所有的安装与使用,下面来实战两个例子。

5.1 Kafka消费

流程如下:

往Mysql中实时更新,插入数据等操作,会记录到binlog中,然后使用maxwell解析binlog,用Kafka进行消费。

依次启动maxwell,Kafka以及消费Kafka。

代码语言:javascript
复制
./start_maxwell.sh
./start_kafka.sh
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

5.2 Hbase消费

Hbase消费则是在Kafka消费基础上做的一个调用,通过pykafka进行消费生产者的数据到Hbase中。流程为:

binlog->maxwell->python操作Kafka->python操作hbase->Hbase。

完整实现如下:

代码语言:javascript
复制
from pykafka import KafkaClient
import happybase
import json


class mysqlToHbase():

    def __init__(self):
        self.client = KafkaClient(hosts="localhost:9092")
        self.topic = self.client.topics['test']
        self.consumer = self.topic.get_balanced_consumer(consumer_group='sqbase', auto_commit_enable=True,
                                               zookeeper_connect='localhost:2181')
        self.conn = happybase.Connection("127.0.0.1", 9090)
        print("===========HBASE数据库表=============\n")
        print(self.conn.tables())
        self.conn.open()

    def putTohbase(self,table_name):
        for m in self.consumer:
            database = json.loads(m.value.decode('utf-8'))["database"]
            name = json.loads(m.value.decode('utf-8'))["table"]
            row_data = json.loads(m.value.decode('utf-8'))["data"]
            if database == 'mydb' and name == 'house':
                print(json.loads(m.value.decode('utf-8')))
                row_id = row_data["id"]
                row = str(row_id)
                del row_data["id"]
                data = {}
                for each in row_data:
                    neweach='info:'+each
                    data[neweach] = row_data[each]
                data['info:price'] = str(data['info:price'])

                self.insertData(table_name,row,data)


    def createTable(self,table_name,families):
        self.conn.create_table(table_name,families)

    # htb.insertData(table_name, 'row', {'info:content': 'asdas', 'info:price': '299'})
    def insertData(self,table_name,row,data):
        table = self.conn.table(table_name)
        table.put(row=row,data=data)


    def deletTable(self,table_name,flag):
        self.conn.delete_table(table_name,flag)

    def getRow(self,table_name):
        table = self.conn.table(table_name)
        print(table.scan())
        i=0
        for key, value in table.scan():
            print(key)
            print(value)
            i+=1
        print(i)


    def closeTable(self):
        self.conn.close()

htb = mysqlToHbase()
table_name = 'sql_hbase'
families = {'info':{}}
# htb.createTable(table_name,families)
htb.putTohbase(table_name)
htb.closeTable()

htb.deletTable(table_name,True)
# htb.getRow(table_name)

图中为当mysql中进行相应操作,hbase便会同步!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 光城 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一个完整的Mysql到Hbase数据同步项目思想与实战
    • 0.导语
      • 1.Mysql binlog
        • 1.1 如何开启?
        • 1.2 查看是否启用
        • 1.3 binlog介绍
        • 1.4 binlog实战
      • 2.Kafka
        • 2.1 Zookeeper安装及配置
        • 2.2 启动Zookeeper
        • 2.3 Kafka安装及配置
        • 2.4 启动Kafka
        • 2.5 封装上述两个启动
        • 2.6 Topic创建
        • 2.7 发布与订阅
      • 3.binlog提取工具Maxwell
        • 3.1 Maxwell安装及配置
        • 3.2 mysql配置Maxwell
        • 3.3 配置Maxwell
        • 3.4 启动maxwell
      • 4.Hbase
        • 4.1 安装及配置
        • 4.2 基本使用
        • 4.3 Python操作Hbase
        • 4.4 Python代码实现
      • 5.实战
        • 5.1 Kafka消费
        • 5.2 Hbase消费
    相关产品与服务
    TDSQL MySQL 版
    TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档