Skip to content

Commit

Permalink
[fix][broker] Fix shadow topics cannot be consumed when the entry is …
Browse files Browse the repository at this point in the history
…not cached (apache#23147)

### Motivation

For shadow topics, a `ReadOnlyLedgerHandle` is created to read messages from the source topic when the entry is not cached. However, it leverages the `readAsync` API that validates the `lastAddConfirmed` field (LAC). In `ReadOnlyLedgerHandle`, this field could never be updated, so `readAsync` could fail immediately. See `LedgerHandle#readAsync`:

```java
if (lastEntry > lastAddConfirmed) {
    LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastEntry:{} lastAddConfirmed:{}",
            ledgerId, firstEntry, lastEntry, lastAddConfirmed);
    return FutureUtils.exception(new BKReadException());
}
```

This bug is not exposed because:
1. `PulsarMockReadHandle` does not maintain a LAC field.
2. The case for cache miss is never tested.

### Modifications

Replace `readAsync` with `readUnconfirmedAsync` and compare the entry range with the `ManagedLedger#getLastConfirmedEntry`. The managed ledger already maintains a `lastConfirmedEntry` to limit the last entry. See `ManagedLedgerImpl#internalReadFromLedger`:

```java
Position lastPosition = lastConfirmedEntry;

if (ledger.getId() == lastPosition.getLedgerId()) {
    lastEntryInLedger = lastPosition.getEntryId();
```

Add `ShadowTopicRealBkTest` to cover two code changes `RangeEntryCacheImpl#readFromStorage` and `EntryCache#asyncReadEntry`.

Exceptionally, compare the entry range with the LAC of a ledger handle when it does not exist in the managed ledger. It's because `ReadOnlyManagedLedgerImpl` could read a ledger in another managed ledger.

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: BewareMyPower#33

<!--
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.

apache/pulsar pull requests should be first tested in your own fork since the 
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in 
a forked repository.

The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
  • Loading branch information
BewareMyPower authored Aug 15, 2024
1 parent ce38ee2 commit 15b88d2
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4050,6 +4050,8 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
public static ManagedLedgerException createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
} else if (t instanceof ManagedLedgerException) {
return (ManagedLedgerException) t;
} else if (t instanceof CompletionException
&& !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
Expand Down Expand Up @@ -429,7 +429,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;

class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
// ReadOnlyManagedLedgerImpl object.
return handle.readAsync(firstEntry, lastEntry);
}
// Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache
// of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
final var lastConfirmedEntry = ml.getLastConfirmedEntry();
if (lastConfirmedEntry == null) {
return CompletableFuture.failedFuture(new ManagedLedgerException(
"LastConfirmedEntry is null when reading ledger " + handle.getId()));
}
if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading ledger " + handle.getId()));
}
if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) {
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " + lastEntry));
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -45,6 +46,7 @@
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -392,6 +394,9 @@ void entryCacheDisabledAsyncReadEntry() throws Exception {
EntryCache entryCache = cacheManager.getEntryCache(ml1);

final CountDownLatch counter = new CountDownLatch(1);
when(ml1.getLastConfirmedEntry()).thenReturn(PositionFactory.create(1L, 1L));
when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock(
MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
entryCache.asyncReadEntry(lh, PositionFactory.create(1L,1L), new AsyncCallbacks.ReadEntryCallback() {
public void readEntryComplete(Entry entry, Object ctx) {
Assert.assertNotEquals(entry, null);
Expand All @@ -406,7 +411,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}, null);
counter.await();

verify(lh).readAsync(anyLong(), anyLong());
verify(lh).readUnconfirmedAsync(anyLong(), anyLong());
}

}
Loading

0 comments on commit 15b88d2

Please sign in to comment.