跳转到内容

Fanout Exchange 模型


Fanout Exchange(扇出交换机) 是 RabbitMQ 中最简单的 Exchange 类型,核心行为是广播:将收到的消息无条件复制并发送到所有绑定的队列,完全忽略 Routing Key。

@startuml
left to right direction
actor Publisher
actor "消费者A"
actor "消费者B"
actor "消费者C"
rectangle "Fanout Exchange" {
}
queue "队列A"
queue "队列B"
queue "队列C"
Publisher --> "Fanout Exchange" : 发布消息\n(忽略 routingKey)
"Fanout Exchange" --> "队列A" : 广播
"Fanout Exchange" --> "队列B" : 广播
"Fanout Exchange" --> "队列C" : 广播
"队列A" --> "消费者A"
"队列B" --> "消费者B"
"队列C" --> "消费者C"
@enduml

核心特点:

  • 广播:每个绑定队列都会收到完全相同的消息副本,互相独立;
  • 忽略 Routing Key:Binding Key 填什么都不影响路由结果,通常留空;
  • 高性能:无需匹配路由键,路由逻辑最简单,性能最高;
  • 无绑定即丢弃:若没有任何队列绑定,消息直接丢弃。

2.1. 声明配置(交换机 + 队列 + 绑定)

Section titled “2.1. 声明配置(交换机 + 队列 + 绑定)”

Spring AMQP 推荐通过 @Configuration 配置类统一声明 Exchange、Queue 和 Binding,Spring 容器启动时会自动在 RabbitMQ 中创建对应资源。

@Configuration
public class FanoutConfig {
public static final String EXCHANGE_FANOUT = "camellia.fanout";
public static final String QUEUE_FANOUT_A = "camellia.fanout.queueA";
public static final String QUEUE_FANOUT_B = "camellia.fanout.queueB";
// 声明 Fanout 交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_FANOUT);
}
// 声明队列 A
@Bean
public Queue queueFanoutA() {
return new Queue(QUEUE_FANOUT_A);
}
// 声明队列 B
@Bean
public Queue queueFanoutB() {
return new Queue(QUEUE_FANOUT_B);
}
// 绑定队列 A 到交换机
@Bean
public Binding bindFanoutA(@Qualifier("queueFanoutA") Queue queue,
FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
// 绑定队列 B 到交换机
@Bean
public Binding bindFanoutB(@Qualifier("queueFanoutB") Queue queue,
FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendFanout() {
String exchangeName = FanoutConfig.EXCHANGE_FANOUT;
for (int i = 1; i <= 5; i++) {
String message = "Broadcast Message #" + i;
// Fanout 模式下 routingKey 填空字符串即可,不影响路由
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}

convertAndSend 参数说明:

参数说明
exchange交换机名称
routingKey路由键,Fanout 模式填 "" 即可
message消息体,自动序列化
@Slf4j
@Component
public class FanoutListener {
@RabbitListener(queues = FanoutConfig.QUEUE_FANOUT_A)
public void consumerA(String message) {
log.info("[ConsumerA] Received: {}", message);
}
@RabbitListener(queues = FanoutConfig.QUEUE_FANOUT_B)
public void consumerB(String message) {
log.info("[ConsumerB] Received: {}", message);
}
}

发送一条消息后,ConsumerA 和 ConsumerB 都会收到,各自独立处理:

[ConsumerA] Received: Broadcast Message #1
[ConsumerB] Received: Broadcast Message #1
场景说明
缓存刷新发布商品更新后,同时通知多个服务节点清理本地缓存。
日志收集一条操作日志同时写入审计系统和监控系统。
消息推送系统公告同时推送给多个业务模块。
数据同步主库变更后广播给多个从库或下游服务。