跳过正文
Background Image
  1. Posts/

Pulsar(1) 消息接受

·1573 字·4 分钟·
目录

Pulsar(1) —— 消息的接受
#

consumer FLOW
#

pulsar的消息消费其实是在SDK中由consumer发一条command.FLOW去向broker索要消息。 sdk维护了一个imcommingMessages, 用于缓冲从broker接收到的消息,并记录一个初始permit = imcommingMessages的最大长度,每次FLOW会给出一个具体数值messagePermits代表可以接受的消息条数,并且permit -= messagePermits。实际作用就是一个跨进程的信号量。 pb源码参见github

message CommandFlow {
    required uint64 consumer_id       = 1;


    // Max number of messages to prefetch, in addition
    // of any number previously specified
    required uint32 messagePermits     = 2;
}

这里偷一个哪都有的图解

consumer

broker send
#

broker处理完这条flow command之后,dispatcher会向 cursor(最终通过DB)索要一定数量的entry。主要调用readMoreEntries这个方法。在用户视角或者说consumer视角,receive()得到的是一条条的消息,即message,而entry是一个db的概念。

具体而言,如果开启了批量消息,那么一个entry可能实际上是一批message,而ACK是以message为单位,当需要重推批量消息时,实际上是重推整一个entry, dispatcher不会花费cpu时间对entry进行解包、过滤、再组包的过程,而是牺牲网络io和客户端cpu,在sdk侧进行解包和过滤。

dispatcher得到entry之后,进行一些无聊的permit计算,得到合适数量的message后,便通过command.Message发往消费者,以shared模式为例:

protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
    // 核心逻辑如下
    while (entriesToDispatch > 0 && isAtleastOneConsumerAvailable()) {
        Consumer c = getNextConsumer();
        if (c == null) {
            // 无可用消费者:释放剩余 entry,rewind cursor 并返回 false
            entries.subList(start, entries.size()).forEach(Entry::release);
            cursor.rewind();
            lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
            return false;
        }

        int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;

        int maxEntriesInThisBatch = getMaxEntriesInThisBatch(
                remainingMessages,
                c.getMaxUnackedMessages(),
                c.getUnackedMessages(),
                avgBatchSizePerMsg,
                availablePermits,
                serviceConfig.getDispatcherMaxRoundRobinBatchSize()
        );

        int end = Math.min(start + maxEntriesInThisBatch, entries.size());
        List<Entry> entriesForThisConsumer = entries.subList(start, end);

        if (readType == ReadType.Replay) {
            entriesForThisConsumer.forEach(entry ->
                    redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()));
        }

        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
        EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
        EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());

        totalEntries += filterEntriesForConsumer(
                metadataArray, start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks,
                cursor, readType == ReadType.Replay, c);
        totalEntriesProcessed += entriesForThisConsumer.size();

        c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks,
                sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
                sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

        int msgSent = sendMessageInfo.getTotalMessages();
        remainingMessages -= msgSent;
        start += maxEntriesInThisBatch;
        entriesToDispatch -= maxEntriesInThisBatch;

        // 全局可用许可调整:已发送消息数减去批内已 ack 的索引数
        TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
                -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));

        totalMessagesSent += sendMessageInfo.getTotalMessages();
        totalBytesSent += sendMessageInfo.getTotalBytes();
    }

    lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
    acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

    if (entriesToDispatch > 0) {
        // 将未发送的 entries 存入重放队列以便稍后重试
        entries.subList(start, entries.size()).forEach(this::addEntryToReplay);
    }

    return true;
}

我们关注这行代码

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks,
                sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
                sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

这里以consumer为单位接受List, 可以看到batchIndexesAcks这个参数,其中的元素对应pb中的这一个字段

message CommandMessage {
    required uint64 consumer_id       = 1;
    required MessageIdData message_id = 2;
    optional uint32 redelivery_count  = 3 [default = 0];
    repeated int64 ack_set = 4;  // here
    optional uint64 consumer_epoch = 5;
}

对每个entry进行二进制编码后,最终走到的生成消息逻辑如下,metadataAndPayload即为entry的二进制格式

public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long entryId, int partition,
            int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic, long epoch) {
        // 这里command即为CommandMessage
        BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount,
                ackSet, epoch);
        ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
        // 忽略interceptor的逻辑
        return res;
    }

可以看到,一条CommandMessage里实际上带的是一条entry。

consumer recv
#

consumer的receive()是接受单条消息,但是command是以entry为单位发送的,因此需要在 sdk的messageReceived()里面进行解包分离message,并塞到imcommingMessages中。 拆分的时候,便需要使用ackSet进行消息过滤。当然,如果是一条全新的消息entry,ackSet == null,自然就会读出整个batch。

void receiveIndividualMessagesFromBatch(...) {
    for (int i = 0; i < batchSize; ++i) {
        // 把batch里面的每一条都单独读出为message
        final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
                singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
                ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch, isEncrypted);
        //...
        executeNotifyCallback(message); // 入队
    }
}

MessageImpl<V> newSingleMessage(...) {
    // 如果已经被ack了,则跳过
    if (isSingleMessageAcked(ackBitSet, index)) {
        return null;
    }
    // ...
}

有一个ZeroQueueConsumer可以不设缓冲区,这里不再详细展开。

当消费者真正调用receive()之后,就可以直接incomingMessages.poll()取出单条消息来了。