diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index e7cdd8770df6ef..a01b73ec9afb8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -410,7 +410,6 @@ import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVReplaceInfo; import org.apache.doris.nereids.trees.plans.commands.info.AlterViewInfo; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; -import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo; @@ -1174,22 +1173,9 @@ public List> visitMultiStatements(MultiState */ @Override public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { - - BulkStorageDesc bulkDesc = null; + BrokerDesc brokerDesc = null; if (ctx.withRemoteStorageSystem() != null) { - Map bulkProperties = - new HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties)); - if (ctx.withRemoteStorageSystem().S3() != null) { - bulkDesc = new BulkStorageDesc("S3", BulkStorageDesc.StorageType.S3, bulkProperties); - } else if (ctx.withRemoteStorageSystem().HDFS() != null) { - bulkDesc = new BulkStorageDesc("HDFS", BulkStorageDesc.StorageType.HDFS, bulkProperties); - } else if (ctx.withRemoteStorageSystem().LOCAL() != null) { - bulkDesc = new BulkStorageDesc("LOCAL_HDFS", BulkStorageDesc.StorageType.LOCAL, bulkProperties); - } else if (ctx.withRemoteStorageSystem().BROKER() != null - && ctx.withRemoteStorageSystem().identifierOrText().getText() != null) { - bulkDesc = new BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(), - bulkProperties); - } + brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem()); } ImmutableList.Builder dataDescriptions = new ImmutableList.Builder<>(); List labelParts = visitMultipartIdentifier(ctx.lableName); @@ -1270,7 +1256,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText(); String comment = LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); - return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment); + return new LoadCommand(labelName, dataDescriptions.build(), brokerDesc, properties, comment); } /* ******************************************************************************************** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 4ccb8916cd8dc0..7d592af0632b02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -17,7 +17,9 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StmtType; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -49,7 +51,6 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; -import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; @@ -89,7 +90,7 @@ public class LoadCommand extends Command implements ForwardWithSync { public static final Logger LOG = LogManager.getLogger(LoadCommand.class); private final String labelName; - private final BulkStorageDesc bulkStorageDesc; + private final BrokerDesc brokerDesc; private final Set sinkTableNames = new HashSet<>(); private final List sourceInfos; private final Map properties; @@ -100,13 +101,13 @@ public class LoadCommand extends Command implements ForwardWithSync { /** * constructor of ExportCommand */ - public LoadCommand(String labelName, List sourceInfos, BulkStorageDesc bulkStorageDesc, + public LoadCommand(String labelName, List sourceInfos, BrokerDesc brokerDesc, Map properties, String comment) { super(PlanType.LOAD_COMMAND); this.labelName = Objects.requireNonNull(labelName.trim(), "labelName should not null"); this.sourceInfos = Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should not null"); this.properties = Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not null"); - this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, "bulkStorageDesc should not null"); + this.brokerDesc = Objects.requireNonNull(brokerDesc, "brokerDesc should not null"); this.comment = Objects.requireNonNull(comment, "comment should not null"); } @@ -151,7 +152,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD LOG.debug("nereids load stmt before conversion: {}", dataDesc::toSql); } // 1. build source projects plan (select col1,col2... from tvf where prefilter) - Map tvfProperties = getTvfProperties(dataDesc, bulkStorageDesc); + Map tvfProperties = getTvfProperties(dataDesc, brokerDesc); LogicalPlan tvfLogicalPlan = new LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties)); tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, tvfLogicalPlan); @@ -431,15 +432,15 @@ private static void checkAndAddSequenceCol(OlapTable olapTable, BulkLoadDataDesc private UnboundTVFRelation getUnboundTVFRelation(Map properties) { UnboundTVFRelation relation; - if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) { relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), S3TableValuedFunction.NAME, new Properties(properties)); - } else if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.HDFS) { + } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) { relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), HdfsTableValuedFunction.NAME, new Properties(properties)); } else { throw new UnsupportedOperationException("Unsupported load storage type: " - + bulkStorageDesc.getStorageType()); + + brokerDesc.getStorageType()); } return relation; } @@ -454,8 +455,8 @@ private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc dataD return targetTable; } - private static Map getTvfProperties(BulkLoadDataDesc dataDesc, BulkStorageDesc bulkStorageDesc) { - Map tvfProperties = new HashMap<>(bulkStorageDesc.getProperties()); + private static Map getTvfProperties(BulkLoadDataDesc dataDesc, BrokerDesc brokerDesc) { + Map tvfProperties = new HashMap<>(brokerDesc.getProperties()); String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv"); if ("csv".equalsIgnoreCase(fileFormat)) { dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep -> @@ -469,7 +470,7 @@ private static Map getTvfProperties(BulkLoadDataDesc dataDesc, B List filePaths = dataDesc.getFilePaths(); // TODO: support multi location by union String listFilePath = filePaths.get(0); - if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) { // TODO: check file path by s3 fs list status tvfProperties.put("uri", listFilePath); }