前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >谷粒商城-高级篇(消息队列)

谷粒商城-高级篇(消息队列)

作者头像
OY
发布2022-04-11 17:11:17
1.7K0
发布2022-04-11 17:11:17
举报
文章被收录于专栏:OY_学习记录OY_学习记录

一、RabbitMQ

见我自己总结的两篇博客

二、安装 RabbitMQ

代码语言:javascript
复制
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

#修改为自动重启:
docker update rabbitmq --restart=always

  • 4369,25672(Erlang 发现&集群端口)
  • 5672,5671(AMQP 端口)
  • 15672(web 管理后台端口)
  • 61613,61614(STOMP 协议端口)
  • 1883,8883(MQTT 协议端口)
  • RabbitMQ 管理 https://www.rabbitmq.com/networking.html

启动

  • http://192.168.56.10:15672/

配置文件:

三、整合 SpringBoot

1、在订单服务中引入依赖

代码语言:javascript
复制
<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
#    虚拟主机
    virtual-host: /
#    开启发送端抵达队列确认【发送端确认机制+本地事务表】
    publisher-returns: true
#    开启发送确认【发送端确认机制+本地事务表】
    publisher-confirm-type: correlated
#    只要抵达队列,优先回调return confirm
    template:
      mandatory: true
#    使用手动确认模式,关闭自动确认【消息丢失】
    listener:
      simple:
        acknowledge-mode: manual

3、@EnableRabbit 加在启动类上【发送消息可以不需要这个注解,监听消息必须使用这个注解】

4、RabbitAutoConfiguration 生效,给容器自动配置了很多类
	RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

5、接收消息注解:
@RabbitListener(queues={"hello-java-queue"})
@RabbitHandler

3、基本使用

代码语言:javascript
复制
1、如何创建Exchanger、Queue、Binding
	1)使用AmqpAdmin
    @Test
    void createExchange() {
        // 创建交换机
        amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange", true, false));
        log.info("Exchange创建[{}]成功", "hello-java-exchange");
    }

    @Test
    void createQueue() {
        amqpAdmin.declareQueue(new Queue("hello-java-queue", true, false, false));
        log.info("Queue创建[{}]成功", "hello-java-queue");
    }

    @Test
    void createBinding() {
        // 创建绑定
        // String destination【目的地,队列name或 交换机name(如果作为路由的话)】
        // Binding.DestinationType destinationType【目的地类型 queue还是exchange(路由)】
        // String exchange【交换机】
        // String routingKey【路由键】
        // @Nullable Map<String, Object> arguments【自定义参数】
        amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange",
                "hello.java", null));
        log.info("Binding创建[{}]成功", "hello-java-binding");
    }

2、如何发送消息【1、交换机;2、路由键;3、消息】
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg() {
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", "hello world");
    }

3、使用json格式的序列化器
    否则使用jdk的序列化器
    @Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

4、接收消息【1、精确交换机:一条消息只会被消费一次;2、绑定队列就可以了】
    1)必须使用@EnableRabbit
    2)监听方法必须放在@Component中
    3)@RabbitListener(queues={"hello-java-queue"})放在类上
       @RabbitHandler:标在方法上【作用:重载处理不同类型的数据】

接受消息代码:

RabbitMQ 消息确认机制

RabbitMQ 消息确认机制-可靠抵达【手动确认+拒绝(拒绝的进入死信路由)】

代码语言:javascript
复制
事务消息:发送返回【信息才到达】

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入 确认机制
	发送端确认机制: 两个:
	publisher confirmCallback 确认模式【如果投递到Broker了,回调confirmCallback方法】
	publisher returnCallback未投递到queue 退回模式【如果没有投递到queue,调用returnCallback】

    消费端确认机制:【消费者收到消息,给服务器发送确认,服务器删除该消息】
	consumer ack机制(消息确认机制)【让Broker知道哪些消息被消费者正确消费了(如果没有则重新投递)】
    	1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端会移除这个消息
    	BUG:消息丢失,消费者监听队列【所有消息会一次性发送到通道,所以自动确认宕机会导致消息丢失】
    	2、手动确认:处理一个确认一个【否则是未签收状态,服务器宕机则会重新进入ready状态不会丢失】
    	参数1:消息下标,参数2:是否批量签收
    	签收:channel.basicAck(message.getMessageProperties().getDeliverTag(), false);

最终解决方案:确认机制+本地事务表
1、发送消息的时候生成消息ID,然后在回调方法里面修改数据库里消息的状态
2、定时扫描数据库消息的状态,没有成功的重新投递一次
3、消费消息时使用手动签收机制【不要使用自动签收】

配置:
spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
#    虚拟主机
    virtual-host: /
#    开启发送端抵达队列确认【发送端确认机制+本地事务表】
    publisher-returns: true
#    开启发送确认【发送端确认机制+本地事务表】
    publisher-confirm-type: correlated
#    只要抵达队列,优先回调return confirm
    template:
      mandatory: true
#    使用手动确认模式,关闭自动确认【消息丢失】
    listener:
      simple:
        acknowledge-mode: manual


@Configuration
public class MyRabbitConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}
代码语言:javascript
复制
使用方法:
监听队列的方法参数上加上通道Channel
然后channel 签收或拒绝 ack/reject(成为死信)

消费者消费:

签收+拒收【并返回服务器入队】multiple:批量签收,requeue:是否重新入队

四、项目搭建

代码语言:javascript
复制
1、等待付款(详情页)/mydata/nginx/html/static/order/detail

2、订单页(订单列表页)/mydata/nginx/html/static/order/list

3、结算页(订单确认页)/mydata/nginx/html/static/order/confirm

4、收银页(支付页)/mydata/nginx/html/static/order/pay

# gulimall
192.168.56.10 gulimall.com
192.168.56.10 search.gulimall.com
192.168.56.10 item.gulimall.com
192.168.56.10 auth.gulimall.com
192.168.56.10 cart.gulimall.com
192.168.56.10 order.gulimall.com

127.0.0.1 ssoserver.com
127.0.0.1 client1.com
127.0.0.1 client2.com

#网关
      - id: gulimall_order_route
        uri: lb://gulimall-order
        predicates:
        - Host=order.gulimall.com

等待付款,详情页:

订单页:

结算页,订单确认页:

收银页:

五、订单服务

1、订单流程

订单生成 -> 支付订单 -> 卖家发货 -> 确认收货 -> 交易成功

2、登录拦截

因为订单系统必然涉及到用户信息,因此进入订单系统的请求必须是已经登录的,所以我们必须通过拦截器对为登录订单请求进行拦截

代码语言:javascript
复制
@Component
public class LoginInterceptor implements HandlerInterceptor {

    public static ThreadLocal<MemberResponseVo> loginUser = new ThreadLocal<>();

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        HttpSession session = request.getSession();
        MemberResponseVo memberResponseVo = (MemberResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
        if(memberResponseVo != null){
            loginUser.set(memberResponseVo);
            return true;
        }else{
            session.setAttribute("msg","请先登录");
            response.sendRedirect("http://auth.gulimall.com/login.html");
            return false;
        }
    }
}
代码语言:javascript
复制
@Configuration
public class GulimallWebConfig implements WebMvcConfigurer {

