Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-298 Consumer supports specifying consumption isolation level #21114

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions pip/pip-298.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
## Background

In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the messages sent by aborted transactions will get filtered by the Transaction Buffer.

## Motivation

Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration for consumers, we can enhance the flexibility of Pulsar transactions.

## Goal

### In Scope

Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions.Allow consumers to configure isolation levels during the building process.

### Out of Scope

None.

## API Changes

Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted.

### Before the Change

The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation process might look like this:

```
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
```

### After the Change

Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted. Introduce a new method isolationLevel() in the consumer builder, which accepts an enumeration value representing the isolation level:

```
public enum IsolationLevel {
READ_COMMITTED,
READ_UNCOMMITTED
}
```

Then, modify the consumer creation process to include the new isolation level configuration:

```
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding the isolation level configuration
.subscribe();
```

With this change, users can now choose between Read Committed and Read Uncommitted isolation levels when creating a new
consumer. If the isolationLevel() method is not called during the builder process, the default isolation level will be
Read Committed.
Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be
configured with the same IsolationLevel.

## Implementation

### Client Changes
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved

Update the PulsarConsumer builder to accept isolation level configurations for Read Committed and Read Uncommitted levels.

In order to achieve the above goals, the following modifications need to be made:

- Added `IsolationLevel` related fields and methods in `ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl`

- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel

```
message CommandSubscribe {

enum IsolationLevel {
READ_COMMITTED = 0;
READ_UNCOMMITTED = 1;
}
optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED];
}
```

### Broker changes

Modify the transaction buffer and dispatching mechanisms to handle messages based on the chosen isolation level.

In order to achieve the above goals, the following modifications need to be made:

- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` and `PersistentDispatcherMultipleConsumers`:

- If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, transactionBuffer.getMaxReadPosition()

- If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST


## Reject Alternatives

None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original template there is "# Monitoring".
One aspect of monitoring IMO is to know for each subscription, it's isolation level. Can you please describe how users will be able to know that? What is changing in "## Public-facing Changes״ ׳ which allows me to know that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @asafm , Thank you for your suggestion.
According to your suggestion, I have made the following two improvements:

  1. Added a new section ##Public-facing Changes to describe changes to the API.
  2. Added a new #Monitor section. Users can monitor the newly added subscriptionStats.subscriptionIsolationLevel through the admin tool.
    When you are free, please help and take a look again