diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index 07a77c39ce8a33..4b7afb1f6a855b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -33,7 +33,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.Command; -import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; +import org.apache.doris.nereids.trees.plans.commands.NoForward; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -58,7 +58,7 @@ /** * insert into values with in txn model. */ -public class BatchInsertIntoTableCommand extends Command implements ForwardWithSync, Explainable { +public class BatchInsertIntoTableCommand extends Command implements NoForward, Explainable { public static final Logger LOG = LogManager.getLogger(BatchInsertIntoTableCommand.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 35a5c11e90cdee..32121b9833b480 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.UserException; +import org.apache.doris.common.Config; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; @@ -55,11 +55,14 @@ import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.qe.MasterTxnExecutor; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TLoadTxnBeginRequest; +import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TMergeType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; @@ -186,15 +189,25 @@ private static void beginBatchInsertTransaction(ConnectContext ctx, txnEntry.setDb(dbObj); String label = txnEntry.getLabel(); try { - long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - sourceType, timeoutSecond); - txnConf.setTxnId(txnId); + long txnId; String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) { + txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), + label, new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + sourceType, timeoutSecond); + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx); + TLoadTxnBeginRequest request = new TLoadTxnBeginRequest(); + request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token) + .setLabel(label).setUser("").setUserIp("").setPasswd(""); + TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request); + txnId = result.getTxnId(); + } + txnConf.setTxnId(txnId); txnConf.setToken(token); - } catch (UserException e) { + } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); }