Skip to content

Commit

Permalink
cleanup code + fix single server handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Dec 19, 2024
1 parent 1d5fa8a commit 245d1ff
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ public void processTimeSeriesQuery(List<String> serializedPlanFragments, Map<Str
LOGGER.warn("Unable to send error to broker. Original error: {}", t.getMessage(), t2);
}
};
if (serializedPlanFragments.isEmpty()) {
handleErrors.accept(Pair.of(new IllegalStateException("No plan fragments received in server"), ""));
return;
}
try {
final long deadlineMs = extractDeadlineMs(metadata);
Preconditions.checkState(System.currentTimeMillis() < deadlineMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public BaseTimeSeriesPlanNode initLeafPlanNode(BaseTimeSeriesPlanNode planNode,

private TimeSeriesPhysicalTableScan convertLeafToPhysicalTableScan(LeafTimeSeriesPlanNode leafNode,
TimeSeriesExecutionContext context) {
List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId());
List<String> segments = context.getPlanIdToSegmentsMap().getOrDefault(leafNode.getId(), Collections.emptyList());
ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context);
return new TimeSeriesPhysicalTableScan(leafNode.getId(), serverQueryRequest, _queryExecutor, _executorService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ Map<String, String> initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan d
result.put(WorkerRequestMetadataKeys.WINDOW_SECONDS, Long.toString(timeBuckets.getBucketSize().getSeconds()));
result.put(WorkerRequestMetadataKeys.NUM_ELEMENTS, Long.toString(timeBuckets.getTimeBuckets().length));
result.put(WorkerRequestMetadataKeys.DEADLINE_MS, Long.toString(deadlineMs));
Map<String, List<String>> planIdToSegments = dispatchablePlan.getPlanIdToSegmentsByServer().get(instanceId);
for (Map.Entry<String, List<String>> entry : planIdToSegments.entrySet()) {
Map<String, List<String>> leafIdToSegments = dispatchablePlan.getLeafIdToSegmentsByInstanceId().get(instanceId);
for (Map.Entry<String, List<String>> entry : leafIdToSegments.entrySet()) {
result.put(WorkerRequestMetadataKeys.encodeSegmentListKey(entry.getKey()), String.join(",", entry.getValue()));
}
result.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestContext.getRequestId()));
Expand Down Expand Up @@ -500,7 +500,7 @@ TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, lo
Preconditions.checkState(!deadline.isExpired(), "Deadline expired before query could be sent to servers");
// Send server fragment to every server
Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder()
.addAllDispatchPlan(plan.getSerializedPlanFragments(serverId))
.addAllDispatchPlan(plan.getSerializedServerFragments())
.putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, requestContext, serverId))
.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -41,7 +38,6 @@
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanResult;
import org.apache.pinot.tsdb.spi.TimeSeriesLogicalPlanner;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,36 +89,30 @@ public TimeSeriesDispatchablePlan buildPhysicalPlan(RangeTimeSeriesRequest timeS
TableScanVisitor.Context scanVisitorContext = TableScanVisitor.createContext(requestContext.getRequestId());
TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), logicalPlan.getTimeBuckets(),
scanVisitorContext);
List<TimeSeriesQueryServerInstance> serverInstances = scanVisitorContext.getPlanIdToSegmentsByServer().keySet()
.stream().map(TimeSeriesQueryServerInstance::new).collect(Collectors.toList());
// Step-2: Create plan fragments and serialize them.
List<TimeSeriesQueryServerInstance> serverInstances = scanVisitorContext.getQueryServers();
// Step-2: Create plan fragments.
List<BaseTimeSeriesPlanNode> fragments = TimeSeriesPlanFragmenter.getFragments(
logicalPlan.getPlanNode(), serverInstances.size() == 1);
List<Pair<String, String>> serializedPlanFragments = new ArrayList<>();
for (int index = 1; index < fragments.size(); index++) {
BaseTimeSeriesPlanNode serverFragment = fragments.get(index);
serializedPlanFragments.add(Pair.of(serverFragment.getId(), TimeSeriesPlanSerde.serialize(serverFragment)));
}
// Step-3: Compute number of servers each exchange node will receive data from.
Map<String, Integer> numServersForExchangePlanNode = computeNumServersForExchangePlanNode(serverInstances,
fragments, scanVisitorContext.getPlanIdToSegmentsByInstanceId());
fragments, scanVisitorContext.getLeafIdToSegmentsByInstanceId());
return new TimeSeriesDispatchablePlan(timeSeriesRequest.getLanguage(), serverInstances, fragments.get(0),
serializedPlanFragments, logicalPlan.getTimeBuckets(), scanVisitorContext.getPlanIdToSegmentsByInstanceId(),
numServersForExchangePlanNode);
fragments.subList(1, fragments.size()), logicalPlan.getTimeBuckets(),
scanVisitorContext.getLeafIdToSegmentsByInstanceId(), numServersForExchangePlanNode);
}

