diff --git a/pip/pip-298.md b/pip/pip-298.md new file mode 100644 index 0000000000000..a0953ad03a3cd --- /dev/null +++ b/pip/pip-298.md @@ -0,0 +1,197 @@ +# Background + +In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent +consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer +works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the +maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the +messages sent by aborted transactions will get filtered by the Transaction Buffer. + +# Motivation + +Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration +for consumers, we can enhance the flexibility of Pulsar transactions. + +Let's consider an example: + +**System**: Financial Transaction System + +**Operations**: Large volume of deposit and withdrawal operations, a +small number of transfer operations. + +**Roles**: + +- **Client A1** +- **Client A2** +- **User Account B1** +- **User Account B2** +- **Request Topic C** +- **Real-time Monitoring System D** +- **Business Processing System E** + +**Client Operations**: + +- **Withdrawal**: Client A1 decreases the deposit amount from User + Account B1 or B2. +- **Deposit**: Client A1 increases the deposit amount in User Account B1 or B2. +- **Transfer**: Client A2 decreases the deposit amount from User + Account B1 and increases it in User Account B2. Or vice versa. + +**Real-time Monitoring System D**: Obtains the latest data from +Request Topic C as quickly as possible to monitor transaction data and +changes in bank reserves in real-time. This is necessary for the +timely detection of anomalies and real-time decision-making. + +**Business Processing System E**: Reads data from Request Topic C, +then actually operates User Accounts B1, B2. + +**User Scenario**: Client A1 sends a large number of deposit and +withdrawal requests to Request Topic C. Client A2 writes a small +number of transfer requests to Request Topic C. + +In this case, Business Processing System E needs a read-committed +isolation level to ensure operation consistency and Exactly Once +semantics. The real-time monitoring system does not care if a small +number of transfer requests are incomplete (dirty data). What it +cannot tolerate is a situation where a large number of deposit and +withdrawal requests cannot be presented in real time due to a small +number of transfer requests (the current situation is that uncommitted +transaction messages can block the reading of committed transaction +messages). + +In this case, it is necessary to set different isolation levels for +different consumers/subscriptions. +The uncommitted transactions do not impact actual users' bank accounts. +Business Processing System E only reads committed transactional +messages and operates users' accounts. It needs Exactly-once semantic. +Real-time Monitoring System D reads uncommitted transactional +messages. It does not need Exactly-once semantic. + +They use different subscriptions and choose different isolation +levels. One needs transaction, one does not. +In general, multiple subscriptions of the same topic do not all +require transaction guarantees. +Some want low latency without the exact-once semantic guarantee, and +some must require the exactly-once guarantee. +We just provide a new option for different subscriptions. + +# Goal + +## In Scope + +Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions. Allow consumers to configure +isolation levels during the building process. + +## Out of Scope + +None. + +# High Level Design + +Add a configuration 'subscriptionIsolationLevel' in the consumer builder to allow users to choose different transaction +isolation levels. + +# Detailed Design + +## Public-facing Changes + +Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read +Uncommitted. + +### Before the Change + +The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation +process might look like this: + +``` +PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + +Consumer consumer = client.newConsumer(Schema.STRING) + .topic("persistent://my-tenant/my-namespace/my-topic") + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); +``` + +### After the Change + +Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read +Uncommitted. Introduce a new method subscriptionIsolationLevel() in the consumer builder, which accepts an enumeration +value representing the isolation level: + +``` +public enum SubscriptionIsolationLevel { + // Consumer can only consume all transactional messages which have been committed. + READ_COMMITTED, + + // Consumer can consume all messages, even transactional messages which have been aborted. + READ_UNCOMMITTED; +} +``` + +Then, modify the consumer creation process to include the new isolation level configuration: + +``` +PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + +Consumer consumer = client.newConsumer(Schema.STRING) + .topic("persistent://my-tenant/my-namespace/my-topic") + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_COMMITTED) // Adding the isolation level configuration + .subscribe(); +``` + +With this change, users can now choose between Read Committed and Read Uncommitted isolation levels when creating a new +consumer. If the isolationLevel() method is not called during the builder process, the default isolation level will be +Read Committed. +Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be +configured with the same IsolationLevel. + +## Design & Implementation Details + +### Client Changes + +Update the PulsarConsumer builder to accept isolation level configurations for Read Committed and Read Uncommitted levels. + +In order to achieve the above goals, the following modifications need to be made: + +- Added `IsolationLevel` related fields and methods in `ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl` + +- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel + +``` +message CommandSubscribe { + + enum IsolationLevel { + READ_COMMITTED = 0; + READ_UNCOMMITTED = 1; + } + optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; +} +``` + +### Broker changes + +Modify the transaction buffer and dispatching mechanisms to handle messages based on the chosen isolation level. + +In order to achieve the above goals, the following modifications need to be made: + +- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` + and `PersistentDispatcherMultipleConsumers`: + + - If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, + transactionBuffer.getMaxReadPosition() + + - If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST + +- Add a new metrics `subscriptionIsolationLevel` in `SubscriptionStatsImpl`. + +# Monitoring + +After this PIP, Users can query the subscription stats of a topic through the admin tool, and observe the `subscriptionIsolationLevel` in the subscription stats to determine the isolation level of the subscription. + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/8ny0qtp7m9qcdbvnfjdvpnkc4c5ssyld +* Mailing List voting thread: https://lists.apache.org/thread/4q1hrv466h8w9ccpf4moxt6jv1jxp1mr +* Document link: https://github.com/apache/pulsar-site/pull/712