zookeeper学习系列:二、api实践

上一章我们知道zookeeper的简介,启动,设置节点以及结构性能。本小节我们来玩玩api,获取下数据。

php版本: http://anykoro.sinaapp.com/2013/04/05/%E4%BD%BF%E7%94%A8apache-zookeeper%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2php%E5%BA%94%E7%94%A8%E7%A8%8B%E5%BA%8F/

go版本:http://mmcgrana.github.io/2014/05/getting-started-with-zookeeper-and-go.html

读一下:http://zookeeper.apache.org/doc/trunk/javaExample.html  

然后我说 what the fuck it is?

我就想读个数据,需要这么复杂么。。。

动手改一下

版本1:  只获取数据,不管别的:

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkReader {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hostPort = "192.168.1.2,192.168.1.3,192.168.1.4";
        String znode = "/test";
        ZooKeeper zk = new ZooKeeper(hostPort, 3000, null);
        System.out.println(new String(zk.getData(znode,false,null)));
    }
}

在zkcli上创建 /test 并改变它的值:123,运行,输出:

123

能得到结果,但是报错了:

14/10/17 11:51:58 ERROR zookeeper.ClientCnxn: Error while calling watcher 
java.lang.NullPointerException
    at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:521)
    at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:497)

看下源码,需要注册个watcher,意思是不这样zookeeper就只是个纯配置了?ok

版本2:zk get data+watcher

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;

public class ZkReader {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        ZooKeeper zk = new ZooKeeper(hostPort, 3000, new MyWatcher());
        System.out.println(new String(zk.getData(znode,false,null)));
    }
}

class MyWatcher  implements Watcher {

    @Override
    public void process(WatchedEvent event) {
        System.out.println("hello zookeeper");
        System.out.println(String.format("hello event! type=%s, stat=%s, path=%s",event.getType(),event.getState(),event.getPath()));
    }
}

输出却是:

hello zookeeper 123 hello event! type=None, stat=SyncConnected, path=null

data总是在中间?百撕不得姐,在邮件组里咨询下,几天后有了回复(不够活跃的邮件组了):

Zookeeper works asynchronously in several threads. Therefore the sequence of execution in different threads is not generally predictable. It could therefore happen that when the connection status change is detected, the Watcher is executed, but only the first "hello zookeeper" gets echoed, then the main thread gets some cycles again and prints "123", after which the second print statement "hello event!..." is executed. If you don't want this to happen, use a CountDownLatch to make the main thread wait until the Zookeeper connection is established and propertly recognized in your program. The main thread creates the CountDownLatch(1), opens the Zk connection and waits latch.await(). The Watcher does its job and then counts the latch down by one, causing the main thread to leave the await and continue doing its job.

被认为是多线程问题,建立zk连接时会启动多个线程:sendThread  eventThread

eventThread执行到一半时,主线程获得了cpu,打印出结果,然后eventThread继续执行watcher.process。

这两个版本只是做到了获取数据,如果数据有变动,需要自动更新呢?ok,参照zk给的例子,简化出第三个版本:

 DataMonitor.java :

/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Watcher, StatCallback {

    ZooKeeper zk;

    String znode;

    boolean dead;

    DataMonitorListener listener;

    byte prevData[];

    public DataMonitor(ZooKeeper zk, String znode,  DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.listener = listener;
        // Get things started by checking if the node exists. We are going
        // to be completely event driven
        zk.exists(znode, true, this, null);
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void showData(byte data[]);
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc : "+rc);

        byte b[] = null;
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.showData(b);
            prevData = b;
        }
    }
}

Executor.java:

import java.io.IOException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;
    ZooKeeper zk;

    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        zk = new ZooKeeper(hostPort, 3000, this);
        dm = new DataMonitor(zk, znode, this);
    }

    public static void main(String[] args) {
        String hostPort = "192.168.1.22,192.168.1.12,192.168.1.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#
     */
    public void process(WatchedEvent event) {
        System.out.println("Executor process event: "+event.getType());
        dm.process(event);

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void showData(byte[] data) {
            System.out.println("data changes: "+new String(data));
    }
}

一个执行者一个监控,注册watcher到zk,当有事件发生时,推送本身的StatCallback到Zookeeper,当节点有变动时调用processResult展示结果。

Executor process event: NodeDataChanged watch event type: NodeDataChanged rc : 0 data changes: abcd

还是有点复杂,仔细看下DataMonitor似乎没有存在的必要,我只需要一个类,启动zk client,并监听数据变化就好了,于是有了第四个单对象版本:

Executor.java
import java.io.IOException;
import java.util.Arrays;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

public class Executor
        implements Watcher, Runnable, AsyncCallback.StatCallback
{
    ZooKeeper zk;
    String znode;
    byte prevData[];

    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        zk = new ZooKeeper(hostPort, 3000, this);
        this.znode = znode;
        zk.exists(znode, true, this, null);
    }

    public static void main(String[] args) {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#
     */
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc : "+rc);

        byte b[] = null;
        try {
            b = zk.getData(znode, false, null);
        } catch (KeeperException e) {
            // We don't need to worry about recovering now. The watch
            // callbacks will kick off any exception handling
            e.printStackTrace();
        } catch (InterruptedException e) {
            return;
        }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            System.out.println("data changes: "+new String(b));
            prevData = b;
        }
    }
}

