跟着实例学习ZooKeeper的用法: 队列

使用Curator也可以简化Ephemeral Node (临时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。

但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大。
  2. 如果有很多节点,ZK启动时相当的慢。 而使用queue会导致好多ZNode. 你需要显著增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事。
  4. 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得不好
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

尽管如此, Curator还是创建了各种Queue的实现。 如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。

DistributedQueue

DistributedQueue是最普通的一种队列。 它设计以下四个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

创建队列使用QueueBuilder,它也是其它队列的创建类。

public static <T> QueueBuilder<T> builder(CuratorFramework client,
                                          QueueConsumer<T> consumer,
                                          QueueSerializer<T> serializer,
                                          java.lang.String queuePath)
QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedQueue<MessageType queue = builder.build();    `

创建好queue就可以往里面放入数据了:

queue.put(aMessage);

QueueSerializer提供了对队列中的对象的序列化和反序列化。

QueueConsumer是消费者, 它可以接收队列的数据。 处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。

正常情况下先将消息从队列中移除,再交给消费者消费。 但这是两个步骤,不是原子的。 可以调用Builder的lockPath()消费者加锁, 当消费者消费数据时持有锁,这样其它消费者不能消费此消息。 如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。 最好还是单消费者模式使用队列。

例子:

package com.colobu.zkrecipe.queue;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorEvent;import org.apache.curator.framework.api.CuratorListener;import org.apache.curator.framework.recipes.queue.DistributedQueue;import org.apache.curator.framework.recipes.queue.QueueBuilder;import org.apache.curator.framework.recipes.queue.QueueConsumer;import org.apache.curator.framework.recipes.queue.QueueSerializer;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;public class DistributedQueueExample {    private static final String PATH = "/example/queue";    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedQueue<String> queue = null;        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildQueue();
            queue.start();            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i);
                Thread.sleep((long)(3 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }    private static QueueSerializer<String> createQueueSerializer() {        return new QueueSerializer<String>(){            @Override
            public byte[] serialize(String item) {                return item.getBytes();
            }            @Override
            public String deserialize(byte[] bytes) {                return new String(bytes);
            }

        };
    }    private static QueueConsumer<String> createQueueConsumer() {        return new QueueConsumer<String>(){            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedIdQueue

DistributedIdQueue和上面的队列类似, 但是可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中任意的元素移除。 它涉及几个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过下面方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在这个例子中, 有些元素还没有被消费者消费时就移除了,这样消费者不会收到删除的消息。

package com.colobu.zkrecipe.queue;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorEvent;import org.apache.curator.framework.api.CuratorListener;import org.apache.curator.framework.recipes.queue.DistributedIdQueue;import org.apache.curator.framework.recipes.queue.QueueBuilder;import org.apache.curator.framework.recipes.queue.QueueConsumer;import org.apache.curator.framework.recipes.queue.QueueSerializer;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;public class DistributedIdQueueExample {    private static final String PATH = "/example/queue";    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long)(50 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }    private static QueueSerializer<String> createQueueSerializer() {        return new QueueSerializer<String>(){            @Override
            public byte[] serialize(String item) {                return item.getBytes();
            }            @Override
            public String deserialize(byte[] bytes) {                return new String(bytes);
            }

        };
    }    private static QueueConsumer<String> createQueueConsumer() {        return new QueueConsumer<String>(){            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedPriorityQueue

优先级队列对队列中的元素按照优先级进行排序。 Priority越小, 元素月靠前, 越先被消费掉。 它涉及下面几个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。 当优先级队列得到元素增删消息时,它会暂停处理当前的元素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的队列的最小数量。 主要设置你的程序可以容忍的不排序的最小值。

放入队列时需要指定优先级:

queue.put(aMessage, priority);

例子:

package com.colobu.zkrecipe.queue;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorEvent;import org.apache.curator.framework.api.CuratorListener;import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;import org.apache.curator.framework.recipes.queue.QueueBuilder;import org.apache.curator.framework.recipes.queue.QueueConsumer;import org.apache.curator.framework.recipes.queue.QueueSerializer;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;public class DistributedPriorityQueueExample {    private static final String PATH = "/example/queue";    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();            for (int i = 0; i < 10; i++) {                int priority = (int)(Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long)(50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }    private static QueueSerializer<String> createQueueSerializer() {        return new QueueSerializer<String>(){            @Override
            public byte[] serialize(String item) {                return item.getBytes();
            }            @Override
            public String deserialize(byte[] bytes) {                return new String(bytes);
            }

        };
    }    private static QueueConsumer<String> createQueueConsumer() {        return new QueueConsumer<String>(){            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);                
            }

        };
    }
}

DistributedDelayQueue

JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了类似的功能, 元素有个delay值, 消费者隔一段时间才能收到元素。 涉及到下面四个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通过下面的语句创建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个时间间隔, 比如20毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的时间已经过去,消息会立刻被消费者接收。

例子:

package com.colobu.zkrecipe.queue;import java.util.Date;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.api.CuratorEvent;import org.apache.curator.framework.api.CuratorListener;import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;import org.apache.curator.framework.recipes.queue.QueueBuilder;import org.apache.curator.framework.recipes.queue.QueueConsumer;import org.apache.curator.framework.recipes.queue.QueueSerializer;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;public class DistributedDelayQueueExample {    private static final String PATH = "/example/queue";    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener(new CuratorListener() {                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("CuratorEvent: " + event.getType().name());
                }
            });

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }    private static QueueSerializer<String> createQueueSerializer() {        return new QueueSerializer<String>() {            @Override
            public byte[] serialize(String item) {                return item.getBytes();
            }            @Override
            public String deserialize(byte[] bytes) {                return new String(bytes);
            }

        };
    }    private static QueueConsumer<String> createQueueConsumer() {        return new QueueConsumer<String>() {            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前面虽然实现了各种队列,但是你注意到没有,这些队列并没有实现类似JDK一样的接口。 SimpleDistributedQueue提供了和JDK一致性的接口(但是没有实现Queue接口)。 创建很简单:

public SimpleDistributedQueue(CuratorFramework client,
                              String path)

增加元素:

public boolean offer(byte[] data) throws Exception

删除元素:

public byte[] take() throws Exception

另外还提供了其它方法:

public byte[] peek() throws Exceptionpublic byte[] poll(long timeout, TimeUnit unit) throws Exceptionpublic byte[] poll() throws Exceptionpublic byte[] remove() throws Exceptionpublic byte[] element() throws Exception

没有add方法, 多了take方法。

take方法在成功返回之前会被阻塞。 而poll在队列为空时直接返回null。w String(client.getData().forPath(PATH2)));

    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        CloseableUtils.closeQuietly(node);
        CloseableUtils.closeQuietly(client);
        CloseableUtils.closeQuietly(server);
    }

}

} “`

原文发布于微信公众号 - IT技术精选文摘(ITHK01)

原文发表时间:2017-11-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Fundebug

JavaScript异步编程史:回调函数到Promise到Async/Await

摘要: 异步编程时JavaScript以及Node.js的一大亮点,其中有什么心酸的黑历史呢?

1685
来自专栏Jerry的SAP技术分享

Java异常处理:如何写出“正确”但被编译器认为有语法错误的程序

文章的标题看似自相矛盾,然而我在“正确”二字上打了引号。我们来看一个例子,关于Java异常处理(Exception Handling)的一些知识点。

1273
来自专栏QQ会员技术团队的专栏

有没有人告诉你—写时拷贝的真相

作者简介:梁少华,QQ动漫后台开发,腾讯高级工程师。从事后台开发4年多,参与过QQ秀、手Q红点系统、手Q游戏公会、QQ动漫等项目,有丰富的后台架构经验,擅长海...

27610
来自专栏微信公众号:Java团长

Java基础知识详细总结

ClassLoader使用的是双亲委托模型来搜索类的,每个ClassLoader实例都有一个父类加载器的引用(不是继承的关系,是一个包含的关系),虚拟机内置的类...

1293
来自专栏Kirito的技术分享

警惕不规范的变量命名

就在最近,项目组开始强调开发规范了,今天分享一个变量名命名不规范的小案例,强调一下规范的重要性。 Boolean变量名命名规范 16年底,阿里公开了《Java...

3349
来自专栏程序员互动联盟

【编程基础】C语言常见宏定义

我们在使用C语言编写程序的时候,常常会使用到宏定义以及宏编译指令,有的可能比较常用,有的可能并不是很常用,是不是所有的C语言宏定义以及宏指令你都清楚呢? 指令 ...

3738
来自专栏小樱的经验随笔

C/C++中int128的那点事

最近群友对int128这个东西讨论的热火朝天的。讲道理的话,编译器的gcc是不支持__int128这种数据类型的,比如在codeblocks 16.01/Dev...

3001
来自专栏HappenLee的技术杂谈

编码与模式------《Designing Data-Intensive Applications》读书笔记5

1、在内存中,数据是保存在对象、结构、列表、数组、哈希表、树、等等。这些数据结构在内存之中被优化为CPU可以高效访问和操作的结构(通常这是操作系统的任务,并不需...

1024
来自专栏逍遥剑客的游戏开发

把C++/CLI委托传入本地代码

2026
来自专栏java达人

jsp中的JSTL与EL表达式用法及区别(一)

对于JSTL和EL之间的关系,这个问题对于初学JSP的朋友来说,估计是个问题,下面来详细介绍一下JSTL和EL表达式他们之间的关系,以及JSTL和EL一些相关概...

2295

扫码关注云+社区

领取腾讯云代金券