跳转到内容

Topic Exchange 模型


Topic Exchange(主题交换机) 是 Direct Exchange 的增强版,同样基于 Routing Key 路由,但支持通配符模式匹配,使得一个绑定规则可以匹配多种路由键,路由更加灵活。

Routing Key 和 Binding Key 都使用点号(.)分隔多个单词,例如 order.created.shanghai

通配符规则:

通配符含义示例
*匹配恰好一个单词*.critical 匹配 error.critical,但不匹配 error.critical.high
#匹配零个或多个单词error.# 匹配 errorerror.criticalerror.critical.high

与其他 Exchange 类型的对比:

Direct ExchangeTopic Exchange
匹配方式精确匹配通配符模式匹配
灵活性低(一个 Key 对应一个规则)高(一个规则覆盖多种 Key)
退化情形不含通配符时等价于 Direct;Binding Key 为 # 时等价于 Fanout

以日志系统为例,Routing Key 格式为 {服务}.{级别}

发送消息:
order.error → 订单服务的错误日志
payment.info → 支付服务的普通日志
order.info → 订单服务的普通日志
Binding Key 匹配情况:
order.* → 匹配 order.error ✓ order.info ✓ payment.info ✗
*.error → 匹配 order.error ✓ payment.info ✗
# → 匹配所有 ✓
order.error → 仅匹配 order.error ✓(退化为 Direct)
@Configuration
public class TopicConfig {
public static final String EXCHANGE_TOPIC = "camellia.topic";
public static final String QUEUE_TOPIC_A = "camellia.topic.queueA";
public static final String QUEUE_TOPIC_B = "camellia.topic.queueB";
// 声明 Topic 交换机(注意返回类型是 TopicExchange)
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_TOPIC);
}
// 声明队列 A:接收所有 topic. 开头的消息
@Bean
public Queue topicQueueA() {
return new Queue(QUEUE_TOPIC_A);
}
// 声明队列 B:接收所有以 .topic 结尾的消息
@Bean
public Queue topicQueueB() {
return new Queue(QUEUE_TOPIC_B);
}
// 绑定队列 A,Binding Key = "topic.#"
@Bean
public Binding bindTopicA(@Qualifier("topicQueueA") Queue queue,
TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("topic.#");
}
// 绑定队列 B,Binding Key = "*.topic"
@Bean
public Binding bindTopicB(@Qualifier("topicQueueB") Queue queue,
TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("*.topic");
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTopic() {
String exchange = TopicConfig.EXCHANGE_TOPIC;
// "topic.A1" → 匹配 "topic.#",queueA 收到
rabbitTemplate.convertAndSend(exchange, "topic.A1", "消息1");
// "B1.topic" → 匹配 "*.topic",queueB 收到
rabbitTemplate.convertAndSend(exchange, "B1.topic", "消息2");
// "topic.A1.extra" → 仍匹配 "topic.#",queueA 收到;不匹配 "*.topic"
rabbitTemplate.convertAndSend(exchange, "topic.A1.extra", "消息3");
// "other.msg" → 无匹配,消息丢弃
rabbitTemplate.convertAndSend(exchange, "other.msg", "消息4");
}
@Slf4j
@Component
public class TopicListener {
@RabbitListener(queues = TopicConfig.QUEUE_TOPIC_A)
public void consumerA(String message) {
log.info("[QueueA - topic.#] Received: {}", message);
}
@RabbitListener(queues = TopicConfig.QUEUE_TOPIC_B)
public void consumerB(String message) {
log.info("[QueueB - *.topic] Received: {}", message);
}
}
@Slf4j
@Component
public class TopicAnnotationListener {
public static final String EXCHANGE_TOPIC = "camellia.topic_annotation";
// 队列 A:匹配中间含 topic 的三段路由键,或以 .topic. 为中间段
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "camellia.topic_annotation.queueA"),
exchange = @Exchange(name = EXCHANGE_TOPIC, type = ExchangeTypes.TOPIC),
key = {"*.topic.#"} // 如 a.topic.b、a.topic.b.c
))
public void consumerA(String message) {
log.info("[ConsumerA] Received: {}", message);
}
// 队列 B:匹配 topic. 开头或以 .topic 结尾
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "camellia.topic_annotation.queueB"),
exchange = @Exchange(name = EXCHANGE_TOPIC, type = ExchangeTypes.TOPIC),
key = {"topic.#", "*.topic"}
))
public void consumerB(String message) {
log.info("[ConsumerB] Received: {}", message);
}
}