自己做watcher,并注册回调函数给zk,更简洁。

经测试,zk三台停掉一台,剩一主一从,仍能正常服务,剩一台时则报错,无法连接,重启动zk变成两台,客户端也无法恢复,重启了才恢复。

看了php api,理解了一下zk.exists 做的操作,exists和get方法都会注册回调过去,一个是注册watcher,一个是注册StatCallback,当触发事件时,监视器会被消费掉,所以我们需要在回调函数中再次设置监视器。于是有了第五个版本

import java.io.IOException;

import org.apache.zookeeper.*;

public class Executor
        implements Watcher, Runnable
{
    ZooKeeper zk;
    String znode;

    public Executor(String hostPort, String znode) throws KeeperException, IOException, InterruptedException {
        zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        zk.getData(znode, this, null);
    }

    public static void main(String[] args) {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                try {
                    System.out.println(new String(zk.getData(znode, this, null)));
                } catch (KeeperException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }
        }

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }
}

上边这两个版本已经可以检测到zk的数据节点变动,但没有处理异常情况,没有处理close事件,大家可以自己动手改造下难懂的http://zookeeper.apache.org/doc/trunk/javaExample.html  例子。 

 更多java api操作(创建节点、删除修改等):http://www.cnblogs.com/haippy/archive/2012/07/19/2600032.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏c#开发者

LightSwitch 2011 数据字段唯一性验证方案

LightSwitch 2011 数据字段唯一性验证方案 ? 验证单表数据的某个字段不能输入重复值 设置实体字段唯一索引 ? 如果不写代码,那么验证只会在...

3485
来自专栏DOTNET

asp.net web api 下载之断点续传

一、基本思想 利用 HTTP 请求的Range标头值,来向服务端传递请求数据的开始位置和结束位置。服务端获得这两个参数后,将指定范围内的数据传递给客户端。当客户...

47812
来自专栏码农阿宇

JustMock .NET单元测试利器(三)用JustMock测试你的应用程序

用JustMock测试你的应用程序 本主题将指导您通过几个简单的步骤来使用Telerik®JustMock轻松测试您的应用程序。您将理解一个简单的原理,称为Ar...

3787
来自专栏大内老A

ASP.NET Core中的依赖注入(5):ServicePrvider实现揭秘【补充漏掉的细节】

到目前为止,我们定义的ServiceProvider已经实现了基本的服务提供和回收功能,但是依然漏掉了一些必需的细节特性。这些特性包括如何针对IServiceP...

2007
来自专栏开发 & 算法杂谈

Intel Pin-JIT模式和Probe模式下库函数的替换

这篇文章主要介绍一下Intel Pin在JIT模式和Probe模式下对库换数的替换,以及实现中有哪写需要注意的地方。

2686
来自专栏Flutter入门

Weex是如何在Android客户端上跑起来的

Weex可以通过自己设计的DSL,书写.we文件或者.vue文件来开发界面,整个页面书写分成了3段,template、style、script,借鉴了成熟的MV...

4595
来自专栏刘望舒

LeakCanary看这一篇文章就够了

LeakCanary是Square公司基于MAT开源的一个内存泄漏检测工具,在发生内存泄漏的时候LeakCanary会自动显示泄漏信息。

2.3K5
来自专栏ppjun专栏

android面试题

比如设置android:layout_gravity="right"的button会显示在父view的最右边。所以layout_gravity是设置当前view...

1712
来自专栏大内老A

通过一个模拟程序让你明白ASP.NET MVC是如何运行的

ASP.NET MVC的路由系统通过对HTTP请求的解析得到表示Controller、Action和其他相关的数据,并以此为依据激活Controller对象,调...

2926
来自专栏chenssy

【死磕Sharding-jdbc】---强制路由

位于 sharding-jdbc-core模块下的包 com.dangdang.ddframe.rdb.sharding.hint中,核心类HintManage...

2311

扫码关注云+社区

领取腾讯云代金券