Spring Kafka 实战
1. 添加依赖与基础配置
Section titled “1. 添加依赖与基础配置”1.1. 添加依赖
Section titled “1.1. 添加依赖”<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>
<!-- 发送/接收 JSON 消息需要 Jackson --><dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId></dependency>1.2. application.yml 基础配置
Section titled “1.2. application.yml 基础配置”spring: kafka: bootstrap-servers: localhost:9092 # Kafka 服务地址
# 生产者配置 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all # 可靠性级别:0 / 1 / all(详见可靠性章节) retries: 3 # 发送失败重试次数 batch-size: 16384 # 批量发送大小(字节),默认 16KB linger-ms: 5 # 批量等待时间(ms),等待期间积累消息一起发送
# 消费者配置 consumer: group-id: my-consumer-group # 消费者组 ID key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest # 新消费者组首次消费策略 enable-auto-commit: true # 是否自动提交 Offset(默认 true) auto-commit-interval: 1000 # 自动提交间隔(ms) properties: spring.json.trusted.packages: "*" # 允许反序列化的包(生产环境建议指定具体包名)
# 监听器配置 listener: ack-mode: batch # Offset 提交模式(手动 ACK 时使用 manual)2. Topic 管理
Section titled “2. Topic 管理”2.1. 通过配置类创建 Topic
Section titled “2.1. 通过配置类创建 Topic”Spring Kafka 提供 NewTopic Bean,应用启动时自动在 Kafka 中创建 Topic(已存在则跳过):
@Configurationpublic class KafkaTopicConfig {
@Bean public NewTopic orderTopic() { return TopicBuilder.name("order.events") .partitions(3) // 分区数 .replicas(1) // 副本数(单机开发环境用 1) .build(); }
@Bean public NewTopic userTopic() { return TopicBuilder.name("user.events") .partitions(2) .replicas(1) .build(); }}3. 生产者(发送消息)
Section titled “3. 生产者(发送消息)”Spring Kafka 提供 KafkaTemplate 作为发送消息的核心模板类,类似 RabbitTemplate。
3.1. 发送字符串消息
Section titled “3.1. 发送字符串消息”@Slf4j@RestController@RequestMapping("/kafka")public class KafkaProducerController {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
// 最简单的发送:只指定 Topic @GetMapping("/send") public void send() { kafkaTemplate.send("order.events", "Hello Kafka!"); }}3.2. 发送 POJO 对象
Section titled “3.2. 发送 POJO 对象”@Data@AllArgsConstructor@NoArgsConstructorpublic class OrderEvent { private String orderId; private String status; private BigDecimal amount;}@GetMapping("/sendOrder")public void sendOrder() { OrderEvent event = new OrderEvent("ORDER-001", "CREATED", new BigDecimal("99.99")); // 指定 Topic + Key + 消息体 // Key 相同的消息会路由到同一 Partition,保证顺序 kafkaTemplate.send("order.events", event.getOrderId(), event);}3.3. 指定分区发送
Section titled “3.3. 指定分区发送”@GetMapping("/sendToPartition")public void sendToPartition() { OrderEvent event = new OrderEvent("ORDER-002", "PAID", new BigDecimal("199.99")); // 参数:topic, partition, key, value kafkaTemplate.send("order.events", 0, event.getOrderId(), event);}3.4. 异步发送(带回调)
Section titled “3.4. 异步发送(带回调)”KafkaTemplate.send() 返回 CompletableFuture,可以注册回调处理成功/失败:
@GetMapping("/sendAsync")public void sendAsync() { OrderEvent event = new OrderEvent("ORDER-003", "SHIPPED", new BigDecimal("59.99"));
kafkaTemplate.send("order.events", event.getOrderId(), event) .whenComplete((result, ex) -> { if (ex == null) { // 发送成功 RecordMetadata metadata = result.getRecordMetadata(); log.info("发送成功 → Topic: {}, Partition: {}, Offset: {}", metadata.topic(), metadata.partition(), metadata.offset()); } else { // 发送失败 log.error("发送失败: {}", ex.getMessage()); } });}3.5. 同步发送
Section titled “3.5. 同步发送”@GetMapping("/sendSync")public void sendSync() throws Exception { OrderEvent event = new OrderEvent("ORDER-004", "COMPLETED", new BigDecimal("299.99")); // .get() 阻塞等待发送结果,确保消息已被 Broker 确认 SendResult<String, Object> result = kafkaTemplate.send("order.events", event.getOrderId(), event).get(); log.info("同步发送成功,Offset: {}", result.getRecordMetadata().offset());}4. 消费者(接收消息)
Section titled “4. 消费者(接收消息)”4.1. 基础消费
Section titled “4.1. 基础消费”@Slf4j@Componentpublic class OrderEventListener {
@KafkaListener(topics = "order.events", groupId = "order-group") public void onMessage(OrderEvent event) { log.info("收到订单事件: {}", event); }}4.2. 获取消息元数据
Section titled “4.2. 获取消息元数据”通过 ConsumerRecord 可以获取消息的完整元数据(Topic、Partition、Offset、Key 等):
@KafkaListener(topics = "order.events", groupId = "order-group")public void onMessageWithMeta(ConsumerRecord<String, OrderEvent> record) { log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}", record.topic(), record.partition(), record.offset(), record.key()); log.info("消息内容: {}", record.value());}4.3. 批量消费
Section titled “4.3. 批量消费”将 List<ConsumerRecord> 作为参数,一次处理多条消息,提升吞吐量:
// application.yml 中开启批量监听// listener:// type: batch
@KafkaListener(topics = "order.events", groupId = "order-group-batch")public void onBatchMessage(List<ConsumerRecord<String, OrderEvent>> records) { log.info("批量收到 {} 条消息", records.size()); records.forEach(record -> log.info("处理消息: {}", record.value()));}4.4. 监听多个 Topic
Section titled “4.4. 监听多个 Topic”@KafkaListener(topics = {"order.events", "user.events"}, groupId = "multi-group")public void onMultiTopicMessage(ConsumerRecord<String, Object> record) { log.info("来自 Topic [{}] 的消息: {}", record.topic(), record.value());}4.5. 指定分区和初始 Offset 消费
Section titled “4.5. 指定分区和初始 Offset 消费”@KafkaListener( groupId = "partition-group", topicPartitions = @TopicPartition( topic = "order.events", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), // 从 Partition 0 的 offset=0 开始 @PartitionOffset(partition = "1", initialOffset = "5") // 从 Partition 1 的 offset=5 开始 } ))public void onPartitionMessage(ConsumerRecord<String, OrderEvent> record) { log.info("Partition {}, Offset {}: {}", record.partition(), record.offset(), record.value());}5. 消息序列化
Section titled “5. 消息序列化”5.1. String 序列化(简单场景)
Section titled “5.1. String 序列化(简单场景)”spring: kafka: producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: value-deserializer: org.apache.kafka.common.serialization.StringDeserializerkafkaTemplate.send("topic", "普通字符串消息");5.2. JSON 序列化(推荐)
Section titled “5.2. JSON 序列化(推荐)”生产者端,JsonSerializer 自动将 Java 对象转为 JSON 字符串:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.dto" spring.json.value.default.type: "com.example.dto.OrderEvent" # 指定默认反序列化类型6. 完整示例
Section titled “6. 完整示例”// 生产者@GetMapping("/demo")public void demo() { for (int i = 1; i <= 10; i++) { OrderEvent event = new OrderEvent("ORDER-" + i, "CREATED", BigDecimal.valueOf(i * 10)); kafkaTemplate.send("order.events", event.getOrderId(), event) .whenComplete((result, ex) -> { if (ex != null) log.error("发送失败", ex); }); }}
// 消费者@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "3")public void consume(ConsumerRecord<String, OrderEvent> record) { log.info("[Partition {}] 消费订单: {}", record.partition(), record.value().getOrderId());}concurrency = "3" 表示启动 3 个并发消费线程,每个线程负责一个 Partition,等效于在同一消费者组内启动 3 个消费者实例。
7. KafkaTemplate 常用方法速查
Section titled “7. KafkaTemplate 常用方法速查”| 方法 | 说明 |
|---|---|
send(topic, value) | 发送消息到指定 Topic |
send(topic, key, value) | 带 Key 发送,相同 Key 路由到同一分区 |
send(topic, partition, key, value) | 指定分区发送 |
send(ProducerRecord) | 完整控制,支持自定义 Header 等 |
.get() | 阻塞等待发送结果(同步) |
.whenComplete() | 注册异步回调 |