    @Resource
    private LoginInterceptor loginInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(loginInterceptor).addPathPatterns("/**");
    }
}

3、订单确认页

3.1 模型抽取

跳转到确认页时需要携带的数据模型

代码语言:javascript
复制
public class OrderConfirmVo {

    @Getter
    @Setter
    /** 会员收获地址列表 **/
    private List<MemberAddressVo> memberAddressVos;

    @Getter @Setter
    /** 所有选中的购物项 **/
    private List<OrderItemVo> items;

    /** 发票记录 **/
    @Getter @Setter
    /** 优惠券(会员积分) **/
    private Integer integration;

    /** 防止重复提交的令牌 **/
    @Getter @Setter
    private String orderToken;

    @Getter @Setter
    Map<Long,Boolean> stocks;

    public Integer getCount() {
        Integer count = 0;
        if (items != null && items.size() > 0) {
            for (OrderItemVo item : items) {
                count += item.getCount();
            }
        }
        return count;
    }


    /** 订单总额 **/
    //BigDecimal total;
    //计算订单总额
    public BigDecimal getTotal() {
        BigDecimal totalNum = BigDecimal.ZERO;
        if (items != null && items.size() > 0) {
            for (OrderItemVo item : items) {
                //计算当前商品的总价格
                BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));
                //再计算全部商品的总价格
                totalNum = totalNum.add(itemPrice);
            }
        }
        return totalNum;
    }


    /** 应付价格 **/
    //BigDecimal payPrice;
    public BigDecimal getPayPrice() {
        return getTotal();
    }
}
3.2 数据获取
  • 查询购物项、库存和收货地址都要远程调用远程服务,串行会浪费大量时间,因此我们使用 CompletableFuture 进行异步编排
  • 可能由于延迟,订单提交按钮可能被点击多次,为了防止重复提交的问题,我们在返回订单确认页时,在 redis 中生成一个随机的令牌,过期时间为 30min,提交的订单会携带这个令牌,我们将会在订单提交的处理页面核验令牌。
代码语言:javascript
复制
@RequestMapping("/toTrade")
public String toTrade(Model model) {
    OrderConfirmVo confirmVo = orderService.confirmOrder();
    model.addAttribute("confirmOrder", confirmVo);
    return "confirm";
}

@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
    MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
    OrderConfirmVo orderConfirmVo = new OrderConfirmVo();

    //TODO :获取当前线程请求头信息(解决Feign异步调用丢失请求头问题)
    RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

    // 开启第一个异步任务
    CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {

        // 每个线程都来共享之前的请求数据
        RequestContextHolder.setRequestAttributes(requestAttributes);

        // 1、远程查询所有的收获地址列表
        List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVo.getId());
        orderConfirmVo.setMemberAddressVos(address);
    }, executor);


    CompletableFuture<Void> cartInfoFuture = CompletableFuture.runAsync(() -> {

        // 每个线程都来功享之前的请求数据
        RequestContextHolder.setRequestAttributes(requestAttributes);

        // 2. 查出所有选中购物项
        List<OrderItemVo> checkedItems = cartFeignService.getCurrentCartItems();
        orderConfirmVo.setItems(checkedItems);
        // fegin在远程调用之前要构造请求,调用很多的拦截器
    }, executor);

    // 3、查询用户积分
    Integer integration = memberResponseVo.getIntegration();
    orderConfirmVo.setIntegeration(integration);

    // 4、价格数据自动计算

    // TODO 5、防重令牌(防止表单重复提交)
    // 为用户设置一个token,三十分钟过期时间(存在redis)
    String token = UUID.randomUUID().toString().replace("-", "");
    redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES);
    orderConfirmVo.setOrderToken(token);

    CompletableFuture.allOf(addressFuture,cartInfoFuture).get();

    return orderConfirmVo;
}
3.3 Feign 远程调用丢失请求头问题

feign远程调用的请求头中没有含有JSESSIONIDcookie,所以也就不能得到服务端的session数据,cart 认为没登录,获取不了用户信息

代码语言:javascript
复制
Request targetRequest(RequestTemplate template) {
  for (RequestInterceptor interceptor : requestInterceptors) {
    interceptor.apply(template);
  }
  return target.apply(template);
}

但是在feign的调用过程中,会使用容器中的RequestInterceptorRequestTemplate进行处理,因此我们可以通过向容器中导入定制的RequestInterceptor为请求加上cookie

代码语言:javascript
复制
public class GuliFeignConfig {
    @Bean
    public RequestInterceptor requestInterceptor() {
        return new RequestInterceptor() {
            @Override
            public void apply(RequestTemplate template) {
                //1. 使用RequestContextHolder拿到老请求的请求数据
                ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
                if (requestAttributes != null) {
                    HttpServletRequest request = requestAttributes.getRequest();
                    if (request != null) {
                        //2. 将老请求得到cookie信息放到feign请求上
                        String cookie = request.getHeader("Cookie");
                        template.header("Cookie", cookie);
                    }
                }
            }
        };
    }
}
  • RequestContextHolder为 SpingMVC 中共享request数据的上下文,底层由ThreadLocal实现

经过RequestInterceptor处理后的请求如下,已经加上了请求头的Cookie信息

3.4 Feign 异步情况丢失上下文问题

  • 由于RequestContextHolder使用ThreadLocal共享数据,所以在开启异步时获取不到老请求的信息,自然也就无法共享cookie

在这种情况下,我们需要在开启异步的时候将老请求的RequestContextHolder的数据设置进去

3.5 运费收件信息获取

数据封装

代码语言:javascript
复制
@Data
public class FareVo {
    private MemberAddressVo address;
    private BigDecimal fare;
}

在页面将选中地址的 id 传给请求

代码语言:javascript
复制
@RequestMapping("/fare/{addrId}")
public FareVo getFare(@PathVariable("addrId") Long addrId) {
    return wareInfoService.getFare(addrId);
}

@Override
public FareVo getFare(Long addrId) {
    FareVo fareVo = new FareVo();
    R info = memberFeignService.info(addrId);
    if (info.getCode() == 0) {
        MemberAddressVo address = info.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {
        });
        fareVo.setAddress(address);
        String phone = address.getPhone();
        //取电话号的最后两位作为邮费
        String fare = phone.substring(phone.length() - 2, phone.length());
        fareVo.setFare(new BigDecimal(fare));
    }
    return fareVo;
}

4、订单提交

4.1 模型抽取

页面提交数据

代码语言:javascript
复制
@Data
public class OrderSubmitVo {

    /** 收获地址的id **/
    private Long addrId;

    /** 支付方式 **/
    private Integer payType;
    //无需提交要购买的商品,去购物车再获取一遍
    //优惠、发票

