RabbitMQ实现订单30分钟超时自动关闭

RabbitMQ实现订单30分钟超时自动关闭
彼岸的風- 实现原理
生成订单时,往MQ传递一个消息设置有效期30分钟,再该信息失效的时候(没有被消费的情况下),执行客户端的一个方法来告知我们信息已经失效(消息过期投递到死信队列),这时候我们在对订单做处理。
- 声明死信队列相关的属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class OrderDlxConfig {
/**
* 订单交换机
*/
private String orderExchange;
/**
* 订单队列
*/
private String orderQueue;
/**
* 订单路由key
*/
private String orderRoutingKey;
/**
* 死信交换机
*/
private String dlxExchange;
/**
* 死信队列
*/
private String dlxQueue;
/**
* 死信路由
*/
private String dlxRoutingKey;
public Queue orderQueue(){
Map<String, Object> args = new HashMap<>(2);
// 绑定我们的死信交换机
args.put("x-dead-letter-exchange", dlxExchange);
// 绑定我们的路由key
args.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, args);
}
public Queue orderDeadQueue(){
return new Queue(dlxQueue);
}
//绑定交换机
public DirectExchange orderExchange(){
return new DirectExchange(orderExchange);
}
public DirectExchange orderDeadExchange(){
return new DirectExchange(dlxExchange);
}
//绑定路由键
public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
}
//绑定死信队列到死信交换机
public Binding deadBindingExchange(Queue orderDeadQueue, DirectExchange orderDeadExchange) {
return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with(dlxRoutingKey);
}
} - 添加application.yml的相关配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: fu99999
password: fu99999
# MQ
wind:
# 死信队列
dlx:
exchange: wind_order_dlx_exchange
queue: wind_order_dlx_queue
routingKey: dlx
# 订单队列
order:
exchange: wind_order_exchange
queue: wind_order_queue
routingKey: order - 创建生产者(设置10秒过期,验证消息是否加入死信队列)
- 在创建订单后使用 rabbitTemplate.convertAndSend() 发送消息到RabbitMQ,将订单号作为消息发送到指定的交换机(orderExchange)和路由键(orderRoutingKey)
- messagePostProcessor(): 定义了一个消息后处理器,用于设置消息的属性,通过message.getMessageProperties().setExpiration(“10000”);设置过期时间10秒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class FuOrderController {
private FuOrderService orderService;
private RabbitTemplate rabbitTemplate;
private RedisTemplate redisTemplate;
private String orderExchange; //订单交换机
private String orderRoutingKey; //订单路由key
public R createOrderByMQ( { FuOrder fuOrder, HttpServletRequest request)
String token = request.getHeader("token");
Random random = new Random();
String orderNo = String.valueOf(random.nextLong()).replace("-", "");
fuOrder.setOrderNo(Long.valueOf(orderNo));
fuOrder.setCreateTime(LocalDateTime.now());
fuOrder.setUserId(Math.toIntExact(JwtUtils.getUserId(token)));
boolean order = orderService.save(fuOrder);
if (order) {
// rabbit投递消息
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, fuOrder.getId(), messagePostProcessor());
return R.ok().message("创建订单成功");
} else {
return R.error().message("创建订单失败");
}
}
/**
* 处理待发送消息MQ
*/
private MessagePostProcessor messagePostProcessor() {
return new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
//设置有效期30分钟
//message.getMessageProperties().setExpiration("1800000");
message.getMessageProperties().setExpiration("10000");//10秒
return message;
}
};
}
}
- 创建死信消费者和消费者过期信息
- 死信队列消费方法:
@RabbitListener(queues = “mayikt_order_dlx_queue”): 使用 @RabbitListener 注解标记了一个监听名为 “mayikt_order_dlx_queue” 的死信队列的方法。
public void orderConsumer(String orderId): 该方法接收从死信队列中获取的消息,消息内容为订单号 orderId。 - 然后处理未支付情况下的相关业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class OrderDlxConsumer {
private FuOrderService orderService;
/**
* 监听我们的死信队列
*/
public void orderConsumer(Long orderId) {
log.info("死信队列获取消息订单ID:" + orderId);
if (StringUtils.isEmpty(orderId.toString())) {
return;
}
FuOrder orderEntity = orderService.getById(orderId);
if (null == orderEntity) {
return;
}
Integer orderStatus = orderEntity.getStatus();
// 判断未支付,取消订单
if (1 == orderStatus) {
FuOrder orderInfo = new FuOrder();
BeanUtils.copyProperties(orderEntity, orderInfo);
orderInfo.setStatus(0);
orderInfo.setClosetTime(LocalDateTime.now());
orderInfo.setUpdateTime(LocalDateTime.now());
orderService.updateById(orderInfo);
}
log.info("MQ死信队列过期未支付订单处理结束");
}
}
评论
匿名评论隐私政策
✅ 你无需删除空行,直接评论以获取最佳展示效果