RabbitMQ 延时队列取消订单

  • 准备工作
  1. 首先需要下载RabbitMQ延迟队列插件,官网下载地址,我本地的RabbitMQ版本是3.12.10所以下了个同版本的延迟队列插件3.12.10插件下载地址
  2. 将下载插件包放到RabbitMQ安装目录下的plugins文件夹中。
  3. 在RabbitMQ目录下的sbin文件夹中打开CMD输入命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange出现如图提示表示安装成功。
  4. 执行重启RabbitMQ命令 RabbitMQ Service - start , RabbitMQ Service - stop 浏览器进入RabbitMQ管理页面http://localhost:15672exchanges 选项中找到 Add a new exchange - type 下拉选中有 x-delayed-message 表示延迟队列插件已经成功安装好了。
  • RabbitMQ 延时队列取消订单
  • 实现思路和之前差不多,创建订单成功后,发一个带有订单信息的延时消息,然后当到达指定时间后,判断一下订单是否未支付。如果已支付 就不做任何处理,如果未支付,就取消订单。
  1. 创建消息队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Configuration
    public class OrderDelayedMessageConfig {
    public static final String DIRECT_QUEUE = "order.queue.direct";//队列
    public static final String DELAYED_EXCHANGE = "order.exchange.delayed";//延迟交换机
    public static final String ROUTING_KEY = "order.routingKey.bind";//绑定的routing-key

    /**
    * 定义队列
    */
    @Bean
    public Queue directQueue() {
    return new Queue(DIRECT_QUEUE, true);
    }

    /**
    * 定义延迟交换机
    * args:根据该参数进行灵活路由,设置为“direct”,意味着该插件具有与直连交换机具有相同的路由行为
    * 交换机类型为 x-delayed-message
    */
    @Bean
    public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
    * 队列和延迟交换机绑定
    */
    @Bean
    public Binding orderBinding() {
    return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }
    }
  2. 创建生产者(下单之后发延迟信息)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    @Slf4j
    @RestController
    @RequestMapping("/order")
    public class FuOrderController {
    @Resource
    private FuOrderService orderService;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private RedisTemplate redisTemplate;

    @Value("${wind.order.exchange}")
    private String orderExchange; //订单交换机

    @Value("${wind.order.routingKey}")
    private String orderRoutingKey; //订单路由key

    @PostMapping("createOrderByDelayMQ")
    public R createOrderByDelayMQ(@RequestBody FuOrder fuOrder, HttpServletRequest request) {
    // 保存订单至数据库
    String token = request.getHeader("token");
    Random random = new Random();
    String orderNo = String.valueOf(random.nextLong()).replace("-", "");
    fuOrder.setOrderNo(Long.valueOf(orderNo));
    fuOrder.setCreateTime(LocalDateTime.now());
    fuOrder.setUserId(Math.toIntExact(JwtUtils.getUserId(token)));
    boolean order = orderService.save(fuOrder);
    if (order) {
    // 将订单ID推送至延迟队列
    rabbitTemplate.convertAndSend(
    OrderDelayedMessageConfig.DELAYED_EXCHANGE,
    OrderDelayedMessageConfig.ROUTING_KEY,
    fuOrder.getId(),
    message -> {
    message.getMessageProperties().setDelay(10000); // 设置延迟时间
    return message;
    });
    return R.ok().message("创建订单成功");
    } else {
    return R.error().message("创建订单失败");
    }
    }
    }
  3. 创建消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Component
    @Slf4j
    @RabbitListener(queues = OrderDelayedMessageConfig.DIRECT_QUEUE)//监听队列名称
    public class OrderDelayConsumer {
    @Resource
    private FuOrderService orderService;

    @RabbitHandler
    public void process(Long orderId) {
    // 取消订单
    FuOrder fuOrder = orderService.getById(orderId);
    FuOrder overdueOrder = new FuOrder();
    BeanUtils.copyProperties(fuOrder,overdueOrder);
    overdueOrder.setStatus(0);
    overdueOrder.setClosetTime(LocalDateTime.now());
    overdueOrder.setUpdateTime(LocalDateTime.now());
    orderService.updateById(overdueOrder);
    log.info("MQ延迟队列过期未支付订单处理结束");
    }
    }