    /** 防重令牌 **/
    private String orderToken;

    /** 应付价格 **/
    private BigDecimal payPrice;

    /** 订单备注 **/
    private String remarks;

    //用户相关的信息,直接去session中取出即可
}

成功后转发至支付页面携带数据

代码语言:javascript
复制
@Data
public class SubmitOrderResponseVo {

    private OrderEntity order;

    /** 错误状态码 **/
    private Integer code;
}
4.2 提交订单
  • 提交订单成功,则携带返回数据转发至支付页面
  • 提交订单失败,则携带错误信息重定向至确认页
代码语言:javascript
复制
@RequestMapping("/submitOrder")
public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes attributes) {
    try{
        SubmitOrderResponseVo responseVo=orderService.submitOrder(submitVo);
        Integer code = responseVo.getCode();
        if (code==0){
            model.addAttribute("order", responseVo.getOrder());
            return "pay";
        }else {
            String msg = "下单失败;";
            switch (code) {
                case 1:
                    msg += "防重令牌校验失败";
                    break;
                case 2:
                    msg += "商品价格发生变化";
                    break;
            }
            attributes.addFlashAttribute("msg", msg);
            return "redirect:http://order.gulimall.com/toTrade";
        }
    }catch (Exception e){
        if (e instanceof NoStockException){
            String msg = "下单失败,商品无库存";
            attributes.addFlashAttribute("msg", msg);
        }
        return "redirect:http://order.gulimall.com/toTrade";
    }
}
代码语言:javascript
复制
    @Transactional
    @Override
    public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {
        SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();
        responseVo.setCode(0);
        //1. 验证防重令牌
        MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
        String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Long execute = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), submitVo.getOrderToken());
        if (execute == 0L) {
            //1.1 防重令牌验证失败
            responseVo.setCode(1);
            return responseVo;
        }else {
            //2. 创建订单、订单项
            OrderCreateTo order =createOrderTo(memberResponseVo,submitVo);

            //3. 验价
            BigDecimal payAmount = order.getOrder().getPayAmount();
            BigDecimal payPrice = submitVo.getPayPrice();
            if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
                //4. 保存订单
                saveOrder(order);
                //5. 锁定库存
                List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
                    OrderItemVo orderItemVo = new OrderItemVo();
                    orderItemVo.setSkuId(item.getSkuId());
                    orderItemVo.setCount(item.getSkuQuantity());
                    return orderItemVo;
                }).collect(Collectors.toList());
                R r = wareFeignService.orderLockStock(orderItemVos);
                //5.1 锁定库存成功
                if (r.getCode()==0){
//                    int i = 10 / 0;
                    responseVo.setOrder(order.getOrder());
                    responseVo.setCode(0);
                    return responseVo;
                }else {
                    //5.1 锁定库存失败
                    String msg = (String) r.get("msg");
                    throw new NoStockException(msg);
                }

            }else {
                //验价失败
                responseVo.setCode(2);
                return responseVo;
            }
        }
    }
1.验证防重令牌

为防止在获取令牌、对比值和删除令牌之间发生错误导入令牌校验出错,我们必须使用脚本保证原子性操作

代码语言:javascript
复制
MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long execute = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), submitVo.getOrderToken());
if (execute == 0L) {
    //1.1 防重令牌验证失败
    responseVo.setCode(1);
    return responseVo;
2.创建订单、订单项

抽取模型

代码语言:javascript
复制
@Data
public class OrderCreateTo {

    private OrderEntity order;

    private List<OrderItemEntity> orderItems;

    /** 订单计算的应付价格 **/
    private BigDecimal payPrice;

    /** 运费 **/
    private BigDecimal fare;

}

创建订单、订单项

代码语言:javascript
复制
//2. 创建订单、订单项
OrderCreateTo order =createOrderTo(memberResponseVo,submitVo);

private OrderCreateTo createOrderTo(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo) {
    //2.1 用IdWorker生成订单号
    String orderSn = IdWorker.getTimeId();
    //2.2 构建订单
    OrderEntity entity = buildOrder(memberResponseVo, submitVo,orderSn);
    //2.3 构建订单项
    List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);
    //2.4 计算价格
    compute(entity, orderItemEntities);
    OrderCreateTo createTo = new OrderCreateTo();
    createTo.setOrder(entity);
    createTo.setOrderItems(orderItemEntities);
    return createTo;
}

构建订单

代码语言:javascript
复制
private OrderEntity buildOrder(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo, String orderSn) {

        OrderEntity orderEntity =new OrderEntity();

        orderEntity.setOrderSn(orderSn);

        //1) 设置用户信息
        orderEntity.setMemberId(memberResponseVo.getId());
        orderEntity.setMemberUsername(memberResponseVo.getUsername());

        //2) 获取邮费和收件人信息并设置
        FareVo fareVo = wareFeignService.getFare(submitVo.getAddrId());
        BigDecimal fare = fareVo.getFare();
        orderEntity.setFreightAmount(fare);
        MemberAddressVo address = fareVo.getAddress();
        orderEntity.setReceiverName(address.getName());
        orderEntity.setReceiverPhone(address.getPhone());
        orderEntity.setReceiverPostCode(address.getPostCode());
        orderEntity.setReceiverProvince(address.getProvince());
        orderEntity.setReceiverCity(address.getCity());
        orderEntity.setReceiverRegion(address.getRegion());
        orderEntity.setReceiverDetailAddress(address.getDetailAddress());

        //3) 设置订单相关的状态信息
        orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode());
        orderEntity.setConfirmStatus(0);
        orderEntity.setAutoConfirmDay(7);

        return orderEntity;
    }

构建订单项

代码语言:javascript
复制
private OrderItemEntity buildOrderItem(OrderItemVo item) {
        OrderItemEntity orderItemEntity = new OrderItemEntity();
        Long skuId = item.getSkuId();
        //1) 设置sku相关属性
        orderItemEntity.setSkuId(skuId);
        orderItemEntity.setSkuName(item.getTitle());
        orderItemEntity.setSkuAttrsVals(StringUtils.collectionToDelimitedString(item.getSkuAttrValues(), ";"));
        orderItemEntity.setSkuPic(item.getImage());
        orderItemEntity.setSkuPrice(item.getPrice());
        orderItemEntity.setSkuQuantity(item.getCount());
        //2) 通过skuId查询spu相关属性并设置
        R r = productFeignService.getSpuBySkuId(skuId);
        if (r.getCode() == 0) {
            SpuInfoTo spuInfo = r.getData(new TypeReference<SpuInfoTo>() {
            });
            orderItemEntity.setSpuId(spuInfo.getId());
            orderItemEntity.setSpuName(spuInfo.getSpuName());
            orderItemEntity.setSpuBrand(spuInfo.getBrandName());
            orderItemEntity.setCategoryId(spuInfo.getCatalogId());
        }
        //3) 商品的优惠信息(不做)

        //4) 商品的积分成长,为价格x数量
        orderItemEntity.setGiftGrowth(item.getPrice().multiply(new BigDecimal(item.getCount())).intValue());
        orderItemEntity.setGiftIntegration(item.getPrice().multiply(new BigDecimal(item.getCount())).intValue());

        //5) 订单项订单价格信息
        orderItemEntity.setPromotionAmount(BigDecimal.ZERO);
        orderItemEntity.setCouponAmount(BigDecimal.ZERO);
        orderItemEntity.setIntegrationAmount(BigDecimal.ZERO);

        //6) 实际价格
        BigDecimal origin = orderItemEntity.getSkuPrice().multiply(new BigDecimal(orderItemEntity.getSkuQuantity()));
        BigDecimal realPrice = origin.subtract(orderItemEntity.getPromotionAmount())
                .subtract(orderItemEntity.getCouponAmount())
                .subtract(orderItemEntity.getIntegrationAmount());
        orderItemEntity.setRealAmount(realPrice);

        return orderItemEntity;
    }

