Skip to content

Commit

Permalink
[fix][txn]: fix transaction pending ack store managed ledger WriteFai…
Browse files Browse the repository at this point in the history
…l state (apache#14738)

like apache#10711

```
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:704) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
	at org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore$2.addFailed(MLPendingAckStore.java:286) ~[io.streamnative-pulsar-broker-2.9.2.5.jar:2.9.2.5]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.failed(OpAddEntry.java:138) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:743) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:708) ~[io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [io.streamnative-managed-ledger-2.9.2.5.jar:2.9.2.5]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.4.jar:4.14.4]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Waiting to recover from failure
	... 10 more
```

## Motivation
when transaction pending ack managed ledger state become WriteFailed state, should `readyToCreateNewLedger`.

## implement

append fail check the managedLedger state and the exception do `readyToCreateNewLedger`
### Verifying this change
Add the tests for it

(cherry picked from commit 6c3711c)
  • Loading branch information
congbobo184 authored and codelipenghui committed Mar 19, 2022
1 parent a1be5e0 commit 6d3fbbe
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] MLPendingAckStore message append fail exception : {}, operation : {}",
managedLedger.getName(), ctx, exception, pendingAckMetadataEntry.getPendingAckOp());

if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
buf.release();
completableFuture.completeExceptionally(new PersistenceException(exception));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.pendingack;

import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.fail;

public class PendingAckMetadataTest extends MockedBookKeeperTestCase {

public PendingAckMetadataTest() {
super(3);
}

@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix("test", "test");
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);

CompletableFuture<ManagedLedger> completableFuture = new CompletableFuture<>();
factory.asyncOpen(pendingAckTopicName, new AsyncCallbacks.OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
completableFuture.complete(ledger);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {

}
}, null);

ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
new MLPendingAckStore(completableFuture.get(), cursor, subCursor);

Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(pendingAckStore);
field = ManagedLedgerImpl.class.getDeclaredField("STATE_UPDATER");
field.setAccessible(true);
AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State> state =
(AtomicReferenceFieldUpdater<ManagedLedgerImpl, ManagedLedgerImpl.State>) field.get(managedLedger);
state.set(managedLedger, WriteFailed);
try {
pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause().getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
}
pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();

completableFuture.get().close();
cursor.close();
subCursor.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand Down Expand Up @@ -163,9 +161,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Transaction log write transaction operation error", exception);
if (exception instanceof ManagedLedgerAlreadyClosedException
&& managedLedger instanceof ManagedLedgerImpl
&& State.WriteFailed == ((ManagedLedgerImpl) managedLedger).getState()) {
if (exception instanceof ManagedLedgerAlreadyClosedException) {
managedLedger.readyToCreateNewLedger();
}
buf.release();
Expand Down

0 comments on commit 6d3fbbe

Please sign in to comment.