Pulsar(3) —— 消息丢失Bug分析#
问题背景#
环境与配置#
- Broker 版本: 4.1.1
- Client 版本: 4.1.1 (Java)
- JDK: OpenJDK 24.0.2
- OS: Linux 5.4.241
Consumer 关键配置#
.enableBatchIndexAcknowledgement(true) // 开启批量消息索引确认
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) // 确认聚合时间
.acknowledgementGroupSize(1000) // 最大确认聚合数量
.ackMode = Individual // 单独确认模式
Producer 关键配置#
.batchingMaxMessages(1000) // 最大批处理消息数
Bug 现象#
在对一个分区主题进行生产和消费测试时,发现偶发性的单个消息丢失问题。
预期结果#
--- Total Acked Messages per Subscription ---
Subscription [sub-1]: 1000000 acks
Subscription [sub-2]: 1000000 acks
实际结果#
--- Total Acked Messages per Subscription ---
Subscription [sub-2]: 999999 acks ← 少了一条消息
Subscription [sub-1]: 1000000 acks
ERROR: [0:123:0:45] not received from [sub-2]!
关键特征#
- 随机发生 - 并非每次必现
- 仅丢失一条消息 - 丢失数量固定为一个
- 随机 batch index - 丢失消息在 batch 中的索引随机
- Broker 有积压 - 但
consumer.receive()和redeliverUnacknowledgedMessages()都无法获取该消息 - 关闭某些特性可规避:
- 设置
enableBatchIndexAcknowledgment = false时问题消失 - 设置
acknowledgmentGroupTime = 0时问题消失 - Go Client(不支持 Batch Index Ack)无此问题
- 设置
DEBUG 日志线索#
Flushing pending acks to broker:
last-cumulative-ack: []
-- individual-acks: []
-- individual-batch-index-acks: [(0, 123, {})]
原因分析#
经过深入分析,发现问题根源在于 Netty Recycler 的错误使用 导致的竞态条件。
核心问题定位#
在 PersistentAcknowledgmentsGroupingTracker 类中:
// 问题场景:isDuplicate() 和 flushAsync() 之间发生竞态
@Override
public boolean isDuplicate(@NonNull MessageIdData messageId) {
// ...
if (type == AckType.Individual) {
// 问题:这里使用的对象是 Netty Recycler 回收的
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(batchIndex);
// 添加到 pendingIndividualBatchIndexAcks
pendingIndividualBatchIndexAcks.put(msgId, bitSet);
}
return false;
}
private void flushAsync() {
// ...
for (Map.Entry<MessageIdImpl, BitSetRecyclable> entry :
pendingIndividualBatchIndexAcks.entrySet()) {
// 问题:可能在 isDuplicate() 还在使用时就被 Recycler 回收了
entry.getValue().recycle(); // ← 这里回收了对象
}
pendingIndividualBatchIndexAcks.clear();
}
Netty Recycler 工作原理#
Netty 的 Recycler 是一种对象池机制:
- 创建: 对象不再使用时调用
recycle()返回池中 - 复用: 需要时从池中取出,避免频繁内存分配
- 风险: 如果对象仍在被使用就被回收,会导致数据错乱
竞态条件时序#
线程 A (isDuplicate) 线程 B (flushAsync)
-------------------------------- --------------------------------
1. 创建 BitSetRecyclable 对象
2. set(batchIndex)
3. put to map 1. iterate map
2. get Value (同一个对象引用)
3. recycle() ← 回收了对象!
4. 返回
4. clear map
5. [问题] 此时 BitSet 已被回收,
可能被其他线程复用,数据被覆盖!
修复方案#
PR 标题#
[fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler
核心修复思路#
避免在 pendingIndividualBatchIndexAcks 中存储从 Recycler 获取的对象,改为在即将发送时才创建回收对象。
代码变更#
// 修改前:存储 BitSetRecyclable 到 Map
pendingIndividualBatchIndexAcks.put(msgId, bitSet);
// 修改后:存储普通的 BitSet,仅在发送时转换为可回收对象
pendingIndividualBatchIndexAcks.put(msgId, new BitSet());
// 在 flushAsync 中发送前再包装为 BitSetRecyclable
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.or(bitSet); // 复制数据
// 发送后回收
bitSetRecyclable.recycle();
关键改动点#
- Map 类型变更:
Map<MessageIdImpl, BitSetRecyclable>→Map<MessageIdImpl, BitSet> - 延迟创建: 仅在需要发送确认时才创建
BitSetRecyclable - 立即回收: 发送完成后立即回收,确保生命周期可控
验证与测试#
修复前测试结果#
- 运行 10 次,约 3-4 次出现消息丢失
- 丢失数量均为 1 条
修复后测试结果#
- 运行 50 次,无消息丢失
- 性能无明显退化
参考信息#
- Issue: #25145 [Bug] Java consumer occasionally missing one message of a batched entry
- PR: #25208 [fix][client] Fix race condition…
- 合并状态: 已合并到 master (2026-02-04)
- 修复版本: 将在下一版本发布