计算订单价格

代码语言:javascript
复制
private void compute(OrderEntity entity, List<OrderItemEntity> orderItemEntities) {
        //总价
        BigDecimal total = BigDecimal.ZERO;
        //优惠价格
        BigDecimal promotion=new BigDecimal("0.0");
        BigDecimal integration=new BigDecimal("0.0");
        BigDecimal coupon=new BigDecimal("0.0");
        //积分
        Integer integrationTotal = 0;
        Integer growthTotal = 0;

        for (OrderItemEntity orderItemEntity : orderItemEntities) {
            total=total.add(orderItemEntity.getRealAmount());
            promotion=promotion.add(orderItemEntity.getPromotionAmount());
            integration=integration.add(orderItemEntity.getIntegrationAmount());
            coupon=coupon.add(orderItemEntity.getCouponAmount());
            integrationTotal += orderItemEntity.getGiftIntegration();
            growthTotal += orderItemEntity.getGiftGrowth();
        }

        entity.setTotalAmount(total);
        entity.setPromotionAmount(promotion);
        entity.setIntegrationAmount(integration);
        entity.setCouponAmount(coupon);
        entity.setIntegration(integrationTotal);
        entity.setGrowth(growthTotal);

        //付款价格=商品价格+运费
        entity.setPayAmount(entity.getFreightAmount().add(total));

        //设置删除状态(0-未删除,1-已删除)
        entity.setDeleteStatus(0);
    }
3.验价

将页面提交的价格和后台计算的价格进行对比,若不同则提示用户商品价格发生变化

代码语言:javascript
复制
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = submitVo.getPayPrice();
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
			/****************/
}else {
    //验价失败
    responseVo.setCode(2);
    return responseVo;
}
4.保存订单
代码语言:javascript
复制
private void saveOrder(OrderCreateTo orderCreateTo) {
    OrderEntity order = orderCreateTo.getOrder();
    order.setCreateTime(new Date());
    order.setModifyTime(new Date());
    this.save(order);
    orderItemService.saveBatch(orderCreateTo.getOrderItems());
}
5.锁定库存
代码语言:javascript
复制
List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
                   OrderItemVo orderItemVo = new OrderItemVo();
                   orderItemVo.setSkuId(item.getSkuId());
                   orderItemVo.setCount(item.getSkuQuantity());
                   return orderItemVo;
               }).collect(Collectors.toList());
               R r = wareFeignService.orderLockStock(orderItemVos);
               //5.1 锁定库存成功
               if (r.getCode()==0){
                   responseVo.setOrder(order.getOrder());
                   responseVo.setCode(0);
                   return responseVo;
               }else {
                   //5.2 锁定库存失败
                   String msg = (String) r.get("msg");
                   throw new NoStockException(msg);
               }
  • 找出所有库存大于商品数的仓库
  • 遍历所有满足条件的仓库,逐个尝试锁库存,若锁库存成功则退出遍历
代码语言:javascript
复制
@RequestMapping("/lock/order")
public R orderLockStock(@RequestBody List<OrderItemVo> itemVos) {
    try {
        Boolean lock = wareSkuService.orderLockStock(itemVos);
        return R.ok();
    } catch (NoStockException e) {
        return R.error(BizCodeEnum.NO_STOCK_EXCEPTION.getCode(), BizCodeEnum.NO_STOCK_EXCEPTION.getMsg());
    }
}

@Transactional
@Override
public Boolean orderLockStock(List<OrderItemVo> itemVos) {
    List<SkuLockVo> lockVos = itemVos.stream().map((item) -> {
        SkuLockVo skuLockVo = new SkuLockVo();
        skuLockVo.setSkuId(item.getSkuId());
        skuLockVo.setNum(item.getCount());
        //找出所有库存大于商品数的仓库
        List<Long> wareIds = baseMapper.listWareIdsHasStock(item.getSkuId(), item.getCount());
        skuLockVo.setWareIds(wareIds);
        return skuLockVo;
    }).collect(Collectors.toList());

    for (SkuLockVo lockVo : lockVos) {
        boolean lock = true;
        Long skuId = lockVo.getSkuId();
        List<Long> wareIds = lockVo.getWareIds();
        //如果没有满足条件的仓库,抛出异常
        if (wareIds == null || wareIds.size() == 0) {
            throw new NoStockException(skuId);
        }else {
            for (Long wareId : wareIds) {
                Long count=baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);
                if (count==0){
                    lock=false;
                }else {
                    lock = true;
                    break;
                }
            }
        }
        if (!lock) throw new NoStockException(skuId);
    }
    return true;
}

这里通过异常机制控制事务回滚,如果在锁定库存失败则抛出NoStockExceptions,订单服务和库存服务都会回滚。

六、分布式事务

1、分布式事务

2、整合 Spring cloud alibaba Seata

在 common 添加依赖

seata-all 使用 0.9【所以启动 事务协调者 0.9 版本的】

代码语言:javascript
复制
<!--seata 分布式事务-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>0.9.0</version>
</dependency>

2、使用一个 @GlobalTransactional 注解在业务方法上【TM 事务管理器上】 资源管理器上只需要标注@Transactional 就可以了【各远程方法】

代码语言:javascript
复制
@GlobalTransactional
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {
}

3、各数据库上创建表

代码语言:javascript
复制
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

安装 事务协调器:seata-server-1.3.0.zip 从 https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩。 5、修改事务协调器的配置 registry.conf

