From b857f1ed3ee697c60e306b7e35e5fb6a7a8ee4de Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Date: Wed, 20 Nov 2024 14:14:00 +0100 Subject: [PATCH] Support spool in runtime. Still not complete. Explain implementation is working, but not actual queries. --- .../MultiStageBrokerRequestHandler.java | 5 +- pinot-common/src/main/proto/plan.proto | 4 +- .../apache/pinot/query/QueryEnvironment.java | 48 +++-- .../pinot/query/context/PlannerContext.java | 9 +- .../explain/PhysicalExplainPlanVisitor.java | 25 ++- .../logical/EquivalentStagesFinder.java | 2 +- .../logical/EquivalentStagesReplacer.java | 20 +- .../logical/PinotLogicalQueryPlanner.java | 12 +- .../query/planner/logical/PlanFragmenter.java | 13 +- .../physical/DispatchablePlanVisitor.java | 18 +- .../physical/MailboxAssignmentVisitor.java | 171 +++++++++--------- .../physical/PinotDispatchPlanner.java | 2 +- .../GreedyShuffleRewriteVisitor.java | 35 ++-- .../planner/plannode/MailboxSendNode.java | 8 + .../planner/serde/PlanNodeDeserializer.java | 12 +- .../planner/serde/PlanNodeSerializer.java | 11 +- .../pinot/query/runtime/QueryRunner.java | 16 +- .../runtime/operator/MailboxSendOperator.java | 19 +- .../pinot/spi/utils/CommonConstants.java | 8 + 19 files changed, 275 insertions(+), 163 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index ae12c0e725f6..8f1003092d76 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -136,14 +136,15 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT, CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT); - //@formatter:off + boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS, + CommonConstants.Broker.DEFAULT_OF_SPOOLS); QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder() .database(database) .tableCache(_tableCache) .workerManager(_workerManager) .defaultInferPartitionHint(inferPartitionHint) + .defaultUseSpools(defaultUseSpool) .build()); - //@formatter:on switch (sqlNodeAndOptions.getSqlNode().getKind()) { case EXPLAIN: boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions) diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 06b2f0910cfd..8d52f55e9534 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -143,13 +143,15 @@ message MailboxReceiveNode { } message MailboxSendNode { - int32 receiverStageId = 1; + // kept for backward compatibility. Brokers populate it, but servers should prioritize receiverStageIds + int32 receiverStageId = 1 [deprecated = true]; ExchangeType exchangeType = 2; DistributionType distributionType = 3; repeated int32 keys = 4; bool prePartitioned = 5; repeated Collation collations = 6; bool sort = 7; + repeated int32 receiverStageIds = 8; } message ProjectNode { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 629c7ae2c56f..63422f37e521 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -138,7 +138,8 @@ public QueryEnvironment(String database, TableCache tableCache, @Nullable Worker private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); HepProgram traitProgram = getTraitProgram(workerManager); - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); + return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram, + sqlNodeAndOptions.getOptions()); } @Nullable @@ -163,14 +164,6 @@ private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) { } } - /** - * Returns the planner context that should be used only for parsing queries. - */ - private PlannerContext getParsingPlannerContext() { - HepProgram traitProgram = getTraitProgram(null); - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); - } - /** * Plan a SQL query. * @@ -185,7 +178,6 @@ private PlannerContext getParsingPlannerContext() { */ public QueryPlannerResult planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions, long requestId) { try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { - plannerContext.setOptions(sqlNodeAndOptions.getOptions()); RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext); // TODO: current code only assume one SubPlan per query, but we should support multiple SubPlans per query. // Each SubPlan should be able to run independently from Broker then set the results into the dependent @@ -209,8 +201,7 @@ public DispatchableSubPlan planQuery(String sqlQuery) { * * Similar to {@link QueryEnvironment#planQuery(String, SqlNodeAndOptions, long)}, this API runs the query * compilation. But it doesn't run the distributed {@link DispatchableSubPlan} generation, instead it only - * returns the - * explained logical plan. + * returns the explained logical plan. * * @param sqlQuery SQL query string. * @param sqlNodeAndOptions parsed SQL query. @@ -221,7 +212,6 @@ public QueryPlannerResult explainQuery(String sqlQuery, SqlNodeAndOptions sqlNod @Nullable AskingServerStageExplainer.OnServerExplainer onServerExplainer) { try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode(); - plannerContext.setOptions(sqlNodeAndOptions.getOptions()); RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext); if (explain instanceof SqlPhysicalExplain) { // get the physical plan for query. @@ -271,8 +261,9 @@ public String explainQuery(String sqlQuery, long requestId) { } public List getTableNamesForQuery(String sqlQuery) { - try (PlannerContext plannerContext = getParsingPlannerContext()) { - SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery).getSqlNode(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery); + try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } @@ -288,8 +279,9 @@ public List getTableNamesForQuery(String sqlQuery) { * Returns whether the query can be successfully compiled in this query environment */ public boolean canCompileQuery(String query) { - try (PlannerContext plannerContext = getParsingPlannerContext()) { - SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + try (PlannerContext plannerContext = getPlannerContext(sqlNodeAndOptions)) { + SqlNode sqlNode = sqlNodeAndOptions.getSqlNode(); if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { sqlNode = ((SqlExplain) sqlNode).getExplicandum(); } @@ -400,7 +392,7 @@ private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContex private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId, @Nullable TransformationTracker.Builder tracker) { - SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker); + SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot, tracker, useSpools(plannerContext.getOptions())); PinotDispatchPlanner pinotDispatchPlanner = new PinotDispatchPlanner(plannerContext, _envConfig.getWorkerManager(), requestId, _envConfig.getTableCache()); return pinotDispatchPlanner.createDispatchableSubPlan(plan); @@ -465,6 +457,14 @@ public static ImmutableQueryEnvironment.Config.Builder configBuilder() { return ImmutableQueryEnvironment.Config.builder(); } + public boolean useSpools(Map options) { + String optionValue = options.get(CommonConstants.Broker.Request.QueryOptionKey.USE_SPOOLS); + if (optionValue == null) { + return _envConfig.defaultUseSpools(); + } + return Boolean.parseBoolean(optionValue); + } + @Value.Immutable public interface Config { String getDatabase(); @@ -484,6 +484,18 @@ default boolean defaultInferPartitionHint() { return CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT; } + /** + * Whether to use spools or not. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_SPOOLS}. + */ + @Value.Default + default boolean defaultUseSpools() { + return CommonConstants.Broker.DEFAULT_OF_SPOOLS; + } + /** * Returns the worker manager. * diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java index 3164921c785e..4505e16da3d8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java @@ -47,15 +47,16 @@ public class PlannerContext implements AutoCloseable { private final RelOptPlanner _relOptPlanner; private final LogicalPlanner _relTraitPlanner; - private Map _options; + private final Map _options; public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory, - HepProgram optProgram, HepProgram traitProgram) { + HepProgram optProgram, HepProgram traitProgram, Map options) { _planner = new PlannerImpl(config); _validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory); _relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs()); _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT, Collections.singletonList(RelDistributionTraitDef.INSTANCE)); + _options = options; } public PlannerImpl getPlanner() { @@ -74,10 +75,6 @@ public LogicalPlanner getRelTraitPlanner() { return _relTraitPlanner; } - public void setOptions(Map options) { - _options = options; - } - public Map getOptions() { return _options; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java index e7d1c04f50dc..b91783a18637 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/explain/PhysicalExplainPlanVisitor.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.query.planner.explain; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.pinot.query.planner.physical.DispatchablePlanFragment; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AggregateNode; @@ -212,14 +215,22 @@ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) { private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) { appendInfo(node, context); - int receiverStageId = node.getReceiverStageId(); - List receiverMailboxInfos = - _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) - .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List> perStageDescriptions = new ArrayList<>(); + // This iterator is guaranteed to be sorted by stageId + for (Integer receiverStageId : node.getReceiverStageIds()) { + List receiverMailboxInfos = + _dispatchableSubPlan.getQueryStageList().get(node.getStageId()).getWorkerMetadataList().get(context._workerId) + .getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + // Sort to ensure print order + Stream stageDescriptions = receiverMailboxInfos.stream() + .sorted(Comparator.comparingInt(MailboxInfo::getPort)) + .map(v -> "[" + receiverStageId + "]@" + v); + perStageDescriptions.add(stageDescriptions); + } context._builder.append("->"); - // Sort to ensure print order - String receivers = receiverMailboxInfos.stream().sorted(Comparator.comparingInt(MailboxInfo::getPort)) - .map(v -> "[" + receiverStageId + "]@" + v).collect(Collectors.joining(",", "{", "}")); + String receivers = perStageDescriptions.stream() + .flatMap(Function.identity()) + .collect(Collectors.joining(",", "{", "}")); return context._builder.append(receivers); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java index 28bca306cd5c..7e87e987a1d2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesFinder.java @@ -52,7 +52,7 @@ public class EquivalentStagesFinder { private EquivalentStagesFinder() { } - public static GroupedStages findEquivalentStages(MailboxSendNode root) { + public static GroupedStages findEquivalentStages(PlanNode root) { Visitor visitor = new Visitor(); root.visit(visitor, null); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java index 06a4cf16dac3..0ad7d9b4d86f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/EquivalentStagesReplacer.java @@ -38,20 +38,31 @@ public class EquivalentStagesReplacer { private EquivalentStagesReplacer() { } + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { + replaceEquivalentStages(root, equivalentStages, OnSubstitution.NO_OP); + } + /** * Replaces the equivalent stages in the query plan. * * @param root Root plan node * @param equivalentStages Equivalent stages */ - public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages) { - root.visit(Replacer.INSTANCE, equivalentStages); + public static void replaceEquivalentStages(PlanNode root, GroupedStages equivalentStages, OnSubstitution listener) { + root.visit(new Replacer(listener), equivalentStages); + } + + public interface OnSubstitution { + OnSubstitution NO_OP = (receiver, oldSender, newSender) -> { + }; + void onSubstitution(int receiver, int oldSender, int newSender); } private static class Replacer extends PlanNodeVisitor.DepthFirstVisitor { - private static final Replacer INSTANCE = new Replacer(); + private final OnSubstitution _listener; - private Replacer() { + public Replacer(OnSubstitution listener) { + _listener = listener; } @Override @@ -62,6 +73,7 @@ public Void visitMailboxReceive(MailboxReceiveNode node, GroupedStages equivalen // we don't want to visit the children of the node given it is going to be pruned node.setSender(leader); leader.addReceiver(node); + _listener.onSubstitution(node.getStageId(), sender.getStageId(), leader.getStageId()); } else { visitMailboxSend(leader, equivalenceGroups); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 8282ea787b31..e08ebd29bd92 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -55,10 +55,10 @@ private PinotLogicalQueryPlanner() { * Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}. */ public static SubPlan makePlan(RelRoot relRoot, - @Nullable TransformationTracker.Builder tracker) { + @Nullable TransformationTracker.Builder tracker, boolean useSpools) { PlanNode rootNode = new RelToPlanNodeConverter(tracker).toPlanNode(relRoot.rel); - PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker); + PlanFragment rootFragment = planNodeToPlanFragment(rootNode, tracker, useSpools); return new SubPlan(rootFragment, new SubPlanMetadata(RelToPlanNodeConverter.getTableNamesFromRelRoot(relRoot.rel), relRoot.fields), List.of()); @@ -89,10 +89,16 @@ public static SubPlan makePlan(RelRoot relRoot, } private static PlanFragment planNodeToPlanFragment( - PlanNode node, @Nullable TransformationTracker.Builder tracker) { + PlanNode node, @Nullable TransformationTracker.Builder tracker, boolean useSpools) { PlanFragmenter fragmenter = new PlanFragmenter(); PlanFragmenter.Context fragmenterContext = fragmenter.createContext(); node = node.visit(fragmenter, fragmenterContext); + + if (useSpools) { + GroupedStages equivalentStages = EquivalentStagesFinder.findEquivalentStages(node); + EquivalentStagesReplacer.replaceEquivalentStages(node, equivalentStages, fragmenter); + } + Int2ObjectOpenHashMap planFragmentMap = fragmenter.getPlanFragmentMap(); Int2ObjectOpenHashMap childPlanFragmentIdsMap = fragmenter.getChildPlanFragmentIdsMap(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java index 420b9d16150b..bbd9a50924a0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanFragmenter.java @@ -56,7 +56,8 @@ * 3. Assign current PlanFragment ID to {@link MailboxReceiveNode}; * 4. Increment current PlanFragment ID by one and assign it to the {@link MailboxSendNode}. */ -public class PlanFragmenter implements PlanNodeVisitor { +public class PlanFragmenter implements PlanNodeVisitor, + EquivalentStagesReplacer.OnSubstitution { private final Int2ObjectOpenHashMap _planFragmentMap = new Int2ObjectOpenHashMap<>(); private final Int2ObjectOpenHashMap _childPlanFragmentIdsMap = new Int2ObjectOpenHashMap<>(); @@ -86,6 +87,16 @@ private PlanNode process(PlanNode node, Context context) { return node; } + @Override + public void onSubstitution(int receiver, int oldSender, int newSender) { + IntList senders = _childPlanFragmentIdsMap.get(receiver); + senders.rem(oldSender); + if (!senders.contains(newSender)) { + senders.add(newSender); + } + _planFragmentMap.remove(oldSender); + } + @Override public PlanNode visitAggregate(AggregateNode node, Context context) { return process(node, context); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java index a6a7040c4e0d..338161da9e7b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.query.planner.physical; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.query.planner.plannode.AggregateNode; import org.apache.pinot.query.planner.plannode.ExchangeNode; @@ -37,10 +40,7 @@ public class DispatchablePlanVisitor implements PlanNodeVisitor { - public static final DispatchablePlanVisitor INSTANCE = new DispatchablePlanVisitor(); - - private DispatchablePlanVisitor() { - } + private final Set _visited = Collections.newSetFromMap(new IdentityHashMap<>()); private static DispatchablePlanMetadata getOrCreateDispatchablePlanMetadata(PlanNode node, DispatchablePlanContext context) { @@ -104,10 +104,12 @@ public Void visitMailboxReceive(MailboxReceiveNode node, DispatchablePlanContext @Override public Void visitMailboxSend(MailboxSendNode node, DispatchablePlanContext context) { - node.getInputs().get(0).visit(this, context); - DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); - dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned()); - context.getDispatchablePlanStageRootMap().put(node.getStageId(), node); + if (_visited.add(node)) { + node.getInputs().get(0).visit(this, context); + DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); + dispatchablePlanMetadata.setPrePartitioned(node.isPrePartitioned()); + context.getDispatchablePlanStageRootMap().put(node.getStageId(), node); + } return null; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java index 75765d341f07..5a6734f23f6a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java @@ -43,99 +43,102 @@ public Void process(PlanNode node, DispatchablePlanContext context) { if (node instanceof MailboxSendNode) { MailboxSendNode sendNode = (MailboxSendNode) node; int senderStageId = sendNode.getStageId(); - int receiverStageId = sendNode.getReceiverStageId(); - Map metadataMap = context.getDispatchablePlanMetadataMap(); - DispatchablePlanMetadata senderMetadata = metadataMap.get(senderStageId); - DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverStageId); - Map senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); - Map receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); - Map> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); - Map> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); + for (Integer receiverStageId : sendNode.getReceiverStageIds()) { + Map metadataMap = context.getDispatchablePlanMetadataMap(); + DispatchablePlanMetadata senderMetadata = metadataMap.get(senderStageId); + DispatchablePlanMetadata receiverMetadata = metadataMap.get(receiverStageId); + Map senderServerMap = senderMetadata.getWorkerIdToServerInstanceMap(); + Map receiverServerMap = receiverMetadata.getWorkerIdToServerInstanceMap(); + Map> senderMailboxesMap = senderMetadata.getWorkerIdToMailboxesMap(); + Map> receiverMailboxesMap = receiverMetadata.getWorkerIdToMailboxesMap(); - int numSenders = senderServerMap.size(); - int numReceivers = receiverServerMap.size(); - if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) { - // For SINGLETON exchange type, send the data to the same instance (same worker id) - Preconditions.checkState(numSenders == numReceivers, - "Got different number of workers for SINGLETON distribution type, sender: %s, receiver: %s", numSenders, - numReceivers); - for (int workerId = 0; workerId < numSenders; workerId++) { - QueryServerInstance senderServer = senderServerMap.get(workerId); - QueryServerInstance receiverServer = receiverServerMap.get(workerId); - Preconditions.checkState(senderServer.equals(receiverServer), - "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", - workerId, senderServer, receiverServer); - MailboxInfos mailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), - ImmutableList.of(workerId))); - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverStageId, mailboxInfos); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, mailboxInfos); - } - } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) { - // - direct exchange possible: - // 1. send the data to the worker with the same worker id (not necessary the same instance), 1-to-1 mapping - // 2. When partition parallelism is configured, fanout based on partition parallelism from each sender - // workerID to sequentially increment receiver workerIDs - int partitionParallelism = numReceivers / numSenders; - if (partitionParallelism == 1) { - // 1-to-1 mapping + int numSenders = senderServerMap.size(); + int numReceivers = receiverServerMap.size(); + if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) { + // For SINGLETON exchange type, send the data to the same instance (same worker id) + Preconditions.checkState(numSenders == numReceivers, + "Got different number of workers for SINGLETON distribution type, sender: %s, receiver: %s", numSenders, + numReceivers); for (int workerId = 0; workerId < numSenders; workerId++) { QueryServerInstance senderServer = senderServerMap.get(workerId); QueryServerInstance receiverServer = receiverServerMap.get(workerId); - List workerIds = ImmutableList.of(workerId); - MailboxInfos senderMailboxInfos; - MailboxInfos receiverMailboxInfos; - if (senderServer.equals(receiverServer)) { - senderMailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); - receiverMailboxInfos = senderMailboxInfos; - } else { - senderMailboxInfos = new MailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); - receiverMailboxInfos = new MailboxInfos( - new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds)); + Preconditions.checkState(senderServer.equals(receiverServer), + "Got different server for SINGLETON distribution type for worker id: %s, sender: %s, receiver: %s", + workerId, senderServer, receiverServer); + MailboxInfos mailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(workerId))); + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(receiverStageId, mailboxInfos); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, mailboxInfos); + } + } else if (senderMetadata.isPrePartitioned() && isDirectExchangeCompatible(senderMetadata, receiverMetadata)) { + // - direct exchange possible: + // 1. send the data to the worker with the same worker id (not necessary the same instance), 1-to-1 mapping + // 2. When partition parallelism is configured, fanout based on partition parallelism from each sender + // workerID to sequentially increment receiver workerIDs + int partitionParallelism = numReceivers / numSenders; + if (partitionParallelism == 1) { + // 1-to-1 mapping + for (int workerId = 0; workerId < numSenders; workerId++) { + QueryServerInstance senderServer = senderServerMap.get(workerId); + QueryServerInstance receiverServer = receiverServerMap.get(workerId); + List workerIds = ImmutableList.of(workerId); + MailboxInfos senderMailboxInfos; + MailboxInfos receiverMailboxInfos; + if (senderServer.equals(receiverServer)) { + senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = senderMailboxInfos; + } else { + senderMailboxInfos = new MailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), workerIds)); + receiverMailboxInfos = new MailboxInfos( + new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), workerIds)); + } + senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(receiverStageId, receiverMailboxInfos); + receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); + } + } else { + // 1-to- mapping + int receiverWorkerId = 0; + for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { + QueryServerInstance senderServer = senderServerMap.get(senderWorkerId); + QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId); + List receiverWorkerIds = new ArrayList<>(partitionParallelism); + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverStageId, + new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), + receiverWorkerIds))); + MailboxInfos senderMailboxInfos = new SharedMailboxInfos( + new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), + ImmutableList.of(senderWorkerId))); + for (int i = 0; i < partitionParallelism; i++) { + receiverWorkerIds.add(receiverWorkerId); + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); + receiverWorkerId++; + } } - senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()) - .put(receiverStageId, receiverMailboxInfos); - receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>()).put(senderStageId, senderMailboxInfos); } } else { - // 1-to- mapping - int receiverWorkerId = 0; + // For other exchange types, send the data to all the instances in the receiver fragment + // TODO: Add support for more exchange types + List receiverMailboxInfoList = getMailboxInfos(receiverServerMap); + MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList) + : new MailboxInfos(receiverMailboxInfoList); for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - QueryServerInstance senderServer = senderServerMap.get(senderWorkerId); - QueryServerInstance receiverServer = receiverServerMap.get(receiverWorkerId); - List receiverWorkerIds = new ArrayList<>(partitionParallelism); - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()).put(receiverStageId, - new MailboxInfos(new MailboxInfo(receiverServer.getHostname(), receiverServer.getQueryMailboxPort(), - receiverWorkerIds))); - MailboxInfos senderMailboxInfos = new SharedMailboxInfos( - new MailboxInfo(senderServer.getHostname(), senderServer.getQueryMailboxPort(), - ImmutableList.of(senderWorkerId))); - for (int i = 0; i < partitionParallelism; i++) { - receiverWorkerIds.add(receiverWorkerId); - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .put(senderStageId, senderMailboxInfos); - receiverWorkerId++; - } + senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) + .put(receiverStageId, receiverMailboxInfos); + } + List senderMailboxInfoList = getMailboxInfos(senderServerMap); + MailboxInfos senderMailboxInfos = + numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) + : new MailboxInfos(senderMailboxInfoList); + for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { + receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) + .put(senderStageId, senderMailboxInfos); } - } - } else { - // For other exchange types, send the data to all the instances in the receiver fragment - // TODO: Add support for more exchange types - List receiverMailboxInfoList = getMailboxInfos(receiverServerMap); - MailboxInfos receiverMailboxInfos = numSenders > 1 ? new SharedMailboxInfos(receiverMailboxInfoList) - : new MailboxInfos(receiverMailboxInfoList); - for (int senderWorkerId = 0; senderWorkerId < numSenders; senderWorkerId++) { - senderMailboxesMap.computeIfAbsent(senderWorkerId, k -> new HashMap<>()) - .put(receiverStageId, receiverMailboxInfos); - } - List senderMailboxInfoList = getMailboxInfos(senderServerMap); - MailboxInfos senderMailboxInfos = - numReceivers > 1 ? new SharedMailboxInfos(senderMailboxInfoList) : new MailboxInfos(senderMailboxInfoList); - for (int receiverWorkerId = 0; receiverWorkerId < numReceivers; receiverWorkerId++) { - receiverMailboxesMap.computeIfAbsent(receiverWorkerId, k -> new HashMap<>()) - .put(senderStageId, senderMailboxInfos); } } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java index 5c9dabb225be..0828aa49ffe5 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java @@ -59,7 +59,7 @@ public DispatchableSubPlan createDispatchableSubPlan(SubPlan subPlan) { PlanFragment rootFragment = subPlan.getSubPlanRoot(); PlanNode rootNode = rootFragment.getFragmentRoot(); // 1. start by visiting the sub plan fragment root. - rootNode.visit(DispatchablePlanVisitor.INSTANCE, context); + rootNode.visit(new DispatchablePlanVisitor(), context); // 2. add a special stage for the global mailbox receive, this runs on the dispatcher. context.getDispatchablePlanStageRootMap().put(0, rootNode); // 3. add worker assignment after the dispatchable plan context is fulfilled after the visit. diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java index 71546d1fe822..a0e94561e3c0 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java @@ -209,22 +209,33 @@ public Set visitMailboxSend(MailboxSendNode node, GreedyShuffleRe boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, distributionKeys); // If receiver is not a join-stage, then we can determine distribution type now. - if (!context.isJoinStage(node.getReceiverStageId())) { - Set colocationKeys; - if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) { - // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side. + boolean sendsToJoin = false; + boolean allAreSuperSet = true; + for (Integer receiverStageId : node.getReceiverStageIds()) { + if (context.isJoinStage(receiverStageId)) { + sendsToJoin = true; + break; + } + if (!(canSkipShuffleBasic && areServersSuperset(receiverStageId, node.getStageId()))) { + allAreSuperSet = false; + break; + } + } + if (!sendsToJoin) { + if (allAreSuperSet) { node.setDistributionType(RelDistribution.Type.SINGLETON); - colocationKeys = oldColocationKeys; + return oldColocationKeys; } else { - colocationKeys = new HashSet<>(); + Set colocationKeys = new HashSet<>(); + context.setColocationKeys(node.getStageId(), colocationKeys); + return colocationKeys; } - context.setColocationKeys(node.getStageId(), colocationKeys); - return colocationKeys; + } else { + // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode. + Set mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>(); + context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys); + return mailboxSendColocationKeys; } - // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode. - Set mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>(); - context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys); - return mailboxSendColocationKeys; } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java index 9cc2c2e65792..0d82e694435a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxSendNode.java @@ -54,6 +54,14 @@ private MailboxSendNode(int stageId, DataSchema dataSchema, List input _sort = sort; } + public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs, + @Nullable List receiverStages, PinotRelExchangeType exchangeType, + RelDistribution.Type distributionType, @Nullable List keys, boolean prePartitioned, + @Nullable List collations, boolean sort) { + this(stageId, dataSchema, inputs, toBitSet(receiverStages), exchangeType, + distributionType, keys, prePartitioned, collations, sort); + } + public MailboxSendNode(int stageId, DataSchema dataSchema, List inputs, int receiverStage, PinotRelExchangeType exchangeType, RelDistribution.Type distributionType, @Nullable List keys, boolean prePartitioned, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index dca8cb18954a..3c3c497fca55 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -117,8 +117,18 @@ private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode pr private static MailboxSendNode deserializeMailboxSendNode(Plan.PlanNode protoNode) { Plan.MailboxSendNode protoMailboxSendNode = protoNode.getMailboxSendNode(); + + List receiverIds; + List protoReceiverIds = protoMailboxSendNode.getReceiverStageIdsList(); + if (protoReceiverIds == null || protoReceiverIds.isEmpty()) { + // This should only happen if a not updated broker sends the request + receiverIds = List.of(protoMailboxSendNode.getReceiverStageId()); + } else { + receiverIds = protoReceiverIds; + } + return new MailboxSendNode(protoNode.getStageId(), extractDataSchema(protoNode), extractInputs(protoNode), - protoMailboxSendNode.getReceiverStageId(), convertExchangeType(protoMailboxSendNode.getExchangeType()), + receiverIds, convertExchangeType(protoMailboxSendNode.getExchangeType()), convertDistributionType(protoMailboxSendNode.getDistributionType()), protoMailboxSendNode.getKeysList(), protoMailboxSendNode.getPrePartitioned(), convertCollations(protoMailboxSendNode.getCollationsList()), protoMailboxSendNode.getSort()); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index 00a21c05e954..74d768e800b7 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -132,8 +132,17 @@ public Void visitMailboxReceive(MailboxReceiveNode node, Plan.PlanNode.Builder b @Override public Void visitMailboxSend(MailboxSendNode node, Plan.PlanNode.Builder builder) { + + List receiverStageIds = new ArrayList<>(); + for (Integer receiverStageId : node.getReceiverStageIds()) { + receiverStageIds.add(receiverStageId); + } + assert !receiverStageIds.isEmpty() : "Receiver stage IDs should not be empty"; + Plan.MailboxSendNode mailboxSendNode = - Plan.MailboxSendNode.newBuilder().setReceiverStageId(node.getReceiverStageId()) + Plan.MailboxSendNode.newBuilder() + .setReceiverStageId(receiverStageIds.get(0)) // to keep backward compatibility + .addAllReceiverStageIds(receiverStageIds) .setExchangeType(convertExchangeType(node.getExchangeType())) .setDistributionType(convertDistributionType(node.getDistributionType())).addAllKeys(node.getKeys()) .setPrePartitioned(node.isPrePartitioned()).addAllCollations(convertCollations(node.getCollations())) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index ba6984bcf975..744a8d3bc5ac 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -207,12 +207,16 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map int stageId = stageMetadata.getStageId(); LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId, stageId, errorBlock.getExceptions()); - int receiverStageId = ((MailboxSendNode) stagePlan.getRootNode()).getReceiverStageId(); - List receiverMailboxInfos = - workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); - List routingInfos = - MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, - receiverMailboxInfos); + MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode(); + List routingInfos = new ArrayList<>(); + for (Integer receiverStageId : rootNode.getReceiverStageIds()) { + List receiverMailboxInfos = + workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List stageRoutingInfos = + MailboxIdUtils.toRoutingInfos(requestId, stageId, workerMetadata.getWorkerId(), receiverStageId, + receiverMailboxInfos); + routingInfos.addAll(stageRoutingInfos); + } for (RoutingInfo routingInfo : routingInfos) { try { StatMap statMap = new StatMap<>(MailboxSendOperator.StatKey.class); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 864f200fe6e5..db325fa17efe 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -65,7 +66,7 @@ public class MailboxSendOperator extends MultiStageOperator { // TODO: Support sort on sender public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator input, MailboxSendNode node) { this(context, input, - statMap -> getBlockExchange(context, node.getReceiverStageId(), node.getDistributionType(), node.getKeys(), + statMap -> getBlockExchange(context, node.getReceiverStageIds(), node.getDistributionType(), node.getKeys(), statMap)); _statMap.merge(StatKey.STAGE, context.getStageId()); _statMap.merge(StatKey.PARALLELISM, 1); @@ -79,7 +80,7 @@ public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator i _exchange = exchangeFactory.apply(_statMap); } - private static BlockExchange getBlockExchange(OpChainExecutionContext context, int receiverStageId, + private static BlockExchange getBlockExchange(OpChainExecutionContext context, Iterable receiverStageIds, RelDistribution.Type distributionType, List keys, StatMap statMap) { Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(distributionType), "Unsupported distribution type: %s", distributionType); @@ -87,11 +88,15 @@ private static BlockExchange getBlockExchange(OpChainExecutionContext context, i long requestId = context.getRequestId(); long deadlineMs = context.getDeadlineMs(); - List mailboxInfos = - context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); - List routingInfos = - MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId, - mailboxInfos); + List routingInfos = new ArrayList<>(); + for (Integer receiverStageId : receiverStageIds) { + List mailboxInfos = + context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos(); + List stageRoutingInfos = + MailboxIdUtils.toRoutingInfos(requestId, context.getStageId(), context.getWorkerId(), receiverStageId, + mailboxInfos); + routingInfos.addAll(stageRoutingInfos); + } List sendingMailboxes = routingInfos.stream() .map(v -> mailboxService.getSendingMailbox(v.getHostname(), v.getPort(), v.getMailboxId(), deadlineMs, statMap)) .collect(Collectors.toList()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 21800684b9b7..f45415b8ce0c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -359,6 +359,13 @@ public static class Broker { public static final String CONFIG_OF_INFER_PARTITION_HINT = "pinot.broker.multistage.infer.partition.hint"; public static final boolean DEFAULT_INFER_PARTITION_HINT = false; + /** + * Whether to use spools in multistage query engine by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_SPOOLS} query option + */ + public static final String CONFIG_OF_SPOOLS = "pinot.broker.multistage.spools"; + public static final boolean DEFAULT_OF_SPOOLS = false; + public static final String CONFIG_OF_USE_FIXED_REPLICA = "pinot.broker.use.fixed.replica"; public static final boolean DEFAULT_USE_FIXED_REPLICA = false; @@ -410,6 +417,7 @@ public static class QueryOptionKey { public static final String INFER_PARTITION_HINT = "inferPartitionHint"; public static final String ENABLE_NULL_HANDLING = "enableNullHandling"; public static final String APPLICATION_NAME = "applicationName"; + public static final String USE_SPOOLS = "useSpools"; /** * If set, changes the explain behavior in multi-stage engine. *