数据订阅库表到Redis

最近更新时间:2018-07-30 15:57:44

本文将以一个简单案例来说明数据订阅中拉取对应表到 Redis 的功能,并且提供简易 RedisDemo下载 。以下操作将在 Centos 操作系统中完成。

配置环境

  1. Java环境配置

    yum install java-1.8.0-openjdk-devel
    
  2. 数据订阅 SDK 下载
    单击下载

  3. jedis-2.9.0.jar 下载
    单击下载

获取密钥

登录 腾讯云控制台,单击导航条中的【云产品】>【管理工具】>【云 API 密钥】,或直接单击进入 云数据库控制台

选择数据订阅

  1. 登录 数据传输DTS控制台,选择左侧的【数据订阅】,进入数据订阅页面。
  2. 选择需同步的 TencentDB 实例名,然后单击启动,再返回数据订阅,单击您所创建的数据订阅。 详细介绍请参考 如何获取数据订阅

  3. 查看对应的 DTS 通道、 IP 和 Port,然后结合之前的密钥填写到对应 RedisDemo.java 里面。

 context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); 请填写您从云API获取的secretID。
        context.setSecretKey("test111usdfsdfsddsfRkeT"); 请填写您从云API获取的secretKey.
    // 在数据迁移服务里面通过数据P阅获取到对应的ip,port,填写到此处
        context.setServiceIp("10.66.112.181"); 请填写您从数据订阅配置获取到的IP
        context.setServicePort(7507); 请填写您从数据订阅配置获取到的PORT

        // 创建消费者
        //SubscribeClient client=new DefaultSubscribeClient(context,true);
        final DefaultSubscribeClient client = new DefaultSubscribeClient(context);

        final Jedis jedis = new Jedis("127.0.0.1", 6379); 请填写您对应的redis主机和端口

        final String targetDatabase = "test"; 填写您所要订阅的库名
        final String targetTable = "alantest"; 填写您所要订阅的表名,表有2个字段分别是id,name。(id是做了主键)。

        // 创建订阅监听者listener
        ClusterListener listener = new ClusterListener() {
            @Override
            public void notify(List<ClusterMessage> messages) throws Exception {
        //                System.out.println("--------------------:" + messages.size());
                for(ClusterMessage m:messages){
                    DataMessage.Record record = m.getRecord();
                    //过滤不感兴趣的订阅信息
                if(!record.getDbName().equalsIgnoreCase(targetDatabase) || !record.getTablename().equalsIgnoreCase(targetTable)){
                        //注意:对于不感兴趣的信息也必须Ack
                        m.ackAsConsumed();
                        continue;
                    }

                    if(record.getOpt() != DataMessage.Record.Type.BEGIN && record.getOpt() != DataMessage.Record.Type.COMMIT){
                        List<DataMessage.Record.Field> fields = record.getFieldList();

                        //INSERT RECORD
                        //String pk = record.getPrimaryKeys();

                        if(record.getOpt() == DataMessage.Record.Type.INSERT){

                String keyid="";
                String value="";
                            for (DataMessage.Record.Field field : fields) {

                                //先获取id值,需要有primary key,然后找到名为name的列,赋值给redis 插入key和name对应的value.
                if(field.getFieldname().equalsIgnoreCase("id")){
                                    keyid=field.getValue();
               continue;
                                }
                if(field.getFieldname().equalsIgnoreCase("name")){
                    value=field.getValue();

                                }
                jedis.set(keyid, value);
                            }

                        }

编译操作与检验

  1. [root@VM_71_10_centos ~]# javac -classpath binlogsdk-2.6.0-release.jar:jedis-2.9.0.jar -encoding UTF-8 RedisDemo.java
    
  2. 执行启动,如果没有异常报错就是正常在服务了,然后查看对应之前设置的落地文件。

    java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:jedis-2.9.0.jar RedisDemo
    
  3. 查看进行数据库插入和update操作,并从redis观察发现确实插入并修改成功了,最后进行delete操作,redis对应的数据也被删除掉了。

MySQL [test]> insert into alantest values(1001,'alan1');
Query OK, 1 row affected (0.00 sec)

MySQL [test]> update alantest set name='alan2' where id=1001;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

------------------------
127.0.0.1:6379> get 1001
"alan2"


MySQL [test]> update alantest set name='alan3' where id=1001;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

------------------------
127.0.0.1:6379> get 1001
"alan3"

MySQL [test]> delete from alantest where id=1001;
Query OK, 1 row affected (0.00 sec)

-----------------------

127.0.0.1:6379> get 1001
(nil)