前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq的代码使用案例

RabbitMq的代码使用案例

作者头像
小勇DW3
发布2019-06-15 15:10:02
6980
发布2019-06-15 15:10:02
举报
文章被收录于专栏:小勇DW3

消费者:

---------------------- 构造初始化:

代码语言:javascript
复制
public RabbitMqReceiver(String host, int port, String username, String password) 
{
    connFactory = new ConnectionFactory();
    connFactory.setHost(host);
    connFactory.setPort(port);
    connFactory.setUsername(username);
    connFactory.setPassword(password);
}
********************************************************************************


---------------------- 构造初始化:
代码语言:javascript
复制
    public Channel createChannel() throws IOException {
        getConnection();
        Channel channel = connection.createChannel();
        if (channel != null) {

           int prefetchCount = 1;

           channel.basicQos(prefetchCount);//最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。

代码语言:javascript
复制
        boolean durable = true; //Server端的Queue持久化
        channel.queueDeclare("task_queue", durable, false, false, null); 
            logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功创建Channel");
        } else {
            logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver创建Channel失败");
        }

        return channel;
    }
代码语言:javascript
复制
********************************************************************************
代码语言:javascript
复制
---------------------- 取得connection实例:
代码语言:javascript
复制
private void getConnection() throws IOException
 {
        synchronized (this) {
            if (connection == null || !connection.isOpen()) {
                connection = connFactory.newConnection();
                if (connection != null) {
                    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功获取连接");
                } else {
                    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver获取连接失败");
                }
            } else {
                logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver连接已存在,复用此连接");
            }
        }
  }
代码语言:javascript
复制
********************************************************************************
代码语言:javascript
复制
----------------------获取Consumer实例:
代码语言:javascript
复制
public QueueingConsumer createConsumer(Channel channel, String queueName) throws IOException {
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);   //自动消息确认打开,默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。

    return consumer;
}
代码语言:javascript
复制
********************************************************************************
代码语言:javascript
复制
----------------------从从rabbitMQ提取消息并转换为对象:
代码语言:javascript
复制
private String getMessageFromMQ() {
    String message = StringUtils.EMPTY;
    String source = StringUtils.EMPTY;
    try {
        message = receiver.nextMessage(checkNotNull(consumer), 1000);
        source = message;
    } catch (ShutdownSignalException e) {
        logger.error("", e);
    } catch (ConsumerCancelledException e) {
        logger.error("consumer exception", e);
    } catch (InterruptedException e) {
        logger.error("timeout exception", e);
    }
    try {
        if (StringUtils.isNotBlank(message)) {
            message = checkNotNull(StringUtils.substringAfter(message, "yyy:"), "xxx");
            message = checkNotNull(StringEscapeUtils.unescapeJava(message), "unescape error");
            int size = message.length();
            if (size > 1) {
                message = checkNotNull(message.substring(0, message.length() - 1), "get json-data error");// 去掉末尾的”
            } else {
                logger.warn(String.format("数据异常,message=%s", source));
            }
        }
    } catch (Throwable e) {
        logger.error(String.format("数据异常,message=%s", source), e);
    }
    return message;
}
代码语言:javascript
复制
********************************************************************************
代码语言:javascript
复制
----------------------每次读取一条消息:

public String nextMessage(QueueingConsumer consumer, long timeOut) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException {
代码语言:javascript
复制
    QueueingConsumer.Delivery delivery;
    if (timeOut > 0) {
        delivery = consumer.nextDelivery(timeOut);
    } else {
        delivery = consumer.nextDelivery();
    }
    if (delivery == null) {
        return StringUtils.EMPTY;
    }

    String message = new String(delivery.getBody());
    return message;
}
代码语言:javascript
复制
********************************************************************************
代码语言:javascript
复制
代码语言:javascript
复制
---------------------- 在storm中创建mq实例:
代码语言:javascript
复制
SpoutOutputCollector collector;
RabbitMqReceiver receiver;
Channel channel;
QueueingConsumer consumer;
代码语言:javascript
复制
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  //初始化调用一次
{
    this.collector = collector;
    receiver = checkNotNull(new RabbitMqReceiver(conf.get("crash.mq.host").toString(),
                    Integer.valueOf(conf.get("crash.mq.port").toString()), conf.get("crash.mq.user").toString(),
                    conf.get("crash.mq.pwd").toString()), "receiver is null");
    try {
        channel = checkNotNull(receiver.createChannel(), "channel is null");
        consumer = checkNotNull(receiver.createConsumer(channel, conf.get("crash.mq.channel").toString()),
                                "comsumer is null");
    } catch (Exception e) {
        logger.error("init mq-client error:", e);
    }
}
代码语言:javascript
复制
---------------------- 在storm中循环执行获得消息实例:
代码语言:javascript
复制
@Override
public void nextTuple() 
{
    String message = getMessageFromMQ();
}

生产者:
--------------------------------------------------:


private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败 
代码语言:javascript
复制
public class Producer {  

    private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败  

    public static void main(String[] argv) throws java.io.IOException {  

        /* 使用工厂类建立Connection和Channel,并且设置参数 */  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("192.168.10.111");// MQ的IP  
        factory.setPort(5672);// MQ端口  
        factory.setUsername("asdf");// MQ用户名  
        factory.setPassword("123456");// MQ密码  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  

        /* 创建消息队列,并且发送消息 */  
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        String message = "消息2";  
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  //Message持久化
        System.out.println("生产了个'" + message + "'");  

        /* 关闭连接 */  
        channel.close();  
        connection.close();  
    }  

}
代码语言:javascript
复制
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-06-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消费者:
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档