目录
8、添加trail文件到replicate进程
服务器 | 主机名 | IP | OGG |
---|---|---|---|
物流运输业务服务器 | node1 | 192.168.88.10 | 源端 |
大数据服务器 | node2 | 192.168.88.20 | 目标端 |
需要切换到oracle用户操作: |
---|
su - oracle |
因为配置数据库需要在sqlplus中执行,所以使用sysdba用户登录: |
sqlplus / as sysdba |
执行归档查询命令: |
---|
archive log list |
Automatic archival是Disabled状态,因为Oracle默认是不开启自动归档的
以DBA的身份连接数据库,执行命令:
conn /as sysdba
关闭数据库,执行命令:
shutdown immediate
启动并装载数据库,但没有打开数据文件,该命令常用来修改数据库运行模式或恢复数据库。执行命令:
startup mount
执行开启归档命令:
alter database archivelog;
执行打开数据库命令:
alter database open;
执行自动归档命令:
alter system archive log start;
执行归档查询命令:archive log list
Automatic archival变成了Enabled状态,表示已经开启自动归档成功
执行SQL语句验证:select force_logging,supplemental_log_data_min from v$database;
当显示NO的时候表示没有开启,需要调整
开启强制日志后数据库会记录除临时表空间或临时回滚段外所有的操作,命令:
alter database force logging;
开启辅助日志命令:alter database add supplemental log data;
开启主键附加日志命令:alter database add supplemental log data (primary key) columns;
开启全列附加日志命令:alter database add supplemental log data (all) columns;
执行SQL语句验证:select force_logging,supplemental_log_data_min from v$database;
当显示为YES的时候表示开启成功。
操作步骤说明
1 创建OGG源端的目录
使用root用户创建:mkdir /u01/app/ogg/src
2 添加OGG源端的目录到oracle用户的环境变量中(需要切换到oracle用户操作)
su - oracle
vim ~/.bash_profile
export OGG_SRC_HOME=/u01/app/ogg/src
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
source ~/.bash_profile
退出oracle用户shell命令:exit
3 解压OGG源端软件
OGG源端的软件包是V34339-01.zip,存放在/export/softwares/oracle/ogg目录下。需要使用root用户解压
cd /export/softwares/oracle/ogg
创建src文件夹是用来存放解压后的OGG源端软件
mkdir /export/softwares/oracle/ogg/src/
解压OGG源端软件到src文件夹下
unzip /export/softwares/oracle/ogg/V34339-01.zip -d /export/softwares/oracle/ogg/src/
cd /export/softwares/oracle/ogg/src/
fbo_ggs_Linux_x64_ora11g_64bit.tar文件才是OGG源端的软件包,解压该文件到/u01/app/ogg/src目录下,执行命令:
tar -xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /u01/app/ogg/src
4 配置/u01/app/ogg/src目录及其所有文件的权限
使用root用户执行授权命令:
chown -R oracle:oinstall /u01/app/ogg/src
可以看到/u01/app/ogg/目录下的src属于oracle用户和oinstall组
可以看到/u01/app/ogg/src目录下的所有文件都属于oracle用户和oinstall组
操作步骤说明
1 创建表空间在磁盘中的物理路径(需要到root用户操作)
mkdir -p /u01/app/oracle/oggdata/orcl/
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl
2 进入sqlplus
切换到oracle用户:su - oracle
登录sqlplus:sqlplus "/as sysdba"
3 创建oggtbs表空间
create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs.dbf' size 500M autoextend on;
4 创建ogg用户(用户名和密码都是ogg)
create user ogg identified by ogg default tablespace oggtbs;
5 赋予ogg用户dba权限
grant dba to ogg;
操作步骤说明
1 使用oracle用户登录源端OGG的命令行中
su – oracle
cd $OGG_SRC_HOME
./ggsci
2 初始化源端OGG目录
注意:如果不在OGG_SRC_HOME下,初始化OGG目录时会报错
create subdirs
退出OGG命令行客户端:exit
3 检查源端OGG初始化后的目录
初始化完成后,可以查询在$OGG_SRC_HOME下是否存在dirchk、dirdat、dirdef、dirjar、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp共11个目录。
切换到oracle用户: |
---|
su – oracle |
登录sqlplus: |
sqlplus "/as sysdba" |
在oracle中创建test_ogg用户: |
create user test_ogg identified by test_ogg default tablespace users; |
为test_ogg用户授权: |
grant dba to test_ogg; |
使用test_ogg用户登录: |
conn test_ogg/test_ogg; |
创建test_ogg表: |
create table test_ogg(id int ,name varchar(20),primary key(id)); |
切换到oracle用户下:su – oracle |
---|
打印源端OGG_SRC_HOME:echo $OGG_SRC_HOME |
进入OGG_SRC_HOME:cd $OGG_SRC_HOME |
./ggsci |
---|
dblogin userid ogg password ogg |
---|
edit param ./globals
oggschema ogg |
---|
然后跟使用vi一样,在新窗口中添加oggschema ogg后保存退出编辑 |
./ggsci
创建mgr进程:edit param mg
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
./ggsci |
---|
add trandata test_ogg.test_ogg |
info trandata test_ogg.test_ogg |
配置Extract进程:edit param extkafka
新增内容:
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /u01/app/ogg/src/dirdat/to
table test_ogg.test_ogg;
配置Pump进程:edit param pukafka
新增内容:
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost localhost mgrport 7809
rmttrail /u01/app/ogg/tgr/dirdat/to
table test_ogg.test_ogg;
extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。
注意:该文件用来在异构数据源之间传输时,需明确知道表之间的映射关系,比如:
Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行:
配置define文件:edit param test_ogg
defsfile /u01/app/ogg/src/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;
生成表schema文件:(在OGG_SRC_HOME目录下执行(oracle用户))
./defgen paramfile dirprm/test_ogg.prm
将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:
scp -r /u01/app/ogg/src/dirdef/test_ogg.test_ogg /u01/app/ogg/tgr/dirdef/
因为目标端目录还没有创建,因此发送文件可能会失败,所以执行完目标端配置后发送即可
使用root用户创建:mkdir /u01/app/ogg/tgr |
---|
从root用户切换到oracle用户:su oracle
注意【非常重要】:在这里,需要调整oracle用户的.bash_profile,主要是更新LD_LIBRARY_PATH的值,来确保源端和目标端的ggsci命令都可以正常使用。如果不更新的话,目标端ggsci命令可以使用,但源端的ggsci命令无法使用,会报错./ggsci: error while loading shared libraries: libclntsh.so.11.1: cannot open shared object file: No such file or directory
vim ~/.bash_profile
export OGG_TGR_HOME=/u01/app/ogg/tg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$ORACLE_HOME/lib:/usr/lib
source ~/.bash_profile
退出oracle用户shell命令:exit
OGG源端的软件包是V971332-01.zip,存放在/export/softwares/oracle/ogg目录下。需要使用root用户解压
cd /export/softwares/oracle/ogg
创建tgr文件夹是用来存放解压后的OGG目标端软件
mkdir -p /export/softwares/oracle/ogg/tgr/
解压OGG目标端软件到tgr文件夹下
unzip /export/softwares/oracle/ogg/V971332-01.zip -d /export/softwares/oracle/ogg/tgr/
cd /export/softwares/oracle/ogg/tgr/
ggs_Adapters_Linux_x64.tar文件是真正的OGG目标端软件包,解压该文件到/u01/app/ogg/tgr目录下,执行命令:
tar -xf ggs_Adapters_Linux_x64.tar -C /u01/app/ogg/tgr/
解压后的默认用户和组
使用root用户执行授权命令:
chown -R oracle:oinstall /u01/app/ogg/tg
可以看到/u01/app/ogg/目录下的tgr属于oracle用户和oinstall组。
ll /u01/app/ogg/tg
可以看到/u01/app/ogg/tgr目录下的所有文件都属于oracle用户和oinstall组。
可以看到/u01/app/ogg/目录下的tgr属于oracle用户和oinstall组。
su oracle
切换oracle用户时需要重新加载环境变量:source ~/.bash_profile
cd $OGG_TGR_HOME
./ggsci
注意:如果不在OGG_TGR_HOME下,初始化目标端OGG目录时会报错
create subdirs
退出OGG命令行客户端:exit
初始化完成后,可以查询在$OGG_TGR_HOME下是否存在dirchk、dircrd、dirdat、dirdef、dirdmp、diretc、dirout、dirpcs、dirprm、dirrpt、dirsql、dirtmp、dirwlt、dirwww共14个目录。
将生成的/u01/app/ogg/src/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:
scp -r $OGG_SRC_HOME/dirdef/test_ogg.test_ogg $OGG_TGR_HOME/dirdef/
解压:tar -zxf /export/softwares/zookeeper-3.4.14.tar.gz -C /export/services/ |
---|
创建软连接:ln -s /export/services/zookeeper-3.4.14 /export/services/zookeeper |
创建zoo.cfg:cp /export/services/zookeeper/conf/zoo_sample.cfg /export/services/zookeeper/conf/zoo.cfg |
配置zoo.cfg:vim /export/services/zookeeper/conf/zoo.cfg |
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/export/datas/zookeeper/data dataLogDir=/export/datas/zookeeper/log clientPort=2181 |
创建ZooKeeper的数据路径: |
mkdir -p /export/datas/zookeeper/data mkdir -p /export/datas/zookeeper/log |
添加到环境变量: |
vim /etc/profile |
export ZOOKEEPER_HOME=/export/services/zookeeper export PATH=.:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH |
source /etc/profile |
启动ZooKeeper: |
zkServer.sh start zkServer.sh status |
解压:tar -zxf /export/softwares/kafka_2.11-2.2.0.tgz -C /export/services/ |
---|
创建软连接:ln -s /export/services/kafka_2.11-2.2.0 /export/services/kafka |
配置server.prperties:vim /export/services/kafka/config/server.properties |
listeners=PLAINTEXT://server01:9092 broker.id=0 zookeeper.connect=server01:2181 |
添加环境变量:vim /etc/profile |
export KAFKA_HOME=/export/services/kafka export PATH=.:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH |
source /etc/profile |
启动Kafka: |
kafka-server-start.sh -daemon /export/services/kafka/config/server.properties |
创建主题: |
kafka-topics.sh --create --zookeeper server01:2181 --replication-factor 1 --partitions 1 --topic test_ogg |
查看主题: |
kafka-topics.sh --list --zookeeper server01:2181 |
操作步骤 | 说明 |
---|---|
1 | 使用oracle用户进入OGG_SRC_HOME目录下 |
切换到oracle用户下:su – oracle | |
打印目标端OGG_TGR_HOME:echo $OGG_TGR_HOME | |
进入OGG_TGR_HOME:cd $OGG_TGR_HOME | |
启动ggsci:./ggsci | |
2 | 配置目标端MRG进程 |
配置MGR进程:edit param mgr | |
新增内容: | |
PORT 7810 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 | |
edit param ./GLOBALS |
---|
新增内容: |
CHECKPOINTTABLE test_ogg.checkpoint |
操作步骤 | 说明 |
---|---|
1 | 配置目标端Replicate进程 |
配置replicate进程:edit param rekafka | |
REPLICAT rekafka sourcedefs /u01/app/ogg/tgr/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg; | |
操作步骤 | 说明 |
---|---|
1 | 添加trail文件到Replicate进程 |
add replicat rekafka exttrail /u01/app/ogg/tgr/dirdat/to,checkpointtable test_ogg.checkpoint | |
cd $OGG_TGR_HOME |
---|
vim dirprm/kafka.props |
新增内容: |
gg.handlerlist=kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.topicMappingTemplate=test_ogg gg.handler.kafkahandler.format=json gg.handler.kafkahandler.mode=op gg.classpath=dirprm/:/export/services/kafka/libs/*:/u01/app/ogg/tgr/:/u01/app/ogg/tgr/lib/* |
cd $OGG_TGR_HOME |
---|
vim dirprm/custom_kafka_producer.properties |
新增内容: |
bootstrap.servers=server01:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 |
在目标端,主要做了4个操作,共包括2个进程,分别是MANAGER和REPLICAT。 |
---|
前提:切换到 oracle 账号且启动了 Oracle
注意:要严格按照启动顺序执行:
操作步骤说明
1 启动源端mgr进程
cd $OGG_SRC_HOME
./ggsci
查看所有进程状态:info all
启动MANAGER进程:start mg
检查所有进程状态:info all
2 启动目标端mgr进程
cd $OGG_TGR_HOME
./ggsci
启动MANAGER进程:start mg
查看所有进程状态:info all
3 启动源端extract进程
cd $OGG_SRC_HOME
./ggsci
启动EXTRACT进程:start extkafka
查看所有进程状态:info all
4 启动源端pump进程
启动pump进程:start pukafka
查看所有进程状态:info all
5 启动目标端replicat进程
cd $OGG_TGR_HOME
./ggsci
启动replicat进程:start rekafka
查看所有进程状态:info all
6 确认源端和目标端进程运行情况
源端:
目标端:
su – oracle |
---|
sqlplus "/as sysdba" |
conn test_ogg/test_ogg |
登录到test_ogg用户下:conn test_ogg/test_ogg
查看该用户拥有的表:select table_name from user_tables;
查看TEST_OGG表的字段信息:
select column_name,data_type from user_tab_columns where table_name = upper('TEST_OGG');
插入数据:insert into test_ogg values(1,'beijing');
执行Commit:commit;
查看kafka是否多出了一个叫做test_ogg的主题
kafka-topics.sh --list --zookeeper server01:2181
然后启动kafka的消费者来消费test_ogg主题的数据:
kafka-console-consumer.sh --bootstrap-server server01:9092 --topic test_ogg --from-beginning
再查看kafka的test_ogg主题下是否有了数据:
执行修改数据(修改id=2的name为china-shanghai):
update test_ogg set name='china-shanghai' where id=2;
查看kafka中是否有了id为2的china-shanghai的这条记录:
删除id为2的数据:delete test_ogg where id=2
检查kafka是否多个一条被标记为删除的数据:
:{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-05-28 09:22:18.000129",
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2020-05-28 09:25:17.000140","current_ts":"2020-05-28T09:25:22.085000","pos":"00000000000000001227","before":{},"after":{"ID":1,"NAME":"china-shanghai"}}
如果数据库中发生了事务(都被commit后)操作,会在源端和目标端的dirdat下生成trail数据文件。数据文件名称只能使用2个字母,多了会报错。
当源端发生事务后,检查源端的trail文件:
ll /u01/app/ogg/src/dirdat/
目标端会接收到源端pump进程传过来的数据文件:
ll /u01/app/ogg/tgr/dirdat/
源端错误日志路径 | /u01/app/ogg/src/ggserr.log |
---|---|
目标端错误日志路径 | /u01/app/ogg/tgr/ggserr.log |