数据订阅库表到 Redis

最近更新时间:2021-03-01 15:03:59

本文将以一个简单案例来说明数据订阅中拉取对应表到 Redis 的功能,并且提供简易 RedisDemo 下载 。以下操作将在 CentOS 操作系统中完成。

配置环境

获取密钥

登录 访问管理控制台 获取密钥。

选择数据订阅

  1. 登录 DTS 控制台,选择左侧的【数据订阅】,进入数据订阅页面。
  2. 选择需同步的 TencentDB 实例名,然后单击启动,再返回数据订阅,单击您所创建的数据订阅。 详细介绍请参考 如何获取数据订阅
  3. 查看对应的 DTS 通道、IP 和 Port,然后结合之前的密钥填写到对应 RedisDemo.java 里面。
context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); 请填写您从云API获取的secretID。
      context.setSecretKey("test111usdfsdfsddsfRkeT"); 请填写您从云API获取的secretKey.
  // 在数据迁移服务里面通过数据P阅获取到对应的ip,port,填写到此处
      context.setServiceIp("10.66.112.181"); 请填写您从数据订阅配置获取到的IP
      context.setServicePort(7507); 请填写您从数据订阅配置获取到的PORT
       // 创建消费者
      //SubscribeClient client=new DefaultSubscribeClient(context,true);
      final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
       final Jedis jedis = new Jedis("127.0.0.1", 6379); 请填写您对应的redis主机和端口
       final String targetDatabase = "test"; 填写您所要订阅的库名
      final String targetTable = "alantest"; 填写您所要订阅的表名,表有2个字段分别是id,name。(id是做了主键)。
       // 创建订阅监听者listener
      ClusterListener listener = new ClusterListener() {
          @Override
          public void notify(List<ClusterMessage> messages) throws Exception {
//                System.out.println("--------------------:" + messages.size());
              for(ClusterMessage m:messages){
                  DataMessage.Record record = m.getRecord();
                  //过滤不感兴趣的订阅信息
           if(!record.getDbName().equalsIgnoreCase(targetDatabase) || !record.getTablename().equalsIgnoreCase(targetTable)){
                      //注意:对于不感兴趣的信息也必须Ack
                      m.ackAsConsumed();
                      continue;
                  }
                   if(record.getOpt() != DataMessage.Record.Type.BEGIN && record.getOpt() != DataMessage.Record.Type.COMMIT){
                      List<DataMessage.Record.Field> fields = record.getFieldList();
                       //INSERT RECORD
                      //String pk = record.getPrimaryKeys();

                      if(record.getOpt() == DataMessage.Record.Type.INSERT){

        String keyid="";
        String value="";
                          for (DataMessage.Record.Field field : fields) {
                               //先获取id值,需要有primary key,然后找到名为name的列,赋值给redis 插入key和name对应的value.
        if(field.getFieldname().equalsIgnoreCase("id")){
                                  keyid=field.getValue();
             continue;
                              }
        if(field.getFieldname().equalsIgnoreCase("name")){
            value=field.getValue();

                              }
        jedis.set(keyid, value);
                          }
                       }

编译操作与检验

  1. [root@VM_71_10_centos ~]# javac -classpath binlogsdk-2.6.0-release.jar:jedis-2.9.0.jar -encoding UTF-8 RedisDemo.java 
  2. 执行启动,如果没有异常报错就是正常在服务了,然后查看对应之前设置的落地文件。
    java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:jedis-2.9.0.jar RedisDemo
  3. 查看进行数据库插入和 update 操作,并从 redis 观察发现确实插入并修改成功了,最后进行 delete 操作,redis 对应的数据也被删除掉了。
MySQL [test]> insert into alantest values(1001,'alan1');
Query OK, 1 row affected (0.00 sec)
MySQL [test]> update alantest set name='alan2' where id=1001;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
------------------------
127.0.0.1:6379> get 1001
"alan2"
MySQL [test]> update alantest set name='alan3' where id=1001;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0
------------------------
127.0.0.1:6379> get 1001
"alan3"
MySQL [test]> delete from alantest where id=1001;
Query OK, 1 row affected (0.00 sec)
-----------------------
127.0.0.1:6379> get 1001
(nil)
目录