Skip to content

Commit

Permalink
Part-1: Pinot Timeseries Engine SPI (#13885)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana authored Sep 9, 2024
1 parent f157858 commit b828280
Show file tree
Hide file tree
Showing 23 changed files with 1,384 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ yarn-error.log*
quickstart*

#build symlink directory
build*
build
build/*

#helm related files
kubernetes/helm/**/charts/
Expand Down
49 changes: 49 additions & 0 deletions pinot-timeseries/pinot-timeseries-spi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot</artifactId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>pinot-timeseries-spi</artifactId>

<properties>
<pinot.root>${basedir}/../..</pinot.root>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-spi</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;


/**
* AggInfo is used to represent the aggregation function. Aggregation functions are simply stored as a string,
* since time-series languages are allowed to implement their own aggregation functions.
* TODO: We will likely be adding more parameters to this. One candidate is partial/full aggregation information or
* aggregation result type to allow for intermediate result types.
*/
public class AggInfo {
private final String _aggFunction;

@JsonCreator
public AggInfo(@JsonProperty("aggFunction") String aggFunction) {
Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo");
_aggFunction = aggFunction;
}

public String getAggFunction() {
return _aggFunction;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;


public class PinotTimeSeriesConfiguration {
private PinotTimeSeriesConfiguration() {
}

public static final String CONFIG_PREFIX = "pinot.timeseries";
private static final String ENABLE_LANGUAGES_SUFFIX = ".languages";
private static final String SERIES_BUILDER_FACTORY_SUFFIX = ".series.builder.factory";
private static final String LOGICAL_PLANNER_CLASS_SUFFIX = ".logical.planner.class";

/**
* Config key that controls which time-series languages are enabled in a given Pinot cluster.
*/
public static String getEnabledLanguagesConfigKey() {
return CONFIG_PREFIX + ENABLE_LANGUAGES_SUFFIX;
}

/**
* Returns the config key which determines the class name for the {@link TimeSeriesBuilderFactory} to be used for a
* given language. Each language can have its own {@link TimeSeriesBuilderFactory}, which allows each language to
* support custom time-series functions.
*/
public static String getSeriesBuilderFactoryConfigKey(String language) {
return CONFIG_PREFIX + "." + language + SERIES_BUILDER_FACTORY_SUFFIX;
}

/**
* Returns config key which determines the class name for the {@link TimeSeriesLogicalPlanner} to be used for a given
* language. Pinot broker will load this logical planner on start-up dynamically. This is called for each language
* configured via {@link #getEnabledLanguagesConfigKey()}.
*/
public static String getLogicalPlannerConfigKey(String language) {
return CONFIG_PREFIX + "." + language + LOGICAL_PLANNER_CLASS_SUFFIX;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import com.google.common.base.Preconditions;
import java.time.Duration;


/**
* A time-series request received by the Pinot Broker. This is passed to the {@link TimeSeriesLogicalPlanner} so
* each query language can parse and plan the query based on their spec.
* <br/>
* <br/>
* <b>Notes:</b>
* <ul>
* <li>[start, end] are both inclusive.</li>
* <li>
* The result can contain time values outside [start, end], though we generally recommend to keep your results
* within the requested range. This decision is left to the time-series query language implementations. In some
* cases, returning data outside the requested time-range can help (e.g. for debugging purposes when you are
* computing moving 1d sum but are only looking at data for the last 12 hours).
* </li>
* <li>stepSeconds is used to define the default resolution for the query</li>
* <li>
* Some query languages allow users to change the resolution via a function, and in those cases the returned
* time-series may have a resolution different than stepSeconds
* </li>
* <li>
* The query execution may scan and process data outside of the time-range [start, end]. The actual data scanned
* and processed is defined by the {@link TimeBuckets} used by the operator.
* </li>
* </ul>
*/
public class RangeTimeSeriesRequest {
/** Engine allows a Pinot cluster to support multiple Time-Series Query Languages. */
private final String _engine;
/** Query is the raw query sent by the caller. */
private final String _query;
/** Start time of the time-window being queried. */
private final long _startSeconds;
/** End time of the time-window being queried. */
private final long _endSeconds;
/**
* <b>Optional</b> field which the caller can use to suggest the default resolution for the query. Language
* implementations can choose to skip this suggestion and choose their own resolution based on their semantics.
*/
private final long _stepSeconds;
/** E2E timeout for the query. */
private final Duration _timeout;

public RangeTimeSeriesRequest(String engine, String query, long startSeconds, long endSeconds, long stepSeconds,
Duration timeout) {
Preconditions.checkState(endSeconds >= startSeconds, "Invalid range. startSeconds "
+ "should be greater than or equal to endSeconds. Found startSeconds=%s and endSeconds=%s",
startSeconds, endSeconds);
_engine = engine;
_query = query;
_startSeconds = startSeconds;
_endSeconds = endSeconds;
_stepSeconds = stepSeconds;
_timeout = timeout;
}

public String getEngine() {
return _engine;
}

public String getQuery() {
return _query;
}

public long getStartSeconds() {
return _startSeconds;
}

public long getEndSeconds() {
return _endSeconds;
}

public long getStepSeconds() {
return _stepSeconds;
}

public Duration getTimeout() {
return _timeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.tsdb.spi;

import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;


/**
* Time buckets used for query execution. Each element (say x) in the {@link #getTimeBuckets()} array represents a
* time-range which is half open on the right side: [x, x + bucketSize.getSeconds()). Some query languages allow some
* operators to mutate the time-buckets on the fly, so it is not guaranteed that the time resolution and/or range
* will be the same across all operators. For instance, Uber's M3QL supports a "summarize 1h sum" operator which will
* change the bucket resolution to 1 hour for all subsequent operators.
*/
public class TimeBuckets {
private final Long[] _timeBuckets;
private final Duration _bucketSize;

private TimeBuckets(Long[] timeBuckets, Duration bucketSize) {
_timeBuckets = timeBuckets;
_bucketSize = bucketSize;
}

public Long[] getTimeBuckets() {
return _timeBuckets;
}

public Duration getBucketSize() {
return _bucketSize;
}

public long getStartTime() {
return _timeBuckets[0];
}

public long getEndTime() {
return _timeBuckets[_timeBuckets.length - 1];
}

public long getRangeSeconds() {
return _timeBuckets[_timeBuckets.length - 1] - _timeBuckets[0];
}

public int getNumBuckets() {
return _timeBuckets.length;
}

public int resolveIndex(long timeValue) {
if (_timeBuckets.length == 0) {
return -1;
}
if (timeValue < _timeBuckets[0]) {
return -1;
}
if (timeValue >= _timeBuckets[_timeBuckets.length - 1] + _bucketSize.getSeconds()) {
return -1;
}
return (int) ((timeValue - _timeBuckets[0]) / _bucketSize.getSeconds());
}

@Override
public boolean equals(Object o) {
if (!(o instanceof TimeBuckets)) {
return false;
}
TimeBuckets other = (TimeBuckets) o;
return this.getStartTime() == other.getStartTime() && this.getEndTime() == other.getEndTime()
&& this.getBucketSize().equals(other.getBucketSize());
}

@Override
public int hashCode() {
int result = Objects.hash(_bucketSize);
result = 31 * result + Arrays.hashCode(_timeBuckets);
return result;
}

public static TimeBuckets ofSeconds(long startTimeSeconds, Duration bucketSize, int numElements) {
long stepSize = bucketSize.getSeconds();
Long[] timeBuckets = new Long[numElements];
for (int i = 0; i < numElements; i++) {
timeBuckets[i] = startTimeSeconds + i * stepSize;
}
return new TimeBuckets(timeBuckets, bucketSize);
}
}
Loading

0 comments on commit b828280

Please sign in to comment.