工作消息队列
1. 什么是工作消息队列
Section titled “1. 什么是工作消息队列”工作消息队列(Work Queue),也称为任务队列(Task Queue),是在基本消息队列的基础上引入多个消费者的模型。它将耗时任务分发给多个 Worker 并行处理,以提高系统吞吐量和并发能力。
与基本消息队列的区别:
| 基本消息队列 | 工作消息队列 | |
|---|---|---|
| 消费者数量 | 1 个 | 多个(竞争消费) |
| 适用场景 | 简单点对点传递 | 高并发任务分发 |
| 任务分配 | — | 轮询 / 公平分发 |
核心特点:
- 生产者(Producer) 将多个任务(消息)发送到队列中;
- 多个消费者(Worker) 监听同一队列,竞争性地消费任务;
- RabbitMQ 自动分配任务给 Worker,默认采用轮询(Round-robin) 调度;
- 启用消息确认后,只有 Worker 处理完并发送 ACK,消息才会从队列删除。
┌──▶ Consumer 1Producer ──▶ Queue └──▶ Consumer 22. 任务分配策略
Section titled “2. 任务分配策略”2.1. 轮询分发(Round-robin Dispatching)
Section titled “2.1. 轮询分发(Round-robin Dispatching)”RabbitMQ 默认采用轮询机制,按顺序将消息依次分配给每个 Worker:
消息 1 → Worker1消息 2 → Worker2消息 3 → Worker1消息 4 → Worker2...2.2. 公平分发(Fair Dispatch)
Section titled “2.2. 公平分发(Fair Dispatch)”通过设置 prefetchCount = 1,让 RabbitMQ 等待 Worker 处理完当前消息并 ACK 后,才分配下一条消息,处理能力强的 Worker 自然会消费更多消息。
Worker1(慢): 消息1 ──处理中──▶ ACK ──▶ 消息3Worker2(快): 消息2 ──▶ ACK ──▶ 消息4 ──▶ ACK ──▶ 消息5 ...配置方式(二选一):
方式一:YAML 配置(推荐)
spring: rabbitmq: listener: simple: prefetch: 1 # 每次预取 1 条消息方式二:代码配置
@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); return factory;}3. 使用 Spring AMQP 实现
Section titled “3. 使用 Spring AMQP 实现”3.1. 添加依赖
Section titled “3.1. 添加依赖”<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>${spring-boot-starter-amqp}</version></dependency>3.2. RabbitMQ 配置
Section titled “3.2. RabbitMQ 配置”spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: / username: admin password: 123456 listener: simple: prefetch: 1 # 开启公平分发3.3. 发送消息(生产者)
Section titled “3.3. 发送消息(生产者)”@RunWith(SpringRunner.class)@SpringBootTestpublic class PublisherTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void sendMessage() { String queueName = "work.queue"; for (int i = 1; i <= 20; i++) { rabbitTemplate.convertAndSend(queueName, "Message #" + i); } }}3.4. 接收消息(消费者)
Section titled “3.4. 接收消息(消费者)”@Componentpublic class WorkQueueListener {
// 消费者 1:模拟处理较慢 @RabbitListener(queues = "work.queue") public void consumer1(String message) throws InterruptedException { System.out.println(LocalDateTime.now() + " [Consumer1] Received: " + message); Thread.sleep(20); // 模拟慢处理 }
// 消费者 2:模拟处理较快 @RabbitListener(queues = "work.queue") public void consumer2(String message) throws InterruptedException { System.err.println(LocalDateTime.now() + " [Consumer2] Received: " + message); Thread.sleep(200); // 模拟快处理 }}