跳转到内容

工作消息队列


工作消息队列(Work Queue),也称为任务队列(Task Queue),是在基本消息队列的基础上引入多个消费者的模型。它将耗时任务分发给多个 Worker 并行处理,以提高系统吞吐量和并发能力。

与基本消息队列的区别:

基本消息队列工作消息队列
消费者数量1 个多个(竞争消费)
适用场景简单点对点传递高并发任务分发
任务分配轮询 / 公平分发

核心特点:

  • 生产者(Producer) 将多个任务(消息)发送到队列中;
  • 多个消费者(Worker) 监听同一队列,竞争性地消费任务;
  • RabbitMQ 自动分配任务给 Worker,默认采用轮询(Round-robin) 调度;
  • 启用消息确认后,只有 Worker 处理完并发送 ACK,消息才会从队列删除。
┌──▶ Consumer 1
Producer ──▶ Queue
└──▶ Consumer 2

2.1. 轮询分发(Round-robin Dispatching)

Section titled “2.1. 轮询分发(Round-robin Dispatching)”

RabbitMQ 默认采用轮询机制,按顺序将消息依次分配给每个 Worker:

消息 1 → Worker1
消息 2 → Worker2
消息 3 → Worker1
消息 4 → Worker2
...

通过设置 prefetchCount = 1,让 RabbitMQ 等待 Worker 处理完当前消息并 ACK 后,才分配下一条消息,处理能力强的 Worker 自然会消费更多消息。

Worker1(慢): 消息1 ──处理中──▶ ACK ──▶ 消息3
Worker2(快): 消息2 ──▶ ACK ──▶ 消息4 ──▶ ACK ──▶ 消息5 ...

配置方式(二选一):

方式一:YAML 配置(推荐)

spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次预取 1 条消息

方式二:代码配置

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
return factory;
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot-starter-amqp}</version>
</dependency>
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: 123456
listener:
simple:
prefetch: 1 # 开启公平分发
@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage() {
String queueName = "work.queue";
for (int i = 1; i <= 20; i++) {
rabbitTemplate.convertAndSend(queueName, "Message #" + i);
}
}
}
@Component
public class WorkQueueListener {
// 消费者 1:模拟处理较慢
@RabbitListener(queues = "work.queue")
public void consumer1(String message) throws InterruptedException {
System.out.println(LocalDateTime.now() + " [Consumer1] Received: " + message);
Thread.sleep(20); // 模拟慢处理
}
// 消费者 2:模拟处理较快
@RabbitListener(queues = "work.queue")
public void consumer2(String message) throws InterruptedException {
System.err.println(LocalDateTime.now() + " [Consumer2] Received: " + message);
Thread.sleep(200); // 模拟快处理
}
}