跳转到内容

Spring Kafka 实战


<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 发送/接收 JSON 消息需要 Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka 服务地址
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 可靠性级别:0 / 1 / all(详见可靠性章节)
retries: 3 # 发送失败重试次数
batch-size: 16384 # 批量发送大小(字节),默认 16KB
linger-ms: 5 # 批量等待时间(ms),等待期间积累消息一起发送
# 消费者配置
consumer:
group-id: my-consumer-group # 消费者组 ID
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest # 新消费者组首次消费策略
enable-auto-commit: true # 是否自动提交 Offset(默认 true)
auto-commit-interval: 1000 # 自动提交间隔(ms)
properties:
spring.json.trusted.packages: "*" # 允许反序列化的包(生产环境建议指定具体包名)
# 监听器配置
listener:
ack-mode: batch # Offset 提交模式(手动 ACK 时使用 manual)

Spring Kafka 提供 NewTopic Bean,应用启动时自动在 Kafka 中创建 Topic(已存在则跳过):

@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic orderTopic() {
return TopicBuilder.name("order.events")
.partitions(3) // 分区数
.replicas(1) // 副本数(单机开发环境用 1)
.build();
}
@Bean
public NewTopic userTopic() {
return TopicBuilder.name("user.events")
.partitions(2)
.replicas(1)
.build();
}
}

Spring Kafka 提供 KafkaTemplate 作为发送消息的核心模板类,类似 RabbitTemplate

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 最简单的发送:只指定 Topic
@GetMapping("/send")
public void send() {
kafkaTemplate.send("order.events", "Hello Kafka!");
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private String orderId;
private String status;
private BigDecimal amount;
}
@GetMapping("/sendOrder")
public void sendOrder() {
OrderEvent event = new OrderEvent("ORDER-001", "CREATED", new BigDecimal("99.99"));
// 指定 Topic + Key + 消息体
// Key 相同的消息会路由到同一 Partition,保证顺序
kafkaTemplate.send("order.events", event.getOrderId(), event);
}
@GetMapping("/sendToPartition")
public void sendToPartition() {
OrderEvent event = new OrderEvent("ORDER-002", "PAID", new BigDecimal("199.99"));
// 参数:topic, partition, key, value
kafkaTemplate.send("order.events", 0, event.getOrderId(), event);
}

KafkaTemplate.send() 返回 CompletableFuture,可以注册回调处理成功/失败:

@GetMapping("/sendAsync")
public void sendAsync() {
OrderEvent event = new OrderEvent("ORDER-003", "SHIPPED", new BigDecimal("59.99"));
kafkaTemplate.send("order.events", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex == null) {
// 发送成功
RecordMetadata metadata = result.getRecordMetadata();
log.info("发送成功 → Topic: {}, Partition: {}, Offset: {}",
metadata.topic(),
metadata.partition(),
metadata.offset());
} else {
// 发送失败
log.error("发送失败: {}", ex.getMessage());
}
});
}
@GetMapping("/sendSync")
public void sendSync() throws Exception {
OrderEvent event = new OrderEvent("ORDER-004", "COMPLETED", new BigDecimal("299.99"));
// .get() 阻塞等待发送结果,确保消息已被 Broker 确认
SendResult<String, Object> result =
kafkaTemplate.send("order.events", event.getOrderId(), event).get();
log.info("同步发送成功,Offset: {}", result.getRecordMetadata().offset());
}
@Slf4j
@Component
public class OrderEventListener {
@KafkaListener(topics = "order.events", groupId = "order-group")
public void onMessage(OrderEvent event) {
log.info("收到订单事件: {}", event);
}
}

通过 ConsumerRecord 可以获取消息的完整元数据(Topic、Partition、Offset、Key 等):

@KafkaListener(topics = "order.events", groupId = "order-group")
public void onMessageWithMeta(ConsumerRecord<String, OrderEvent> record) {
log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}",
record.topic(),
record.partition(),
record.offset(),
record.key());
log.info("消息内容: {}", record.value());
}

List<ConsumerRecord> 作为参数,一次处理多条消息,提升吞吐量:

// application.yml 中开启批量监听
// listener:
// type: batch
@KafkaListener(topics = "order.events", groupId = "order-group-batch")
public void onBatchMessage(List<ConsumerRecord<String, OrderEvent>> records) {
log.info("批量收到 {} 条消息", records.size());
records.forEach(record -> log.info("处理消息: {}", record.value()));
}
@KafkaListener(topics = {"order.events", "user.events"}, groupId = "multi-group")
public void onMultiTopicMessage(ConsumerRecord<String, Object> record) {
log.info("来自 Topic [{}] 的消息: {}", record.topic(), record.value());
}
@KafkaListener(
groupId = "partition-group",
topicPartitions = @TopicPartition(
topic = "order.events",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"), // 从 Partition 0 的 offset=0 开始
@PartitionOffset(partition = "1", initialOffset = "5") // 从 Partition 1 的 offset=5 开始
}
)
)
public void onPartitionMessage(ConsumerRecord<String, OrderEvent> record) {
log.info("Partition {}, Offset {}: {}", record.partition(), record.offset(), record.value());
}
spring:
kafka:
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafkaTemplate.send("topic", "普通字符串消息");

生产者端,JsonSerializer 自动将 Java 对象转为 JSON 字符串:

spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.dto"
spring.json.value.default.type: "com.example.dto.OrderEvent" # 指定默认反序列化类型
// 生产者
@GetMapping("/demo")
public void demo() {
for (int i = 1; i <= 10; i++) {
OrderEvent event = new OrderEvent("ORDER-" + i, "CREATED", BigDecimal.valueOf(i * 10));
kafkaTemplate.send("order.events", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) log.error("发送失败", ex);
});
}
}
// 消费者
@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "3")
public void consume(ConsumerRecord<String, OrderEvent> record) {
log.info("[Partition {}] 消费订单: {}", record.partition(), record.value().getOrderId());
}

concurrency = "3" 表示启动 3 个并发消费线程,每个线程负责一个 Partition,等效于在同一消费者组内启动 3 个消费者实例。

方法说明
send(topic, value)发送消息到指定 Topic
send(topic, key, value)带 Key 发送,相同 Key 路由到同一分区
send(topic, partition, key, value)指定分区发送
send(ProducerRecord)完整控制,支持自定义 Header 等
.get()阻塞等待发送结果(同步)
.whenComplete()注册异步回调