代码语言:javascript
复制
1)把自己注册到nacos,并且设置nacos的url
registry {
	# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
 		type = "nacos"


2)配置放在file【也可以在nacos上】
config {
 		# file、nacos 、apollo、zk、consul、etcd3
 		type = "file"

七、使用消息队列实现最终一致性

1、延迟队列的定义与实现

  • 定义: 延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息” 是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个进行消费。
  • 实现: RabbitMQ 可以通过设置队列的 TTL 和 死信路由实现延迟队列
代码语言:txt
复制
- TTL:

RabbitMQ 可以正确 Queue 设置 x-expires 或者 针对 Message 设置 x-message-ttl, 来控制消息的生存时间。如果超时(两者同时设置以最先到期的时间为准),则消息变为 dead letter(死信)

代码语言:txt
复制
- 死信路由 DLX

RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-router-key(可选)两个参数,如果队列内出现了 dead letter, 则按照这两个参数重新路由转发到指定的队列。

代码语言:txt
复制
- x-dead-letter-exchange:出现 dead letter 之后将 dead letter 重新发送到指定 exchange
- x-dead-letter-routing-key: 出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送

针对订单创建以上消息队列,创建订单时消息会被发送至队列 order.delay.queue , 经过 TTL 的时间后消息会变成死信以 order.release.order 的路由键经交换机转发至队列 order.release.order.queue ,在通过监听该队列的消息来实现过期订单的处理。

2、延迟队列使用场景

为什么不能用定时任务完成?

如果恰好在一次扫描后完成业务逻辑,那么就会等待两个扫描周期才能扫到过期的订单,不能保证时效性

3、 定时关单与库存解锁主体逻辑

  • 订单超时未支付触发订单过期状态修改与库存解锁

创建订单时消息会被发送至队列 order.delay.queue , 经过 TTL 的时间后消息会变成死信以 order.release.order 的路由键交换机转发至队列 order.release.order.queue, 在通过监听该队列的消息来实现过期订单的处理

  • 如果该订单已支付,则无需处理
  • 否则说明该订单已过期,修改该订单的状态并通过并通过路由键 order.release.other 发送消息至队列 stock.release.stock.queue 进行库存解锁
  • 库存锁定后延迟检查是否需要解锁库存

在库存锁定后通过 路由键 stock.locked 发送至 延迟队列stock.delay.queue , 延迟时间到,死信通过 路由键stock.release 转发至 stock.release.stock.queue, 通过监听该队列进行判断当前订单状态,来确认库存是否需要解锁

  • 由于 关闭订单库存解锁 都有可能被执行多次,因此要保证业务逻辑的幂等性,在执行业务是重新查询当前的状态进行判断
  • 订单关闭和库存解锁都会进行库存解锁的操作,来确保业务异常或者订单过期时库存会被可靠解锁

4、创建业务交换机和队列

订单模块

代码语言:javascript
复制
@Configuration
public class MyRabbitmqConfig {
    @Bean
    public Exchange orderEventExchange() {
        /**
         *   String name,
         *   boolean durable,
         *   boolean autoDelete,
         *   Map<String, Object> arguments
         */
        return new TopicExchange("order-event-exchange", true, false);
    }

    /**
     * 延迟队列
     * @return
     */
    @Bean
    public Queue orderDelayQueue() {
       /**
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
        HashMap<String, Object> arguments = new HashMap<>();
        //死信交换机
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        //死信路由键
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
        return new Queue("order.delay.queue",true,false,false,arguments);
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {

        Queue queue = new Queue("order.release.order.queue", true, false, false);

        return queue;
    }

    /**
     * 创建订单的binding
     * @return
     */
    @Bean
    public Binding orderCreateBinding() {
        /**
         * String destination, 目的地(队列名或者交换机名字)
         * DestinationType destinationType, 目的地类型(Queue、Exhcange)
         * String exchange,
         * String routingKey,
         * Map<String, Object> arguments
         * */
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    @Bean
    public Binding orderReleaseBinding() {
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }

    @Bean
    public Binding orderReleaseOrderBinding() {
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.other.#",
                null);
    }
}

库存模块

代码语言:javascript
复制
@Configuration
public class MyRabbitmqConfig {

    @Bean
    public Exchange stockEventExchange() {
        return new TopicExchange("stock-event-exchange", true, false);
    }

    /**
     * 延迟队列
     * @return
     */
    @Bean
    public Queue stockDelayQueue() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息过期时间 2分钟
        arguments.put("x-message-ttl", 120000);
        return new Queue("stock.delay.queue", true, false, false, arguments);
    }

    /**
     * 普通队列,用于解锁库存
     * @return
     */
    @Bean
    public Queue stockReleaseStockQueue() {
        return new Queue("stock.release.stock.queue", true, false, false, null);
    }


    /**
     * 交换机和延迟队列绑定
     * @return
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }

    /**
     * 交换机和普通队列绑定
     * @return
     */
    @Bean
    public Binding stockReleaseBinding() {
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }
}

5、库存自动解锁

5.1 库存锁定

在库存锁定是添加一下逻辑

  • 由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库 id, 商品 id, 锁了几件….)
  • 在锁定成功后,向延迟对队列发消息,带上库存锁定的相关信息
代码语言:javascript
复制
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo wareSkuLockVo) {
    //因为可能出现订单回滚后,库存锁定不回滚的情况,但订单已经回滚,得不到库存锁定信息,因此要有库存工作单
    WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
    taskEntity.setOrderSn(wareSkuLockVo.getOrderSn());
    taskEntity.setCreateTime(new Date());
    wareOrderTaskService.save(taskEntity);

    List<OrderItemVo> itemVos = wareSkuLockVo.getLocks();
    List<SkuLockVo> lockVos = itemVos.stream().map((item) -> {
        SkuLockVo skuLockVo = new SkuLockVo();
        skuLockVo.setSkuId(item.getSkuId());
        skuLockVo.setNum(item.getCount());
        List<Long> wareIds = baseMapper.listWareIdsHasStock(item.getSkuId(), item.getCount());
        skuLockVo.setWareIds(wareIds);
        return skuLockVo;
    }).collect(Collectors.toList());

    for (SkuLockVo lockVo : lockVos) {
        boolean lock = true;
        Long skuId = lockVo.getSkuId();
        List<Long> wareIds = lockVo.getWareIds();
        if (wareIds == null || wareIds.size() == 0) {
            throw new NoStockException(skuId);
        }else {
            for (Long wareId : wareIds) {
                Long count=baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);
                if (count==0){
                    lock=false;
                }else {
                    //锁定成功,保存工作单详情
                    WareOrderTaskDetailEntity detailEntity = WareOrderTaskDetailEntity.builder()
                            .skuId(skuId)
                            .skuName("")
                            .skuNum(lockVo.getNum())
                            .taskId(taskEntity.getId())
                            .wareId(wareId)
                            .lockStatus(1).build();
                    wareOrderTaskDetailService.save(detailEntity);
                    //发送库存锁定消息至延迟队列
                    StockLockedTo lockedTo = new StockLockedTo();
                    lockedTo.setId(taskEntity.getId());
                    StockDetailTo detailTo = new StockDetailTo();
                    BeanUtils.copyProperties(detailEntity,detailTo);
                    lockedTo.setDetailTo(detailTo);
                    rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);

                    lock = true;
                    break;
                }
            }
        }
        if (!lock) throw new NoStockException(skuId);
    }
    return true;
}
5.2 监听队列
  • 延迟队列会将过期的消息路由至"stock.release.stock.queue",通过监听该队列实现库存的解锁
  • 为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队
