kafka高级特性
1. 消息顺序性保证
Section titled “1. 消息顺序性保证”1.1. Kafka 顺序性的边界
Section titled “1.1. Kafka 顺序性的边界”Kafka 只保证单个 Partition 内消息有序,跨 Partition 不保证顺序。因此顺序性保证的核心是:让需要保证顺序的消息进入同一个 Partition。
✅ 正确:同一订单的所有消息使用相同 Key → 路由到同一 PartitionPartition 0: [ORDER-1 创建] → [ORDER-1 支付] → [ORDER-1 发货]
❌ 错误:不指定 Key,轮询分配到不同 PartitionPartition 0: [ORDER-1 创建]Partition 1: [ORDER-1 支付] ← 可能被不同消费者并行处理,顺序无法保证Partition 2: [ORDER-1 发货]1.2. 通过 Key 保证顺序
Section titled “1.2. 通过 Key 保证顺序”// 使用业务 ID 作为 Key,相同 Key 的消息路由到同一 PartitionkafkaTemplate.send("order.events", order.getOrderId(), order);1.3. 生产者端的顺序性陷阱
Section titled “1.3. 生产者端的顺序性陷阱”开启重试(retries > 0)时,若第一批消息发送失败、第二批已发送成功,重试后第一批反而在第二批之后到达,造成乱序。
解决方案:设置 max.in.flight.requests.per.connection = 1,保证同一时刻只有一个请求在途:
spring: kafka: producer: retries: 3 properties: max.in.flight.requests.per.connection: 1 # 严格顺序,但吞吐量下降1.4. 消费者端的顺序性保证
Section titled “1.4. 消费者端的顺序性保证”单个 Partition 只会被同一消费者组内的一个消费者线程消费,天然保证消费顺序。但需注意:
// ❌ concurrency > 1 时,多线程可能并发消费同一 Partition 的不同批次,破坏顺序@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "3")
// ✅ 严格顺序消费:concurrency 不超过分区数,且每个分区由固定线程处理@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "1")2. 延迟消息
Section titled “2. 延迟消息”2.1. Kafka 原生不支持延迟消息
Section titled “2.1. Kafka 原生不支持延迟消息”Kafka 本身没有延迟队列机制,社区常见的实现方案有三种:
| 方案 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 时间轮 + 中间存储 | 消息先存 Redis/DB,到期后投递 Kafka | 精度高,灵活 | 引入额外组件 |
| 多级延迟 Topic | 按延迟时间建不同 Topic,消费者轮询检查是否到期 | 无额外依赖 | 只支持固定延迟档位 |
| kafka-delay-queue 插件 | 第三方扩展 | 使用简单 | 社区维护,生产风险较高 |
2.2. 方案一:Redis + Kafka 实现延迟消息(推荐)
Section titled “2.2. 方案一:Redis + Kafka 实现延迟消息(推荐)”流程:
Producer ──▶ Redis ZSet(score = 执行时间戳) ↓ 定时任务轮询(每秒) 到期消息 ──▶ Kafka Topic ──▶ Consumer 正常消费实现:
@Servicepublic class DelayMessageService {
@Autowired private StringRedisTemplate redisTemplate;
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
private static final String DELAY_QUEUE_KEY = "kafka:delay:queue";
// 发送延迟消息:存入 Redis ZSet,score 为执行时间戳 public void sendDelayMessage(String topic, Object message, long delaySeconds) { String payload = JSON.toJSONString(Map.of("topic", topic, "message", message)); double score = System.currentTimeMillis() + delaySeconds * 1000; redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, payload, score); }
// 定时任务:每秒扫描到期消息并投递 Kafka @Scheduled(fixedDelay = 1000) public void processDelayQueue() { long now = System.currentTimeMillis(); // 取出 score <= now 的所有到期消息 Set<String> messages = redisTemplate.opsForZSet() .rangeByScore(DELAY_QUEUE_KEY, 0, now);
if (messages == null || messages.isEmpty()) return;
for (String payload : messages) { // 删除成功才投递,防止多实例重复处理 Long removed = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, payload); if (removed != null && removed > 0) { Map<String, Object> data = JSON.parseObject(payload, Map.class); kafkaTemplate.send((String) data.get("topic"), data.get("message")); } } }}使用示例:
// 30 秒后发送订单超时消息delayMessageService.sendDelayMessage("order.timeout", orderEvent, 30);2.3. 方案二:多级延迟 Topic
Section titled “2.3. 方案二:多级延迟 Topic”// 按延迟档位建立不同 Topic// delay-5s.topic / delay-30s.topic / delay-5m.topic
@KafkaListener(topics = "delay-30s.topic", groupId = "delay-group")public void processDelay30s(ConsumerRecord<String, Object> record) { long createTime = Long.parseLong(record.headers() .lastHeader("createTime").value().toString());
if (System.currentTimeMillis() - createTime < 30_000) { // 未到期,重新投递 kafkaTemplate.send("delay-30s.topic", record.key(), record.value()); return; } // 到期,投递到目标 Topic kafkaTemplate.send("order.timeout", record.key(), record.value());}3. 日志压缩(Log Compaction)
Section titled “3. 日志压缩(Log Compaction)”3.1. 什么是日志压缩
Section titled “3.1. 什么是日志压缩”Kafka 默认的日志清理策略是按时间或大小删除旧数据(cleanup.policy=delete)。日志压缩(cleanup.policy=compact)是另一种策略:保留每个 Key 的最新一条消息,删除同 Key 的历史旧值。
压缩前:Key=A: [val1] → [val2] → [val3]Key=B: [val1] → [val2]Key=C: [val1]
压缩后:Key=A: [val3] ← 只保留最新值Key=B: [val2]Key=C: [val1]3.2. 适用场景
Section titled “3.2. 适用场景”日志压缩适合存储状态快照而非事件流的场景:
| 场景 | 说明 |
|---|---|
| 用户配置同步 | 只关心用户最新配置,历史变更不需要 |
| 数据库变更同步(CDC) | 只需同步最新行数据 |
| 缓存预热 | 消费者启动后加载所有 Key 的最新值到本地缓存 |
3.3. 配置日志压缩
Section titled “3.3. 配置日志压缩”@Beanpublic NewTopic userProfileTopic() { return TopicBuilder.name("user.profile") .partitions(3) .replicas(1) .config(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) // 开启日志压缩 .config(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1") .build();}3.4. 删除 Key(墓碑消息)
Section titled “3.4. 删除 Key(墓碑消息)”日志压缩中,发送 value=null 的消息(墓碑消息/Tombstone)表示删除该 Key:
// 删除 Key 为 "user-123" 的记录kafkaTemplate.send("user.profile", "user-123", null);4. 消息过滤
Section titled “4. 消息过滤”4.1. 通过 RecordFilterStrategy 过滤
Section titled “4.1. 通过 RecordFilterStrategy 过滤”Spring Kafka 提供 RecordFilterStrategy,在消息到达 @KafkaListener 之前进行过滤,被过滤的消息不会触发监听方法(但 Offset 仍会提交):
@Configurationpublic class KafkaFilterConfig {
@Bean public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> filteredKafkaListenerContainerFactory( ConsumerFactory<String, OrderEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory);
// 只处理状态为 CREATED 的订单消息 factory.setRecordFilterStrategy(record -> !"CREATED".equals(record.value().getStatus()) ); // true = 过滤掉(不处理),false = 不过滤(正常处理)
return factory; }}// 指定使用自定义 ContainerFactory@KafkaListener( topics = "order.events", groupId = "order-group", containerFactory = "filteredKafkaListenerContainerFactory")public void onCreatedOrder(OrderEvent event) { // 只会收到 status=CREATED 的消息 log.info("新订单: {}", event.getOrderId());}4.2. 通过 Header 过滤
Section titled “4.2. 通过 Header 过滤”发送时在消息 Header 中附加标签,消费者按 Header 过滤:
// 生产者:添加 Headerpublic void sendWithHeader(OrderEvent event) { ProducerRecord<String, Object> record = new ProducerRecord<>( "order.events", null, event.getOrderId(), event); record.headers().add("eventType", event.getStatus().getBytes(StandardCharsets.UTF_8)); kafkaTemplate.send(record);}
// 消费者:读取 Header@KafkaListener(topics = "order.events", groupId = "order-group")public void onMessage(ConsumerRecord<String, OrderEvent> record) { Header header = record.headers().lastHeader("eventType"); if (header == null) return;
String eventType = new String(header.value(), StandardCharsets.UTF_8); if ("CREATED".equals(eventType)) { // 处理创建事件 }}5. 高级特性速查
Section titled “5. 高级特性速查”| 特性 | 关键配置 / API | 核心要点 |
|---|---|---|
| 消息顺序 | 消息指定相同 Key | 同 Key → 同 Partition → 有序 |
| 严格顺序 | max.in.flight=1 或开启幂等性 | 防止重试乱序 |
| 延迟消息 | Redis ZSet + 定时任务 | Kafka 原生不支持,需借助外部组件 |
| 日志压缩 | cleanup.policy=compact | 保留每个 Key 最新值,适合状态存储 |
| 墓碑消息 | send(topic, key, null) | value=null 表示删除该 Key |
| 消息过滤 | RecordFilterStrategy | 框架层过滤,Offset 仍提交 |