跳过正文
Background Image
  1. Posts/

Pulsar(2) ACK

·3196 字·7 分钟·
目录

Pulsar(2) —— ACK
#

正如收消息可以由sdk批,ack也可以由sdk批。

consumer ACK
#

consumer调用ack之后,会在sdk暂存起来, 以batch消息为例,计算ackSet (将对应的为置0) 并更新到pending队列中。

CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) {
    ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
            MessageIdAdvUtils.discardBatch(msgId), __ -> {
                final BitSet ackSet = msgId.getAckSet();
                final ConcurrentBitSetRecyclable value;
                if (ackSet != null) {
                    synchronized (ackSet) {
                        if (!ackSet.isEmpty()) {
                            value = ConcurrentBitSetRecyclable.create(ackSet);
                        } else {
                            value = ConcurrentBitSetRecyclable.create();
                            value.set(0, msgId.getBatchSize());
                        }
                    }
                } else {
                    value = ConcurrentBitSetRecyclable.create();
                    value.set(0, msgId.getBatchSize());
                }
                return value;
            });
    bitSet.clear(msgId.getBatchIndex());
    return CompletableFuture.completedFuture(null);
}

sdk需要根据ackSet滤掉已经被ack的message。因此对于batch消息,在ack时,设置bit位可以让broker知道这一个entry中的哪些batch index是被ack过的。

sdk进行消息的生产和消费(主要是ack)时会有以下表现:

  • 生产者
    • 不开启batch,每条消息生产之后都没有batch sizeentry id都是独立的
    • 开启batch消息并设置batch size,则取决于生产速率,如果达到了timeout则该批消息会提前发出,size小于设定的batch size
  • 消费者
    • 非batch消息,receive得到都是entry独立的单条消息,ack没有特殊处理
    • batch消息,receive得到的是一个entry里按batch index解码出的单条消息
      • ack之后如上面提到的,同一个entry重发之后会在sdk过滤掉已ack的,避免重复消费
      • command ack不会携带batch size !!!
      • ack set基于org.apache.pulsar.common.util.collections.ConcurrentBitSet,底层是java.util.BitSet,会自动truncate,即抹掉高位0以节省空间。
        • pb的ack set结构是Vec<i64>,或者说long[]
        • 例如batch size = 65, 已ack的index为64(最后一个,刚好是第二个long的最低位),原本的表示应该是\[0b1..1, 0b0..000 0\] (超过batch size的位置当成已ack,所以为0),这时候高位0自动抹除只剩下了\[0b1..1\]。
        • 当整个batch都ack之后,因为全0,导致ack set为空, 只看ack setbatch size的话跟非batch消息没什么区别 !!!
        • 因此无法通过ack set来反推batch size

通常情况下调用这个函数来构造ack命令,入参只带了ack set

    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
                                 ValidationError validationError, Map<String, Long> properties, long requestId) {
        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError,
                properties, -1L, -1L, requestId, -1); // 最后一个参数是batch size = -1, 即不设置
    }

这种完全依赖broker记录batch size的行为,可以确保ack的时候不会因为sdk导致batch size混乱。但是在broker恢复后entry缓存还未重建,为了尚未完成的batch ack而要把entry读出才能得到batch size,牺牲了性能和灵活性。

broker cursor delete
#

broker收到ack后,会通过cursor模块移动相应的指针和更新记录。每个subscription下可能有多个consumer,但是只会有一个cursor,即topic/partition/subscription为单位进行记录。

和ack相关的字段大致如下:

public class ManagedCursorImpl implements ManagedCursor {
    Position markDeletePosition; // 记录最大连续位置(下一个位置是空洞位置)
    RangeSetWrapper<Position> individualDeletedMessages; // 单条ACK的消息集合,离散的位置
    ConcurrentSkipListMap<Position, BitSet> batchDeletedIndexes; // batch消息的子消息ack情况
}

class Position {
    int ledgerId;
    int entryId;
    long[] ackSet;
}

这三个字段从下往上是一个层级关系

  • batchDeletedIndexes记录了batch消息的ack情况,实际上存储的是ackSet,这里ackSet的位为0代表已经ack
  • individualDeletedMessages记录了离散的ack记录(区间表示),如果是batch消息,只有在batchDeletedIndexes中的ackSet全为0,才会从中转移到individualDeletedMessages上;如果是非batch消息,那么ack会直接记录。
  • markDeletePosition记录了可以被安全删除的位置,即position最小的空洞的前一个位置,或者说从头开始连续的最大的位置,当空洞合并成了新的区间,且区间的左端点-1是markDeletePosition,那么这个区间的右端点 变会成为新的markDeletePosition

后台会定期的对这些重要的元信息做持久化,便于错误恢复或者节点转移时可以得到尽可能新的ack进度。

consumer REDELIVER
#

sdk定时发送重推请求
#

在sdk内部维护了一个UnAckedMessageRedeliveryTracker,用于统计需要自动定时重推的消息,由名字可以看出其追踪尚未被用户ack的消息。定期扫描所有尚未ack的消息,如果超时则收集起来,并统一批量发送Redeliver UnackedMessages给broker处理。

