Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] should acquire db lock out of createLoadTask to avoid dead lock (backport #55219) #55263

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId, String label)
planParams.query_options.setLoad_job_type(TLoadJobType.ROUTINE_LOAD);
StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr();

StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table.getName(), label, "", "",
StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table, label, "", "",
taskTimeoutSecond * 1000, true, warehouseId);
streamLoadTask.setTxnId(txnId);
streamLoadTask.setLabel(label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String
} finally {
readUnlock();
}
Table table = checkMeta(db, tableName);

boolean createTask = true;

Expand All @@ -141,7 +142,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String
task.beginTxn(channelId, channelNum, resp);
return;
}
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());
addLoadTask(task);
Expand All @@ -161,10 +162,12 @@ public void beginLoadTask(String dbName, String tableName, String label, String
StreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);

writeLock();
try {
task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId);
task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad,
warehouseId);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId())
.add("msg", "create load task").build());

Expand All @@ -176,58 +179,37 @@ public void beginLoadTask(String dbName, String tableName, String label, String
}

// for sync stream load
public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws UserException {
Table table;
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, long warehouseId)
throws UserException {

private StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user,
String clientIp, long timeoutMillis, int channelNum,
int channelId, long warehouseId) {
// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db,
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName),
label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId);
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp,
long timeoutMillis, int channelNum,
int channelId, long warehouseId) throws UserException {
Table table;
private Table checkMeta(Database db, String tableName) throws UserException {
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
unprotectedCheckMeta(db, tableName);
table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName);
return unprotectedCheckMeta(db, tableName);
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}

// init stream load task
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table,
label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId);
return streamLoadTask;
}

public void unprotectedCheckMeta(Database db, String tblName)
private Table unprotectedCheckMeta(Database db, String tblName)
throws UserException {
if (tblName == null) {
throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time");
Expand All @@ -248,6 +230,7 @@ public void unprotectedCheckMeta(Database db, String tblName)
if (!table.isOlapOrCloudNativeTable()) {
throw new AnalysisException("Only olap/lake table support stream load");
}
return table;
}

public void replayCreateLoadTask(StreamLoadTask loadJob) {
Expand All @@ -257,7 +240,7 @@ public void replayCreateLoadTask(StreamLoadTask loadJob) {
.build());
}

public Database checkDbName(String dbName) throws UserException {
private Database checkDbName(String dbName) throws UserException {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName);
if (db == null) {
LOG.warn("Database {} does not exist", dbName);
Expand Down Expand Up @@ -729,4 +712,4 @@ public Map<Long, Long> getRunningTaskCount() {
readUnlock();
}
}
}
}
Loading