Skip to content

Commit

Permalink
[BugFix] Fix concurrent issue in olap table listener (StarRocks#54051)
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Jan 24, 2025
1 parent da819bf commit 7c98728
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public ResultWrapper handle(BaseRequest request, BaseResponse response) throws S
result = handlePrepareTransaction(
db, label,
Optional.ofNullable(requestBody.getCommittedTablets()).orElse(new ArrayList<>(0)),
Optional.ofNullable(requestBody.getFailedTablets()).orElse(new ArrayList<>(0))
Optional.ofNullable(requestBody.getFailedTablets()).orElse(new ArrayList<>(0)),
timeoutMillis
);
break;
case TXN_COMMIT:
Expand Down Expand Up @@ -124,7 +125,8 @@ private TransactionResult handleBeginTransaction(Database db,
private TransactionResult handlePrepareTransaction(Database db,
String label,
List<TabletCommitInfo> committedTablets,
List<TabletFailInfo> failedTablets) throws StarRocksException {
List<TabletFailInfo> failedTablets,
long timeoutMillis) throws StarRocksException {
long dbId = db.getId();
TransactionState txnState = getTxnState(dbId, label);
long txnId = txnState.getTransactionId();
Expand All @@ -133,7 +135,7 @@ private TransactionResult handlePrepareTransaction(Database db,
switch (txnStatus) {
case PREPARE:
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction(
dbId, txnId, committedTablets, failedTablets, new MiniLoadTxnCommitAttachment());
dbId, txnId, committedTablets, failedTablets, new MiniLoadTxnCommitAttachment(), timeoutMillis);
result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId);
result.addResultEntry(TransactionResult.LABEL_KEY, label);
break;
Expand Down Expand Up @@ -162,7 +164,7 @@ private TransactionResult handleCommitTransaction(Database db,
switch (txnStatus) {
case PREPARED:
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.commitPreparedTransaction(db, txnId, timeoutMillis);
.commitPreparedTransaction(db.getId(), txnId, timeoutMillis);
result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId);
result.addResultEntry(TransactionResult.LABEL_KEY, label);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private TransactionResult handleCommitTransaction(Database db,
switch (txnStatus) {
case PREPARED:
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.commitPreparedTransaction(db, txnId, timeoutMillis);
.commitPreparedTransaction(db.getId(), txnId, timeoutMillis);
result.addResultEntry(TransactionResult.TXN_ID_KEY, txnId);
result.addResultEntry(TransactionResult.LABEL_KEY, label);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.common.util.ProfileManager;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.concurrent.lock.LockTimeoutException;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.http.rest.TransactionResult;
Expand Down Expand Up @@ -940,16 +941,16 @@ public void unprotectedBeginTxn(boolean replay, boolean isBackendTxn, TUniqueId
timeoutMs / 1000, warehouseId);
}

public void unprotectedPrepareTxn() throws StarRocksException {
public void unprotectedPrepareTxn() throws StarRocksException, LockTimeoutException {
List<TabletCommitInfo> commitInfos = TabletCommitInfo.fromThrift(coord.getCommitInfos());
List<TabletFailInfo> failInfos = TabletFailInfo.fromThrift(coord.getFailInfos());
finishPreparingTimeMs = System.currentTimeMillis();
StreamLoadTxnCommitAttachment txnCommitAttachment = new StreamLoadTxnCommitAttachment(
beforeLoadTimeMs, startLoadingTimeMs, startPreparingTimeMs, finishPreparingTimeMs,
endTimeMs, numRowsNormal, numRowsAbnormal, numRowsUnselected, numLoadBytesTotal,
trackingUrl);
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction(dbId,
txnId, commitInfos, failInfos, txnCommitAttachment);
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().prepareTransaction(
dbId, txnId, commitInfos, failInfos, txnCommitAttachment, timeoutMs);
}

public boolean checkNeedRemove(long currentMs, boolean isForce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,32 @@ public void prepareTransaction(long dbId, long transactionId, List<TabletCommitI
List<TabletFailInfo> tabletFailInfos,
TxnCommitAttachment txnCommitAttachment)
throws StarRocksException {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
}
// timeout is 0, means no timeout
prepareTransaction(dbId, transactionId, tabletCommitInfos, tabletFailInfos, txnCommitAttachment, 0);
}

public void prepareTransaction(
@NotNull long dbId, long transactionId, @NotNull List<TabletCommitInfo> tabletCommitInfos,
@NotNull List<TabletFailInfo> tabletFailInfos,
@Nullable TxnCommitAttachment attachment, long timeoutMs) throws StarRocksException {
TransactionState transactionState = getTransactionState(dbId, transactionId);
List<Long> tableId = transactionState.getTableIdList();
LOG.debug("try to pre commit transaction: {}", transactionId);
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
dbTransactionMgr.prepareTransaction(transactionId, tabletCommitInfos, tabletFailInfos, txnCommitAttachment, true);
Locker locker = new Locker();
if (!locker.tryLockTablesWithIntensiveDbLock(dbId, tableId, LockType.WRITE, timeoutMs, TimeUnit.MILLISECONDS)) {
throw new StarRocksException("get database write lock timeout, database=" + dbId + ", timeout=" + timeoutMs + "ms");
}
try {
if (Config.disable_load_job) {
throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented");
}

DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
dbTransactionMgr.prepareTransaction(transactionId, tabletCommitInfos, tabletFailInfos, attachment, true);
LOG.debug("prepare transaction: {} success", transactionId);
} finally {
locker.unLockTablesWithIntensiveDbLock(dbId, tableId, LockType.WRITE);
}
}

public void commitPreparedTransaction(long dbId, long transactionId, long timeoutMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DiskInfo;
import com.starrocks.common.DdlException;
import com.starrocks.common.StarRocksException;
Expand Down Expand Up @@ -658,7 +657,7 @@ public void prepareTransactionForBypassWriteTest() throws Exception {
anyLong, anyLong,
(List<TabletCommitInfo>) any,
(List<TabletFailInfo>) any,
(TxnCommitAttachment) any);
(TxnCommitAttachment) any, anyLong);
times = 1;
result = new StarRocksException("prepare transaction error");

Expand Down Expand Up @@ -692,7 +691,7 @@ public void prepareTransactionForBypassWriteTest() throws Exception {
anyLong, anyLong,
(List<TabletCommitInfo>) any,
(List<TabletFailInfo>) any,
(TxnCommitAttachment) any);
(TxnCommitAttachment) any, anyLong);
times = 1;
}
};
Expand Down Expand Up @@ -899,7 +898,7 @@ public void commitTransactionWithoutChannelInfoTest() throws Exception {
times = 1;
result = newTxnState(txnId, label, LoadJobSourceType.FRONTEND_STREAMING, TransactionStatus.PREPARED);

globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong);
globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong);
times = 1;
result = new StarRocksException("commit prepared transaction error");
}
Expand Down Expand Up @@ -934,7 +933,7 @@ public void commitTransactionWithoutChannelInfoTest() throws Exception {
times = 1;
result = newTxnState(txnId, label, LoadJobSourceType.FRONTEND_STREAMING, TransactionStatus.PREPARED);

globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong);
globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong);
times = 1;
}
};
Expand Down Expand Up @@ -1049,7 +1048,7 @@ public void commitTransactionForBypassWriteTest() throws Exception {
newTxnState(txnId, label, LoadJobSourceType.BYPASS_WRITE, TransactionStatus.PREPARED)
);

globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong);
globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong);
times = 1;
result = new StarRocksException("commit prepared transaction error");
}
Expand Down Expand Up @@ -1079,7 +1078,7 @@ public void commitTransactionForBypassWriteTest() throws Exception {
newTxnState(txnId, label, LoadJobSourceType.BYPASS_WRITE, TransactionStatus.PREPARED)
);

globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong);
globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong);
times = 1;
}
};
Expand Down Expand Up @@ -1126,10 +1125,10 @@ public void commitTransactionForBypassWriteWithLifeCycleTest() throws Exception
anyLong, anyLong,
(List<TabletCommitInfo>) any,
(List<TabletFailInfo>) any,
(TxnCommitAttachment) any);
(TxnCommitAttachment) any, anyLong);
times = 1;

globalTransactionMgr.commitPreparedTransaction((Database) any, anyLong, anyLong);
globalTransactionMgr.commitPreparedTransaction(anyLong, anyLong, anyLong);
times = 1;
}
};
Expand Down

0 comments on commit 7c98728

Please sign in to comment.