From 7c98728a864afe6f02d9f5f6969568688f995464 Mon Sep 17 00:00:00 2001 From: meegoo Date: Fri, 24 Jan 2025 15:36:47 +0800 Subject: [PATCH] [BugFix] Fix concurrent issue in olap table listener (#54051) Signed-off-by: meegoo --- .../BypassWriteTransactionHandler.java | 10 ++++--- .../TransactionWithoutChannelHandler.java | 2 +- .../load/streamload/StreamLoadTask.java | 7 +++-- .../transaction/GlobalTransactionMgr.java | 29 +++++++++++++++---- .../http/TransactionLoadActionTest.java | 17 +++++------ 5 files changed, 43 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/BypassWriteTransactionHandler.java b/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/BypassWriteTransactionHandler.java index b6059545ed4a3..69b10f93aea8b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/BypassWriteTransactionHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/BypassWriteTransactionHandler.java @@ -84,7 +84,8 @@ public ResultWrapper handle(BaseRequest request, BaseResponse response) throws S result = handlePrepareTransaction( db, label, Optional.ofNullable(requestBody.getCommittedTablets()).orElse(new ArrayList<>(0)), - Optional.ofNullable(requestBody.getFailedTablets()).orElse(new ArrayList<>(0)) + Optional.ofNullable(requestBody.getFailedTablets()).orElse(new ArrayList<>(0)), + timeoutMillis ); break; case TXN_COMMIT: @@ -124,7 +125,8 @@ private TransactionResult handleBeginTransaction(Database db, private TransactionResult handlePrepareTransaction(Database db, String label, List committedTablets, - List failedTablets) throws StarRocksException { + List failedTablets, + long timeoutMillis) throws StarRocksException { long dbId = db.getId(); TransactionState txnState = getTxnState(dbId, label); long txnId = txnState.getTransactionId(); @@ -133,7 +135,7 @@ private TransactionResult handlePrepareTransaction(Database db, switch (txnStatus) { case PREPARE: GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction( - dbId, txnId, committedTablets, failedTablets, new MiniLoadTxnCommitAttachment()); + dbId, txnId, committedTablets, failedTablets, new MiniLoadTxnCommitAttachment(), timeoutMillis); result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId); result.addResultEntry(TransactionResult.LABEL_KEY, label); break; @@ -162,7 +164,7 @@ private TransactionResult handleCommitTransaction(Database db, switch (txnStatus) { case PREPARED: GlobalStateMgr.getCurrentState().getGlobalTransactionMgr() - .commitPreparedTransaction(db, txnId, timeoutMillis); + .commitPreparedTransaction(db.getId(), txnId, timeoutMillis); result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId); result.addResultEntry(TransactionResult.LABEL_KEY, label); break; diff --git a/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/TransactionWithoutChannelHandler.java b/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/TransactionWithoutChannelHandler.java index 630e8e8766ab2..acc8ae8ec0652 100644 --- a/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/TransactionWithoutChannelHandler.java +++ b/fe/fe-core/src/main/java/com/starrocks/http/rest/transaction/TransactionWithoutChannelHandler.java @@ -97,7 +97,7 @@ private TransactionResult handleCommitTransaction(Database db, switch (txnStatus) { case PREPARED: GlobalStateMgr.getCurrentState().getGlobalTransactionMgr() - .commitPreparedTransaction(db, txnId, timeoutMillis); + .commitPreparedTransaction(db.getId(), txnId, timeoutMillis); result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId); result.addResultEntry(TransactionResult.LABEL_KEY, label); break; diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java index 6b49f25495d8d..40441d828d6fa 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java @@ -36,6 +36,7 @@ import com.starrocks.common.util.ProfileManager; import com.starrocks.common.util.RuntimeProfile; import com.starrocks.common.util.TimeUtils; +import com.starrocks.common.util.concurrent.lock.LockTimeoutException; import com.starrocks.common.util.concurrent.lock.LockType; import com.starrocks.common.util.concurrent.lock.Locker; import com.starrocks.http.rest.TransactionResult; @@ -940,7 +941,7 @@ public void unprotectedBeginTxn(boolean replay, boolean isBackendTxn, TUniqueId timeoutMs / 1000, warehouseId); } - public void unprotectedPrepareTxn() throws StarRocksException { + public void unprotectedPrepareTxn() throws StarRocksException, LockTimeoutException { List commitInfos = TabletCommitInfo.fromThrift(coord.getCommitInfos()); List failInfos = TabletFailInfo.fromThrift(coord.getFailInfos()); finishPreparingTimeMs = System.currentTimeMillis(); @@ -948,8 +949,8 @@ public void unprotectedPrepareTxn() throws StarRocksException { beforeLoadTimeMs, startLoadingTimeMs, startPreparingTimeMs, finishPreparingTimeMs, endTimeMs, numRowsNormal, numRowsAbnormal, numRowsUnselected, numLoadBytesTotal, trackingUrl); - GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction(dbId, - txnId, commitInfos, failInfos, txnCommitAttachment); + GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction( + dbId, txnId, commitInfos, failInfos, txnCommitAttachment, timeoutMs); } public boolean checkNeedRemove(long currentMs, boolean isForce) { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java index 9f2a216115d73..9fd6a0bdaebee 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/GlobalTransactionMgr.java @@ -275,13 +275,32 @@ public void prepareTransaction(long dbId, long transactionId, List tabletFailInfos, TxnCommitAttachment txnCommitAttachment) throws StarRocksException { - if (Config.disable_load_job) { - throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); - } + // timeout is 0, means no timeout + prepareTransaction(dbId, transactionId, tabletCommitInfos, tabletFailInfos, txnCommitAttachment, 0); + } + public void prepareTransaction( + @NotNull long dbId, long transactionId, @NotNull List tabletCommitInfos, + @NotNull List tabletFailInfos, + @Nullable TxnCommitAttachment attachment, long timeoutMs) throws StarRocksException { + TransactionState transactionState = getTransactionState(dbId, transactionId); + List tableId = transactionState.getTableIdList(); LOG.debug("try to pre commit transaction: {}", transactionId); - DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.prepareTransaction(transactionId, tabletCommitInfos, tabletFailInfos, txnCommitAttachment, true); + Locker locker = new Locker(); + if (!locker.tryLockTablesWithIntensiveDbLock(dbId, tableId, LockType.WRITE, timeoutMs, TimeUnit.MILLISECONDS)) { + throw new StarRocksException("get database write lock timeout, database=" + dbId + ", timeout=" + timeoutMs + "ms"); + } + try { + if (Config.disable_load_job) { + throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); + } + + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + dbTransactionMgr.prepareTransaction(transactionId, tabletCommitInfos, tabletFailInfos, attachment, true); + LOG.debug("prepare transaction: {} success", transactionId); + } finally { + locker.unLockTablesWithIntensiveDbLock(dbId, tableId, LockType.WRITE); + } } public void commitPreparedTransaction(long dbId, long transactionId, long timeoutMillis) diff --git a/fe/fe-core/src/test/java/com/starrocks/http/TransactionLoadActionTest.java b/fe/fe-core/src/test/java/com/starrocks/http/TransactionLoadActionTest.java index 6602665e5017d..2e1c4ca5d89fa 100644 --- a/fe/fe-core/src/test/java/com/starrocks/http/TransactionLoadActionTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/http/TransactionLoadActionTest.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; -import com.starrocks.catalog.Database; import com.starrocks.catalog.DiskInfo; import com.starrocks.common.DdlException; import com.starrocks.common.StarRocksException; @@ -658,7 +657,7 @@ public void prepareTransactionForBypassWriteTest() throws Exception { anyLong, anyLong, (List) any, (List) any, - (TxnCommitAttachment) any); + (TxnCommitAttachment) any, anyLong); times = 1; result = new StarRocksException("prepare transaction error"); @@ -692,7 +691,7 @@ public void prepareTransactionForBypassWriteTest() throws Exception { anyLong, anyLong, (List) any, (List) any, - (TxnCommitAttachment) any); + (TxnCommitAttachment) any, anyLong); times = 1; } }; @@ -899,7 +898,7 @@ public void commitTransactionWithoutChannelInfoTest() throws Exception { times = 1; result = newTxnState(txnId, label, LoadJobSourceType.FRONTEND_STREAMING, TransactionStatus.PREPARED); - globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong); + globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong); times = 1; result = new StarRocksException("commit prepared transaction error"); } @@ -934,7 +933,7 @@ public void commitTransactionWithoutChannelInfoTest() throws Exception { times = 1; result = newTxnState(txnId, label, LoadJobSourceType.FRONTEND_STREAMING, TransactionStatus.PREPARED); - globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong); + globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong); times = 1; } }; @@ -1049,7 +1048,7 @@ public void commitTransactionForBypassWriteTest() throws Exception { newTxnState(txnId, label, LoadJobSourceType.BYPASS_WRITE, TransactionStatus.PREPARED) ); - globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong); + globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong); times = 1; result = new StarRocksException("commit prepared transaction error"); } @@ -1079,7 +1078,7 @@ public void commitTransactionForBypassWriteTest() throws Exception { newTxnState(txnId, label, LoadJobSourceType.BYPASS_WRITE, TransactionStatus.PREPARED) ); - globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong); + globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong); times = 1; } }; @@ -1126,10 +1125,10 @@ public void commitTransactionForBypassWriteWithLifeCycleTest() throws Exception anyLong, anyLong, (List) any, (List) any, - (TxnCommitAttachment) any); + (TxnCommitAttachment) any, anyLong); times = 1; - globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong); + globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong); times = 1; } };