代码语言:javascript
复制
@Component
@RabbitListener(queues = {"stock.release.stock.queue"})
public class StockReleaseListener {

    @Autowired
    private WareSkuService wareSkuService;

    @RabbitHandler
    public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
        log.info("************************收到库存解锁的消息********************************");
        try {
            wareSkuService.unlock(stockLockedTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}
5.3 库存解锁
  • 如果工作单详情不为空,说明该库存锁定成功
    • 查询最新的订单状态,如果订单不存在,说明订单提交出现异常回滚,或者订单处于已取消的状态,我们都对已锁定的库存进行解锁
  • 如果工作单详情为空,说明库存未锁定,自然无需解锁
  • 为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁
代码语言:javascript
复制
@Override
   public void unlock(StockLockedTo stockLockedTo) {
       StockDetailTo detailTo = stockLockedTo.getDetailTo();
       WareOrderTaskDetailEntity detailEntity = wareOrderTaskDetailService.getById(detailTo.getId());
       //1.如果工作单详情不为空,说明该库存锁定成功
       if (detailEntity != null) {
           WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(stockLockedTo.getId());
           R r = orderFeignService.infoByOrderSn(taskEntity.getOrderSn());
           if (r.getCode() == 0) {
               OrderTo order = r.getData("order", new TypeReference<OrderTo>() {
               });
               //没有这个订单||订单状态已经取消 解锁库存
               if (order == null||order.getStatus()== OrderStatusEnum.CANCLED.getCode()) {
                   //为保证幂等性,只有当工作单详情处于被锁定的情况下才进行解锁
                   if (detailEntity.getLockStatus()== WareTaskStatusEnum.Locked.getCode()){
                       unlockStock(detailTo.getSkuId(), detailTo.getSkuNum(), detailTo.getWareId(), detailEntity.getId());
                   }
               }
           }else {
               throw new RuntimeException("远程调用订单服务失败");
           }
       }else {
           //无需解锁
       }
   }

6、定时关单

6.1 提交订单
代码语言:javascript
复制
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {

    //提交订单的业务处理。。。

    //发送消息到订单延迟队列,判断过期订单
    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());


}
6.2 监听队列

创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue,因此我们对该队列进行监听,进行订单的关闭

代码语言:javascript
复制
@Component
@RabbitListener(queues = {"order.release.order.queue"})
public class OrderCloseListener {

    @Autowired
    private OrderService orderService;

    @RabbitHandler
    public void listener(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
        System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            orderService.closeOrder(orderEntity);
            channel.basicAck(deliveryTag,false);
        } catch (Exception e){
            channel.basicReject(deliveryTag,true);
        }

    }
}
6.3 关闭订单
  • 由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单
  • 关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁
