From 245d1ffedd164a6dda599e5e401fbbef20b57857 Mon Sep 17 00:00:00 2001 From: ankitsultana Date: Thu, 19 Dec 2024 23:54:23 +0000 Subject: [PATCH] cleanup code + fix single server handling --- .../pinot/query/runtime/QueryRunner.java | 4 ++ .../PhysicalTimeSeriesServerPlanVisitor.java | 2 +- .../service/dispatch/QueryDispatcher.java | 6 +-- .../planner/TimeSeriesQueryEnvironment.java | 26 ++++--------- .../planner/physical/TableScanVisitor.java | 18 ++++++--- .../physical/TimeSeriesDispatchablePlan.java | 39 +++++++++---------- 6 files changed, 47 insertions(+), 48 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index b835b7e8839..ac335a1674c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -279,6 +279,10 @@ public void processTimeSeriesQuery(List serializedPlanFragments, Map segments = context.getPlanIdToSegmentsMap().get(leafNode.getId()); + List segments = context.getPlanIdToSegmentsMap().getOrDefault(leafNode.getId(), Collections.emptyList()); ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context); return new TimeSeriesPhysicalTableScan(leafNode.getId(), serverQueryRequest, _queryExecutor, _executorService); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index eb692a10351..5a4ce98286c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -264,8 +264,8 @@ Map initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan d result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, Long.toString(timeBuckets.getBucketSize().getSeconds())); result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, Long.toString(timeBuckets.getTimeBuckets().length)); result.put(WorkerRequestMetadataKeys.DEADLINE_MS, Long.toString(deadlineMs)); - Map> planIdToSegments = dispatchablePlan.getPlanIdToSegmentsByServer().get(instanceId); - for (Map.Entry> entry : planIdToSegments.entrySet()) { + Map> leafIdToSegments = dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId); + for (Map.Entry> entry : leafIdToSegments.entrySet()) { result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue())); } result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestContext.getRequestId())); @@ -500,7 +500,7 @@ TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, lo Preconditions.checkState(!deadline.isExpired(), "Deadline expired before query could be sent to servers"); // Send server fragment to every server Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder() - .addAllDispatchPlan(plan.getSerializedPlanFragments(serverId)) + .addAllDispatchPlan(plan.getSerializedServerFragments()) .putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, requestContext, serverId)) .putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId)) .build(); diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java index 41facf7a512..980c4f6bf3b 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java @@ -21,14 +21,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.lang.reflect.Constructor; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.spi.env.PinotConfiguration; @@ -41,7 +38,6 @@ import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult; import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; -import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,36 +89,30 @@ public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeS TableScanVisitor.Context scanVisitorContext = TableScanVisitor.createContext(requestContext.getRequestId()); TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), logicalPlan.getTimeBuckets(), scanVisitorContext); - List serverInstances = scanVisitorContext.getPlanIdToSegmentsByServer().keySet() - .stream().map(TimeSeriesQueryServerInstance::new).collect(Collectors.toList()); - // Step-2: Create plan fragments and serialize them. + List serverInstances = scanVisitorContext.getQueryServers(); + // Step-2: Create plan fragments. List fragments = TimeSeriesPlanFragmenter.getFragments( logicalPlan.getPlanNode(), serverInstances.size() == 1); - List> serializedPlanFragments = new ArrayList<>(); - for (int index = 1; index < fragments.size(); index++) { - BaseTimeSeriesPlanNode serverFragment = fragments.get(index); - serializedPlanFragments.add(Pair.of(serverFragment.getId(), TimeSeriesPlanSerde.serialize(serverFragment))); - } // Step-3: Compute number of servers each exchange node will receive data from. Map numServersForExchangePlanNode = computeNumServersForExchangePlanNode(serverInstances, - fragments, scanVisitorContext.getPlanIdToSegmentsByInstanceId()); + fragments, scanVisitorContext.getLeafIdToSegmentsByInstanceId()); return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), serverInstances, fragments.get(0), - serializedPlanFragments, logicalPlan.getTimeBuckets(), scanVisitorContext.getPlanIdToSegmentsByInstanceId(), - numServersForExchangePlanNode); + fragments.subList(1, fragments.size()), logicalPlan.getTimeBuckets(), + scanVisitorContext.getLeafIdToSegmentsByInstanceId(), numServersForExchangePlanNode); } private Map computeNumServersForExchangePlanNode(List serverInstances, - List planNodes, Map>> planIdToSegmentsByInstanceId) { + List planNodes, Map>> leafIdToSegmentsByInstanceId) { // TODO(timeseries): Handle this gracefully and return an empty block. Preconditions.checkState(!serverInstances.isEmpty(), "No servers selected for the query"); if (serverInstances.size() == 1) { // For single-server case, the broker fragment consists only of the TimeSeriesExchangeNode. return ImmutableMap.of(planNodes.get(0).getId(), 1); } - // For the multi-server case, the planIdToSegmentsByInstanceId map already has the information we need, but we + // For the multi-server case, the leafIdToSegmentsByInstanceId map already has the information we need, but we // just need to restructure it so that we can get number of servers by planId. Map> planIdToServers = new HashMap<>(); - for (var entry : planIdToSegmentsByInstanceId.entrySet()) { + for (var entry : leafIdToSegmentsByInstanceId.entrySet()) { String instanceId = entry.getKey(); for (var innerEntry : entry.getValue().entrySet()) { String planId = innerEntry.getKey(); diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index 2edc443afcd..3df75ce8ab9 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.DataSource; import org.apache.pinot.common.request.Expression; @@ -58,7 +59,7 @@ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets ti for (var entry : routingTable.getServerInstanceToSegmentsMap().entrySet()) { ServerInstance serverInstance = entry.getKey(); List segments = entry.getValue().getLeft(); - context.getPlanIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>()) + context.getLeafIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>()) .put(sfpNode.getId(), segments); } } @@ -72,24 +73,29 @@ public static Context createContext(Long requestId) { } public static class Context { - private final Map>> _planIdToSegmentsByServer = new HashMap<>(); + private final Map>> _leafIdToSegmentsByServer = new HashMap<>(); private final Long _requestId; public Context(Long requestId) { _requestId = requestId; } - public Map>> getPlanIdToSegmentsByServer() { - return _planIdToSegmentsByServer; + public List getQueryServers() { + return _leafIdToSegmentsByServer.keySet().stream().map(TimeSeriesQueryServerInstance::new).collect( + Collectors.toList()); } - public Map>> getPlanIdToSegmentsByInstanceId() { + public Map>> getLeafIdToSegmentsByInstanceId() { Map>> result = new HashMap<>(); - for (var entry : _planIdToSegmentsByServer.entrySet()) { + for (var entry : _leafIdToSegmentsByServer.entrySet()) { result.put(entry.getKey().getInstanceId(), entry.getValue()); } return result; } + + Map>> getLeafIdToSegmentsByServer() { + return _leafIdToSegmentsByServer; + } } private BrokerRequest compileBrokerRequest(String tableName, Expression filterExpression) { diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java index 261bb405d1a..8fa0152be75 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TimeSeriesDispatchablePlan.java @@ -18,35 +18,37 @@ */ package org.apache.pinot.tsdb.planner.physical; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.tuple.Pair; +import java.util.stream.Collectors; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; +import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde; public class TimeSeriesDispatchablePlan { private final List _queryServerInstances; private final String _language; private final BaseTimeSeriesPlanNode _brokerFragment; - private final List> _serializedPlanFragments; + private final List _serverFragments; private final TimeBuckets _timeBuckets; - private final Map>> _planIdToSegmentsByServer; + private final Map>> _leafIdToSegmentsByInstanceId; private final Map _numInputServersForExchangePlanNode; + private final List _serializedServerFragments; public TimeSeriesDispatchablePlan(String language, List queryServerInstances, - BaseTimeSeriesPlanNode brokerFragment, List> serializedPlanFragmentsByPlanId, - TimeBuckets initialTimeBuckets, Map>> planIdToSegmentsByServer, + BaseTimeSeriesPlanNode brokerFragment, List serverFragments, + TimeBuckets initialTimeBuckets, Map>> leafIdToSegmentsByInstanceId, Map numInputServersForExchangePlanNode) { _language = language; _queryServerInstances = queryServerInstances; _brokerFragment = brokerFragment; - _serializedPlanFragments = serializedPlanFragmentsByPlanId; + _serverFragments = serverFragments; _timeBuckets = initialTimeBuckets; - _planIdToSegmentsByServer = planIdToSegmentsByServer; + _leafIdToSegmentsByInstanceId = leafIdToSegmentsByInstanceId; _numInputServersForExchangePlanNode = numInputServersForExchangePlanNode; + _serializedServerFragments = serverFragments.stream().map(TimeSeriesPlanSerde::serialize).collect( + Collectors.toList()); } public String getLanguage() { @@ -61,23 +63,20 @@ public BaseTimeSeriesPlanNode getBrokerFragment() { return _brokerFragment; } - public List getSerializedPlanFragments(String instanceId) { - Set planIdForInstance = _planIdToSegmentsByServer.get(instanceId).keySet(); - List result = new ArrayList<>(); - for (var planIdAndPlanFragment : _serializedPlanFragments) { - if (planIdForInstance.contains(planIdAndPlanFragment.getLeft())) { - result.add(planIdAndPlanFragment.getRight()); - } - } - return result; + public List getServerFragments() { + return _serverFragments; + } + + public List getSerializedServerFragments() { + return _serializedServerFragments; } public TimeBuckets getTimeBuckets() { return _timeBuckets; } - public Map>> getPlanIdToSegmentsByServer() { - return _planIdToSegmentsByServer; + public Map>> getLeafIdToSegmentsByInstanceId() { + return _leafIdToSegmentsByInstanceId; } public Map getNumInputServersForExchangePlanNode() {