跳转到内容

消息转换器


消息转换器(Message Converter) 是 Spring AMQP 中负责 Java 对象 ↔ AMQP 消息 相互转换的组件,在 RabbitTemplate 发送和 @RabbitListener 接收消息时自动介入。

发送:Java 对象 ──[MessageConverter]──▶ byte[] ──▶ RabbitMQ
接收:RabbitMQ ──▶ byte[] ──[MessageConverter]──▶ Java 对象

Spring AMQP 内置了多种转换器,默认使用 SimpleMessageConverter

SimpleMessageConverter 是 Spring AMQP 的默认转换器,支持以下类型:

消息类型转换方式
String直接作为消息 body,编码为字节数组
byte[]直接使用,不做额外处理
其他 Java 对象JDK 序列化,转为字节数组

JDK 序列化的问题:

  • 序列化后体积大,网络传输开销高;
  • 消息内容是二进制,在 RabbitMQ 管理台看到的是乱码/Base64 编码,无法直观调试;
  • 强依赖 Java 平台,跨语言消费时无法反序列化;
  • 发送的对象类必须实现 Serializable 接口;

3. Jackson2JsonMessageConverter(推荐)

Section titled “3. Jackson2JsonMessageConverter(推荐)”

使用 Jackson 将 Java 对象序列化为 JSON 字符串,可读性强,跨语言兼容,是生产环境中最常用的转换器。

RabbitMQ 管理台中消息内容对比:

转换器消息内容示例Content-Type
SimpleMessageConverterrO0ABXNy...(Base64)application/x-java-serialized-object
Jackson2JsonMessageConverter{"name":"Camellia","age":18}application/json

spring-boot-starter-amqp 本身不包含 Jackson,需要单独引入:

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

在任意 @Configuration 类中声明为 Bean,Spring AMQP 会自动替换默认的 SimpleMessageConverter

@Bean
public MessageConverter messageConverter() {
// 开启消息类型头信息,方便消费者自动反序列化
return new Jackson2JsonMessageConverter();
}

生产者和消费者服务都需要注册,否则一端序列化为 JSON,另一端仍用 JDK 方式反序列化会报错。

@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
String queueName = "camellia.work.queue";
// 直接发送 Map
Map<String, Object> message = Map.of("name", "Camellia", "age", 18);
rabbitTemplate.convertAndSend(queueName, message);
// 也可以发送自定义 POJO(无需实现 Serializable)
User user = new User("Camellia", 18);
rabbitTemplate.convertAndSend(queueName, user);
}
@Slf4j
@Component
public class WorkQueueListener {
public static final String QUEUE_WORK = "camellia.work.queue";
// 接收 Map 类型
@RabbitListener(queues = QUEUE_WORK)
public void consumer1(Map<String, Object> message) {
log.info("[Consumer1] Received: {}", message);
}
// 接收自定义 POJO(Jackson 自动反序列化)
@RabbitListener(queues = QUEUE_WORK)
public void consumer2(User user) {
log.info("[Consumer2] Received: {}", user);
}
}