private Map<String, Integer> computeNumServersForExchangePlanNode(List<TimeSeriesQueryServerInstance> serverInstances,
List<BaseTimeSeriesPlanNode> planNodes, Map<String, Map<String, List<String>>> planIdToSegmentsByInstanceId) {
List<BaseTimeSeriesPlanNode> planNodes, Map<String, Map<String, List<String>>> leafIdToSegmentsByInstanceId) {
// TODO(timeseries): Handle this gracefully and return an empty block.
Preconditions.checkState(!serverInstances.isEmpty(), "No servers selected for the query");
if (serverInstances.size() == 1) {
// For single-server case, the broker fragment consists only of the TimeSeriesExchangeNode.
return ImmutableMap.of(planNodes.get(0).getId(), 1);
}
// For the multi-server case, the planIdToSegmentsByInstanceId map already has the information we need, but we
// For the multi-server case, the leafIdToSegmentsByInstanceId map already has the information we need, but we
// just need to restructure it so that we can get number of servers by planId.
Map<String, Set<String>> planIdToServers = new HashMap<>();
for (var entry : planIdToSegmentsByInstanceId.entrySet()) {
for (var entry : leafIdToSegmentsByInstanceId.entrySet()) {
String instanceId = entry.getKey();
for (var innerEntry : entry.getValue().entrySet()) {
String planId = innerEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
Expand Down Expand Up @@ -58,7 +59,7 @@ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets ti
for (var entry : routingTable.getServerInstanceToSegmentsMap().entrySet()) {
ServerInstance serverInstance = entry.getKey();
List<String> segments = entry.getValue().getLeft();
context.getPlanIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>())
context.getLeafIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>())
.put(sfpNode.getId(), segments);
}
}
Expand All @@ -72,24 +73,29 @@ public static Context createContext(Long requestId) {
}

public static class Context {
private final Map<ServerInstance, Map<String, List<String>>> _planIdToSegmentsByServer = new HashMap<>();
private final Map<ServerInstance, Map<String, List<String>>> _leafIdToSegmentsByServer = new HashMap<>();
private final Long _requestId;

public Context(Long requestId) {
_requestId = requestId;
}

public Map<ServerInstance, Map<String, List<String>>> getPlanIdToSegmentsByServer() {
return _planIdToSegmentsByServer;
public List<TimeSeriesQueryServerInstance> getQueryServers() {
return _leafIdToSegmentsByServer.keySet().stream().map(TimeSeriesQueryServerInstance::new).collect(
Collectors.toList());
}

public Map<String, Map<String, List<String>>> getPlanIdToSegmentsByInstanceId() {
public Map<String, Map<String, List<String>>> getLeafIdToSegmentsByInstanceId() {
Map<String, Map<String, List<String>>> result = new HashMap<>();
for (var entry : _planIdToSegmentsByServer.entrySet()) {
for (var entry : _leafIdToSegmentsByServer.entrySet()) {
result.put(entry.getKey().getInstanceId(), entry.getValue());
}
return result;
}

Map<ServerInstance, Map<String, List<String>>> getLeafIdToSegmentsByServer() {
return _leafIdToSegmentsByServer;
}
}

private BrokerRequest compileBrokerRequest(String tableName, Expression filterExpression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,37 @@
*/
package org.apache.pinot.tsdb.planner.physical;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import java.util.stream.Collectors;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;


public class TimeSeriesDispatchablePlan {
private final List<TimeSeriesQueryServerInstance> _queryServerInstances;
private final String _language;
private final BaseTimeSeriesPlanNode _brokerFragment;
private final List<Pair<String, String>> _serializedPlanFragments;
private final List<BaseTimeSeriesPlanNode> _serverFragments;
private final TimeBuckets _timeBuckets;
private final Map<String, Map<String, List<String>>> _planIdToSegmentsByServer;
private final Map<String, Map<String, List<String>>> _leafIdToSegmentsByInstanceId;
private final Map<String, Integer> _numInputServersForExchangePlanNode;
private final List<String> _serializedServerFragments;

public TimeSeriesDispatchablePlan(String language, List<TimeSeriesQueryServerInstance> queryServerInstances,
BaseTimeSeriesPlanNode brokerFragment, List<Pair<String, String>> serializedPlanFragmentsByPlanId,
TimeBuckets initialTimeBuckets, Map<String, Map<String, List<String>>> planIdToSegmentsByServer,
BaseTimeSeriesPlanNode brokerFragment, List<BaseTimeSeriesPlanNode> serverFragments,
TimeBuckets initialTimeBuckets, Map<String, Map<String, List<String>>> leafIdToSegmentsByInstanceId,
Map<String, Integer> numInputServersForExchangePlanNode) {
_language = language;
_queryServerInstances = queryServerInstances;
_brokerFragment = brokerFragment;
_serializedPlanFragments = serializedPlanFragmentsByPlanId;
_serverFragments = serverFragments;
_timeBuckets = initialTimeBuckets;
_planIdToSegmentsByServer = planIdToSegmentsByServer;
_leafIdToSegmentsByInstanceId = leafIdToSegmentsByInstanceId;
_numInputServersForExchangePlanNode = numInputServersForExchangePlanNode;
_serializedServerFragments = serverFragments.stream().map(TimeSeriesPlanSerde::serialize).collect(
Collectors.toList());
}

public String getLanguage() {
Expand All @@ -61,23 +63,20 @@ public BaseTimeSeriesPlanNode getBrokerFragment() {
return _brokerFragment;
}

public List<String> getSerializedPlanFragments(String instanceId) {
Set<String> planIdForInstance = _planIdToSegmentsByServer.get(instanceId).keySet();
List<String> result = new ArrayList<>();
for (var planIdAndPlanFragment : _serializedPlanFragments) {
if (planIdForInstance.contains(planIdAndPlanFragment.getLeft())) {
result.add(planIdAndPlanFragment.getRight());
}
}
return result;
public List<BaseTimeSeriesPlanNode> getServerFragments() {
return _serverFragments;
}

public List<String> getSerializedServerFragments() {
return _serializedServerFragments;
}

public TimeBuckets getTimeBuckets() {
return _timeBuckets;
}

public Map<String, Map<String, List<String>>> getPlanIdToSegmentsByServer() {
return _planIdToSegmentsByServer;
public Map<String, Map<String, List<String>>> getLeafIdToSegmentsByInstanceId() {
return _leafIdToSegmentsByInstanceId;
}

public Map<String, Integer> getNumInputServersForExchangePlanNode() {
Expand Down

0 comments on commit 245d1ff

Please sign in to comment.