From bb28a7eb6492aa1ae1cd0e3c59ea9b2fe153204d Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 31 Oct 2025 16:25:06 +0800 Subject: [PATCH] [Chore](code-clear)Unify the use of BrokerDesc as the storage property bridge for Load removing redundant property declarations. BulkLoadDesc has no persistence or serialization requirements, so related implementations have been safely removed to simplify the code. --- .../nereids/parser/LogicalPlanBuilder.java | 20 +++------------- .../trees/plans/commands/LoadCommand.java | 23 ++++++++++--------- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index e7cdd8770df6ef..a01b73ec9afb8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -410,7 +410,6 @@ import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVReplaceInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterViewInfo; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; -import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; @@ -1174,22 +1173,9 @@ public List> visitMultiStatements(MultiState */ @Override public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { - - BulkStorageDesc bulkDesc = null; + BrokerDesc brokerDesc = null; if (ctx.withRemoteStorageSystem() != null) { - Map bulkProperties = - new HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties)); - if (ctx.withRemoteStorageSystem().S3() != null) { - bulkDesc = new BulkStorageDesc("S3", BulkStorageDesc.StorageType.S3, bulkProperties); - } else if (ctx.withRemoteStorageSystem().HDFS() != null) { - bulkDesc = new BulkStorageDesc("HDFS", BulkStorageDesc.StorageType.HDFS, bulkProperties); - } else if (ctx.withRemoteStorageSystem().LOCAL() != null) { - bulkDesc = new BulkStorageDesc("LOCAL_HDFS", BulkStorageDesc.StorageType.LOCAL, bulkProperties); - } else if (ctx.withRemoteStorageSystem().BROKER() != null - && ctx.withRemoteStorageSystem().identifierOrText().getText() != null) { - bulkDesc = new BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(), - bulkProperties); - } + brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem()); } ImmutableList.Builder dataDescriptions = new ImmutableList.Builder<>(); List labelParts = visitMultipartIdentifier(ctx.lableName); @@ -1270,7 +1256,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText(); String comment = LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); - return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment); + return new LoadCommand(labelName, dataDescriptions.build(), brokerDesc, properties, comment); } /* ******************************************************************************************** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 4ccb8916cd8dc0..7d592af0632b02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -17,7 +17,9 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StmtType; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -49,7 +51,6 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; -import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; @@ -89,7 +90,7 @@ public class LoadCommand extends Command implements ForwardWithSync { public static final Logger LOG = LogManager.getLogger(LoadCommand.class); private final String labelName; - private final BulkStorageDesc bulkStorageDesc; + private final BrokerDesc brokerDesc; private final Set sinkTableNames = new HashSet<>(); private final List sourceInfos; private final Map properties; @@ -100,13 +101,13 @@ public class LoadCommand extends Command implements ForwardWithSync { /** * constructor of ExportCommand */ - public LoadCommand(String labelName, List sourceInfos, BulkStorageDesc bulkStorageDesc, + public LoadCommand(String labelName, List sourceInfos, BrokerDesc brokerDesc, Map properties, String comment) { super(PlanType.LOAD_COMMAND); this.labelName = Objects.requireNonNull(labelName.trim(), "labelName should not null"); this.sourceInfos = Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should not null"); this.properties = Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not null"); - this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, "bulkStorageDesc should not null"); + this.brokerDesc = Objects.requireNonNull(brokerDesc, "brokerDesc should not null"); this.comment = Objects.requireNonNull(comment, "comment should not null"); } @@ -151,7 +152,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD LOG.debug("nereids load stmt before conversion: {}", dataDesc::toSql); } // 1. build source projects plan (select col1,col2... from tvf where prefilter) - Map tvfProperties = getTvfProperties(dataDesc, bulkStorageDesc); + Map tvfProperties = getTvfProperties(dataDesc, brokerDesc); LogicalPlan tvfLogicalPlan = new LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties)); tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, tvfLogicalPlan); @@ -431,15 +432,15 @@ private static void checkAndAddSequenceCol(OlapTable olapTable, BulkLoadDataDesc private UnboundTVFRelation getUnboundTVFRelation(Map properties) { UnboundTVFRelation relation; - if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) { relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), S3TableValuedFunction.NAME, new Properties(properties)); - } else if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.HDFS) { + } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) { relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), HdfsTableValuedFunction.NAME, new Properties(properties)); } else { throw new UnsupportedOperationException("Unsupported load storage type: " - + bulkStorageDesc.getStorageType()); + + brokerDesc.getStorageType()); } return relation; } @@ -454,8 +455,8 @@ private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc dataD return targetTable; } - private static Map getTvfProperties(BulkLoadDataDesc dataDesc, BulkStorageDesc bulkStorageDesc) { - Map tvfProperties = new HashMap<>(bulkStorageDesc.getProperties()); + private static Map getTvfProperties(BulkLoadDataDesc dataDesc, BrokerDesc brokerDesc) { + Map tvfProperties = new HashMap<>(brokerDesc.getProperties()); String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv"); if ("csv".equalsIgnoreCase(fileFormat)) { dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep -> @@ -469,7 +470,7 @@ private static Map getTvfProperties(BulkLoadDataDesc dataDesc, B List filePaths = dataDesc.getFilePaths(); // TODO: support multi location by union String listFilePath = filePaths.get(0); - if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) { // TODO: check file path by s3 fs list status tvfProperties.put("uri", listFilePath); }