跳转到内容

Direct Exchange 模型


Direct Exchange(直连交换机) 根据消息的 Routing Key 与队列的 Binding Key 进行精确匹配,只有两者完全相等时,消息才会被路由到对应队列。

@startuml
actor Publisher as P
actor 错误日志消费者 as C1
actor 普通日志消费者 as C2
rectangle "Direct 交换机" {
P -> "Direct 交换机" : 发送消息\nroutingKey: error / info
}
queue "error队列" as Q1
queue "info队列" as Q2
"Direct 交换机" -> Q1 : routingKey = error
"Direct 交换机" -> Q2 : routingKey = info
Q1 -> C1
Q2 -> C2
@enduml

与 Fanout 的核心区别:

Fanout ExchangeDirect Exchange
路由方式广播,忽略 Key精确匹配 Routing Key
消息投递所有绑定队列都收到只有 Key 匹配的队列收到
适用场景全局通知按类型分类路由

两个特殊情况:

  • 一个队列绑定多个 Key:该队列会接收任意一个 Key 匹配的消息
  • 多个队列绑定相同 Key:效果等同于 Fanout,多个队列同时收到消息
@Configuration
public class DirectConfig {
public static final String EXCHANGE_DIRECT = "camellia.direct";
public static final String QUEUE_DIRECT_A = "camellia.direct.queueA";
public static final String QUEUE_DIRECT_B = "camellia.direct.queueB";
// 声明 Direct 交换机(注意返回类型是 DirectExchange,不是 Exchange)
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_DIRECT);
}
// 声明队列 A
@Bean
public Queue directQueueA() {
return new Queue(QUEUE_DIRECT_A);
}
// 声明队列 B
@Bean
public Queue directQueueB() {
return new Queue(QUEUE_DIRECT_B);
}
// 绑定队列 A,Binding Key = "error"
@Bean
public Binding bindDirectA(@Qualifier("directQueueA") Queue queue,
DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("error");
}
// 绑定队列 B,Binding Key = "info"
@Bean
public Binding bindDirectB(@Qualifier("directQueueB") Queue queue,
DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("info");
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDirect(String routingKey, String message) {
// 第二个参数是 Routing Key,Direct 模式下决定消息去哪个队列。
rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE_DIRECT, routingKey, message);
}
// 调用示例
sendDirect("error", "数据库连接失败"); // → 路由到 queueA
sendDirect("info", "用户登录成功"); // → 路由到 queueB
sendDirect("debug", "调试信息"); // → 无匹配队列,消息丢弃
@Slf4j
@Component
public class DirectListener {
@RabbitListener(queues = DirectConfig.QUEUE_DIRECT_A)
public void consumerA(String message) {
log.error("[ERROR队列] Received: {}", message);
}
@RabbitListener(queues = DirectConfig.QUEUE_DIRECT_B)
public void consumerB(String message) {
log.info("[INFO队列] Received: {}", message);
}
}

无需单独编写配置类,直接在 @RabbitListener 中通过注解声明交换机、队列和绑定关系,Spring 容器启动时自动完成创建。

@Slf4j
@Component
public class DirectAnnotationListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "camellia.direct.queueA"),
exchange = @Exchange(name = "camellia.direct2", type = ExchangeTypes.DIRECT),
key = {"error", "critical"} // 该队列接收 error 和 critical 两种路由键
))
public void consumerA(String message) {
log.error("[ConsumerA] Received: {}", message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "camellia.direct.queueB"),
exchange = @Exchange(name = "camellia.direct2", type = ExchangeTypes.DIRECT),
key = {"info", "warn"}
))
public void consumerB(String message) {
log.info("[ConsumerB] Received: {}", message);
}
}

注解说明:

注解作用
@QueueBinding整合声明队列、交换机和绑定关系
@Queue声明队列,name 指定队列名
@Exchange声明交换机,type 指定类型
key绑定键(Binding Key),支持数组,即一个队列绑定多个 Key

生产者无需任何改动,正常调用 convertAndSend 即可。

配置类方式注解方式
代码位置集中在 @Configuration分散在各消费者类
可读性清晰,拓扑结构一目了然简洁,但队列关系较分散
适合场景队列较多、结构复杂队列少、快速开发
推荐程度✅ 生产环境推荐✅ 原型/小项目推荐