Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVReplaceInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterViewInfo;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
Expand Down Expand Up @@ -1174,22 +1173,9 @@ public List<Pair<LogicalPlan, StatementContext>> visitMultiStatements(MultiState
*/
@Override
public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {

BulkStorageDesc bulkDesc = null;
BrokerDesc brokerDesc = null;
if (ctx.withRemoteStorageSystem() != null) {
Map<String, String> bulkProperties =
new HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties));
if (ctx.withRemoteStorageSystem().S3() != null) {
bulkDesc = new BulkStorageDesc("S3", BulkStorageDesc.StorageType.S3, bulkProperties);
} else if (ctx.withRemoteStorageSystem().HDFS() != null) {
bulkDesc = new BulkStorageDesc("HDFS", BulkStorageDesc.StorageType.HDFS, bulkProperties);
} else if (ctx.withRemoteStorageSystem().LOCAL() != null) {
bulkDesc = new BulkStorageDesc("LOCAL_HDFS", BulkStorageDesc.StorageType.LOCAL, bulkProperties);
} else if (ctx.withRemoteStorageSystem().BROKER() != null
&& ctx.withRemoteStorageSystem().identifierOrText().getText() != null) {
bulkDesc = new BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(),
bulkProperties);
}
brokerDesc = visitWithRemoteStorageSystem(ctx.withRemoteStorageSystem());
}
ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new ImmutableList.Builder<>();
List<String> labelParts = visitMultipartIdentifier(ctx.lableName);
Expand Down Expand Up @@ -1270,7 +1256,7 @@ public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
String commentSpec = ctx.commentSpec() == null ? "''" : ctx.commentSpec().STRING_LITERAL().getText();
String comment =
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1));
return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment);
return new LoadCommand(labelName, dataDescriptions.build(), brokerDesc, properties, comment);
}

/* ********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -49,7 +51,6 @@
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
public static final Logger LOG = LogManager.getLogger(LoadCommand.class);

private final String labelName;
private final BulkStorageDesc bulkStorageDesc;
private final BrokerDesc brokerDesc;
private final Set<String> sinkTableNames = new HashSet<>();
private final List<BulkLoadDataDesc> sourceInfos;
private final Map<String, String> properties;
Expand All @@ -100,13 +101,13 @@ public class LoadCommand extends Command implements ForwardWithSync {
/**
* constructor of ExportCommand
*/
public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, BulkStorageDesc bulkStorageDesc,
public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, BrokerDesc brokerDesc,
Map<String, String> properties, String comment) {
super(PlanType.LOAD_COMMAND);
this.labelName = Objects.requireNonNull(labelName.trim(), "labelName should not null");
this.sourceInfos = Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should not null");
this.properties = Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not null");
this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, "bulkStorageDesc should not null");
this.brokerDesc = Objects.requireNonNull(brokerDesc, "brokerDesc should not null");
this.comment = Objects.requireNonNull(comment, "comment should not null");
}

Expand Down Expand Up @@ -151,7 +152,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD
LOG.debug("nereids load stmt before conversion: {}", dataDesc::toSql);
}
// 1. build source projects plan (select col1,col2... from tvf where prefilter)
Map<String, String> tvfProperties = getTvfProperties(dataDesc, bulkStorageDesc);
Map<String, String> tvfProperties = getTvfProperties(dataDesc, brokerDesc);
LogicalPlan tvfLogicalPlan = new LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties));
tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, tvfLogicalPlan);

Expand Down Expand Up @@ -431,15 +432,15 @@ private static void checkAndAddSequenceCol(OlapTable olapTable, BulkLoadDataDesc

private UnboundTVFRelation getUnboundTVFRelation(Map<String, String> properties) {
UnboundTVFRelation relation;
if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) {
if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
S3TableValuedFunction.NAME, new Properties(properties));
} else if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.HDFS) {
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
HdfsTableValuedFunction.NAME, new Properties(properties));
} else {
throw new UnsupportedOperationException("Unsupported load storage type: "
+ bulkStorageDesc.getStorageType());
+ brokerDesc.getStorageType());
}
return relation;
}
Expand All @@ -454,8 +455,8 @@ private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc dataD
return targetTable;
}

private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, BulkStorageDesc bulkStorageDesc) {
Map<String, String> tvfProperties = new HashMap<>(bulkStorageDesc.getProperties());
private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, BrokerDesc brokerDesc) {
Map<String, String> tvfProperties = new HashMap<>(brokerDesc.getProperties());
String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv");
if ("csv".equalsIgnoreCase(fileFormat)) {
dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
Expand All @@ -469,7 +470,7 @@ private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, B
List<String> filePaths = dataDesc.getFilePaths();
// TODO: support multi location by union
String listFilePath = filePaths.get(0);
if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) {
if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
// TODO: check file path by s3 fs list status
tvfProperties.put("uri", listFilePath);
}
Expand Down
Loading