open-messaging使用实例

本文主要展示一下open-messaging使用实例

consumer

PullConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

public class PullConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        resourceManager.createQueue( "NS://HELLO_QUEUE", OMS.newKeyValue());

        //Start a PullConsumer to receive messages from the specific queue.
        final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer();
        pullConsumer.attachQueue("NS://HELLO_QUEUE");
        pullConsumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                pullConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Receive one message from queue.
        Message message = pullConsumer.receive();

        //Acknowledge the consumed message
        pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.RECEIPT_HANDLE));
    }
}
  • 首先创建messagingAccessPoint,然后启动是调用start,在shutdownHook里头调用shutdown
  • 然后通过resourceManager创建queue,和pullConsumer,并将其绑定
  • 之后调用pullConsumer的startup方法启动,然后关闭时shutdown方法
  • pullConsumer调用receive方法来拉取消息,这里改名为pull方法可能更合适些
  • pullConsumer可以对消息进行ack

PushConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

public class PushConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
        consumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Consume messages from a simple queue.
        String simpleQueue = "NS://HELLO_QUEUE";
        resourceManager.createQueue( simpleQueue, OMS.newKeyValue());

        //This queue doesn't has a source queue, so only the message delivered to the queue directly can
        //be consumed by this consumer.
        consumer.attachQueue(simpleQueue, new MessageListener() {
            @Override
            public void onReceived(Message message, Context context) {
                System.out.println("Received one message: " + message);
                context.ack();
            }

        });
    }
}
  • 也是先创建messagingAccessPoint,然后创建PushConsumer
  • 也是通过resourceManager创建queue,然后跟PushConsumer绑定
  • PushConsumer通过注册MessageListener来处理回调逻辑

StreamingConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java

public class StreamingConsumerApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        //Fetch a ResourceManager to create Queue resource.
        String targetQueue = "NS://HELLO_QUEUE";
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();
        resourceManager.createQueue(targetQueue, OMS.newKeyValue());

        //Fetch the streams of the target queue.
        List<String> streams = resourceManager.listStreams(targetQueue);

        //Start a StreamingConsumer to iterate messages from the specific stream.
        final StreamingConsumer streamingConsumer = messagingAccessPoint.createStreamingConsumer();
        streamingConsumer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                streamingConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        assert streams.size() != 0;
        StreamingIterator streamingIterator = streamingConsumer.seekToBeginning(streams.get(0));

        while (streamingIterator.hasNext()) {
            Message message = streamingIterator.next();
            System.out.println("Received one message: " + message);
        }

        //All the messages in the stream has been consumed.
        //Now consume the messages in reverse order
        while (streamingIterator.hasPrevious()) {
            Message message = streamingIterator.previous();
            System.out.println("Received one message again: " + message);
        }
    }
}
  • stream的这种方式跟kafka的使用方式有点类似
  • 通过StreamingConsumer获取StreamingIterator,然后遍历获取消息

producer

Producer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

public class ProducerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        producer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        //Sends a message to the specified destination synchronously.
        {
            SendResult sendResult = producer.send(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
        }

        //Sends a message to the specified destination asynchronously.
        //And get the result through Future
        {
            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            final SendResult sendResult = result.get(3000L);
            System.out.println("Send async message OK, message id is: " + sendResult.messageId());
        }

        //Sends a message to the specified destination asynchronously.
        //And retrieve the result through FutureListener
        {
            final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));

            result.addListener(new FutureListener<SendResult>() {

                @Override
                public void operationComplete(Future<SendResult> future) {
                    if (future.isDone() && null == future.getThrowable()) {
                        System.out.println("Send async message OK, message id is: " + future.get().messageId());
                    } else {
                        System.out.println("Send async message Failed, cause is: " + future.getThrowable().getMessage());
                    }
                }
            });
        }

        //Sends a message to the specific queue in OneWay manner.
        {
            //There is no {@code Future} related or {@code RuntimeException} thrown. The calling thread doesn't
            //care about the send result and also have no context to get the result.
            producer.sendOneway(producer.createBytesMessage(
                "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
        }
    }
}
  • 通过messagingAccessPoint创建producer
  • producer可以send、sendAsync以及sendOneway
  • send是同步,sendAsync是异步,可以通过listener回调处理,sendOneway就是不关系发送结果

TransactionProducer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

public class TransactionProducerApp {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

        final Producer producer = messagingAccessPoint.createProducer();
        messagingAccessPoint.startup();
        producer.startup();

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));

        Message message = producer.createBytesMessage(
            "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));

        //Sends a transaction message to the specified destination synchronously.
        SendResult sendResult = producer.send(message, new LocalTransactionExecutor() {
            @Override
            public void execute(final Message message, final ExecutionContext context) {
                //Do some local transaction
                //Then commit this transaction and the message will be delivered.
                context.commit();
            }

            @Override
            public void check(final Message message, final CheckContext context) {
                //The server may lookup the transaction status forwardly associated the specified message
                context.commit();
            }
        }, OMS.newKeyValue());

        System.out.println("Send transaction message OK, message id is: " + sendResult.messageId());
    }
}
  • 使用的还是Producer,只是send方法使用的是有LocalTransactionExecutor参数的方法,来发送事务消息
  • LocalTransactionExecutor定义了execute和check方法
  • execute方法用来做本地事务相关的操作;check方法用于检查本地事务的状态

