diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java index f4169b61c5e02..663ae209ab026 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java @@ -894,7 +894,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId, String label) planParams.query_options.setLoad_job_type(TLoadJobType.ROUTINE_LOAD); StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr(); - StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table.getName(), label, "", "", + StreamLoadTask streamLoadTask = streamLoadManager.createLoadTaskWithoutLock(db, table, label, "", "", taskTimeoutSecond * 1000, true, warehouseId); streamLoadTask.setTxnId(txnId); streamLoadTask.setLabel(label); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java index 7a881a9cb547c..af552ee94c001 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadMgr.java @@ -130,6 +130,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String } finally { readUnlock(); } + Table table = checkMeta(db, tableName); boolean createTask = true; @@ -141,7 +142,7 @@ public void beginLoadTask(String dbName, String tableName, String label, String task.beginTxn(channelId, channelNum, resp); return; } - task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId); + task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, channelNum, channelId, warehouseId); LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId()) .add("msg", "create load task").build()); addLoadTask(task); @@ -161,10 +162,12 @@ public void beginLoadTask(String dbName, String tableName, String label, String StreamLoadTask task = null; Database db = checkDbName(dbName); long dbId = db.getId(); + Table table = checkMeta(db, tableName); writeLock(); try { - task = createLoadTask(db, tableName, label, user, clientIp, timeoutMillis, isRoutineLoad, warehouseId); + task = createLoadTaskWithoutLock(db, table, label, user, clientIp, timeoutMillis, isRoutineLoad, + warehouseId); LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, task.getId()) .add("msg", "create load task").build()); @@ -176,19 +179,8 @@ public void beginLoadTask(String dbName, String tableName, String label, String } // for sync stream load - public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp, - long timeoutMillis, boolean isRoutineLoad, long warehouseId) - throws UserException { - Table table; - Locker locker = new Locker(); - locker.lockDatabase(db.getId(), LockType.READ); - try { - unprotectedCheckMeta(db, tableName); - table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName); - } finally { - locker.unLockDatabase(db.getId(), LockType.READ); - } - + public StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, String clientIp, + long timeoutMillis, boolean isRoutineLoad, long warehouseId) { // init stream load task long id = GlobalStateMgr.getCurrentState().getNextId(); StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table, @@ -196,38 +188,28 @@ public StreamLoadTask createLoadTask(Database db, String tableName, String label return streamLoadTask; } - public StreamLoadTask createLoadTaskWithoutLock(Database db, String tableName, String label, String user, String clientIp, - long timeoutMillis, boolean isRoutineLoad, long warehouseId) - throws UserException { + + private StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, + String clientIp, long timeoutMillis, int channelNum, + int channelId, long warehouseId) { // init stream load task long id = GlobalStateMgr.getCurrentState().getNextId(); - StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, - (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName), - label, user, clientIp, timeoutMillis, System.currentTimeMillis(), isRoutineLoad, warehouseId); + StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table, + label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId); return streamLoadTask; } - public StreamLoadTask createLoadTask(Database db, String tableName, String label, String user, String clientIp, - long timeoutMillis, int channelNum, - int channelId, long warehouseId) throws UserException { - Table table; + private Table checkMeta(Database db, String tableName) throws UserException { Locker locker = new Locker(); locker.lockDatabase(db.getId(), LockType.READ); try { - unprotectedCheckMeta(db, tableName); - table = GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), tableName); + return unprotectedCheckMeta(db, tableName); } finally { locker.unLockDatabase(db.getId(), LockType.READ); } - - // init stream load task - long id = GlobalStateMgr.getCurrentState().getNextId(); - StreamLoadTask streamLoadTask = new StreamLoadTask(id, db, (OlapTable) table, - label, user, clientIp, timeoutMillis, channelNum, channelId, System.currentTimeMillis(), warehouseId); - return streamLoadTask; } - public void unprotectedCheckMeta(Database db, String tblName) + private Table unprotectedCheckMeta(Database db, String tblName) throws UserException { if (tblName == null) { throw new AnalysisException("Table name must be specified when calling /begin/transaction/ first time"); @@ -248,6 +230,7 @@ public void unprotectedCheckMeta(Database db, String tblName) if (!table.isOlapOrCloudNativeTable()) { throw new AnalysisException("Only olap/lake table support stream load"); } + return table; } public void replayCreateLoadTask(StreamLoadTask loadJob) { @@ -257,7 +240,7 @@ public void replayCreateLoadTask(StreamLoadTask loadJob) { .build()); } - public Database checkDbName(String dbName) throws UserException { + private Database checkDbName(String dbName) throws UserException { Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName); if (db == null) { LOG.warn("Database {} does not exist", dbName); @@ -729,4 +712,4 @@ public Map getRunningTaskCount() { readUnlock(); } } -} \ No newline at end of file +}