跳转到内容

可靠性保证


与 RabbitMQ 类似,Kafka 的消息可靠性同样需要从三个阶段分别保障:

生产者 ──①──▶ Broker(Leader) ──②──▶ Follower 副本
消费者 ◀──③── Broker(Leader)
阶段风险解决方案
① 生产者 → Broker网络故障、Broker 宕机导致消息未写入生产者 ACK 机制 + 重试
② Leader → FollowerLeader 宕机时 Follower 数据不同步ISR + acks=all
③ Broker → 消费者消费者处理失败但 Offset 已提交手动提交 Offset + 重试

acks 参数控制生产者在认为消息”发送成功”之前,需要等待多少个副本确认:

acks=0:不等待确认

Producer ──发送──▶ Broker
✗ 不等待任何响应,立即认为成功
  • 吞吐量最高,但消息可能在网络传输途中或 Broker 写入前丢失
  • 适用场景:日志采集等允许少量丢失的场景

acks=1(默认):等待 Leader 确认

Producer ──发送──▶ Leader 写入成功 ──ACK──▶ Producer
Follower 异步同步(不等待)
  • Leader 写入成功即返回,若 Leader 此时宕机且 Follower 尚未同步,消息丢失
  • 适用场景:对性能和可靠性有一定平衡要求的场景

acks=allacks=-1):等待 ISR 全部确认

Producer ──发送──▶ Leader 写入
├──同步──▶ Follower1 写入
└──同步──▶ Follower2 写入
全部确认后 ──ACK──▶ Producer
  • ISR 中所有副本都写入成功才返回,可靠性最高
  • 需配合 min.insync.replicas 使用(见下文)
  • 适用场景:金融、订单等核心业务

acks=all 时,若 ISR 中只剩 Leader 一个副本,退化为 acks=1,仍然不安全。通过 min.insync.replicas 设置最少同步副本数,低于该值时 Broker 拒绝写入:

# Broker 配置(或 Topic 级别配置)
min.insync.replicas: 2 # ISR 中至少有 2 个副本才允许写入

推荐组合(3个副本):

replication-factor = 3
min.insync.replicas = 2
acks = all
含义:3 个副本中至少 2 个写入成功,即使 1 个副本宕机也能正常写入
spring:
kafka:
producer:
acks: all
retries: 3 # 发送失败重试次数
properties:
retry.backoff.ms: 300 # 重试间隔(ms)
max.in.flight.requests.per.connection: 1 # 开启幂等性时必须 ≤ 5
enable.idempotence: true # 开启生产者幂等性(见第 3 节)
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerListener<String, Object> producerListener() {
return new ProducerListener<>() {
@Override
public void onError(ProducerRecord<String, Object> record,
RecordMetadata metadata,
Exception exception) {
// 所有发送失败的消息都会触发此回调
log.error("消息发送失败 → Topic: {}, Key: {}, 原因: {}",
record.topic(), record.key(), exception.getMessage());
// 可在此处做告警、落库补偿等处理
}
};
}
}

acks=all + retries > 0 时,若 Broker 已写入但 ACK 因网络故障丢失,生产者会重试,导致消息重复写入

Producer ──发送──▶ Broker 写入成功
──ACK──✗(网络故障,ACK 丢失)
Producer 超时重试 ──发送──▶ Broker 再次写入(重复!)
spring:
kafka:
producer:
properties:
enable.idempotence: true # 开启幂等性

开启后 Kafka 为每个生产者分配唯一 Producer ID(PID),并为每条消息附加序列号(Sequence Number)。Broker 检测到相同 PID + 序列号的消息时直接去重,保证单会话内精确一次写入。

事务消息保证一批消息要么全部写入成功,要么全部不写入,适用于需要原子性操作多个 Topic 的场景。

spring:
kafka:
producer:
transaction-id-prefix: tx- # 指定前缀后自动开启事务,幂等性也随之开启
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendInTransaction(OrderEvent order, UserEvent user) {
kafkaTemplate.executeInTransaction(operations -> {
// 两条消息在同一事务中,要么都成功,要么都回滚
operations.send("order.events", order.getOrderId(), order);
operations.send("user.events", user.getUserId(), user);
return true;
});
}
spring:
kafka:
consumer:
enable-auto-commit: true
auto-commit-interval: 1000 # 每 1 秒自动提交一次

风险:消息拉取后、处理完成前,自动提交触发了,此时消费者宕机,消息丢失(已提交但未处理)。

拉取消息 ──▶ [处理中...] ──▶ 自动提交 Offset ✓
宕机!消息未处理完,但 Offset 已提交 → 消息丢失

关闭自动提交,由代码在消息处理完成后显式提交 Offset,确保”处理成功才提交”:

spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: manual # 手动提交模式
@KafkaListener(topics = "order.events", groupId = "order-group")
public void onMessage(ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack) {
try {
// 处理消息
processOrder(record.value());
// 处理成功后手动提交 Offset
ack.acknowledge();
} catch (Exception e) {
log.error("消息处理失败: {}", e.getMessage());
// 不调用 ack.acknowledge(),该消息下次会重新投递
}
}
ack-mode说明适用场景
RECORD每条消息处理后立即提交消息量小,可靠性要求高
BATCH(默认)每批次消息处理完后提交批量消费,性能与可靠性平衡
MANUAL调用 ack.acknowledge() 时提交自定义提交时机
MANUAL_IMMEDIATE调用后立即同步提交需要立即确认的场景
spring:
kafka:
listener:
ack-mode: manual
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
// 死信 Topic 配置:处理失败的消息发往 xxx.DLT(Dead Letter Topic)
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// 重试策略:最多重试 3 次,每次间隔 1 秒
FixedBackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
// 某些异常不重试,直接发死信(如反序列化异常,重试也没意义)
handler.addNotRetryableExceptions(DeserializationException.class);
return handler;
}

Kafka 死信 Topic 命名规则为原 Topic 名 + .DLT 后缀:

// 消费 order.events 的死信队列
@KafkaListener(topics = "order.events.DLT", groupId = "order-dlq-group")
public void onDeadLetter(ConsumerRecord<String, OrderEvent> record) {
log.error("死信消息 → Partition: {}, Offset: {}, 消息: {}",
record.partition(), record.offset(), record.value());
// 告警、人工处理、落库等
}
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: all
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
retry.backoff.ms: 300
consumer:
group-id: order-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.dto"
listener:
ack-mode: manual
阶段风险方案代价
生产者 → Broker消息丢失acks=all + retries吞吐量下降
重试导致重复消息重复写入enable.idempotence=true几乎无影响
原子写多 Topic部分写入事务消息吞吐量显著下降
Broker → 消费者消息丢失/重复消费手动提交 Offset代码复杂度提升
消费失败消息丢失重试 + 死信 Topic需额外监控死信队列