Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] should acquire db lock out of createLoadTask to avoid dead lock #55219

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
} finally {
readUnlock();
}
Table table = checkMeta(db, tableName);

boolean createTask = true;

Expand All @@ -144,7 +145,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
task.beginTxnFromFrontend(channelId, channelNum, resp);
return;
}
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
task = createLoadTask(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());
addLoadTask(task);
Expand All @@ -166,10 +167,11 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
StreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);

writeLock();
try {
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
task = createLoadTask(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
kevincai marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());

Expand All @@ -181,19 +183,8 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
}

// for sync stream load
public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws StarRocksException {
Table table;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

public StreamLoadTask createLoadTask(Database db, Table table, String label, String user, String clientIp,
kevincai marked this conversation as resolved.
Show resolved Hide resolved
long timeoutMillis, boolean isRoutineLoad, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
Expand All @@ -212,27 +203,27 @@ public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, S
return streamLoadTask;
}

public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
public StreamLoadTask createLoadTask(Database db, Table table, String label, String user, String clientIp,
kevincai marked this conversation as resolved.
Show resolved Hide resolved
long timeoutMillis, int channelNum,
int channelId, long warehouseId) throws StarRocksException {
Table table;
int channelId, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

private Table checkMeta(Database db, String tableName) throws StarRocksException {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
return unprotectedCheckMeta(db, tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public void unprotectedCheckMeta(Database db, String tblName)
public Table unprotectedCheckMeta(Database db, String tblName)
kevincai marked this conversation as resolved.
Show resolved Hide resolved
throws StarRocksException {
if (tblName == null) {
throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time");
Expand All @@ -253,6 +244,7 @@ public void unprotectedCheckMeta(Database db, String tblName)
if (!table.isOlapOrCloudNativeTable()) {
throw new AnalysisException("Only olap/lake table support stream load");
}
return table;
}

public void replayCreateLoadTask(StreamLoadTask loadJob) {
Expand Down
Loading