Topic Exchange 模型
1. 什么是 Topic Exchange
Section titled “1. 什么是 Topic Exchange”Topic Exchange(主题交换机) 是 Direct Exchange 的增强版,同样基于 Routing Key 路由,但支持通配符模式匹配,使得一个绑定规则可以匹配多种路由键,路由更加灵活。
Routing Key 和 Binding Key 都使用点号(.)分隔多个单词,例如 order.created.shanghai。
通配符规则:
| 通配符 | 含义 | 示例 |
|---|---|---|
* | 匹配恰好一个单词 | *.critical 匹配 error.critical,但不匹配 error.critical.high |
# | 匹配零个或多个单词 | error.# 匹配 error、error.critical、error.critical.high |
与其他 Exchange 类型的对比:
| Direct Exchange | Topic Exchange | |
|---|---|---|
| 匹配方式 | 精确匹配 | 通配符模式匹配 |
| 灵活性 | 低(一个 Key 对应一个规则) | 高(一个规则覆盖多种 Key) |
| 退化情形 | — | 不含通配符时等价于 Direct;Binding Key 为 # 时等价于 Fanout |
2. 通配符匹配详解
Section titled “2. 通配符匹配详解”以日志系统为例,Routing Key 格式为 {服务}.{级别}:
发送消息: order.error → 订单服务的错误日志 payment.info → 支付服务的普通日志 order.info → 订单服务的普通日志
Binding Key 匹配情况: order.* → 匹配 order.error ✓ order.info ✓ payment.info ✗ *.error → 匹配 order.error ✓ payment.info ✗ # → 匹配所有 ✓ order.error → 仅匹配 order.error ✓(退化为 Direct)3. 使用 Spring AMQP 实现
Section titled “3. 使用 Spring AMQP 实现”3.1. 配置类声明式实现
Section titled “3.1. 配置类声明式实现”3.1.1. 完整配置类
Section titled “3.1.1. 完整配置类”@Configurationpublic class TopicConfig {
public static final String EXCHANGE_TOPIC = "camellia.topic"; public static final String QUEUE_TOPIC_A = "camellia.topic.queueA"; public static final String QUEUE_TOPIC_B = "camellia.topic.queueB";
// 声明 Topic 交换机(注意返回类型是 TopicExchange) @Bean public TopicExchange topicExchange() { return new TopicExchange(EXCHANGE_TOPIC); }
// 声明队列 A:接收所有 topic. 开头的消息 @Bean public Queue topicQueueA() { return new Queue(QUEUE_TOPIC_A); }
// 声明队列 B:接收所有以 .topic 结尾的消息 @Bean public Queue topicQueueB() { return new Queue(QUEUE_TOPIC_B); }
// 绑定队列 A,Binding Key = "topic.#" @Bean public Binding bindTopicA(@Qualifier("topicQueueA") Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("topic.#"); }
// 绑定队列 B,Binding Key = "*.topic" @Bean public Binding bindTopicB(@Qualifier("topicQueueB") Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("*.topic"); }}3.1.2. 生产者
Section titled “3.1.2. 生产者”@Autowiredprivate RabbitTemplate rabbitTemplate;
public void sendTopic() { String exchange = TopicConfig.EXCHANGE_TOPIC;
// "topic.A1" → 匹配 "topic.#",queueA 收到 rabbitTemplate.convertAndSend(exchange, "topic.A1", "消息1");
// "B1.topic" → 匹配 "*.topic",queueB 收到 rabbitTemplate.convertAndSend(exchange, "B1.topic", "消息2");
// "topic.A1.extra" → 仍匹配 "topic.#",queueA 收到;不匹配 "*.topic" rabbitTemplate.convertAndSend(exchange, "topic.A1.extra", "消息3");
// "other.msg" → 无匹配,消息丢弃 rabbitTemplate.convertAndSend(exchange, "other.msg", "消息4");}3.1.3. 消费者
Section titled “3.1.3. 消费者”@Slf4j@Componentpublic class TopicListener {
@RabbitListener(queues = TopicConfig.QUEUE_TOPIC_A) public void consumerA(String message) { log.info("[QueueA - topic.#] Received: {}", message); }
@RabbitListener(queues = TopicConfig.QUEUE_TOPIC_B) public void consumerB(String message) { log.info("[QueueB - *.topic] Received: {}", message); }}3.2. 注解声明式实现
Section titled “3.2. 注解声明式实现”@Slf4j@Componentpublic class TopicAnnotationListener {
public static final String EXCHANGE_TOPIC = "camellia.topic_annotation";
// 队列 A:匹配中间含 topic 的三段路由键,或以 .topic. 为中间段 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "camellia.topic_annotation.queueA"), exchange = @Exchange(name = EXCHANGE_TOPIC, type = ExchangeTypes.TOPIC), key = {"*.topic.#"} // 如 a.topic.b、a.topic.b.c )) public void consumerA(String message) { log.info("[ConsumerA] Received: {}", message); }
// 队列 B:匹配 topic. 开头或以 .topic 结尾 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "camellia.topic_annotation.queueB"), exchange = @Exchange(name = EXCHANGE_TOPIC, type = ExchangeTypes.TOPIC), key = {"topic.#", "*.topic"} )) public void consumerB(String message) { log.info("[ConsumerB] Received: {}", message); }}