Oracle数据同步到Kafka

需求:Oracle 数据库结构化数据同步到Kafka,并接入大数据平台

目标:通过Oracle GoldenGate 实现Oracle数据库到Kafka数据的同步

步骤:

基本信息:

Kafka集群地址:kafka_host:9092

Kafka topic: goldengate

Oracle 数据库同步表名:fund

一、Oracle源库准备

二、GoldenGate源库准备

三、GoldenGate For Kafka配置

一、Oracle源库准备:

export TMPDIR=/oracle/app/db/xttsdir

export OGG_HOME=/oradata/ogg

Oracle源数据库准备:

CREATE USER ogg IDENTIFIED BY ogg4test DEFAULT TABLESPACE USERS ;

GRANT CONNECT,RESOURCE,DBA TO ogg;

show parameter pfile

SQL> alter system set enable_goldengate_replication=TRUE scope=both;

设置归档目录:/oradata/archive

SQL>alter system set log_archive_dest_1='location=/oradata/archive' scope=both;

SQL>shutdown immediate

SQL>startup mount

SQL>alter database archivelog;

SQL>archive log list;

SQL>alter database open;

SQL>select force_logging,supplemental_log_data_min from v$database;

SQL>alter database force logging;

SQL>alter database add supplemental log data;

SQL>alter system archive log current;

二、Oracle GoldenGate源库准备:

1. 安装goldengate 软件,./runInstaller 图形化安装

./ggsci

GGSCI (test01) 1> create subdirs

Creating subdirectories under current directory /oradata/ogg

Parameter file /oradata/ogg/dirprm: already exists.

Report file /oradata/ogg/dirrpt: already exists.

Checkpoint file /oradata/ogg/dirchk: already exists.

Process status files /oradata/ogg/dirpcs: already exists.

SQL script files /oradata/ogg/dirsql: already exists.

Database definitions files /oradata/ogg/dirdef: already exists.

Extract data files /oradata/ogg/dirdat: already exists.

Temporary files /oradata/ogg/dirtmp: already exists.

Credential store files /oradata/ogg/dircrd: already exists.

Masterkey wallet files /oradata/ogg/dirwlt: already exists.

Dump files /oradata/ogg/dirdmp: already exists.

2. 配置OGG Mgr进程:

GGSCI>edit param mgr

PORT 7809

PURGEOLDEXTRACTS ./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72

AUTORESTART ER *, RETRIES 3, WAITMINUTES 10, RESETMINUTES 60

3. 增加需同步的oracle数据库表FUND:

GGSCI (test01) 7> DBLOGIN USERID ogg, PASSWORD ogg4test

Successfully logged into database.

GGSCI (test01 as ogg@zjtest1) 8> ADD TRANDATA funder.FUND

2018-05-17 15:25:19 INFO OGG-15132 Logging of supplemental redo data enabled for table funder.FUND.

2018-05-17 15:25:19 INFO OGG-15133 TRANDATA for scheduling columns has been added on table funder.FUND.

2018-05-17 15:25:20 INFO OGG-15135 TRANDATA for instantiation CSN has been added on table funder.FUND.

GGSCI (test01 ) 9> add ext eapps, tranlog, begin now,threads 1

4. 抽取传递进程配置:

为eapps配置参数:

GGSCI (test01 )> edit param eapps

extract eapps

USERID ogg, PASSWORD ogg4test

DISCARDFILE ./dirrpt/eapps.dsc, APPEND

DISCARDROLLOVER AT 01:00 ON SUNDAY

EXTTRAIL ./dirdat/et

TRANLOGOPTIONS DBLOGREADER

STATOPTIONS REPORTFETCH

REPORTCOUNT every 10 minutes, RATE

REPORTROLLOVER AT 01:00 ON SUNDAY

TABLE funder.FUND;

GGSCI (test01) > add ext papps, exttrailsource ./dirdat/et

EXTRACT added.

GGSCI (test01) > add rmttrail ./dirdat/rt, ext papps, megabytes 500

RMTTRAIL added.

编辑papps进程

GGSCI (test01) >edit params papps

extract papps

RMTHOST 192.168.1.22, MGRPORT 7809

PASSTHRU

RMTTRAIL ./dirdat/rt

TABLE funder.FUND;

GGSCI (test01) > start papps

Sending START request to MANAGER ...

EXTRACT papps starting

GGSCI (test01) > info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING eapps 00:00:00 00:00:01

EXTRACT RUNNING papps 00:00:00 00:00:18

GGSCI (test01) >

GGSCI (test01) > start eapps

Sending START request to MANAGER ...

EXTRACT eapps starting

GGSCI (test01)> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING eapps 00:00:00 00:02:53

三、GoldenGate For Kafka配置

安装OGG for big data Adapters:

ggs_Adapters_Linux_x64直接在/oracle/app/ogg4bigdata目录下解压(无需安装)

$ls –lrt

drwxr-xr-x 10 oracle oinstall 4096 Aug 29 2017ogg4bigdata

配置OGG进程:

GGSCI >create subdirs

配置Manager,并启动mgr进程:

GGSCI >edit params mgr

PORT 7809

PURGEOLDEXTRACTS./dirdat/et* , USECHECKPOINTS, MINKEEPHOURS 72

AUTORESTART ER *, RETRIES3, WAITMINUTES 10, RESETMINUTES 60

GGSCI > start mgr

3. 配置Kafka进程

GGSCI> add replicat rkafka,exttrail dirdat/rt,begin now

GGSCI>edit params rkafka

REPLICAT rkafka

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP funder.*, TARGET funder.*;

4. 修改kafka.props (kafka.props要放在/data/ogg/dirprm目录)

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.KafkaProducerConfigFile=./dirprm/custom_kafka_producer.properties

#The following resolves the topicname using the short table name

gg.handler.kafkahandler.topicMappingTemplate=goldengate

#The following selects the messagekey using the concatenated primary keys

gg.handler.kafkahandler.keyMappingTemplate=id

gg.log=log4j

#Sample gg.classpath for ApacheKafka

gg.classpath=dirprm/:/oracle/app/ogg4bigdata/ggjava/resources/lib/*:/oracle/app/ogg4bigdata/kafka/libs/*

#Sample gg.classpath for HDP

#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

5. custom_kafka_producer.properties

bootstrap.servers=kafka_host:9092

acks=1

测试过程:

如下在oracle数据库表中插入数据:

同时在kafka中监控JSON数据是否接收到

有问题请留言。

  • 发表于:
  • 原文链接:https://kuaibao.qq.com/s/20180622G0PRXY00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券