专栏首页算法之名RabbitMQ使用多路由,多队列来破除流控

RabbitMQ使用多路由,多队列来破除流控

流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。

现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。

我们先给Order接口添加一个发送消息的方法。

public interface Order {
    public void makeOrder(Order order);
    public OrderSuccessResult getResult(Order order);
    public void postOrder(Order order);
}

实现类实现该方法

@Data
@AllArgsConstructor
@NoArgsConstructor
@ServiceOrderVersion(value = 1)
@RequiredArgsConstructor
public class ServiceOrder extends AbstractOrder {
    private Long id;
    @NonNull
    private String code;
    @NonNull
    private Store store;
    @NonNull
    private ProviderService service;
    @NonNull
    private Car car;
    @NonNull
    private Date serviceDate;
    @NonNull
    private String contact;
    @NonNull
    private String contactTel;
    private AppUser user;
    @NonNull
    private String content;
    private int status;
    private Date createDate;


    @Override
    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        ((ServiceOrder)order).setId(idService.genId());
        ((ServiceOrder)order).setCode(getCodeInfo(idService));
        AppUser loginAppUser = AppUserUtil.getLoginAppUser();
        AppUser user = new AppUser();
        user.setId(loginAppUser.getId());
        user.setUsername(loginAppUser.getUsername());
        ((ServiceOrder)order).setUser(user);
        ((ServiceOrder)order).setStatus(1);
        ((ServiceOrder)order).setCreateDate(new Date());
        serviceOrderDao.save((ServiceOrder) order);
    }

    @Override
    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);
        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();
        return this.orderSuccessResult.getResult(order);
    }

    @Override
    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
        CompletableFuture.runAsync(() ->
                sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,
                        OwnerCarCenterMq.ROUTING_KEY_ORDER,
                        order)
        );
    }

    private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());
        flow = flow.substring(14,flow.length());
        String pre = DateUtils.format(new Date(), DateUtils.pattern9);
        return pre + flow;
    }
}

其中我们定义了这么一组队列名,交换机,和路由

public interface OwnerCarCenterMq {
    /**
     * 队列名
     */
    String ORDER_QUEUE = "order";
    /**
     * 服务系统exchange名
     */
    String MQ_EXCHANGE_ORDER = "order.topic.exchange";

    /**
     * 服务添加routing key
     */
    String ROUTING_KEY_ORDER = "post.order";
}

为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。

@Configuration
public class RabbitmqConfig {

   @Bean
   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);
         queues.add(queue);
      }
      return queues;
   }

   @Bean
   public TopicExchange orderExchange() {
      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);
   }


   @Bean
   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);
         bindings.add(binding);
      }
      return bindings;
   }
}

重新封装消息提供者,每次发送都随机选取一个路由来进行发送。

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 对消息对象进行二进制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。

Controller如下

@Slf4j
@RestController
public class OrderController {
    private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();
    private ThreadLocal<Order> orderService = new ThreadLocal<>();
    @Autowired
    private OrderBean orderBean;

    @Transactional
    @SuppressWarnings("unchecked")
    @PostMapping("/makeeorder")
    public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {
        log.info(orderStr);
        Order order = setOrderFactory(orderStr,type);
        orderService.get().makeOrder(order);
        orderService.get().postOrder(order);
        return Result.success(orderService.get().getResult(order));
    }

    /**
     * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂
     * @param orderStr
     * @return
     */
    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);
        Object order = JSONObject.parseObject(orderStr, classType);
//        if (orderStr.contains("service")) {
//            order = JSON.parseObject(orderStr, ServiceOrder.class);
//        }else if (orderStr.contains("product")) {
//            order = JSON.parseObject(orderStr, ProductOrder.class);
//        }
        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");
        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));
//        if (order instanceof ServiceOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));
//        }else if (order instanceof ProductOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));
//        }
        orderService.set(orderFactory.get().getOrder());
        return (Order) order;
    }
}

最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控

@Slf4j
@Component
@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})
public class ServiceOrderConsummer {
    @Getter
    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();
    @RabbitHandler
    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            ServiceOrder order = unSerialize(data);
            this.serviceOrders.add(order);
            log.info(String.valueOf(order));
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,ServiceOrder.class);
        }
        finally {
            input.close();
        }
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用桥接模式来显示下单结果 顶

    在用工厂方法模式来下不同订单 中我们看到,我们只简单显示来一个“下单成功”,但实际上我们需要给用户返回到结果可能多种多样。

    算法之名
  • 用工厂方法模式来下不同订单 顶

    Controller如下,用传递的内容来判断是哪种类型的订单,并给抽象订单工厂来获取具体的订单工厂,通过具体的订单工厂来生成订单服务,完成下单功能。考虑到线程安...

    算法之名
  • 浅谈mybatis的日志适配模式 顶

    Java开发中经常用到的日志框架有很多,Log4j、Log4j2、slf4j等等,Mybatis定义了一套统一的日志接口供上层使用,并为上述常用的日志框架提供了...

    算法之名
  • Feign 与 Hystrix

    Feign是一个声明式的web服务客户端,它使得web服务调用非常的简单,当我们使用Feign时,Spring Cloud

    小忽悠
  • Feign 与 Hystrix

    Feign是一个声明式的web服务客户端,它使得web服务调用非常的简单,当我们使用Feign时,Spring Cloud 整合了Ribbon和Eureka,从...

    小忽悠
  • Spring中的SpEL表达式概述

    <bean id="numberGuess" class="org.spring.samples.NumberGuess"> <property name="...

    cwl_java
  • Redis使用场景一(活动秒杀)

    redis的互斥锁可以解决这个问题,redis的setnx命令在指定的 key 不存在时,为 key 设置指定的值。当存在时,则无法插入值

    Java程序员也要new个对象
  • 重拾Java(0)-基础知识点

    叶应是叶
  • (29) 剖析String / 计算机程序的思维逻辑

    上节介绍了单个字符的封装类Character,本节介绍字符串类。字符串操作大概是计算机程序中最常见的操作了,Java中表示字符串的类是String,本节就来详细...

    swiftma
  • 03 设计模式 抽象工厂

    建立一个最高层级的工厂,用来生产不同产品的工厂,然后再建立一个产品规范,用来规定生产的产品需要有什么样的功能。

    shimeath

扫码关注云+社区

领取腾讯云代金券