Skip to content

[feat][client]Add Fetch Consumer Support and Expose Continuous Sequence ID (BrokerEntryMetadata Index) In MessageId #24176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Apr 11, 2025

Motivation

  1. Current Limitations:
    • Pulsar lacks native support for Fetch Consumer-style consumption, which limits flexibility in scenarios requiring explicit message retrieval.
    • message ID are discontinuous, complicating use cases that rely on strict sequential tracking (e.g., auditing, replay, or stateful processing).
  2. Opportunity:
    • The existing broker entry metadata contains an index field that can serve as a continuous sequence identifier. However, this metadata is not propagated to clients, leaving it underutilized.

Modifications

  1. Modify MessageIdData:

    • Modified the broker-to-client message pipeline to embed the broker entry metadata index field into MessageIdData.
  2. Client API Exposure:

    • Expose the broker entry metadata's index as a continuous sequence ID in messageIdAdv.

    • Clients can now access this via:

      long sequenceId = messageIdAdv.getIndex();  
  3. Fetch Consumer Support:

    • Implemented logic for fetch conumer which can explicit aquire messages.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Expose Continuous Sequence ID  (BrokerEntryMetadata Index) to client
@liangyepianzhou liangyepianzhou self-assigned this Apr 11, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 11, 2025
@lhotari
Copy link
Member

lhotari commented Apr 11, 2025

@liangyepianzhou before putting too much effort in implementation or a PIP, I'd suggest starting a discussion first whether this would be accepted by the Pulsar community. My first thought is that it's not a great idea to expose such features to clients and add new abstractions.

@liangyepianzhou
Copy link
Contributor Author

@liangyepianzhou before putting too much effort in implementation or a PIP, I'd suggest starting a discussion first whether this would be accepted by the Pulsar community. My first thought is that it's not a great idea to expose such features to clients and add new abstractions.

@lhotari Thank you for your suggestion. I plan to split this Draft PR into two PIPs and proceed with subsequent code development only after community discussion and approval:

  1. Store the offset in the client's MessageIdAdv. Since users typically receive MessageId, this won't introduce new client abstractions.

  2. For the Fetch Consumer feature, I've prepared two approaches:

    • If the Pulsar community is willing to accept this feature (similar to how RocketMQ and Kafka maintain both Pull/Fetch Consumer and Push Consumer), then we'll merge it into the community.
    • If the Pulsar community prefers not to support Pull/Fetch Consumer, then we'll add this feature to the Contrib Repository instead.

@lhotari
Copy link
Member

lhotari commented Apr 14, 2025

I plan to split this Draft PR into two PIPs and proceed with subsequent code development only after community discussion and approval:

@liangyepianzhou Please be prepared for a different type of handling for this type of proposal.

The first step would be to expand the description which is vague at the moment.

  • Pulsar lacks native support for Fetch Consumer-style consumption, which limits flexibility in scenarios requiring explicit message retrieval.

What flexibility is limited since Pulsar supports explicit message retrieval? Please show a concrete example.

  • message ID are discontinuous, complicating use cases that rely on strict sequential tracking (e.g., auditing, replay, or stateful processing).

Offsets are discontinuous even in Kafka. Gaps in the Kafka offset can be a result of compaction or aborted transactions, for example. I don't see how Pulsar's message id is much different from Kafka's offset in that perspective.

A completely different matter is to efficiently retrieve messages for a given sequence. There's some on going work with PIP-404 related to that. It's targeted to the Kafka-on-Pulsar protocol extension, but could be an inspiration for other secondary search indexes that might be useful for locating entries efficiently.

  1. For the Fetch Consumer feature, I've prepared two approaches:
    • If the Pulsar community is willing to accept this feature (similar to how RocketMQ and Kafka maintain both Pull/Fetch Consumer and Push Consumer), then we'll merge it into the community.

Comparing to RocketMQ or Kafka is not a proper way to justify adding this to Pulsar. Pulsar has different concepts and the current changes in this draft PR would break consistency of the existing concepts. Focusing on your use case and problem to solve is a better way to start addressing what you want to truly achieve.

  • If the Pulsar community prefers not to support Pull/Fetch Consumer, then we'll add this feature to the Contrib Repository instead.

This statement assumes that you already have the correct solution. I'd suggest to think beyond what you have come up with in this draft PR and focus providing more details about the use case and problem to solve so that we'd be able to address that in a consistent way that makes sense for Pulsar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants