From 1b5858b5b4b5dffe45d5af73f3604d85bfa36132 Mon Sep 17 00:00:00 2001
From: Ankit Sultana
Date: Tue, 10 Dec 2024 11:39:27 -0600
Subject: [PATCH] [timeseries] Part-3: Add Time Series Exchange Operator, Plan
Node and Serde (#14611)
* Part-3: Add Time Series Exchange Operator, Plan Node and Serde
* add tests + fragmenter
* add license
* address feedback
* address more feedback
---
.../TimeSeriesExchangeReceiveOperator.java | 182 ++++++++++++++++
.../TimeSeriesExchangeReceivePlanNode.java | 74 +++++++
.../serde/TimeSeriesBlockSerde.java | 201 ++++++++++++++++++
...TimeSeriesExchangeReceiveOperatorTest.java | 156 ++++++++++++++
.../serde/TimeSeriesBlockSerdeTest.java | 135 ++++++++++++
.../tsdb/planner/TimeSeriesExchangeNode.java | 75 +++++++
.../planner/TimeSeriesPlanFragmenter.java | 118 ++++++++++
.../planner/TimeSeriesPlanFragmenterTest.java | 167 +++++++++++++++
8 files changed, 1108 insertions(+)
create mode 100644 pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
create mode 100644 pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceivePlanNode.java
create mode 100644 pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java
create mode 100644 pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
create mode 100644 pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
create mode 100644 pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
create mode 100644 pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
create mode 100644 pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
new file mode 100644
index 000000000000..79d49e0b462a
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
+
+
+/**
+ *
Overview
+ * Receives and optionally aggregates the response from all servers for the corresponding plan node.
+ *
+ *
Aggregate Receive
+ * When a non-null {@link AggInfo} is passed, this operator will aggregate the received data using the corresponding
+ * series builder created via {@link TimeSeriesBuilderFactory}.
+ *
+ *
Non-Aggregate Receive
+ * When a null AggInfo is passed, then we don't perform any aggregation. If we receive series with the same ID from
+ * different servers, we will simply append them to the list, creating a union.
+ */
+public class TimeSeriesExchangeReceiveOperator extends BaseTimeSeriesOperator {
+ /**
+ * Receiver will receive either TimeSeriesBlock or Throwable. And will have at most _numServersQueried objects that
+ * can be polled.
+ */
+ private final BlockingQueue
+ *
+ * TODO(timeseries): One source of inefficiency is boxing/unboxing of Double arrays.
+ * TODO(timeseries): The other is tag values being Object[]. We should make tag values String[].
+ *
+ */
+public class TimeSeriesBlockSerde {
+ /**
+ * Since DataBlock can only handle primitive double[] arrays, we use Double.MIN_VALUE to represent nulls.
+ * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN can help detect divide by 0.
+ * TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
+ */
+ private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
+
+ private TimeSeriesBlockSerde() {
+ }
+
+ public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer readOnlyByteBuffer)
+ throws IOException {
+ DataBlock dataBlock = DataBlockUtils.readFrom(readOnlyByteBuffer);
+ TransferableBlock transferableBlock = TransferableBlockUtils.wrap(dataBlock);
+ List tagNames = generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
+ "Missing data schema in TransferableBlock"));
+ List container = transferableBlock.getContainer();
+ TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+ Map> seriesMap = new HashMap<>();
+ for (int index = 1; index < container.size(); index++) {
+ Object[] row = container.get(index);
+ TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+ long seriesId = Long.parseLong(timeSeries.getId());
+ seriesMap.computeIfAbsent(seriesId, x -> new ArrayList<>()).add(timeSeries);
+ }
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock timeSeriesBlock)
+ throws IOException {
+ TimeBuckets timeBuckets = Objects.requireNonNull(timeSeriesBlock.getTimeBuckets());
+ List container = new ArrayList<>();
+ DataSchema dataSchema = generateDataSchema(timeSeriesBlock);
+ container.add(timeBucketsToRow(timeBuckets, dataSchema));
+ for (var entry : timeSeriesBlock.getSeriesMap().entrySet()) {
+ for (TimeSeries timeSeries : entry.getValue()) {
+ container.add(timeSeriesToRow(timeSeries, dataSchema));
+ }
+ }
+ TransferableBlock transferableBlock = new TransferableBlock(container, dataSchema, DataBlock.Type.ROW);
+ return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
+ }
+
+ private static DataSchema generateDataSchema(TimeSeriesBlock timeSeriesBlock) {
+ TimeSeries sampledTimeSeries = sampleTimeSeries(timeSeriesBlock).orElse(null);
+ int numTags = sampledTimeSeries == null ? 0 : sampledTimeSeries.getTagNames().size();
+ ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+ String[] columnNames = new String[numTags + 1];
+ for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
+ columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
+ dataTypes[tagIndex] = ColumnDataType.STRING;
+ }
+ columnNames[numTags] = "__ts_values";
+ dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+ return new DataSchema(columnNames, dataTypes);
+ }
+
+ private static List generateTagNames(DataSchema dataSchema) {
+ String[] columnNames = dataSchema.getColumnNames();
+ List tagNames = new ArrayList<>(columnNames.length - 1);
+ for (int index = 0; index < columnNames.length - 1; index++) {
+ tagNames.add(columnNames[index]);
+ }
+ return tagNames;
+ }
+
+ private static Optional sampleTimeSeries(TimeSeriesBlock timeSeriesBlock) {
+ if (timeSeriesBlock.getSeriesMap().isEmpty()) {
+ return Optional.empty();
+ }
+ List timeSeriesList = timeSeriesBlock.getSeriesMap().values().iterator().next();
+ Preconditions.checkState(!timeSeriesList.isEmpty(), "Found empty time-series list");
+ return Optional.of(timeSeriesList.get(0));
+ }
+
+ private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) {
+ int numColumns = dataSchema.getColumnNames().length;
+ Object[] result = new Object[numColumns];
+ for (int index = 0; index < numColumns - 1; index++) {
+ result[index] = "null";
+ }
+ double firstBucketValue = timeBuckets.getTimeBuckets()[0];
+ double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
+ double numBuckets = timeBuckets.getNumBuckets();
+ result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets};
+ return result;
+ }
+
+ private static TimeBuckets timeBucketsFromRow(Object[] row) {
+ double[] values = (double[]) row[row.length - 1];
+ long fbv = (long) values[0];
+ Duration window = Duration.ofSeconds((long) values[1]);
+ int numBuckets = (int) values[2];
+ return TimeBuckets.ofSeconds(fbv, window, numBuckets);
+ }
+
+ private static Object[] timeSeriesToRow(TimeSeries timeSeries, DataSchema dataSchema) {
+ int numColumns = dataSchema.getColumnNames().length;
+ Object[] result = new Object[numColumns];
+ for (int index = 0; index < numColumns - 1; index++) {
+ Object tagValue = timeSeries.getTagValues()[index];
+ result[index] = tagValue == null ? "null" : tagValue.toString();
+ }
+ result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+ return result;
+ }
+
+ private static TimeSeries timeSeriesFromRow(List tagNames, Object[] row, TimeBuckets timeBuckets) {
+ Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+ Object[] tagValues = new Object[row.length - 1];
+ System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+ return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, timeBuckets, values, tagNames, tagValues);
+ }
+
+ private static double[] unboxDoubleArray(Double[] values) {
+ double[] result = new double[values.length];
+ for (int index = 0; index < result.length; index++) {
+ result[index] = values[index] == null ? NULL_PLACEHOLDER : values[index];
+ }
+ return result;
+ }
+
+ private static Double[] boxDoubleArray(double[] values) {
+ Double[] result = new Double[values.length];
+ for (int index = 0; index < result.length; index++) {
+ result[index] = values[index] == NULL_PLACEHOLDER ? null : values[index];
+ }
+ return result;
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
new file mode 100644
index 000000000000..c9fd9293335e
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesExchangeReceiveOperatorTest {
+ private static final int NUM_SERVERS_QUERIED = 3;
+ private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(200), 4);
+ private static final List TAG_NAMES = ImmutableList.of("city", "zip");
+ private static final Object[] CHICAGO_SERIES_VALUES = new Object[]{"Chicago", "60605"};
+ private static final Object[] SF_SERIES_VALUES = new Object[]{"San Francisco", "94107"};
+ private static final Long CHICAGO_SERIES_HASH = TimeSeries.hash(CHICAGO_SERIES_VALUES);
+ private static final Long SF_SERIES_HASH = TimeSeries.hash(SF_SERIES_VALUES);
+ private static final SimpleTimeSeriesBuilderFactory SERIES_BUILDER_FACTORY = new SimpleTimeSeriesBuilderFactory();
+
+ @Test
+ public void testGetNextBlockWithAggregation() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.addAll(generateTimeSeriesBlocks());
+ TimeSeriesExchangeReceiveOperator operator = new TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+ // Run test
+ TimeSeriesBlock block = operator.nextBlock();
+ // Validate results
+ assertEquals(block.getSeriesMap().size(), 2);
+ assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago series not present in received block");
+ assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series not present in received block");
+ assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1, "Expected 1 series for Chicago");
+ assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF");
+ // Ensure Chicago had series addition performed
+ Double[] chicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+ assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
+ // Ensure SF had input series unmodified
+ Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
+ }
+
+ @Test
+ public void testGetNextBlockNoAggregation() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.addAll(generateTimeSeriesBlocks());
+ TimeSeriesExchangeReceiveOperator operator = new TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+ // Run test
+ TimeSeriesBlock block = operator.nextBlock();
+ // Validate results
+ assertEquals(block.getSeriesMap().size(), 2);
+ assertTrue(block.getSeriesMap().containsKey(CHICAGO_SERIES_HASH), "Chicago series not present in received block");
+ assertTrue(block.getSeriesMap().containsKey(SF_SERIES_HASH), "SF series not present in received block");
+ assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2, "Expected 2 series for Chicago");
+ assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF");
+ // Ensure Chicago has unmodified series values
+ Double[] firstChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+ Double[] secondChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+ assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
+ assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
+ // Ensure SF has input unmodified series values
+ Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+ assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
+ }
+
+ @Test
+ public void testGetNextBlockFailure() {
+ // Setup test
+ long deadlineMs = Long.MAX_VALUE;
+ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(NUM_SERVERS_QUERIED);
+ blockingQueue.add(new TimeoutException("Test error"));
+ // Run test with aggregation
+ try {
+ TimeSeriesExchangeReceiveOperator operator = new TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, SUM_AGG_INFO, SERIES_BUILDER_FACTORY);
+ TimeSeriesBlock block = operator.nextBlock();
+ fail();
+ } catch (Throwable t) {
+ assertEquals(t.getMessage(), "Test error");
+ }
+ blockingQueue.add(new TimeoutException("Test error"));
+ try {
+ TimeSeriesExchangeReceiveOperator operator = new TimeSeriesExchangeReceiveOperator(blockingQueue, deadlineMs,
+ NUM_SERVERS_QUERIED, null, SERIES_BUILDER_FACTORY);
+ TimeSeriesBlock block = operator.nextBlock();
+ fail();
+ } catch (Throwable t) {
+ assertEquals(t.getMessage(), "Test error");
+ }
+ }
+
+ private List generateTimeSeriesBlocks() {
+ List seriesBlocks = new ArrayList<>();
+ {
+ Map> seriesMap = new HashMap<>();
+ seriesMap.put(CHICAGO_SERIES_HASH, ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+ seriesMap.put(SF_SERIES_HASH, ImmutableList.of(createSanFranciscoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ {
+ Map> seriesMap = new HashMap<>();
+ seriesMap.put(CHICAGO_SERIES_HASH, ImmutableList.of(createChicagoSeries(new Double[]{10.0, 10.0, 10.0, 10.0})));
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ {
+ Map> seriesMap = new HashMap<>();
+ seriesBlocks.add(new TimeSeriesBlock(TIME_BUCKETS, seriesMap));
+ }
+ // Shuffle the output to test multiple scenarios over time
+ Collections.shuffle(seriesBlocks);
+ return seriesBlocks;
+ }
+
+ private TimeSeries createChicagoSeries(Double[] values) {
+ return new TimeSeries(CHICAGO_SERIES_HASH.toString(), null, TIME_BUCKETS, values, TAG_NAMES, CHICAGO_SERIES_VALUES);
+ }
+
+ private TimeSeries createSanFranciscoSeries(Double[] values) {
+ return new TimeSeries(SF_SERIES_HASH.toString(), null, TIME_BUCKETS, values, TAG_NAMES, SF_SERIES_VALUES);
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
new file mode 100644
index 000000000000..f08d39ca0a91
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.timeseries.serde;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.tsdb.spi.TimeBuckets;
+import org.apache.pinot.tsdb.spi.series.TimeSeries;
+import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesBlockSerdeTest {
+ private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(200), 5);
+
+ @Test
+ public void testSerde()
+ throws IOException {
+ // To test serde of TimeSeriesBlock, we do the following:
+ // 1. Serialize the time-series block (say Block-1) to get ByteString-1
+ // 2. Deserialize ByteString-1 to get Block-2.
+ // 3. Serialize Block-2 to get ByteString-2.
+ // 4. Compare ByteString-1 and ByteString-2.
+ // 5. Compare values of Block-1 and Block-2.
+ List blocks = List.of(buildBlockWithNoTags(), buildBlockWithSingleTag(),
+ buildBlockWithMultipleTags());
+ for (TimeSeriesBlock block1 : blocks) {
+ // Serialize, deserialize and serialize again
+ ByteString byteString1 = TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
+ String serializedBlockString1 = byteString1.toStringUtf8();
+ TimeSeriesBlock block2 = TimeSeriesBlockSerde.deserializeTimeSeriesBlock(byteString1.asReadOnlyByteBuffer());
+ String serializedBlockString2 = TimeSeriesBlockSerde.serializeTimeSeriesBlock(block2).toStringUtf8();
+ // Serialized blocks in both cases should be the same since serialization is deterministic.
+ assertEquals(serializedBlockString1, serializedBlockString2);
+ // Compare block1 and block2
+ compareBlocks(block1, block2);
+ }
+ }
+
+ /**
+ * Compares time series blocks in a way which makes it easy to debug test failures when/if they happen in CI.
+ */
+ private static void compareBlocks(TimeSeriesBlock block1, TimeSeriesBlock block2) {
+ assertEquals(block1.getTimeBuckets(), block2.getTimeBuckets(), "Time buckets are different across blocks");
+ assertEquals(block1.getSeriesMap().size(), block2.getSeriesMap().size(), String.format(
+ "Different number of series in blocks: %s and %s", block1.getSeriesMap().size(), block2.getSeriesMap().size()));
+ assertEquals(block1.getSeriesMap().keySet(), block2.getSeriesMap().keySet(),
+ String.format("Series blocks have different keys: %s vs %s",
+ block1.getSeriesMap().keySet(), block2.getSeriesMap().keySet()));
+ for (long seriesHash : block1.getSeriesMap().keySet()) {
+ List seriesList1 = block1.getSeriesMap().get(seriesHash);
+ List seriesList2 = block2.getSeriesMap().get(seriesHash);
+ compareTimeSeries(seriesList1, seriesList2);
+ }
+ }
+
+ private static void compareTimeSeries(List series1, List series2) {
+ assertEquals(series1.size(), series2.size(),
+ String.format("Different count of series with the same id: %s vs %s", series1.size(), series2.size()));
+ for (int index = 0; index < series1.size(); index++) {
+ TimeSeries seriesOne = series1.get(index);
+ TimeSeries seriesTwo = series2.get(index);
+ assertEquals(seriesOne.getTagNames(), seriesTwo.getTagNames());
+ assertEquals(seriesOne.getValues(), seriesTwo .getValues());
+ }
+ }
+
+ private static TimeSeriesBlock buildBlockWithNoTags() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Single series: []
+ List tagNames = Collections.emptyList();
+ Object[] seriesValues = new Object[0];
+ long seriesHash = TimeSeries.hash(seriesValues);
+ Map> seriesMap = new HashMap<>();
+ seriesMap.put(seriesHash, ImmutableList.of(new TimeSeries(Long.toString(seriesHash), null, timeBuckets,
+ new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ private static TimeSeriesBlock buildBlockWithSingleTag() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Series are: [cityId=Chicago] and [cityId=San Francisco]
+ List tagNames = ImmutableList.of("cityId");
+ Object[] seriesOneValues = new Object[]{"Chicago"};
+ Object[] seriesTwoValues = new Object[]{"San Francisco"};
+ long seriesOneHash = TimeSeries.hash(seriesOneValues);
+ long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+ Map> seriesMap = new HashMap<>();
+ seriesMap.put(seriesOneHash, ImmutableList.of(new TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+ new Double[]{null, 123.0, 0.0, 1.0}, tagNames, seriesOneValues)));
+ seriesMap.put(seriesTwoHash, ImmutableList.of(new TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+ new Double[]{null, null, null, null}, tagNames, seriesTwoValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+
+ private static TimeSeriesBlock buildBlockWithMultipleTags() {
+ TimeBuckets timeBuckets = TIME_BUCKETS;
+ // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco, zip=94107]
+ List tagNames = ImmutableList.of("cityId", "zip");
+ Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+ Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+ long seriesOneHash = TimeSeries.hash(seriesOneValues);
+ long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+ Map> seriesMap = new HashMap<>();
+ seriesMap.put(seriesOneHash, ImmutableList.of(new TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+ new Double[]{null, 123.0, Double.NaN, 1.0}, tagNames, seriesOneValues)));
+ seriesMap.put(seriesTwoHash, ImmutableList.of(new TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+ new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames, seriesTwoValues)));
+ return new TimeSeriesBlock(timeBuckets, seriesMap);
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
new file mode 100644
index 000000000000..399a2bd28d72
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesExchangeNode.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.tsdb.spi.AggInfo;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * This node exists in the logical plan, but not in the physical/dispatchable plans. Similar to the
+ * {@link LeafTimeSeriesPlanNode}, a physical plan visitor will convert to its equivalent physical plan node, which will
+ * be capable of returning an executable operator with the {@link #run()} method.
+ *
+ * Note: This node doesn't exist in the pinot-timeseries-spi because we don't want to let language developers
+ * control how and when exchange will be run (as of now).
+ */
+public class TimeSeriesExchangeNode extends BaseTimeSeriesPlanNode {
+ @Nullable
+ private final AggInfo _aggInfo;
+
+ @JsonCreator
+ public TimeSeriesExchangeNode(@JsonProperty("id") String id,
+ @JsonProperty("inputs") List inputs,
+ @Nullable @JsonProperty("aggInfo") AggInfo aggInfo) {
+ super(id, inputs);
+ _aggInfo = aggInfo;
+ }
+
+ @Nullable
+ public AggInfo getAggInfo() {
+ return _aggInfo;
+ }
+
+ @Override
+ public BaseTimeSeriesPlanNode withInputs(List newInputs) {
+ return new TimeSeriesExchangeNode(_id, newInputs, _aggInfo);
+ }
+
+ @Override
+ public String getKlass() {
+ return TimeSeriesExchangeNode.class.getName();
+ }
+
+ @Override
+ public String getExplainName() {
+ return "TIME_SERIES_BROKER_RECEIVE";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ throw new IllegalStateException("Time Series Exchange should have been replaced with a physical plan node");
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
new file mode 100644
index 000000000000..46a3f68c31dd
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+
+
+/**
+ * Fragments the plan into executable units. Since we only support Broker-Reduce for Time Series Queries at present,
+ * we will have 1 fragment for the broker, and 1 fragment for each {@link LeafTimeSeriesPlanNode}.
+ *
+ * As an example, say we have the following plan:
+ *
+ * This is fragment-2:
+ * +------------+
+ * | Leaf-1 |
+ * +------------+
+ *
+ *
+ * This is fragment-3:
+ * +------------+
+ * | Leaf-2 |
+ * +------------+
+ *
+ *
+ */
+public class TimeSeriesPlanFragmenter {
+ private TimeSeriesPlanFragmenter() {
+ }
+
+ /**
+ * Fragments the plan as described in {@link TimeSeriesPlanFragmenter}. The first element of the list is the broker
+ * fragment, and the other elements are the server fragments. For single-node queries, this pushes down the entire
+ * plan to the servers.
+ *
+ * Note: This method may return cloned plan nodes, so you should use them as the plan subsequently.
+ *
+ */
+ public static List getFragments(BaseTimeSeriesPlanNode rootNode,
+ boolean isSingleServerQuery) {
+ List result = new ArrayList<>();
+ Context context = new Context();
+ if (isSingleServerQuery) {
+ final String id = rootNode.getId();
+ return ImmutableList.of(new TimeSeriesExchangeNode(id, Collections.emptyList(), null), rootNode);
+ }
+ result.add(fragmentRecursively(rootNode, context));
+ result.addAll(context._fragments);
+ return result;
+ }
+
+ private static BaseTimeSeriesPlanNode fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
+ if (planNode instanceof LeafTimeSeriesPlanNode) {
+ LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
+ context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+ return new TimeSeriesExchangeNode(planNode.getId(), Collections.emptyList(), leafNode.getAggInfo());
+ }
+ List newInputs = new ArrayList<>();
+ for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
+ newInputs.add(fragmentRecursively(input, context));
+ }
+ return planNode.withInputs(newInputs);
+ }
+
+ private static class Context {
+ private final List _fragments = new ArrayList<>();
+ }
+}
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
new file mode 100644
index 000000000000..8727f64ddcc7
--- /dev/null
+++ b/pinot-timeseries/pinot-timeseries-planner/src/test/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenterTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tsdb.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
+import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
+import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeSeriesPlanFragmenterTest {
+ @Test
+ public void testGetFragmentsWithMultipleLeafNodes() {
+ /*
+ * Create Input:
+ * Node-1
+ * / \
+ * Node-2 Leaf-2
+ * /
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Node-1
+ * / \
+ * Node-2 Exchange (named Leaf-2)
+ * /
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ * Fragment-3:
+ * Leaf-2
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+ BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2", Collections.singletonList(leafOne));
+ BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1", ImmutableList.of(nodeTwo, leafTwo));
+ List fragments = TimeSeriesPlanFragmenter.getFragments(nodeOne, false);
+ // Test whether correct number of fragments generated
+ assertEquals(fragments.size(), 3);
+ // Test whether fragment roots are correct
+ assertEquals(fragments.get(0).getId(), "Node-1");
+ assertEquals(fragments.get(1).getId(), "Leaf-1");
+ assertEquals(fragments.get(2).getId(), "Leaf-2");
+ // Test whether broker fragment has the right inputs
+ {
+ BaseTimeSeriesPlanNode brokerFragment = fragments.get(0);
+ assertEquals(brokerFragment.getInputs().size(), 2);
+ // Left and right inputs should have IDs Node-2 and Leaf-2.
+ BaseTimeSeriesPlanNode leftInput = brokerFragment.getInputs().get(0);
+ BaseTimeSeriesPlanNode rightInput = brokerFragment.getInputs().get(1);
+ assertEquals(leftInput.getId(), "Node-2");
+ assertEquals(rightInput.getId(), "Leaf-2");
+ // Right input should be exchange
+ assertTrue(rightInput instanceof TimeSeriesExchangeNode, "Node should have been replaced by Exchange");
+ // Input for Left input should be exchange
+ assertEquals(leftInput.getInputs().size(), 1);
+ assertEquals(leftInput.getInputs().get(0).getId(), "Leaf-1");
+ assertTrue(leftInput.getInputs().get(0) instanceof TimeSeriesExchangeNode);
+ }
+ // Test the other two fragments
+ assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode, "Expected leaf node in fragment");
+ assertTrue(fragments.get(2) instanceof LeafTimeSeriesPlanNode, "Expected leaf node in fragment");
+ }
+
+ @Test
+ public void testGetFragmentsForSingleServerQuery() {
+ /*
+ * Create Input:
+ * Node-1
+ * / \
+ * Node-2 Leaf-2
+ * /
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Node-1
+ * / \
+ * Node-2 Exchange (named Leaf-2)
+ * /
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ * Fragment-3:
+ * Leaf-2
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ LeafTimeSeriesPlanNode leafTwo = createMockLeafNode("Leaf-2");
+ BaseTimeSeriesPlanNode nodeTwo = new MockTimeSeriesPlanNode("Node-2", Collections.singletonList(leafOne));
+ BaseTimeSeriesPlanNode nodeOne = new MockTimeSeriesPlanNode("Node-1", ImmutableList.of(nodeTwo, leafTwo));
+ List fragments = TimeSeriesPlanFragmenter.getFragments(nodeOne, true);
+ assertEquals(fragments.size(), 2, "Expect only 2 fragments for single-server query");
+ assertEquals(fragments.get(0).getId(), "Node-1");
+ assertEquals(fragments.get(1), nodeOne);
+ }
+
+ @Test
+ public void testGetFragmentsWithSinglePlanNode() {
+ /*
+ * Create Input:
+ * Leaf-1
+ * Expected Outputs:
+ * Fragment-1:
+ * Exchange (named Leaf-1)
+ * Fragment-2:
+ * Leaf-1
+ */
+ LeafTimeSeriesPlanNode leafOne = createMockLeafNode("Leaf-1");
+ List fragments = TimeSeriesPlanFragmenter.getFragments(leafOne, false);
+ assertEquals(fragments.size(), 2);
+ assertTrue(fragments.get(0) instanceof TimeSeriesExchangeNode);
+ assertTrue(fragments.get(1) instanceof LeafTimeSeriesPlanNode);
+ assertEquals(fragments.get(0).getId(), fragments.get(1).getId());
+ }
+
+ private LeafTimeSeriesPlanNode createMockLeafNode(String id) {
+ return new LeafTimeSeriesPlanNode(id, Collections.emptyList(), "someTableName", "someTimeColumn",
+ TimeUnit.SECONDS, 0L, "", "", null, Collections.emptyList());
+ }
+
+ static class MockTimeSeriesPlanNode extends BaseTimeSeriesPlanNode {
+ public MockTimeSeriesPlanNode(String id, List inputs) {
+ super(id, inputs);
+ }
+
+ @Override
+ public BaseTimeSeriesPlanNode withInputs(List newInputs) {
+ return new MockTimeSeriesPlanNode(_id, newInputs);
+ }
+
+ @Override
+ public String getKlass() {
+ return "";
+ }
+
+ @Override
+ public String getExplainName() {
+ return "";
+ }
+
+ @Override
+ public BaseTimeSeriesOperator run() {
+ return null;
+ }
+ }
+}