Skip to content

Commit

Permalink
[BugFix] should acquire db lock out of createLoadTask to avoid dead lock
Browse files Browse the repository at this point in the history
Signed-off-by: kaijian.ding <[email protected]>
  • Loading branch information
kaijianding committed Jan 19, 2025
1 parent cbb7eb1 commit 5894e06
Showing 1 changed file with 19 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
} finally {
readUnlock();
}
Table table = checkMeta(db, tableName);

boolean createTask = true;

Expand All @@ -144,7 +145,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
task.beginTxnFromFrontend(channelId, channelNum, resp);
return;
}
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
task = createLoadTask(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());
addLoadTask(task);
Expand All @@ -166,10 +167,11 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
StreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);

writeLock();
try {
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
task = createLoadTask(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());

Expand All @@ -181,19 +183,8 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
}

// for sync stream load
public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws StarRocksException {
Table table;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

public StreamLoadTask createLoadTask(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
Expand All @@ -212,27 +203,27 @@ public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, S
return streamLoadTask;
}

public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
public StreamLoadTask createLoadTask(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, int channelNum,
int channelId, long warehouseId) throws StarRocksException {
Table table;
int channelId, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

private Table checkMeta(Database db, String tableName) throws StarRocksException {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
return unprotectedCheckMeta(db, tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public void unprotectedCheckMeta(Database db, String tblName)
public Table unprotectedCheckMeta(Database db, String tblName)
throws StarRocksException {
if (tblName == null) {
throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time");
Expand All @@ -253,6 +244,7 @@ public void unprotectedCheckMeta(Database db, String tblName)
if (!table.isOlapOrCloudNativeTable()) {
throw new AnalysisException("Only olap/lake table support stream load");
}
return table;
}

public void replayCreateLoadTask(StreamLoadTask loadJob) {
Expand Down

0 comments on commit 5894e06

Please sign in to comment.