Skip to content

Commit

Permalink
[fix](txn insert) Fix txn insert values error when connect to followe…
Browse files Browse the repository at this point in the history
…r fe (apache#34950)
  • Loading branch information
mymeiyi authored and dataroaring committed Jun 21, 2024
1 parent cfd34ce commit f550945
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NoForward;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand All @@ -58,7 +58,7 @@
/**
* insert into values with in txn model.
*/
public class BatchInsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {
public class BatchInsertIntoTableCommand extends Command implements NoForward, Explainable {

public static final Logger LOG = LogManager.getLogger(BatchInsertIntoTableCommand.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
Expand Down Expand Up @@ -55,11 +55,14 @@
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
Expand Down Expand Up @@ -186,15 +189,25 @@ private static void beginBatchInsertTransaction(ConnectContext ctx,
txnEntry.setDb(dbObj);
String label = txnEntry.getLabel();
try {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
long txnId;
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
} else {
MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx);
TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
.setLabel(label).setUser("").setUserIp("").setPasswd("");
TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
txnId = result.getTxnId();
}
txnConf.setTxnId(txnId);
txnConf.setToken(token);
} catch (UserException e) {
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}

Expand Down

0 comments on commit f550945

Please sign in to comment.