diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b6eaad2e3e38f..91b440cf84001 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -400,6 +400,14 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, appendLogCount.increment(); try { synchronized (txnMetaListPair.getLeft()) { + if (txnMetaListPair.getLeft().status() == newStatus) { + transactionLog.deletePosition(Collections.singletonList(position)); + log.info("TxnID : {} has update txn status to {} repeatedly.", + txnMetaListPair.getLeft().id().toString(), + txnMetaListPair.getLeft().status().name()); + promise.complete(null); + return; + } txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); txnMetaListPair.getRight().add(position); } diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 3b831ad38ba1c..87975e5445268 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -143,6 +143,71 @@ public void testTransactionOperation(TxnLogBufferedWriterConfig txnLogBufferedWr } } + @Test + public void testUpdateTxnStatusConcurrently() throws Exception { + TxnLogBufferedWriterConfig disabled = new TxnLogBufferedWriterConfig(); + disabled.setBatchEnabled(false); + + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); + factoryConf.setMaxCacheSize(0); + + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf); + TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1); + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); + managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); + MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory, + managedLedgerConfig, disabled, transactionTimer, DISABLED_BUFFERED_WRITER_METRICS); + mlTransactionLog.initialize().get(2, TimeUnit.SECONDS); + MLTransactionMetadataStore transactionMetadataStore = + new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, + new TransactionTimeoutTrackerImpl(), + mlTransactionSequenceIdGenerator, 0L); + transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get(); + + if (transactionMetadataStore.checkIfReady()) { + TxnID txnID = transactionMetadataStore.newTransaction(5000, null).get(); + assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.OPEN); + + List partitions = new ArrayList<>(); + partitions.add("pt-1"); + partitions.add("pt-2"); + transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get(); + assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(), partitions); + + // update txn status OPEN->COMMITTING concurrently + List> list = new ArrayList<>(); + list.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false)); + list.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false)); + + try { + FutureUtil.waitForAll(list).get(); + } catch (ExecutionException e) { + fail(); + } + Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.COMMITTING); + + // update txn status COMMITTING->COMMITTED concurrently + List> list2 = new ArrayList<>(); + list2.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false)); + list2.add(transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false)); + + try { + FutureUtil.waitForAll(list2).get(); + } catch (ExecutionException e) { + fail(); + } + + try { + transactionMetadataStore.getTxnMeta(txnID).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException); + } + } + } + @DataProvider(name = "isUseManagedLedgerProperties") public Object[][] versions() { return new Object[][] { { true }, { false }};