跳转到内容

kafka高级特性


Kafka 只保证单个 Partition 内消息有序,跨 Partition 不保证顺序。因此顺序性保证的核心是:让需要保证顺序的消息进入同一个 Partition

✅ 正确:同一订单的所有消息使用相同 Key → 路由到同一 Partition
Partition 0: [ORDER-1 创建] → [ORDER-1 支付] → [ORDER-1 发货]
❌ 错误:不指定 Key,轮询分配到不同 Partition
Partition 0: [ORDER-1 创建]
Partition 1: [ORDER-1 支付] ← 可能被不同消费者并行处理,顺序无法保证
Partition 2: [ORDER-1 发货]
// 使用业务 ID 作为 Key,相同 Key 的消息路由到同一 Partition
kafkaTemplate.send("order.events", order.getOrderId(), order);

开启重试(retries > 0)时,若第一批消息发送失败、第二批已发送成功,重试后第一批反而在第二批之后到达,造成乱序。

解决方案:设置 max.in.flight.requests.per.connection = 1,保证同一时刻只有一个请求在途:

spring:
kafka:
producer:
retries: 3
properties:
max.in.flight.requests.per.connection: 1 # 严格顺序,但吞吐量下降

单个 Partition 只会被同一消费者组内的一个消费者线程消费,天然保证消费顺序。但需注意:

// ❌ concurrency > 1 时,多线程可能并发消费同一 Partition 的不同批次,破坏顺序
@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "3")
// ✅ 严格顺序消费:concurrency 不超过分区数,且每个分区由固定线程处理
@KafkaListener(topics = "order.events", groupId = "order-group", concurrency = "1")

Kafka 本身没有延迟队列机制,社区常见的实现方案有三种:

方案原理优点缺点
时间轮 + 中间存储消息先存 Redis/DB,到期后投递 Kafka精度高,灵活引入额外组件
多级延迟 Topic按延迟时间建不同 Topic,消费者轮询检查是否到期无额外依赖只支持固定延迟档位
kafka-delay-queue 插件第三方扩展使用简单社区维护,生产风险较高

2.2. 方案一:Redis + Kafka 实现延迟消息(推荐)

Section titled “2.2. 方案一:Redis + Kafka 实现延迟消息(推荐)”

流程:

Producer ──▶ Redis ZSet(score = 执行时间戳)
定时任务轮询(每秒)
到期消息 ──▶ Kafka Topic ──▶ Consumer 正常消费

实现:

@Service
public class DelayMessageService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String DELAY_QUEUE_KEY = "kafka:delay:queue";
// 发送延迟消息:存入 Redis ZSet,score 为执行时间戳
public void sendDelayMessage(String topic, Object message, long delaySeconds) {
String payload = JSON.toJSONString(Map.of("topic", topic, "message", message));
double score = System.currentTimeMillis() + delaySeconds * 1000;
redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, payload, score);
}
// 定时任务:每秒扫描到期消息并投递 Kafka
@Scheduled(fixedDelay = 1000)
public void processDelayQueue() {
long now = System.currentTimeMillis();
// 取出 score <= now 的所有到期消息
Set<String> messages = redisTemplate.opsForZSet()
.rangeByScore(DELAY_QUEUE_KEY, 0, now);
if (messages == null || messages.isEmpty()) return;
for (String payload : messages) {
// 删除成功才投递,防止多实例重复处理
Long removed = redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, payload);
if (removed != null && removed > 0) {
Map<String, Object> data = JSON.parseObject(payload, Map.class);
kafkaTemplate.send((String) data.get("topic"), data.get("message"));
}
}
}
}

使用示例:

// 30 秒后发送订单超时消息
delayMessageService.sendDelayMessage("order.timeout", orderEvent, 30);
// 按延迟档位建立不同 Topic
// delay-5s.topic / delay-30s.topic / delay-5m.topic
@KafkaListener(topics = "delay-30s.topic", groupId = "delay-group")
public void processDelay30s(ConsumerRecord<String, Object> record) {
long createTime = Long.parseLong(record.headers()
.lastHeader("createTime").value().toString());
if (System.currentTimeMillis() - createTime < 30_000) {
// 未到期,重新投递
kafkaTemplate.send("delay-30s.topic", record.key(), record.value());
return;
}
// 到期,投递到目标 Topic
kafkaTemplate.send("order.timeout", record.key(), record.value());
}

Kafka 默认的日志清理策略是按时间或大小删除旧数据(cleanup.policy=delete)。日志压缩(cleanup.policy=compact)是另一种策略:保留每个 Key 的最新一条消息,删除同 Key 的历史旧值

压缩前:
Key=A: [val1] → [val2] → [val3]
Key=B: [val1] → [val2]
Key=C: [val1]
压缩后:
Key=A: [val3] ← 只保留最新值
Key=B: [val2]
Key=C: [val1]

日志压缩适合存储状态快照而非事件流的场景:

场景说明
用户配置同步只关心用户最新配置,历史变更不需要
数据库变更同步(CDC)只需同步最新行数据
缓存预热消费者启动后加载所有 Key 的最新值到本地缓存
@Bean
public NewTopic userProfileTopic() {
return TopicBuilder.name("user.profile")
.partitions(3)
.replicas(1)
.config(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT) // 开启日志压缩
.config(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1")
.build();
}

日志压缩中,发送 value=null 的消息(墓碑消息/Tombstone)表示删除该 Key:

// 删除 Key 为 "user-123" 的记录
kafkaTemplate.send("user.profile", "user-123", null);

Spring Kafka 提供 RecordFilterStrategy,在消息到达 @KafkaListener 之前进行过滤,被过滤的消息不会触发监听方法(但 Offset 仍会提交):

@Configuration
public class KafkaFilterConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
filteredKafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 只处理状态为 CREATED 的订单消息
factory.setRecordFilterStrategy(record ->
!"CREATED".equals(record.value().getStatus())
);
// true = 过滤掉(不处理),false = 不过滤(正常处理)
return factory;
}
}
// 指定使用自定义 ContainerFactory
@KafkaListener(
topics = "order.events",
groupId = "order-group",
containerFactory = "filteredKafkaListenerContainerFactory"
)
public void onCreatedOrder(OrderEvent event) {
// 只会收到 status=CREATED 的消息
log.info("新订单: {}", event.getOrderId());
}

发送时在消息 Header 中附加标签,消费者按 Header 过滤:

// 生产者:添加 Header
public void sendWithHeader(OrderEvent event) {
ProducerRecord<String, Object> record = new ProducerRecord<>(
"order.events", null, event.getOrderId(), event);
record.headers().add("eventType",
event.getStatus().getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record);
}
// 消费者:读取 Header
@KafkaListener(topics = "order.events", groupId = "order-group")
public void onMessage(ConsumerRecord<String, OrderEvent> record) {
Header header = record.headers().lastHeader("eventType");
if (header == null) return;
String eventType = new String(header.value(), StandardCharsets.UTF_8);
if ("CREATED".equals(eventType)) {
// 处理创建事件
}
}
特性关键配置 / API核心要点
消息顺序消息指定相同 Key同 Key → 同 Partition → 有序
严格顺序max.in.flight=1 或开启幂等性防止重试乱序
延迟消息Redis ZSet + 定时任务Kafka 原生不支持,需借助外部组件
日志压缩cleanup.policy=compact保留每个 Key 最新值,适合状态存储
墓碑消息send(topic, key, null)value=null 表示删除该 Key
消息过滤RecordFilterStrategy框架层过滤,Offset 仍提交