Skip to content

Commit

Permalink
fix update txn status concurrently throw exception
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed Apr 10, 2023
1 parent 2ddfbfe commit 55210af
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,14 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
appendLogCount.increment();
try {
synchronized (txnMetaListPair.getLeft()) {
if (txnMetaListPair.getLeft().status() == newStatus) {
transactionLog.deletePosition(Collections.singletonList(position));
log.info("TxnID : {} has update txn status to {} repeatedly.",
txnMetaListPair.getLeft().id().toString(),
txnMetaListPair.getLeft().status().name());
promise.complete(null);
return;
}
txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
txnMetaListPair.getRight().add(position);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,71 @@ public void testTransactionOperation(TxnLogBufferedWriterConfig txnLogBufferedWr
}
}

@Test
public void testUpdateTxnStatusConcurrently() throws Exception {
TxnLogBufferedWriterConfig disabled = new TxnLogBufferedWriterConfig();
disabled.setBatchEnabled(false);

ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig, disabled, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator, 0L);
transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();

if (transactionMetadataStore.checkIfReady()) {
TxnID txnID = transactionMetadataStore.newTransaction(5000, null).get();
assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.OPEN);

List<String> partitions = new ArrayList<>();
partitions.add("pt-1");
partitions.add("pt-2");
transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get();
assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(), partitions);

// update txn status OPEN->COMMITTING concurrently
List<CompletableFuture<Void>> list = new ArrayList<>();
list.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false));
list.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false));

try {
FutureUtil.waitForAll(list).get();
} catch (ExecutionException e) {
fail();
}
Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.COMMITTING);

// update txn status COMMITTING->COMMITTED concurrently
List<CompletableFuture<Void>> list2 = new ArrayList<>();
list2.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false));
list2.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false));

try {
FutureUtil.waitForAll(list2).get();
} catch (ExecutionException e) {
fail();
}

try {
transactionMetadataStore.getTxnMeta(txnID).get();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
}
}

@DataProvider(name = "isUseManagedLedgerProperties")
public Object[][] versions() {
return new Object[][] { { true }, { false }};
Expand Down

0 comments on commit 55210af

Please sign in to comment.