代码语言:javascript
复制
@Override
public void closeOrder(OrderEntity orderEntity) {
    //因为消息发送过来的订单已经是很久前的了,中间可能被改动,因此要查询最新的订单
    OrderEntity newOrderEntity = this.getById(orderEntity.getId());
    //如果订单还处于新创建的状态,说明超时未支付,进行关单
    if (newOrderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {
        OrderEntity updateOrder = new OrderEntity();
        updateOrder.setId(newOrderEntity.getId());
        updateOrder.setStatus(OrderStatusEnum.CANCLED.getCode());
        this.updateById(updateOrder);

        //关单后发送消息通知其他服务进行关单相关的操作,如解锁库存
        OrderTo orderTo = new OrderTo();
        BeanUtils.copyProperties(newOrderEntity,orderTo);
        rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other",orderTo);
    }
}
6.4 解锁库存
代码语言:javascript
复制
@Slf4j
@Component
@RabbitListener(queues = {"stock.release.stock.queue"})
public class StockReleaseListener {

    @Autowired
    private WareSkuService wareSkuService;

    @RabbitHandler
    public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
        log.info("************************收到库存解锁的消息********************************");
        try {
            wareSkuService.unlock(stockLockedTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }

    @RabbitHandler
    public void handleStockLockedRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
        log.info("************************从订单模块收到库存解锁的消息********************************");
        try {
            wareSkuService.unlock(orderTo);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}
代码语言:javascript
复制
@Override
public void unlock(OrderTo orderTo) {
    //为防止重复解锁,需要重新查询工作单
    String orderSn = orderTo.getOrderSn();
    WareOrderTaskEntity taskEntity = wareOrderTaskService.getBaseMapper().selectOne((new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn)));
    //查询出当前订单相关的且处于锁定状态的工作单详情
    List<WareOrderTaskDetailEntity> lockDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", taskEntity.getId()).eq("lock_status", WareTaskStatusEnum.Locked.getCode()));
    for (WareOrderTaskDetailEntity lockDetail : lockDetails) {
        unlockStock(lockDetail.getSkuId(),lockDetail.getSkuNum(),lockDetail.getWareId(),lockDetail.getId());
    }
}

八、支付

1、支付宝加密原理

  • 支付宝加密采用 RSA 非对称加密,分别在客户端和支付端有两个公钥和私钥
  • 在发送订单数据时,直接使用明文但会使用 商户私钥 加一个对应的签名,支付宝端会使用 商户公钥 对签名进行验签,只有数据明文和签名对应的时候才能说明传输正确。
  • 支付宝成功后,支付宝发送支付成功数据之外,还会使用 支付宝私钥 加一个对应的签名,商户端收到支付成功数据之后会使用 支付宝公钥 验签,成功后才能确认。

2、配置支付宝沙箱环境

3、环境搭建

导入支付宝 sdk

代码语言:javascript
复制
<dependency>
    <groupId>com.alipay.sdk</groupId>
    <artifactId>alipay-sdk-java</artifactId>
    <version>4.9.28.ALL</version>
</dependency>

抽取支付工具类并进行配置

成功调用该接口后,返回的数据就是支付页面的 html,因此后续会使用@ResponseBody

代码语言:javascript
复制
@ConfigurationProperties(prefix = "alipay")
@Component
@Data
public class AlipayTemplate {

    //在支付宝创建的应用的id
    private   String app_id = "2016102600763190";

    // 商户私钥,您的PKCS8格式RSA2私钥
    private String merchant_private_key = "MjXN6Hnj8k2GAriRFt0BS9gjihbl9Rt38VMNbBi3Vt3Cy6TOwANLLJ/DfnYjRqwCG81fkyKlDqdsamdfCiTysCa0gQKBgQDYQ45LSRxAOTyM5NliBmtev0lbpDa7FqXL0UFgBel5VgA1Ysp0+6ex2n73NBHbaVPEXgNMnTdzU3WF9uHF4Gj0mfUzbVMbj/YkkHDOZHBggAjEHCB87IKowq/uAH/++Qes2GipHHCTJlG6yejdxhOsMZXdCRnidNx5yv9+2JI37QKBgQCw0xn7ZeRBIOXxW7xFJw1WecUV7yaL9OWqKRHat3lFtf1Qo/87cLl+KeObvQjjXuUe07UkrS05h6ijWyCFlBo2V7Cdb3qjq4atUwScKfTJONnrF+fwTX0L5QgyQeDX5a4yYp4pLmt6HKh34sI5S/RSWxDm7kpj+/MjCZgp6Xc51g==";

    // 支付宝公钥,查看地址:https://openhome.alipay.com/platform/keyManage.htm 对应APPID下的支付宝公钥。
    private String alipay_public_key = "MIIBIjA74UKxt2F8VMIRKrRAAAuIMuawIsl4Ye+G12LK8P1ZLYy7ZJpgZ+Wv5nOs3DdoEazgCERj/ON8lM1KBHZOAV+TkrIcyi7cD1gfv4a1usikrUqm8/qhFvoiUfyHJFv1ymT7C4BI6aHzQ2zcUlSQPGoPl4C11tgnSkm3DlH2JZKgaIMcCOnNH+qctjNh9yIV9zat2qUiXbxmrCTtxAmiI3I+eVsUNwvwIDAQAB";

    // 服务器[异步通知]页面路径  需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
    // 支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
    private  String notify_url="http://**.natappfree.cc/payed/notify";

    // 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
    //同步通知,支付成功,一般跳转到成功页
    private  String return_url="http://order.gulimall.com/memberOrder.html";

    // 签名方式
    private  String sign_type = "RSA2";

    // 字符编码格式
    private  String charset = "utf-8";

    // 支付宝网关; https://openapi.alipaydev.com/gateway.do
    private  String gatewayUrl = "https://openapi.alipaydev.com/gateway.do";

    public  String pay(PayVo vo) throws AlipayApiException {

        //AlipayClient alipayClient = new DefaultAlipayClient(AlipayTemplate.gatewayUrl, AlipayTemplate.app_id, AlipayTemplate.merchant_private_key, "json", AlipayTemplate.charset, AlipayTemplate.alipay_public_key, AlipayTemplate.sign_type);
        //1、根据支付宝的配置生成一个支付客户端
        AlipayClient alipayClient = new DefaultAlipayClient(gatewayUrl,
                app_id, merchant_private_key, "json",
                charset, alipay_public_key, sign_type);

        //2、创建一个支付请求 //设置请求参数
        AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
        alipayRequest.setReturnUrl(return_url);
        alipayRequest.setNotifyUrl(notify_url);

        //商户订单号,商户网站订单系统中唯一订单号,必填
        String out_trade_no = vo.getOut_trade_no();
        //付款金额,必填
        String total_amount = vo.getTotal_amount();
        //订单名称,必填
        String subject = vo.getSubject();
        //商品描述,可空
        String body = vo.getBody();

        alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
                + "\"total_amount\":\""+ total_amount +"\","
                + "\"subject\":\""+ subject +"\","
                + "\"body\":\""+ body +"\","
                + "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");

        String result = alipayClient.pageExecute(alipayRequest).getBody();

        //会收到支付宝的响应,响应的是一个页面,只要浏览器显示这个页面,就会自动来到支付宝的收银台页面
        System.out.println("支付宝的响应:"+result);

        return result;

}

4、订单支付与同步通知

点击支付跳转到支付接口

代码语言:javascript
复制
@ResponseBody
@GetMapping(value = "aliPayOrder", produces = "text/html")
public String aliPayOrder(@RequestParam("orderSn") String orderSn) throws AlipayApiException {
    System.out.println("接收到订单信息orderSn:" + orderSn);
    // 获取当前订单并设置支付订单相关信息
    PayVo payVo = orderService.getOrderPay(orderSn);
    String pay = alipayTemplate.pay(payVo);
    return pay;
}

@Override
public PayVo getOrderPay(String orderSn) {
    OrderEntity orderEntity = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
    PayVo payVo = new PayVo();
    //交易号
    payVo.setOut_trade_no(orderSn);
    //支付金额设置为两位小数,否则会报错
    BigDecimal payAmount = orderEntity.getPayAmount().setScale(2, BigDecimal.ROUND_UP);
    payVo.setTotal_amount(payAmount.toString());

    List<OrderItemEntity> orderItemEntities = orderItemService.list(new QueryWrapper<OrderItemEntity>().eq("order_sn", orderSn));
    OrderItemEntity orderItemEntity = orderItemEntities.get(0);
    //订单名称
    payVo.setSubject(orderItemEntity.getSkuName());
    //商品描述
    payVo.setBody(orderItemEntity.getSkuAttrsVals());
    return payVo;
}

设置成功回调地址为订单详情页

代码语言:javascript
复制
	// 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
    //同步通知,支付成功,一般跳转到成功页
    private  String return_url="http://order.gulimall.com/memberOrder.html";

/**
     * 获取当前用户的所有订单
     * @return
     */
@GetMapping("/memberOrder.html")
public String memberOrderPage(@RequestParam(value = "pageNum",required = false,defaultValue = "0") Integer pageNum, Model model){
    Map<String, Object> params = new HashMap<>();
    params.put("page",pageNum.toString());
    // 分页查询当前用户的所有订单及对应的订单项
    R orderInfo = orderFeignService.listWithItem(params);
    model.addAttribute("orders",orderInfo);
    return "orderList";
}

注意:需要给 gulimall-member 项目基本配置

  1. spring-session 依赖
  2. spring-session 配置
  3. LoginInterceptor 拦截器

5、异步通知

  • 订单支付成功后支付宝会回调商户接口,这个时候需要修改订单状态
  • 由于同步跳转可能由于网络问题失败,所以使用异步通知
  • 支付宝使用的是最大努力通知方案,保障数据一致性,隔一段时间会通知商户支付成功,直到返回success
5.1 内网穿透设置异步通知地址
  • 将外网映射到本地的order.gulimall.com:80
  • 由于回调的请求头不是order.gulimall.com,因此 nginx 转发到网关后找不到对应的服务,所以需要对 nginx 进行设置

/payed/notify异步通知转发至订单服务

设置异步通知的地址

代码语言:javascript
复制
// 服务器[异步通知]页面路径  需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
// 支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
private  String notify_url="http://****.natappfree.cc/payed/notify";
5.2 支付包支付异步通知

异步通知的参数

代码语言:javascript
复制
@PostMapping("/payed/notify")
public String handlerAlipay(HttpServletRequest request) {
    System.out.println("收到支付宝异步通知******************");
    Map<String, String[]> parameterMap = request.getParameterMap();
    for (String key : parameterMap.keySet()) {
        String value = request.getParameter(key);
        System.out.println("key:"+key+"===========>value:"+value);
    }
    return "success";
}
代码语言:javascript
复制
收到支付宝异步通知******************
key:gmt_create===========>value:2020-10-18 09:13:26
key:charset===========>value:utf-8
key:gmt_payment===========>value:2020-10-18 09:13:34
key:notify_time===========>value:2020-10-18 09:13:35
key:subject===========>value:华为
key:sign===========>value:aqhKWzgzTLE84Scy5d8i3f+t9f7t7IE5tK/s5iHf3SdFQXPnTt6MEVtbr15ZXmITEo015nCbSXaUFJvLiAhWpvkNEd6ysraa+2dMgotuHPIHnIUFwvdk+U4Ez+2A4DBTJgmwtc5Ay8mYLpHLNR9ASuEmkxxK2F3Ov6MO0d+1DOjw9c/CCRRBWR8NHSJePAy/UxMzULLtpMELQ1KUVHLgZC5yym5TYSuRmltYpLHOuoJhJw8vGkh2+4FngvjtS7SBhEhR1GvJCYm1iXRFTNgP9Fmflw+EjxrDafCIA+r69ZqoJJ2Sk1hb4cBsXgNrFXR2Uj4+rQ1Ec74bIjT98f1KpA==
key:buyer_id===========>value:2088622954825223
key:body===========>value:上市年份:2020;内存:64G
key:invoice_amount===========>value:6300.00
key:version===========>value:1.0
key:notify_id===========>value:2020101800222091334025220507700182
key:fund_bill_list===========>value:[{"amount":"6300.00","fundChannel":"ALIPAYACCOUNT"}]
key:notify_type===========>value:trade_status_sync
key:out_trade_no===========>value:12345523123
key:total_amount===========>value:6300.00
key:trade_status===========>value:TRADE_SUCCESS
key:trade_no===========>value:2020101822001425220501264292
key:auth_app_id===========>value:2016102600763190
key:receipt_amount===========>value:6300.00
key:point_amount===========>value:0.00
key:app_id===========>value:2016102600763190
key:buyer_pay_amount===========>value:6300.00
key:sign_type===========>value:RSA2
key:seller_id===========>value:2088102181115314

各参数详细意义见支付宝开放平台异步通知

验证签名

代码语言:javascript
复制
@PostMapping("/payed/notify")
public String handlerAlipay(HttpServletRequest request, PayAsyncVo payAsyncVo) throws AlipayApiException {
    System.out.println("收到支付宝异步通知******************");
    // 只要收到支付宝的异步通知,返回 success 支付宝便不再通知
    // 获取支付宝POST过来反馈信息
    //TODO 需要验签
    Map<String, String> params = new HashMap<>();
    Map<String, String[]> requestParams = request.getParameterMap();
    for (String name : requestParams.keySet()) {
        String[] values = requestParams.get(name);
        String valueStr = "";
        for (int i = 0; i < values.length; i++) {
            valueStr = (i == values.length - 1) ? valueStr + values[i]
                    : valueStr + values[i] + ",";
        }
        //乱码解决,这段代码在出现乱码时使用
        // valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
        params.put(name, valueStr);
    }

    boolean signVerified = AlipaySignature.rsaCheckV1(params, alipayTemplate.getAlipay_public_key(),
            alipayTemplate.getCharset(), alipayTemplate.getSign_type()); //调用SDK验证签名

    if (signVerified){
        System.out.println("支付宝异步通知验签成功");
        //修改订单状态
        orderService.handlerPayResult(payAsyncVo);
        return "success";
    }else {
        System.out.println("支付宝异步通知验签失败");
        return "error";
    }
}

修改订单状态与保存交易流水

代码语言:javascript
复制
@Transactional(rollbackFor = Exception.class)
@Override
public String handlePayResult(PayAsyncVo asyncVo) {

    //保存交易流水信息
    PaymentInfoEntity paymentInfo = new PaymentInfoEntity();
    paymentInfo.setOrderSn(asyncVo.getOut_trade_no());
    paymentInfo.setAlipayTradeNo(asyncVo.getTrade_no());
    paymentInfo.setTotalAmount(new BigDecimal(asyncVo.getBuyer_pay_amount()));
    paymentInfo.setSubject(asyncVo.getBody());
    paymentInfo.setPaymentStatus(asyncVo.getTrade_status());
    paymentInfo.setCreateTime(new Date());
    paymentInfo.setCallbackTime(asyncVo.getNotify_time());
    //添加到数据库中
    this.paymentInfoService.save(paymentInfo);

    //修改订单状态
    //获取当前状态
    String tradeStatus = asyncVo.getTrade_status();

    if (tradeStatus.equals("TRADE_SUCCESS") || tradeStatus.equals("TRADE_FINISHED")) {
        //支付成功状态
        String orderSn = asyncVo.getOut_trade_no(); //获取订单号
        this.updateOrderStatus(orderSn,OrderStatusEnum.PAYED.getCode(),PayConstant.ALIPAY);
    }

    return "success";
}

/**
     * 修改订单状态
     * @param orderSn
     * @param code
     */
private void updateOrderStatus(String orderSn, Integer code,Integer payType) {
    this.baseMapper.updateOrderStatus(orderSn,code,payType);
}

public class PayConstant {

    public static final Integer ALIPAY = 1;

    public static final Integer WXPAY = 2;

}

6、收单

由于可能出现订单已经过期后,库存已经解锁,但支付成功后再修改订单状态的情况,需要设置支付有效时间,只有在有效期内才能进行支付

代码语言:javascript
复制
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
        + "\"total_amount\":\""+ total_amount +"\","
        + "\"subject\":\""+ subject +"\","
        + "\"body\":\""+ body +"\","
        //设置过期时间为1m
        +"\"timeout_express\":\"1m\","
        + "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");

超时后订单显示

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、RabbitMQ
  • 二、安装 RabbitMQ
  • 三、整合 SpringBoot
    • RabbitMQ 消息确认机制
    • 四、项目搭建
    • 五、订单服务
      • 1、订单流程
        • 2、登录拦截
          • 3、订单确认页
            • 3.1 模型抽取
            • 3.2 数据获取
            • 3.3 Feign 远程调用丢失请求头问题
            • 3.4 Feign 异步情况丢失上下文问题
            • 3.5 运费收件信息获取
          • 4、订单提交
            • 4.1 模型抽取
            • 4.2 提交订单
        • 六、分布式事务
          • 1、分布式事务
            • 2、整合 Spring cloud alibaba Seata
            • 七、使用消息队列实现最终一致性
              • 1、延迟队列的定义与实现
                • 2、延迟队列使用场景
                  • 3、 定时关单与库存解锁主体逻辑
                    • 4、创建业务交换机和队列
                      • 5、库存自动解锁
                        • 5.1 库存锁定
                        • 5.2 监听队列
                        • 5.3 库存解锁
                      • 6、定时关单
                        • 6.1 提交订单
                        • 6.2 监听队列
                        • 6.3 关闭订单
                        • 6.4 解锁库存
                    • 八、支付
                      • 1、支付宝加密原理
                        • 2、配置支付宝沙箱环境
                          • 3、环境搭建
                            • 4、订单支付与同步通知
                              • 5、异步通知
                                • 5.1 内网穿透设置异步通知地址
                                • 5.2 支付包支付异步通知
                              • 6、收单
                              相关产品与服务
                              消息队列 CMQ 版
                              消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档