RabbitMQ实现订单30分钟超时自动关闭

  • 实现原理
    生成订单时,往MQ传递一个消息设置有效期30分钟,再该信息失效的时候(没有被消费的情况下),执行客户端的一个方法来告知我们信息已经失效(消息过期投递到死信队列),这时候我们在对订单做处理。
  1. 声明死信队列相关的属性
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    @Configuration
    public class OrderDlxConfig {
    /**
    * 订单交换机
    */
    @Value("${wind.order.exchange}")
    private String orderExchange;
    /**
    * 订单队列
    */
    @Value("${wind.order.queue}")
    private String orderQueue;
    /**
    * 订单路由key
    */
    @Value("${wind.order.routingKey}")
    private String orderRoutingKey;
    /**
    * 死信交换机
    */
    @Value("${wind.dlx.exchange}")
    private String dlxExchange;
    /**
    * 死信队列
    */
    @Value("${wind.dlx.queue}")
    private String dlxQueue;
    /**
    * 死信路由
    */
    @Value("${wind.dlx.routingKey}")
    private String dlxRoutingKey;

    @Bean
    public Queue orderQueue(){
    Map<String, Object> args = new HashMap<>(2);
    // 绑定我们的死信交换机
    args.put("x-dead-letter-exchange", dlxExchange);
    // 绑定我们的路由key
    args.put("x-dead-letter-routing-key", dlxRoutingKey);
    return new Queue(orderQueue, true, false, false, args);
    }
    @Bean
    public Queue orderDeadQueue(){
    return new Queue(dlxQueue);
    }
    //绑定交换机
    @Bean
    public DirectExchange orderExchange(){
    return new DirectExchange(orderExchange);
    }
    @Bean
    public DirectExchange orderDeadExchange(){
    return new DirectExchange(dlxExchange);
    }

    //绑定路由键
    @Bean
    public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {
    return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
    }
    //绑定死信队列到死信交换机
    @Bean
    public Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {
    return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(dlxRoutingKey);
    }
    }
  2. 添加application.yml的相关配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    spring:

    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: fu99999
    password: fu99999
    # MQ
    wind:
    # 死信队列
    dlx:
    exchange: wind_order_dlx_exchange
    queue: wind_order_dlx_queue
    routingKey: dlx
    # 订单队列
    order:
    exchange: wind_order_exchange
    queue: wind_order_queue
    routingKey: order
  3. 创建生产者(设置10秒过期,验证消息是否加入死信队列)
  • 在创建订单后使用 rabbitTemplate.convertAndSend() 发送消息到RabbitMQ,将订单号作为消息发送到指定的交换机(orderExchange)和路由键(orderRoutingKey)
  • messagePostProcessor(): 定义了一个消息后处理器,用于设置消息的属性,通过message.getMessageProperties().setExpiration(“10000”);设置过期时间10秒
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    @Slf4j
    @RestController
    @RequestMapping("/order")
    public class FuOrderController {
    @Resource
    private FuOrderService orderService;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private RedisTemplate redisTemplate;

    @Value("${wind.order.exchange}")
    private String orderExchange; //订单交换机

    @Value("${wind.order.routingKey}")
    private String orderRoutingKey; //订单路由key

    @PostMapping("createOrderByMQ")
    public R createOrderByMQ(@RequestBody FuOrder fuOrder, HttpServletRequest request) {
    String token = request.getHeader("token");
    Random random = new Random();
    String orderNo = String.valueOf(random.nextLong()).replace("-", "");
    fuOrder.setOrderNo(Long.valueOf(orderNo));
    fuOrder.setCreateTime(LocalDateTime.now());
    fuOrder.setUserId(Math.toIntExact(JwtUtils.getUserId(token)));
    boolean order = orderService.save(fuOrder);
    if (order) {
    // rabbit投递消息
    rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, fuOrder.getId(), messagePostProcessor());
    return R.ok().message("创建订单成功");
    } else {
    return R.error().message("创建订单失败");
    }
    }

    /**
    * 处理待发送消息MQ
    */
    private MessagePostProcessor messagePostProcessor() {
    return new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
    //设置有效期30分钟
    //message.getMessageProperties().setExpiration("1800000");
    message.getMessageProperties().setExpiration("10000");//10秒
    return message;
    }
    };
    }
    }
  1. 创建死信消费者和消费者过期信息
  • 死信队列消费方法:
    @RabbitListener(queues = “mayikt_order_dlx_queue”): 使用 @RabbitListener 注解标记了一个监听名为 “mayikt_order_dlx_queue” 的死信队列的方法。
    public void orderConsumer(String orderId): 该方法接收从死信队列中获取的消息,消息内容为订单号 orderId。
  • 然后处理未支付情况下的相关业务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Slf4j
    @Component
    @RabbitListener(
    bindings = @QueueBinding(value = @Queue("wind_order_dlx_queue"),
    exchange = @Exchange(value = "wind_order_dlx_exchange", type = ExchangeTypes.DIRECT),
    key = "dlx"))
    public class OrderDlxConsumer {
    @Resource
    private FuOrderService orderService;

    /**
    * 监听我们的死信队列
    */
    @RabbitHandler
    public void orderConsumer(Long orderId) {
    log.info("死信队列获取消息订单ID:" + orderId);
    if (StringUtils.isEmpty(orderId.toString())) {
    return;
    }
    FuOrder orderEntity = orderService.getById(orderId);
    if (null == orderEntity) {
    return;
    }
    Integer orderStatus = orderEntity.getStatus();
    // 判断未支付,取消订单
    if (1 == orderStatus) {
    FuOrder orderInfo = new FuOrder();
    BeanUtils.copyProperties(orderEntity, orderInfo);
    orderInfo.setStatus(0);
    orderInfo.setClosetTime(LocalDateTime.now());
    orderInfo.setUpdateTime(LocalDateTime.now());
    orderService.updateById(orderInfo);
    }
    log.info("MQ死信队列过期未支付订单处理结束");
    }
    }