前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >OGG实现Oracle数据同步到Kafka

OGG实现Oracle数据同步到Kafka

作者头像
星哥玩云
发布2022-08-16 14:33:40
2K0
发布2022-08-16 14:33:40
举报
文章被收录于专栏:开源部署

环境: 源端:Oracle12.2 ogg for Oracle 12.3 目标端:Kafka ogg for bigdata 12.3 将Oracle中的数据通过OGG同步到Kafka

源端配置: 1、为要同步的表添加附加日志 dblogin USERID ogg@orclpdb, PASSWORD ogg add trandata scott.tab1 add trandata scott.tab2

2、 添加抽取进程 GGSCI>add extract EXT_KAF1,integrated tranlog, begin now GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数: GGSCI> edit params EXT_KAF1

extract EXT_KAF1 userid c##ggadmin,PASSWORD ggadmin LOGALLSUPCOLS UPDATERECORDFORMAT COMPACT exttrail ./dirdat/k1,FORMAT RELEASE 12.3 SOURCECATALOG orclpdb --(指定pdb) table scott.tab1; table scott.tab2;

注册进程 GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin GGSCI> register extract EXT_KAF1 database container (orclpdb)

3、添加投递进程: GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1 GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数: GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1 USERID c##ggadmin,PASSWORD ggadmin PASSTHRU RMTHOST 10.1.1.247, MGRPORT 9178 RMTTRAIL ./dirdat/f1,format release 12.3 SOURCECATALOG orclpdb TABLE scott.tab1; table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化 GGSCI> add extract ek_01, sourceistable

编辑参数: GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02 EXTRACT ek_02 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab2;

5、生成def文件: GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3 SOURCECATALOG orclpdb table scott.tab1; table scott.tab2;

OGG_HOME下执行如下命令生成def文件 defgen paramfile dirprm/defgen1.prm

将生成的def文件传到目标端$OGG_HOME/dirdef下

目标端配置: 1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下 cd $OGG_HOME/AdapterExamples/big-data/kafka cp * $OGG_HOME/dirprm

2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下 cd $ORACLE_HOME/AdapterExamples/trail cp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化) GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01 SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/ka reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrun GGSCI> EDIT PARAMS rp_02

SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/kb reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程: GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1 GGSCI>edit params r_kaf1

REPLICAT r_kaf1 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2 GGSCI> edit params r_kaf2

REPLICAT r_kaf2 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置: custom_kafka_producer.properties文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号 acks=1 reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=16384 linger.ms=10000

kafka1.props文件内容如下: gg.handlerlist = kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate= topic1 #gg.handler.kafkahandler.format=avro_op gg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式 gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.format.truncateOpKey=T gg.handler.kafkahandler.format.prettyPrint=false gg.handler.kafkahandler.format.jsonDelimiter=CDATA[] gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键 gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字 gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。 #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/ javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复: 1、启动源端抓取进程 GGSCI> start EXT_KAF1 2、启动源端投递进程 GGSCI> start PMP_KAF1 3、启动源端初始化进程 GGSCI> start ek_01 4、启动目标端初始化进程 在$OGG_HOME下执行如下命令: ./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD 5、启动目标端恢复进程 GGSCI> start R_KAF1

遇到的错误: 1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的) 解决:重启MGR进程

2、ERROR OG-15051 Java or JNI exception

原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。 解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档