diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 042e03998696c..4e5e12365480c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -660,6 +660,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx, boolean isFindFromLedger); + + /** + * Find the newest entry that matches the given predicate. + * + * @param constraint + * search only active entries or all entries + * @param condition + * predicate that reads an entry an applies a condition + * @param callback + * callback object returning the resultant position + * @param startPosition + * start position to search from. + * @param endPosition + * end position to search to. + * @param ctx + * opaque context + * @param isFindFromLedger + * find the newest entry from ledger + */ + default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + Position startPosition, Position endPosition, FindEntryCallback callback, + Object ctx, boolean isFindFromLedger) { + asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger); + } + /** * reset the cursor to specified position to enable replay of messages. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 934bfba4b0d81..50f5f36b2d53d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1272,27 +1272,55 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate @Override public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { - OpFindNewest op; - Position startPosition = null; - long max = 0; + asyncFindNewestMatching(constraint, condition, null, null, callback, ctx, + isFindFromLedger); + } + + + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + Position start, Position end, FindEntryCallback callback, + Object ctx, boolean isFindFromLedger) { + Position startPosition; switch (constraint) { - case SearchAllAvailableEntries: - startPosition = getFirstPosition(); - max = ledger.getNumberOfEntries() - 1; - break; - case SearchActiveEntries: - startPosition = ledger.getNextValidPosition(markDeletePosition); - max = getNumberOfEntriesInStorage(); - break; - default: - callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx); - return; + case SearchAllAvailableEntries -> + startPosition = start == null ? getFirstPosition() : start; + case SearchActiveEntries -> { + if (start == null) { + startPosition = ledger.getNextValidPosition(markDeletePosition); + } else { + startPosition = start; + startPosition = startPosition.compareTo(markDeletePosition) <= 0 + ? ledger.getNextValidPosition(startPosition) : startPosition; + } + } + default -> { + callback.findEntryFailed( + new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx); + return; + } } + // startPosition can't be null, should never go here. if (startPosition == null) { callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), Optional.empty(), ctx); return; } + // Calculate the end position + Position endPosition = end == null ? ledger.lastConfirmedEntry : end; + endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition; + // Calculate the number of entries between the startPosition and endPosition + long max = 0; + if (startPosition.compareTo(endPosition) <= 0) { + max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition)); + } + + if (max <= 0) { + callback.findEntryComplete(null, ctx); + return; + } + + OpFindNewest op; if (isFindFromLedger) { op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 69b74fcf8f5c1..d3ea98131ad8f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -4873,6 +4873,297 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext()); } + @Test + public void testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(2); + managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", managedLedgerConfig); + @Cleanup + ManagedCursor managedCursor = ledger.openCursor("test"); + + Position position = ledger.addEntry("test".getBytes(Encoding)); + Position position1 = ledger.addEntry("test1".getBytes(Encoding)); + Position position2 = ledger.addEntry("test2".getBytes(Encoding)); + Position position3 = ledger.addEntry("test3".getBytes(Encoding)); + + Predicate condition = entry -> { + try { + Position p = entry.getPosition(); + return p.compareTo(position1) <= 0; + } finally { + entry.release(); + } + }; + + // find the newest entry with start and end position + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference positionRef = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, position2, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef.set(position); + latch.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, true); + + latch.await(); + assertFalse(failed.get()); + assertNotNull(positionRef.get()); + assertEquals(positionRef.get(), position1); + + // find the newest entry with start + AtomicBoolean failed1 = new AtomicBoolean(false); + CountDownLatch latch1 = new CountDownLatch(1); + AtomicReference positionRef1 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, null, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef1.set(position); + latch1.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed1.set(true); + latch1.countDown(); + } + }, null, true); + latch1.await(); + assertFalse(failed1.get()); + assertNotNull(positionRef1.get()); + assertEquals(positionRef1.get(), position1); + + // find the newest entry with end + AtomicBoolean failed2 = new AtomicBoolean(false); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicReference positionRef2 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, position2, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef2.set(position); + latch2.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed2.set(true); + latch2.countDown(); + } + }, null, true); + latch2.await(); + assertFalse(failed2.get()); + assertNotNull(positionRef2.get()); + assertEquals(positionRef2.get(), position1); + + // find the newest entry without start and end position + AtomicBoolean failed3 = new AtomicBoolean(false); + CountDownLatch latch3 = new CountDownLatch(1); + AtomicReference positionRef3 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef3.set(position); + latch3.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed3.set(true); + latch3.countDown(); + } + }, null, true); + latch3.await(); + assertFalse(failed3.get()); + assertNotNull(positionRef3.get()); + assertEquals(positionRef3.get(), position1); + + // find position3 + AtomicBoolean failed4 = new AtomicBoolean(false); + CountDownLatch latch4 = new CountDownLatch(1); + AtomicReference positionRef4 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { + try { + Position p = entry.getPosition(); + return p.compareTo(position3) <= 0; + } finally { + entry.release(); + } + }, position3, position3, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef4.set(position); + latch4.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed4.set(true); + latch4.countDown(); + } + }, null, true); + latch4.await(); + assertFalse(failed4.get()); + assertNotNull(positionRef4.get()); + assertEquals(positionRef4.get(), position3); + } + + + @Test + public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() throws Exception { + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(2); + managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", managedLedgerConfig); + @Cleanup + ManagedCursorImpl managedCursor = (ManagedCursorImpl) ledger.openCursor("test"); + + Position position = ledger.addEntry("test".getBytes(Encoding)); + Position position1 = ledger.addEntry("test1".getBytes(Encoding)); + Position position2 = ledger.addEntry("test2".getBytes(Encoding)); + Position position3 = ledger.addEntry("test3".getBytes(Encoding)); + Position position4 = ledger.addEntry("test4".getBytes(Encoding)); + managedCursor.markDelete(position1); + assertEquals(managedCursor.getNumberOfEntries(), 3); + + Predicate condition = entry -> { + try { + Position p = entry.getPosition(); + return p.compareTo(position3) <= 0; + } finally { + entry.release(); + } + }; + + // find the newest entry with start and end position + AtomicBoolean failed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference positionRef = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, position4, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef.set(position); + latch.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed.set(true); + latch.countDown(); + } + }, null, true); + latch.await(); + assertFalse(failed.get()); + assertNotNull(positionRef.get()); + assertEquals(positionRef.get(), position3); + + // find the newest entry with start + AtomicBoolean failed1 = new AtomicBoolean(false); + CountDownLatch latch1 = new CountDownLatch(1); + AtomicReference positionRef1 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, null, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef1.set(position); + latch1.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed1.set(true); + latch1.countDown(); + } + }, null, true); + + latch1.await(); + assertFalse(failed1.get()); + assertNotNull(positionRef1.get()); + assertEquals(positionRef1.get(), position3); + + // find the newest entry with end + AtomicBoolean failed2 = new AtomicBoolean(false); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicReference positionRef2 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, position4, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef2.set(position); + latch2.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed2.set(true); + latch2.countDown(); + } + }, null, true); + + latch2.await(); + assertFalse(failed2.get()); + assertNotNull(positionRef2.get()); + assertEquals(positionRef2.get(), position3); + + // find the newest entry without start and end position + AtomicBoolean failed3 = new AtomicBoolean(false); + CountDownLatch latch3 = new CountDownLatch(1); + AtomicReference positionRef3 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef3.set(position); + latch3.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed3.set(true); + latch3.countDown(); + } + }, null, true); + latch3.await(); + assertFalse(failed3.get()); + assertNotNull(positionRef3.get()); + assertEquals(positionRef3.get(), position3); + + // find position4 + AtomicBoolean failed4 = new AtomicBoolean(false); + CountDownLatch latch4 = new CountDownLatch(1); + AtomicReference positionRef4 = new AtomicReference<>(); + managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> { + try { + Position p = entry.getPosition(); + return p.compareTo(position4) <= 0; + } finally { + entry.release(); + } + }, position4, position4, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionRef4.set(position); + latch4.countDown(); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + failed4.set(true); + latch4.countDown(); + } + }, null, true); + latch4.await(); + assertFalse(failed4.get()); + assertNotNull(positionRef4.get()); + assertEquals(positionRef4.get(), position4); + } + @Test void testForceCursorRecovery() throws Exception { TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor); diff --git a/pip/pip-401.md b/pip/pip-401.md new file mode 100644 index 0000000000000..8f1bc7851f7da --- /dev/null +++ b/pip/pip-401.md @@ -0,0 +1,141 @@ +# PIP-401: Support set batching configurations for Pulsar Functions&Sources + +# Background knowledge + +Pulsar Functions and Sources enable the batching feature hard-coded, and also set the `batchingMaxPublishDelay` to 10ms, it only +supports set the `batch-builder` for now, this is not suitable for all the use cases, and also not feasible for users. + +# Motivation + +Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users. + +# Goals + +## In Scope + +- Support setting batching configurations for Pulsar Functions&Sources. + +# High Level Design + +Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources. + +# Detailed Design + +## Design & Implementation Details + +- Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message + - `bool enabled` + - `int32 batchingMaxPublishDelayMs` + - `int32 roundRobinRouterBatchingPartitionSwitchFrequency` + - `int32 batchingMaxMessages` + - `int32 batchingMaxBytes` + - `string batchBuilder` +- Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`: + - `bool enabled` + - `int batchingMaxPublishDelayMs` + - `int roundRobinRouterBatchingPartitionSwitchFrequency` + - `int batchingMaxMessages` + - `int batchingMaxBytes` + - `String batchBuilder` + +And related logic also will be added: +- convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa + +To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`, +the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`. + +After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments: + +```shell +./bin/pulsar-admin functions create \ + --tenant public \ + --namespace default \ + --name test-java \ + --className org.apache.pulsar.functions.api.examples.ExclamationFunction \ + --inputs persistent://public/default/test-java-input \ + --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ + --jar /pulsar/examples/api-examples.jar +``` + +```shell +./bin/pulsar-admin sources create \ + --name data-generator-source \ + --source-type data-generator \ + --destination-topic-name persistent://public/default/data-source-topic \ + --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ + --source-config '{"sleepBetweenMessages": "1000"}' +``` + +Users can also use the function config file to set the batching configs for functions: + +```yaml +tenant: "public" +namespace: "default" +name: "test-java" +jar: "/pulsar/examples/api-examples.jar" +className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" +inputs: ["persistent://public/default/test-java-input"] +output: "persistent://public/default/test-java-output" +autoAck: true +parallelism: 1 +producerConfig: + batchingConfig: + enabled: true + batchingMaxPublishDelayMs: 100 + roundRobinRouterBatchingPartitionSwitchFrequency: 10 + batchingMaxMessages: 1000 +``` + +And use source config file to set the batching configs for sources: + +```yaml +tenant: "public" +namespace: "default" +name: "data-generator-source" +topicName: "persistent://public/default/data-source-topic" +archive: "builtin://data-generator" +parallelism: 1 +configs: + sleepBetweenMessages: "5000" +producerConfig: + batchingConfig: + enabled: true + batchingMaxPublishDelayMs: 100 + roundRobinRouterBatchingPartitionSwitchFrequency: 10 + batchingMaxMessages: 1000 +``` + +## Public-facing Changes + +### CLI + + +# Monitoring + + +# Security Considerations + + +# Backward & Forward Compatibility + +## Revert + +No changes are needed to revert to the previous version. + +## Upgrade + +No other changes are needed to upgrade to the new version. + +# Alternatives + +None + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr +* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0b6f0e9418cf9..d27661d0ee65e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2238,6 +2238,24 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Max time before triggering a rollover on a cursor ledger" ) private int managedLedgerCursorRolloverTimeInSeconds = 14400; + + @FieldContext( + category = CATEGORY_STORAGE_ML, + dynamic = true, + doc = "When resetting a subscription by timestamp, the broker will use the" + + " ledger closing timestamp metadata to determine the range of ledgers" + + " to search for the message where the subscription position is reset to. " + + " Since by default, the search condition is based on the message publish time provided by the " + + " client at the publish time, there will be some clock skew between the ledger closing timestamp " + + " metadata and the publish time." + + " This configuration is used to set the max clock skew between the ledger closing" + + " timestamp and the message publish time for finding the range of ledgers to open for searching." + + " The default value is 60000 milliseconds (60 seconds). When set to -1, the broker will not" + + " use the ledger closing timestamp metadata to determine the range of ledgers to search for the" + + " message." + ) + private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = 60000; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index 08273155e4cfa..5a4631cf205f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -25,6 +25,9 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; @@ -37,6 +40,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback { private final ManagedCursor cursor; private final String subName; + private final int ledgerCloseTimestampMaxClockSkewMillis; private final String topicName; private long timestamp = 0; @@ -48,19 +52,23 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback AtomicIntegerFieldUpdater .newUpdater(PersistentMessageFinder.class, "messageFindInProgress"); - public PersistentMessageFinder(String topicName, ManagedCursor cursor) { + public PersistentMessageFinder(String topicName, ManagedCursor cursor, int ledgerCloseTimestampMaxClockSkewMillis) { this.topicName = topicName; this.cursor = cursor; this.subName = Codec.decode(cursor.getName()); + this.ledgerCloseTimestampMaxClockSkewMillis = ledgerCloseTimestampMaxClockSkewMillis; } public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback callback) { - this.timestamp = timestamp; if (messageFindInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { + this.timestamp = timestamp; if (log.isDebugEnabled()) { log.debug("[{}] Starting message position find at timestamp {}", subName, timestamp); } - + Pair range = + getFindPositionRange(cursor.getManagedLedger().getLedgersInfo().values(), + cursor.getManagedLedger().getLastConfirmedEntry(), timestamp, + ledgerCloseTimestampMaxClockSkewMillis); cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { try { long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); @@ -71,7 +79,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback entry.release(); } return false; - }, this, callback, true); + }, range.getLeft(), range.getRight(), this, callback, true); } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName, @@ -83,6 +91,59 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback } } + public static Pair getFindPositionRange(Iterable ledgerInfos, + Position lastConfirmedEntry, long targetTimestamp, + int ledgerCloseTimestampMaxClockSkewMillis) { + if (ledgerCloseTimestampMaxClockSkewMillis < 0) { + // this feature is disabled when the value is negative + return Pair.of(null, null); + } + + long targetTimestampMin = targetTimestamp - ledgerCloseTimestampMaxClockSkewMillis; + long targetTimestampMax = targetTimestamp + ledgerCloseTimestampMaxClockSkewMillis; + + Position start = null; + Position end = null; + + LedgerInfo secondToLastLedgerInfo = null; + LedgerInfo lastLedgerInfo = null; + for (LedgerInfo info : ledgerInfos) { + if (!info.hasTimestamp()) { + // unexpected case, don't set start and end + return Pair.of(null, null); + } + secondToLastLedgerInfo = lastLedgerInfo; + lastLedgerInfo = info; + long closeTimestamp = info.getTimestamp(); + // For an open ledger, closeTimestamp is 0 + if (closeTimestamp == 0) { + end = null; + break; + } + if (closeTimestamp <= targetTimestampMin) { + start = PositionFactory.create(info.getLedgerId(), 0); + } else if (closeTimestamp > targetTimestampMax) { + // If the close timestamp is greater than the timestamp + end = PositionFactory.create(info.getLedgerId(), info.getEntries() - 1); + break; + } + } + // If the second-to-last ledger's close timestamp is less than the target timestamp, then start from the + // first entry of the last ledger when there are confirmed entries in the ledger + if (lastLedgerInfo != null && secondToLastLedgerInfo != null + && secondToLastLedgerInfo.getTimestamp() > 0 + && secondToLastLedgerInfo.getTimestamp() < targetTimestampMin) { + Position firstPositionInLedger = PositionFactory.create(lastLedgerInfo.getLedgerId(), 0); + if (lastConfirmedEntry != null + && lastConfirmedEntry.compareTo(firstPositionInLedger) >= 0) { + start = firstPositionInLedger; + } else { + start = lastConfirmedEntry; + } + } + return Pair.of(start, end); + } + private static final Logger log = LoggerFactory.getLogger(PersistentMessageFinder.class); @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index b5a1a9db5deb1..a96a7e75506eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -134,6 +134,7 @@ public class PersistentSubscription extends AbstractSubscription { private volatile CompletableFuture fenceFuture; private volatile CompletableFuture inProgressResetCursorFuture; private volatile Boolean replicatedControlled; + private final ServiceConfiguration config; static Map getBaseCursorProperties(Boolean isReplicated) { return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : @@ -156,6 +157,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, Boolean replicated, Map subscriptionProperties) { this.topic = topic; + this.config = topic.getBrokerService().getPulsar().getConfig(); this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; @@ -166,7 +168,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma } this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); - if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() + if (config.isTransactionCoordinatorEnabled() && !isEventSystemTopic(TopicName.get(topicName)) && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) { this.pendingAckHandle = new PendingAckHandleImpl(this); @@ -203,7 +205,6 @@ public boolean isReplicated() { public boolean setReplicated(boolean replicated) { replicatedControlled = replicated; - ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { this.replicatedSubscriptionSnapshotCache = null; @@ -261,7 +262,6 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { case Shared: if (dispatcher == null || dispatcher.getType() != SubType.Shared) { previousDispatcher = dispatcher; - ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (config.isSubscriptionSharedUseClassicPersistentImplementation()) { dispatcher = new PersistentDispatcherMultipleConsumersClassic(topic, cursor, this); } else { @@ -290,7 +290,6 @@ private CompletableFuture addConsumerInternal(Consumer consumer) { || !((StickyKeyDispatcher) dispatcher) .hasSameKeySharedPolicy(ksm)) { previousDispatcher = dispatcher; - ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (config.isSubscriptionKeySharedUseClassicPersistentImplementation()) { dispatcher = new PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor, @@ -426,7 +425,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { pendingAckHandle.clearIndividualPosition(position); @@ -602,10 +601,9 @@ public CompletableFuture analyzeBacklog(Optional final EntryFilterSupport entryFilterSupport = dispatcher != null ? (EntryFilterSupport) dispatcher : new EntryFilterSupport(this); // we put some hard limits on the scan, in order to prevent denial of services - ServiceConfiguration configuration = topic.getBrokerService().getPulsar().getConfiguration(); - long maxEntries = configuration.getSubscriptionBacklogScanMaxEntries(); - long timeOutMs = configuration.getSubscriptionBacklogScanMaxTimeMs(); - int batchSize = configuration.getDispatcherMaxReadBatchSize(); + long maxEntries = config.getSubscriptionBacklogScanMaxEntries(); + long timeOutMs = config.getSubscriptionBacklogScanMaxTimeMs(); + int batchSize = config.getDispatcherMaxReadBatchSize(); AtomicReference firstPosition = new AtomicReference<>(); AtomicReference lastPosition = new AtomicReference<>(); final Predicate condition = entry -> { @@ -780,7 +778,8 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { @Override public CompletableFuture resetCursor(long timestamp) { CompletableFuture future = new CompletableFuture<>(); - PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor); + PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor, + config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); if (log.isDebugEnabled()) { log.debug("[{}][{}] Resetting subscription to timestamp {}", topicName, subName, timestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java index 3866130c64be5..b57b5ce94be42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java @@ -470,7 +470,6 @@ public void release(ByteBuf processedPayload) { var addEntryCallback = new AsyncCallbacks.AddEntryCallback() { @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { - entryData.release(); countDownLatch.countDown(); successCount.incrementAndGet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 176a799292ac3..6f2f1f3a1a2c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -138,7 +139,7 @@ void reset() { } CompletableFuture findMessage(final Result result, final ManagedCursor c1, final long timestamp) { - PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1); + PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1, 0); final CompletableFuture future = new CompletableFuture<>(); messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() { @@ -217,7 +218,7 @@ void testPersistentMessageFinder() throws Exception { assertNotEquals(result.position, null); assertEquals(result.position, lastPosition); - PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1); + PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1, 0); final AtomicBoolean ex = new AtomicBoolean(false); messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(), new AsyncCallbacks.FindEntryCallback() { @@ -589,4 +590,241 @@ public void test() { resetCursorData.setExcluded(true); System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON)); } + + @Test + public void testGetFindPositionRange_EmptyLedgerInfos() { + List ledgerInfos = new ArrayList<>(); + Position lastConfirmedEntry = null; + long targetTimestamp = 2000; + Pair range = + PersistentMessageFinder.getFindPositionRange(ledgerInfos, lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNull(range.getLeft()); + assertNull(range.getRight()); + } + + @Test + public void testGetFindPositionRange_AllTimestampsLessThanTarget() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build()); + Position lastConfirmedEntry = PositionFactory.create(2, 9); + + long targetTimestamp = 2000; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); + } + + @Test + public void testGetFindPositionRange_LastTimestampIsZero() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(3, 5); + + long targetTimestamp = 2000; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(3, 0)); + } + + @Test + public void testGetFindPositionRange_LastTimestampIsZeroWithNoEntries() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(1500).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(2, 9); + + long targetTimestamp = 2000; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(2, 9)); + } + + @Test + public void testGetFindPositionRange_AllTimestampsGreaterThanTarget() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(3000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(4000).build()); + Position lastConfirmedEntry = PositionFactory.create(2, 9); + + long targetTimestamp = 2000; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getRight(), PositionFactory.create(1, 9)); + } + + @Test + public void testGetFindPositionRange_MixedTimestamps() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build()); + Position lastConfirmedEntry = PositionFactory.create(3, 9); + + long targetTimestamp = 2500; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(3, 0)); + assertEquals(range.getRight(), PositionFactory.create(3, 9)); + } + + @Test + public void testGetFindPositionRange_TimestampAtBoundary() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build()); + Position lastConfirmedEntry = PositionFactory.create(4, 9); + + long targetTimestamp = 3000; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(3, 0)); + // there might be entries in the next ledger with the same timestamp as the target timestamp, even though + // the close timestamp of ledger 3 is equals to the target timestamp + assertEquals(range.getRight(), PositionFactory.create(4, 9)); + } + + @Test + public void testGetFindPositionRange_ClockSkew() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(5, 5); + + long targetTimestamp = 2009; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 10); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(1, 0)); + assertEquals(range.getRight(), PositionFactory.create(4, 9)); + } + + @Test + public void testGetFindPositionRange_ClockSkewCase2() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(5, 5); + + long targetTimestamp = 2995; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 10); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); + assertEquals(range.getRight(), PositionFactory.create(4, 9)); + } + + @Test + public void testGetFindPositionRange_ClockSkewCase3() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(3000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(5, 5); + + long targetTimestamp = 3005; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 10); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNotNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(2, 0)); + assertEquals(range.getRight(), PositionFactory.create(4, 9)); + } + + @Test + public void testGetFindPositionRange_FeatureDisabledWithNegativeClockSkew() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(2).setEntries(10).setTimestamp(2000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(3).setEntries(10).setTimestamp(2010).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(4).setEntries(10).setTimestamp(4000).build()); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(5).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(5, 5); + + long targetTimestamp = 2009; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, -1); + + assertNotNull(range); + assertNull(range.getLeft()); + assertNull(range.getRight()); + } + + @Test + public void testGetFindPositionRange_SingleLedger() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setTimestamp(0).build()); + Position lastConfirmedEntry = PositionFactory.create(1, 5); + + long targetTimestamp = 2500; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNull(range.getLeft()); + assertNull(range.getRight()); + } + + @Test + public void testGetFindPositionRange_SingleClosedLedger() { + List ledgerInfos = new ArrayList<>(); + ledgerInfos.add(LedgerInfo.newBuilder().setLedgerId(1).setEntries(10).setTimestamp(1000).build()); + Position lastConfirmedEntry = PositionFactory.create(1, 9); + + long targetTimestamp = 2500; + Pair range = PersistentMessageFinder.getFindPositionRange(ledgerInfos, + lastConfirmedEntry, targetTimestamp, 0); + + assertNotNull(range); + assertNotNull(range.getLeft()); + assertNull(range.getRight()); + assertEquals(range.getLeft(), PositionFactory.create(1, 0)); + } }