routing

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

public class RoutingApp {
    public static void main(String[] args) throws OMSResourceNotExistException {
        //Load and start the vendor implementation from a specific OMS driver URL.
        final MessagingAccessPoint messagingAccessPoint =
            OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
        messagingAccessPoint.startup();

        String destinationQueue = "NS://DESTINATION_QUEUE";
        String sourceQueue = "NS://SOURCE_QUEUE";
        //Fetch a ResourceManager to create source Queue, destination Queue, and the Routing instance.
        ResourceManager resourceManager = messagingAccessPoint.resourceManager();

        //Create the destination queue.
        resourceManager.createQueue(destinationQueue, OMS.newKeyValue());
        //Create the source queue.
        resourceManager.createQueue(sourceQueue, OMS.newKeyValue());

        KeyValue routingAttr = OMS.newKeyValue();
        routingAttr.put(OMSBuiltinKeys.ROUTING_SOURCE, sourceQueue)
            .put(OMSBuiltinKeys.ROUTING_DESTINATION, destinationQueue)
            .put(OMSBuiltinKeys.ROUTING_EXPRESSION, "color = 'red'");

        resourceManager.createRouting("NS://HELLO_ROUTING", routingAttr);

        //Send messages to the source queue ahead of the routing
        final Producer producer = messagingAccessPoint.createProducer();
        producer.startup();

        producer.send(producer.createBytesMessage(sourceQueue, "RED_COLOR".getBytes())
            .putUserHeaders("color", "red"));

        producer.send(producer.createBytesMessage(sourceQueue, "GREEN_COLOR".getBytes())
            .putUserHeaders("color", "green"));

        //Consume messages from the queue behind the routing.
        final PushConsumer pushConsumer = messagingAccessPoint.createPushConsumer();
        pushConsumer.startup();

        pushConsumer.attachQueue(destinationQueue, new MessageListener() {
            @Override
            public void onReceived(Message message, Context context) {
                //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule
                //In this case, the push consumer will only receive the message with red color.
                System.out.println("Received a red message: " + message);
                context.ack();
            }

        });

        //Register a shutdown hook to close the opened endpoints.
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                producer.shutdown();
                pushConsumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
    }
}
  • routing用来做路由,可以通过表达式来从源队列过滤消息到目标队列,起到消息过滤的作用

小结

  • open messaging没有定义kafka的topic相关的概念,也没有consumer group的概念
  • amqp通过Exchange屏蔽了queue和topic的细节,不像JMS那样,需要producer去选择是要发到topic,还是发到queue
  • 这里open messaging虽然没有定义exchange,但是由于没有topic概念,发送都是发送到queue
  • open messaging的routing概念,跟amqp的outingKey有点类似,不过这个routing仅仅是作用于消息过滤,对消费者起作用

doc

  • AMQP基本概念
  • openmessaging.cloud
  • OpenMessaging Runtime Interface for Java

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏积累沉淀

Java设计模式(十九)----备忘录模式

备忘录模式 一、 概念 二、 结构 三、 分类 1.”白箱”备忘录模式的实现 2.“黑箱”备忘录模式的实现 3.“多重”检查点 4....

20090
来自专栏Android 开发学习

JsBridge 源码分析

19230
来自专栏蜉蝣禅修之道

自己写的小型静态服务器

18840
来自专栏c#开发者

LightSwitch 2011 数据字段唯一性验证方案

LightSwitch 2011 数据字段唯一性验证方案 ? 验证单表数据的某个字段不能输入重复值 设置实体字段唯一索引 ? 如果不写代码,那么验证只会在...

35250
来自专栏编码小白

tomcat请求处理分析(一) 启动container实例

1.1.1  启动container实例 其主要是进行了生命周期中一系列的操作之后调用StandardEngine中的 startInternal方法,不难看出...

38760
来自专栏菩提树下的杨过

silverlight动态读取txt文件/解析json数据/调用wcf示例

终于开始正式学习silverlight,虽然有点晚,但总算开始了,今天看了一下sdk,主要是想看下silverlight中如何动态调用数据,对于数据库的访问,s...

246100
来自专栏Java学习网

常见的 Java 错误及避免方法之第五集(每集10个错误后续持续发布)

当输入期间意外终止文件或流时,将抛出“EOFException”。 以下是抛出EOFException异常的一个示例,来自JavaBeat应用程序:

16630
来自专栏刘望舒

LeakCanary看这一篇文章就够了

LeakCanary是Square公司基于MAT开源的一个内存泄漏检测工具,在发生内存泄漏的时候LeakCanary会自动显示泄漏信息。

2.8K50
来自专栏飞扬的花生

在ASP.MVC中使用Ajax

      Asp.net MVC 抛弃了Asp.net WebForm那种高度封装的控件,让我们跟底层的HTML有了更多的亲近。可以更自由、更灵活的去控制HT...

22690
来自专栏c#开发者

MSMQ突破4M限制的方法

    在默认情况下msmq 3.0(windows xp ,windows 2003)最大单个消息(Message size)大小4M;(包括正文和全部指定属...

36440

扫码关注云+社区

领取腾讯云代金券