可靠性保证
1. 消息可靠性的三个阶段
Section titled “1. 消息可靠性的三个阶段”与 RabbitMQ 类似,Kafka 的消息可靠性同样需要从三个阶段分别保障:
生产者 ──①──▶ Broker(Leader) ──②──▶ Follower 副本 ↓消费者 ◀──③── Broker(Leader)| 阶段 | 风险 | 解决方案 |
|---|---|---|
| ① 生产者 → Broker | 网络故障、Broker 宕机导致消息未写入 | 生产者 ACK 机制 + 重试 |
| ② Leader → Follower | Leader 宕机时 Follower 数据不同步 | ISR + acks=all |
| ③ Broker → 消费者 | 消费者处理失败但 Offset 已提交 | 手动提交 Offset + 重试 |
2. 生产者 ACK 机制
Section titled “2. 生产者 ACK 机制”2.1. 三种 ACK 级别
Section titled “2.1. 三种 ACK 级别”acks 参数控制生产者在认为消息”发送成功”之前,需要等待多少个副本确认:
acks=0:不等待确认
Producer ──发送──▶ Broker ✗ 不等待任何响应,立即认为成功- 吞吐量最高,但消息可能在网络传输途中或 Broker 写入前丢失
- 适用场景:日志采集等允许少量丢失的场景
acks=1(默认):等待 Leader 确认
Producer ──发送──▶ Leader 写入成功 ──ACK──▶ Producer Follower 异步同步(不等待)- Leader 写入成功即返回,若 Leader 此时宕机且 Follower 尚未同步,消息丢失
- 适用场景:对性能和可靠性有一定平衡要求的场景
acks=all(acks=-1):等待 ISR 全部确认
Producer ──发送──▶ Leader 写入 ├──同步──▶ Follower1 写入 └──同步──▶ Follower2 写入 全部确认后 ──ACK──▶ Producer- ISR 中所有副本都写入成功才返回,可靠性最高
- 需配合
min.insync.replicas使用(见下文) - 适用场景:金融、订单等核心业务
2.2. min.insync.replicas
Section titled “2.2. min.insync.replicas”acks=all 时,若 ISR 中只剩 Leader 一个副本,退化为 acks=1,仍然不安全。通过 min.insync.replicas 设置最少同步副本数,低于该值时 Broker 拒绝写入:
# Broker 配置(或 Topic 级别配置)min.insync.replicas: 2 # ISR 中至少有 2 个副本才允许写入推荐组合(3个副本):
replication-factor = 3min.insync.replicas = 2acks = all
含义:3 个副本中至少 2 个写入成功,即使 1 个副本宕机也能正常写入2.3. Spring 中配置 ACK
Section titled “2.3. Spring 中配置 ACK”spring: kafka: producer: acks: all retries: 3 # 发送失败重试次数 properties: retry.backoff.ms: 300 # 重试间隔(ms) max.in.flight.requests.per.connection: 1 # 开启幂等性时必须 ≤ 5 enable.idempotence: true # 开启生产者幂等性(见第 3 节)2.4. 全局失败回调
Section titled “2.4. 全局失败回调”@Configurationpublic class KafkaProducerConfig {
@Bean public ProducerListener<String, Object> producerListener() { return new ProducerListener<>() { @Override public void onError(ProducerRecord<String, Object> record, RecordMetadata metadata, Exception exception) { // 所有发送失败的消息都会触发此回调 log.error("消息发送失败 → Topic: {}, Key: {}, 原因: {}", record.topic(), record.key(), exception.getMessage()); // 可在此处做告警、落库补偿等处理 } }; }}3. 生产者幂等性
Section titled “3. 生产者幂等性”3.1. 为什么需要幂等性
Section titled “3.1. 为什么需要幂等性”acks=all + retries > 0 时,若 Broker 已写入但 ACK 因网络故障丢失,生产者会重试,导致消息重复写入。
Producer ──发送──▶ Broker 写入成功 ──ACK──✗(网络故障,ACK 丢失)Producer 超时重试 ──发送──▶ Broker 再次写入(重复!)3.2. 开启幂等性
Section titled “3.2. 开启幂等性”spring: kafka: producer: properties: enable.idempotence: true # 开启幂等性开启后 Kafka 为每个生产者分配唯一 Producer ID(PID),并为每条消息附加序列号(Sequence Number)。Broker 检测到相同 PID + 序列号的消息时直接去重,保证单会话内精确一次写入。
4. 事务消息
Section titled “4. 事务消息”事务消息保证一批消息要么全部写入成功,要么全部不写入,适用于需要原子性操作多个 Topic 的场景。
4.1. 配置事务
Section titled “4.1. 配置事务”spring: kafka: producer: transaction-id-prefix: tx- # 指定前缀后自动开启事务,幂等性也随之开启4.2. 使用事务发送
Section titled “4.2. 使用事务发送”@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;
public void sendInTransaction(OrderEvent order, UserEvent user) { kafkaTemplate.executeInTransaction(operations -> { // 两条消息在同一事务中,要么都成功,要么都回滚 operations.send("order.events", order.getOrderId(), order); operations.send("user.events", user.getUserId(), user); return true; });}5. 消费者 Offset 提交
Section titled “5. 消费者 Offset 提交”5.1. 自动提交(默认)
Section titled “5.1. 自动提交(默认)”spring: kafka: consumer: enable-auto-commit: true auto-commit-interval: 1000 # 每 1 秒自动提交一次风险:消息拉取后、处理完成前,自动提交触发了,此时消费者宕机,消息丢失(已提交但未处理)。
拉取消息 ──▶ [处理中...] ──▶ 自动提交 Offset ✓ ↑ 宕机!消息未处理完,但 Offset 已提交 → 消息丢失5.2. 手动提交
Section titled “5.2. 手动提交”关闭自动提交,由代码在消息处理完成后显式提交 Offset,确保”处理成功才提交”:
spring: kafka: consumer: enable-auto-commit: false listener: ack-mode: manual # 手动提交模式@KafkaListener(topics = "order.events", groupId = "order-group")public void onMessage(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { try { // 处理消息 processOrder(record.value()); // 处理成功后手动提交 Offset ack.acknowledge(); } catch (Exception e) { log.error("消息处理失败: {}", e.getMessage()); // 不调用 ack.acknowledge(),该消息下次会重新投递 }}5.3. 手动提交的几种模式
Section titled “5.3. 手动提交的几种模式”| ack-mode | 说明 | 适用场景 |
|---|---|---|
RECORD | 每条消息处理后立即提交 | 消息量小,可靠性要求高 |
BATCH(默认) | 每批次消息处理完后提交 | 批量消费,性能与可靠性平衡 |
MANUAL | 调用 ack.acknowledge() 时提交 | 自定义提交时机 |
MANUAL_IMMEDIATE | 调用后立即同步提交 | 需要立即确认的场景 |
6. 消费者重试与死信处理
Section titled “6. 消费者重试与死信处理”6.1. 配置重试
Section titled “6.1. 配置重试”spring: kafka: listener: ack-mode: manual@Beanpublic DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) { // 死信 Topic 配置:处理失败的消息发往 xxx.DLT(Dead Letter Topic) DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
// 重试策略:最多重试 3 次,每次间隔 1 秒 FixedBackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
// 某些异常不重试,直接发死信(如反序列化异常,重试也没意义) handler.addNotRetryableExceptions(DeserializationException.class);
return handler;}6.2. 死信 Topic 消费
Section titled “6.2. 死信 Topic 消费”Kafka 死信 Topic 命名规则为原 Topic 名 + .DLT 后缀:
// 消费 order.events 的死信队列@KafkaListener(topics = "order.events.DLT", groupId = "order-dlq-group")public void onDeadLetter(ConsumerRecord<String, OrderEvent> record) { log.error("死信消息 → Partition: {}, Offset: {}, 消息: {}", record.partition(), record.offset(), record.value()); // 告警、人工处理、落库等}7. 可靠性配置完整示例
Section titled “7. 可靠性配置完整示例”spring: kafka: bootstrap-servers: localhost:9092 producer: acks: all retries: 3 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: enable.idempotence: true max.in.flight.requests.per.connection: 5 retry.backoff.ms: 300 consumer: group-id: order-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "com.example.dto" listener: ack-mode: manual8. 可靠性保证总结
Section titled “8. 可靠性保证总结”| 阶段 | 风险 | 方案 | 代价 |
|---|---|---|---|
| 生产者 → Broker | 消息丢失 | acks=all + retries | 吞吐量下降 |
| 重试导致重复 | 消息重复写入 | enable.idempotence=true | 几乎无影响 |
| 原子写多 Topic | 部分写入 | 事务消息 | 吞吐量显著下降 |
| Broker → 消费者 | 消息丢失/重复消费 | 手动提交 Offset | 代码复杂度提升 |
| 消费失败 | 消息丢失 | 重试 + 死信 Topic | 需额外监控死信队列 |