Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-298 Consumer supports specifying consumption isolation level #21114

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 113 additions & 21 deletions pip/pip-298.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,106 @@
## Background
# Background

In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the messages sent by aborted transactions will get filtered by the Transaction Buffer.
In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent
consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer
works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the
maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the
messages sent by aborted transactions will get filtered by the Transaction Buffer.

## Motivation
# Motivation

Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration for consumers, we can enhance the flexibility of Pulsar transactions.
Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration
for consumers, we can enhance the flexibility of Pulsar transactions.

## Goal
Let's consider an example:

### In Scope
**System**: Financial Transaction System

Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions.Allow consumers to configure isolation levels during the building process.
**Operations**: Large volume of deposit and withdrawal operations, a
small number of transfer operations.

### Out of Scope
**Roles**:

- **Client A1**
- **Client A2**
- **User Account B1**
- **User Account B2**
- **Request Topic C**
- **Real-time Monitoring System D**
- **Business Processing System E**

**Client Operations**:

- **Withdrawal**: Client A1 decreases the deposit amount from User
Account B1 or B2.
- **Deposit**: Client A1 increases the deposit amount in User Account B1 or B2.
- **Transfer**: Client A2 decreases the deposit amount from User
Account B1 and increases it in User Account B2. Or vice versa.

**Real-time Monitoring System D**: Obtains the latest data from
Request Topic C as quickly as possible to monitor transaction data and
changes in bank reserves in real-time. This is necessary for the
timely detection of anomalies and real-time decision-making.

**Business Processing System E**: Reads data from Request Topic C,
then actually operates User Accounts B1, B2.

**User Scenario**: Client A1 sends a large number of deposit and
withdrawal requests to Request Topic C. Client A2 writes a small
number of transfer requests to Request Topic C.

In this case, Business Processing System E needs a read-committed
isolation level to ensure operation consistency and Exactly Once
semantics. The real-time monitoring system does not care if a small
number of transfer requests are incomplete (dirty data). What it
cannot tolerate is a situation where a large number of deposit and
withdrawal requests cannot be presented in real time due to a small
number of transfer requests (the current situation is that uncommitted
transaction messages can block the reading of committed transaction
messages).

In this case, it is necessary to set different isolation levels for
different consumers/subscriptions.
The uncommitted transactions do not impact actual users' bank accounts.
Business Processing System E only reads committed transactional
messages and operates users' accounts. It needs Exactly-once semantic.
Real-time Monitoring System D reads uncommitted transactional
messages. It does not need Exactly-once semantic.

They use different subscriptions and choose different isolation
levels. One needs transaction, one does not.
In general, multiple subscriptions of the same topic do not all
require transaction guarantees.
Some want low latency without the exact-once semantic guarantee, and
some must require the exactly-once guarantee.
We just provide a new option for different subscriptions.

# Goal

## In Scope

Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions. Allow consumers to configure
isolation levels during the building process.

## Out of Scope

None.

## API Changes
# High Level Design

Add a configuration 'subscriptionIsolationLevel' in the consumer builder to allow users to choose different transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Do we keep a table of client features each SDK needs to implement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isolation levels.

# Detailed Design

Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted.
## Public-facing Changes

Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read
Uncommitted.

### Before the Change

The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation process might look like this:
The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation
process might look like this:

```
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Expand All @@ -36,12 +114,17 @@ Consumer<String> consumer = client.newConsumer(Schema.STRING)

### After the Change

Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted. Introduce a new method isolationLevel() in the consumer builder, which accepts an enumeration value representing the isolation level:
Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read
Uncommitted. Introduce a new method subscriptionIsolationLevel() in the consumer builder, which accepts an enumeration
value representing the isolation level:

```
public enum IsolationLevel {
public enum SubscriptionIsolationLevel {
// Consumer can only consume all transactional messages which have been committed.
READ_COMMITTED,
READ_UNCOMMITTED

// Consumer can consume all messages, even transactional messages which have been aborted.
READ_UNCOMMITTED;
}
```

Expand All @@ -54,7 +137,7 @@ Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding the isolation level configuration
.subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_COMMITTED) // Adding the isolation level configuration
.subscribe();
```

Expand All @@ -64,7 +147,7 @@ Read Committed.
Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be
configured with the same IsolationLevel.

## Implementation
## Design & Implementation Details

### Client Changes
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -93,13 +176,22 @@ Modify the transaction buffer and dispatching mechanisms to handle messages base

In order to achieve the above goals, the following modifications need to be made:

- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` and `PersistentDispatcherMultipleConsumers`:
- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer`
and `PersistentDispatcherMultipleConsumers`:

- If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is,
transactionBuffer.getMaxReadPosition()

- If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST

- If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, transactionBuffer.getMaxReadPosition()
- Add a new metrics `subscriptionIsolationLevel` in `SubscriptionStatsImpl`.

- If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST
# Monitoring
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good


After this PIP, Users can query the subscription stats of a topic through the admin tool, and observe the `subscriptionIsolationLevel` in the subscription stats to determine the isolation level of the subscription.

## Reject Alternatives
# Links

None
* Mailing List discussion thread: https://lists.apache.org/thread/8ny0qtp7m9qcdbvnfjdvpnkc4c5ssyld
* Mailing List voting thread: https://lists.apache.org/thread/4q1hrv466h8w9ccpf4moxt6jv1jxp1mr
* Document link: https://github.com/apache/pulsar-site/pull/712