前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot中rabbitmq的配置和使用【进阶一】

springboot中rabbitmq的配置和使用【进阶一】

作者头像
用户5640963
发布2019-07-25 14:50:25
4.3K0
发布2019-07-25 14:50:25
举报
文章被收录于专栏:卯金刀GG卯金刀GG

1、yml配置

代码语言:javascript
复制
 alimq:
    ProducerId: PRODUCER(mq中定义)
    ConsumerId: CONSUMER(mq中定义)
    AccessKey:  
    SecretKey: 
    ONSAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
    SendMsgTimeoutMillis: 3000
    topic: TOPIC
    #mq开关 0-不启动消费  1-启动消费
    mqflag: 1
    tag: ZC_xxx(mq中定义)

2、ali生产者和消费者配置

代码语言:javascript
复制
package common.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * @Description:
 * @Auther: liuyue
 * @Date: 
 */
@Configuration
@Data
public class AliMQConfig {
    @Value("${alimq.topic}")
    private String topic;
    @Value("${alimq.ProducerId}")
    private String producerId;
    @Value("${alimq.ConsumerId}")
    private String consumerId;
    @Value("${alimq.AccessKey}")
    private String accesskey;
    @Value("${alimq.SecretKey}")
    private String secretkey;
    @Value("${alimq.ONSAddr}")
    private String onsaddr;
    @Value("${alimq.tag}")
    private String subExpression;

    //提供消费者的配置
    public Properties getConsumerProperties() {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerId);
        consumerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        consumerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return consumerProperties;
    }

    //提供生产者的配置
    public Properties getProducerProperties() {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.ProducerId, producerId);
        producerProperties.setProperty(PropertyKeyConst.AccessKey, accesskey);
        producerProperties.setProperty(PropertyKeyConst.SecretKey, secretkey);
        producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsaddr);
        return producerProperties;
    }
}

3、消费者监听器

代码语言:javascript
复制
package common.config;

import config.alimq.MQMsgConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Description:
 * @Auther: liuyue
 * @Date: 2018/11/14 19:31
 */
@Component
@Slf4j
public class ListenerConfig implements CommandLineRunner {
    @Resource
    MQMsgConsumer mqConsumer;
    @Value("${alimq.mqflag}")
    private String mqflag;

    @Override
    public void run(String... strings) throws Exception {
        if("0".equals(mqflag)){
            log.info("alimq没有开启消费");
        }else{
            log.info("=======alimq开始消费=========");
            mqConsumer.start();
            mqConsumer.onMessage();
        }
    }
}

4、消费者类

代码语言:javascript
复制
package config.alimq;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import config.SpringContextHolder;
import config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@SuppressWarnings("all")
@Slf4j
@Component
public class MQMsgConsumer implements InitializingBean, DisposableBean {

    @Autowired
    AliMQConfig busMqConfig;
    private Consumer busConsumer;

    @Autowired
    IPanoramaProService panoramaProServiceImpl;

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("消费者初始化");
        busConsumer = ONSFactory.createConsumer(busMqConfig.getConsumerProperties());
        // busConsumer.start();
        log.info("消费者初始化完成");
    }

    public void start() {
        busConsumer.start();
    }

    public void onMessage() {
        busConsumer.subscribe(busMqConfig.getTopic(), busMqConfig.getSubExpression(), new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                // System.out.println(JSON.toJSONString(message));
                System.out.println("Receive: " + message);
                System.out.println(new String(message.getBody()));
                Action consumer = consumer(message, context);
                return consumer;
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        busConsumer.shutdown();
        log.info("消费停止");
    }

    //执行mq消费
    public Action consumer(Message message, ConsumeContext context) {
        //更新审核时间 
        if("ZC_xxx".equals(message.getTag()))
        {
            boolean status = synchroProjectPlanStatus(message, context);
            if (!status) {
                return Action.CommitMessage;
            }
        }
        return Action.ReconsumeLater;
    }

    /**
     * 更新审核时间 
     * @author liu
     * @since 2018年11月2日 下午2:10:34
     * @param message
     * @param context
     * @return
     */
    private boolean synchroProjectPlanStatus( Message message, ConsumeContext context ){
        boolean bl = false;
        byte[] msgBody = message.getBody();
        if( null != msgBody && msgBody.length > 0 ){
            try {
                String msgBodyStr = new String(msgBody, "UTF-8");
                log.info(" THE MQ message body value: " + msgBodyStr);
                //JSONObject msgJson = JSONObject.parseObject(msgBodyStr);
                
                if( null != msgBodyStr ){
                    //转化为对象
                    ProjectPlanParas projectPlanParas =
                            JSON.parseObject(msgBodyStr, ProjectPlanParas.class);
                    log.info(" THE ProjectPlanParas   value: " + projectPlanParas.getZutuanCode());
                    log.info(" THE ProjectPlanParas   value: " + projectPlanParas.getFinishDate());
                    //执行更新的操作
                    bl = panoramaProServiceImpl.synchroProjectPlanStatus(projectPlanParas);
                    log.info(" THE MQ synchroProjectPlanStatus status : " + bl);
                }
            } catch (UnsupportedEncodingException e) {
                log.info(" THE MQ message UnsupportedEncodingException : " + e);
                e.printStackTrace();
            }
            
        }
        return bl;
    }
    
}

5、生产者类

代码语言:javascript
复制
package config.alimq;

import com.aliyun.openservices.ons.api.*;
import com.config.AliMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
@Slf4j
public class MQMsgProducer implements InitializingBean, DisposableBean {

    @Autowired
    AliMQConfig busMqConfig;
    private Producer producer;

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("生产者初始化");
        producer = ONSFactory.createProducer(busMqConfig.getProducerProperties());
        producer.start();
    }

    public void sentMessage(Message message) {
        producer.sendAsync(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(sendResult.getTopic() + "-----" + sendResult.getMessageId());
            }

            @Override
            public void onException(OnExceptionContext context) {
                log.error(context.getTopic() + "-----" + context.getMessageId() + ":error=" + context.getException());
            }
        });
    }

    @Override
    public void destroy() throws Exception {
        producer.shutdown();
    }
}

6、生产者调用类,推送消息,业务代码片段

代码语言:javascript
复制
@Resource
    MQMsgProducer mqProducer;
    //修改成,使用alimq更新年景计划的时间 edit by liuy at 20181102日
     Message msg = new Message(aliMQConfig.getTopic(),
                "ZC_xxx", json.getBytes("UTF-8"));
     mqProducer.sentMessage(msg);

至此,全部过程结束。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档