private void triggerRedelivery(ConsumerBase<?> consumerBase) {
    if (ackTimeoutMessages.isEmpty()) {
        return;
    }
    Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
    messageIds.clear();

    try {
        long now = System.currentTimeMillis();
        ackTimeoutMessages.forEach((messageId, timestamp) -> {
            if (timestamp <= now) {
                addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
                messageIds.add(messageId);
            }
        });
        if (!messageIds.isEmpty()) {
            log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size());
            Iterator<MessageId> iterator = messageIds.iterator();
            while (iterator.hasNext()) {
                MessageId messageId = iterator.next();
                ackTimeoutMessages.remove(messageId);
            }
        }
    } finally {
        if (messageIds.size() > 0) {
            consumerBase.onAckTimeoutSend(messageIds);
            consumerBase.redeliverUnacknowledgedMessages(messageIds);
        }
    }
    }

broker接收到重推请求后,则会将这批message id推送到dispatcher中,并增加其重推计数,并写入pb对应的redelivery_count字段中。

public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
    positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount);
    redeliverUnacknowledgedMessages(consumer);
}

sdk收到之后进行检查,如果订阅了RLQ/DLQ规则且重试次数redelivery_count达到了上限,那么会稍后转投到RLQ/DLQ中。

// ConsumerImpl#messageReceived
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
    if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
        possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
                Collections.singletonList(message));
        if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
            // count超过!!之后,这里继续发重推请求,兜底策略,因为一般count就不会超过
            redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
            // The message is skipped due to reaching the max redelivery count,
            // so we need to increase the available permits
            increaseAvailablePermits(cnx);
            return;
        }
    }
}

nack
#

nack由用户主动调用consumer.negativeAcknowledge(msg),同样有一个NegativeAcksTracker进行记录,并定时扫描需要重推的消息。也是通过RedeliverUnackedMessages这条命令。

需要注意的是,broker中的重推计数器不会持久化,因此broker挂掉或者一个subscription的所有consumer离线之后,重推计数会归零。

RLQ
#

RetryLetterQueue即重试队列,与SDK维护的重试逻辑不同,本质上是一个全新的队列,broker无感。具体而言RLQ表现为:

  • 配置dlq规则时使用enableRetry(true)开启
  • 使用consumer.reconsumerLater(msg)标记消息自动转发,当redelivery_count超过阈值则先转投RLQ
  • 会订阅一个全新的topic,如果未指定则根据规则自动订阅
  • 全新topic意味着与原来独立,broker对待RLQ和对待普通topic是一样的,不会特殊处理。
  • 可视作sdk侧有一个新的producer和新的consumer,重投就是生产新消息
  • 每次重投会ack原消息,并生产一条新消息(payload一致)投递,如果设置了timeout则是延迟消息
  • RLQ的消息通过properties进行计数,因此broker可以做到无感
    {
    REAL_TOPIC="persistent://my-property/my-ns/test, 
    ORIGIN_MESSAGE_ID=314:28:-1, 
    RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry, 
    RECONSUMETIMES=16 # 每次重投+1
    }
    
  • RECONSUMETIMES超过设定之后会转投DLQ(如果有的话)

DLQ
#

DeadLetterQueue即死信队列,本质上与RLQ相似,都是订阅新topic。与RLQ的区别在于:

  • 只会sdk内部自动订阅producer用于转投,consumer需要用户手动订阅
  • 没有RECONSUMETIMES,因为不会重试

dlq

下面给出doReconsumeLater()的核心逻辑

// 如果重试次数超过阈值,转投DLQ
if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
    && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> {
    try {
        TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                dlqProducer.newMessage(
                                Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
                        .value(retryMessage.getData())
                        .properties(propertiesMap);
        copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
        copyMessageEventTime(message, typedMessageBuilderNew);
        // 投往DLQ
        typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
            consumerDlqMessagesCounter.increment();

            // 成功后ack当前消息
            doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
                result.complete(null);
            }).exceptionally(ex -> {
                result.completeExceptionally(ex);
                return null;
            });
        }).exceptionally(ex -> {
            result.completeExceptionally(ex);
            return null;
        });
    } catch (Exception e) {
        result.completeExceptionally(e);
    }
}, internalPinnedExecutor).exceptionally(ex -> {
    result.completeExceptionally(ex);
    return null;
});
} else {
    // 重试次数还没耗尽,继续重投
    assert retryMessage != null;
    initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> {
        try {
            TypedMessageBuilder<byte[]> typedMessageBuilderNew = rtlProducer
                    .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
                    .value(retryMessage.getData())
                    .properties(propertiesMap);
            if (delayTime > 0) {
                typedMessageBuilderNew.deliverAfter(delayTime, unit);
            }
            copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
            copyMessageEventTime(message, typedMessageBuilderNew);
            // 重新发往RLQ
            typedMessageBuilderNew.sendAsync()
                    .thenCompose(
                            __ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) // ack旧消息
                    .thenAccept(v -> {
                        result.complete(null);
                    })
                    .exceptionally(ex -> {
                        result.completeExceptionally(ex);
                        return null;
                    });
        } catch (Exception e) {
            result.completeExceptionally(e);
        }
    }, internalPinnedExecutor).exceptionally(ex -> {
        result.completeExceptionally(ex);
        return null;
    });
}