Skip to content

Commit

Permalink
[BugFix] should acquire db lock out of createLoadTask to avoid dead l…
Browse files Browse the repository at this point in the history
…ock (backport #55219) (#55263)

Signed-off-by: Kevin Xiaohua Cai <[email protected]>
Co-authored-by: kaijianding <[email protected]>
Co-authored-by: Kevin Xiaohua Cai <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2025
1 parent 9da0802 commit 27a9987
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId, String label)
planParams.query_options.setLoad_job_type(TLoadJobType.ROUTINE_LOAD);
StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr();

StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table.getName(), label, "", "",
StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table, label, "", "",
taskTimeoutSecond * 1000, true, warehouseId);
streamLoadTask.setTxnId(txnId);
streamLoadTask.setLabel(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String
} finally {
readUnlock();
}
Table table = checkMeta(db, tableName);

boolean createTask = true;

Expand All @@ -141,7 +142,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String
task.beginTxn(channelId, channelNum, resp);
return;
}
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());
addLoadTask(task);
Expand All @@ -161,10 +162,12 @@ public void beginLoadTask(String dbName, String tableName, String label, String
StreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);

writeLock();
try {
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad,
warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());

Expand All @@ -176,58 +179,37 @@ public void beginLoadTask(String dbName, String tableName, String label, String
}

// for sync stream load
public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws UserException {
Table table;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws UserException {

private StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user,
String clientIp, long timeoutMillis, int channelNum,
int channelId, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db,
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName),
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, int channelNum,
int channelId, long warehouseId) throws UserException {
Table table;
private Table checkMeta(Database db, String tableName) throws UserException {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
return unprotectedCheckMeta(db, tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public void unprotectedCheckMeta(Database db, String tblName)
private Table unprotectedCheckMeta(Database db, String tblName)
throws UserException {
if (tblName == null) {
throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time");
Expand All @@ -248,6 +230,7 @@ public void unprotectedCheckMeta(Database db, String tblName)
if (!table.isOlapOrCloudNativeTable()) {
throw new AnalysisException("Only olap/lake table support stream load");
}
return table;
}

public void replayCreateLoadTask(StreamLoadTask loadJob) {
Expand All @@ -257,7 +240,7 @@ public void replayCreateLoadTask(StreamLoadTask loadJob) {
.build());
}

public Database checkDbName(String dbName) throws UserException {
private Database checkDbName(String dbName) throws UserException {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName);
if (db == null) {
LOG.warn("Database {} does not exist", dbName);
Expand Down Expand Up @@ -729,4 +712,4 @@ public Map<Long, Long> getRunningTaskCount() {
readUnlock();
}
}
}
}

0 comments on commit 27a9987

Please sign in to comment.