前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用Java获取PostgreSQL变更数据

用Java获取PostgreSQL变更数据

作者头像
Grainger
发布2022-04-24 10:03:46
1.4K0
发布2022-04-24 10:03:46
举报
文章被收录于专栏:数据与未来数据与未来

上期示例了一下 Oracle CDC的配置 过程,本期我们再来看一下 用 Java 程序实现 PostgreSQL 如何实现变更数据的获取。

数据同步方式

PostgreSQL数据库提供了两种复制方式:物理复制和逻辑复制。

  • 物理复制

物理复制是指将主库 WAL 日志的日志页直接发到备机,备机完全应用的一种复制方式。

  • 逻辑复制

PostgreSQL 逻辑复制是事务级别的复制,使用订阅复制槽技术,通过在订阅端回放 WAL 日志中的逻辑条目。

物理复制和逻辑复制有各自的适用场景以及优缺点,这部分不是本篇讨论范围。

本篇我们主要介绍如何通过 Java 程序,实现 PostgreSQL 的逻辑复制。

PostgreSQL 配置

要使用 PostgreSQL 的逻辑复制功能,首先需要对数据库进行相应的配置以支持逻辑复制功能。

  • 在 postgres.conf 中加入以下配置项
代码语言:javascript
复制
wal_level = logical
max_wal_senders = 10
max_worker_processes = 10
max_replication_slots = 10
  • 创建复制账号
代码语言:javascript
复制
CREATE USER repuser REPLICATION LOGIN
CONNECTION LIMIT 8 ENCRYPTED PASSWORD 'repuser';
  • 创建逻辑复制槽
代码语言:javascript
复制
SELECT * FROM pg_create_logical_replication_slot(
'regression_slot', 'test_decoding');

Java 代码实现

在进行逻辑复制时,我们需要使用到 LSN 号进行复制。具体步骤如下:

1、获取 LSN

在 PostgreSQL 9.x 版本中,执行以下查询即可:

代码语言:javascript
复制
SELECT pg_current_xlog_location();

在 PostgreSQL 10.x 及以上版本中,执行以下查询即可:

代码语言:javascript
复制
SELECT pg_current_wal_lsn();

具体 Java 实现如下:

代码中的 queryLSN(String sql,String column) 方法就是简单的数据库查询,在这里不再列出具体代码。

下面的代码中用了个偷懒的办法,没有先判断数据库版本,而是先执行 pg_current_xlog_location() 如果报错代码为 42883 ,也就是未定义的函数,说明数据库版本不是9.x,则再执行 pg_current_wal_lsn() 。

代码语言:javascript
复制
/**
 * 得到开始的LSN
 * @return
 * @throws SQLException
 */
public LogSequenceNumber getStartLSN() throws SQLException {
    LogSequenceNumber currentLSN = null;
    try {
        final String lsn = this.queryLSN(
        "SELECT pg_current_xlog_location();",
         "pg_current_xlog_location");
        currentLSN = LogSequenceNumber.valueOf(lsn);
    }catch (SQLException exFunction) {
        if (!exFunction.getSQLState().equals("42883")) {
            throw exFunction;
        }
        final String lsn2 = this.queryforLSN(
        "SELECT pg_current_wal_lsn();",
        "pg_current_wal_lsn");
        currentLSN = LogSequenceNumber.valueOf(lsn2);
    }
    return currentLSN;
}
2、开启复制槽

以下代码中的 dbConnection 就是一个 PostgreSQL 的数据库连接。具体数据库连接代码不再列出。 下面代码中的 regression_slot 即最开始配置数据库时创建的逻辑复制槽名称。

代码语言:javascript
复制
/**
 * 开启复制槽
 * @throws Exception
 */
@Override
void startSlot() throws Exception {
    final LogSequenceNumber startLSN = this.getStartLSN();
    this.replConnection = this.dbConnection.unwrap(PGConnection.class);
    this.stream = ((this.replConnection.getReplicationAPI()
    .replicationStream()
    .logical()
    .withSlotName('regression_slot'))
    .withSlotOption("include-xids", true)
    .withSlotOption("include-timestamp",true)
    .withStartPosition(startLSN)).start();

}
3、获取数据库变更数据

开启复制槽之后,我们就可以获取实时变更数据。具体获取变更数据代码如下:

代码语言:javascript
复制
/**
 * 读取变更数据
 * @return
 * @throws Exception
 */
@Override
public String processRecords() throws Exception {
    String event = null;
    final ByteBuffer record = this.stream.read();
    if (record != null) {
        final int offset = record.arrayOffset();
        final byte[] source = record.array();
        final int length = source.length - offset;
        event = new String(source, offset, length);
    }
    return event;
}

我们可以使用一个线程,循环获取需要数据库的实时变更,并将得到的数据加入到一个队列中。

到这里,我们就完成了用 Java 程序获取 PostgreSQL 数据变更的关键代码。

现在就可以到数据库里插入数据。

读取到的数据格式如下:

代码语言:javascript
复制
BEGIN 36652
table test.source: INSERT: id[integer]:1 name[character varying]:'grainger' address[character varying]:'China'
COMMIT 36652 (at 2020-09-19 12:00:41.005607+08)

是不是很简单呢?

关键代码都给你了,剩下的就看你的了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 山东Oracle用户组 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据同步方式
  • PostgreSQL 配置
  • Java 代码实现
    • 1、获取 LSN
      • 2、开启复制槽
        • 3、获取数据库变更数据
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档