Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] should acquire db lock out of createLoadTask to avoid dead lock #55219

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId, String label)
planParams.query_options.setLoad_job_type(TLoadJobType.ROUTINE_LOAD);
StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr();

StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table.getName(), label, "", "",
StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table, label, "", "",
taskTimeoutSecond * 1000, true, warehouseId);
streamLoadTask.setTxnId(txnId);
streamLoadTask.setLabel(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
} finally {
readUnlock();
}
Table table = checkMeta(db, tableName);

boolean createTask = true;

Expand All @@ -144,7 +145,7 @@ public void beginLoadTaskFromFrontend(String dbName, String tableName, String la
task.beginTxnFromFrontend(channelId, channelNum, resp);
return;
}
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());
addLoadTask(task);
Expand All @@ -166,10 +167,12 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
StreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);

writeLock();
try {
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad,
warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());

Expand All @@ -180,59 +183,36 @@ public void beginLoadTaskFromBackend(String dbName, String tableName, String lab
}
}

// for sync stream load
public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws StarRocksException {
Table table;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws StarRocksException {
private StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user,
String clientIp, long timeoutMillis, int channelNum,
int channelId, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db,
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName),
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, int channelNum,
int channelId, long warehouseId) throws StarRocksException {
Table table;
private Table checkMeta(Database db, String tableName) throws StarRocksException {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
return unprotectedCheckMeta(db, tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public void unprotectedCheckMeta(Database db, String tblName)
private Table unprotectedCheckMeta(Database db, String tblName)
throws StarRocksException {
if (tblName == null) {
throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time");
Expand All @@ -253,6 +233,7 @@ public void unprotectedCheckMeta(Database db, String tblName)
if (!table.isOlapOrCloudNativeTable()) {
throw new AnalysisException("Only olap/lake table support stream load");
}
return table;
}

public void replayCreateLoadTask(StreamLoadTask loadJob) {
Expand All @@ -262,7 +243,7 @@ public void replayCreateLoadTask(StreamLoadTask loadJob) {
.build());
}

public Database checkDbName(String dbName) throws StarRocksException {
private Database checkDbName(String dbName) throws StarRocksException {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName);
if (db == null) {
LOG.warn("Database {} does not exist", dbName);
Expand Down
Loading