Skip to content

Commit

Permalink
Improve ES query compatibility for Grafana 10
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Mar 21, 2024
1 parent 72f740d commit a1c99b9
Show file tree
Hide file tree
Showing 17 changed files with 530 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ public static Trace.Span fromIngestDocument(
// these fields don't need to be tags as they have been explicitly set already
sourceAndMetadata.remove(IngestDocument.Metadata.ID.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.INDEX.getFieldName());
sourceAndMetadata.remove("timestamp");
sourceAndMetadata.remove("_timestamp");
sourceAndMetadata.remove("@timestamp");

sourceAndMetadata.forEach(
(key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value, schema)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ public ElasticsearchApiService(KaldbQueryServiceBase searcher) {
this.searcher = searcher;
}

/** Returns metadata about the cluster */
@Get
@Path("/")
public HttpResponse clusterMetadata() {
// todo - expand this to automatically pull in build info
// example - https://opensearch.org/docs/2.3/quickstart/
// number must validate with npm semver validate for grafana compatibility due to
// https://github.com/grafana/grafana/blob/f74d5ff93ebe61e090994162be9b08bafcd5b7f0/public/app/plugins/datasource/elasticsearch/components/QueryEditor/MetricAggregationsEditor/MetricEditor.tsx#L54
return HttpResponse.of(
"""
{
"version":
{
"distribution": "astra",
"number": "0.0.1",
"lucene_version": "9.7.0"
}
}
""");
}

/**
* Multisearch API
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.slack.kaldb.logstore.search.SearchResultUtils;
import com.slack.kaldb.logstore.search.aggregations.AutoDateHistogramAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.AvgAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.CumulativeSumAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder;
Expand Down Expand Up @@ -120,28 +121,74 @@ private static List<KaldbSearch.SearchRequest.SearchAggregation> getRecursive(Js
.fieldNames()
.forEachRemaining(
aggregationObject -> {
if (aggregationObject.equals(DateHistogramAggBuilder.TYPE)) {
JsonNode dateHistogram = aggs.get(aggregationName).get(aggregationObject);
if (aggregationObject.equals(AutoDateHistogramAggBuilder.TYPE)) {
JsonNode autoDateHistogram =
aggs.get(aggregationName).get(aggregationObject);
aggBuilder
.setType(DateHistogramAggBuilder.TYPE)
.setType(AutoDateHistogramAggBuilder.TYPE)
.setName(aggregationName)
.setValueSource(
KaldbSearch.SearchRequest.SearchAggregation.ValueSourceAggregation
.newBuilder()
.setField(getFieldName(dateHistogram))
.setDateHistogram(
.setField(getFieldName(autoDateHistogram))
.setAutoDateHistogram(
KaldbSearch.SearchRequest.SearchAggregation
.ValueSourceAggregation.DateHistogramAggregation
.ValueSourceAggregation.AutoDateHistogramAggregation
.newBuilder()
.setMinDocCount(
getDateHistogramMinDocCount(dateHistogram))
.setInterval(getDateHistogramInterval(dateHistogram))
.putAllExtendedBounds(
getDateHistogramExtendedBounds(dateHistogram))
.setFormat(getDateHistogramFormat(dateHistogram))
.setOffset(getDateHistogramOffset(dateHistogram))
.setMinInterval(
SearchResultUtils.toValueProto(
getAutoDateHistogramMinInterval(
autoDateHistogram)))
.setNumBuckets(
SearchResultUtils.toValueProto(
getAutoDateHistogramNumBuckets(
autoDateHistogram)))
.build())
.build());
} else if (aggregationObject.equals(DateHistogramAggBuilder.TYPE)) {
JsonNode dateHistogram = aggs.get(aggregationName).get(aggregationObject);
if (getDateHistogramInterval(dateHistogram).equals("auto")) {
// if using "auto" type, default to using AutoDateHistogram as "auto" is
// not a valid interval for DateHistogramAggBuilder
aggBuilder
.setType(AutoDateHistogramAggBuilder.TYPE)
.setName(aggregationName)
.setValueSource(
KaldbSearch.SearchRequest.SearchAggregation
.ValueSourceAggregation.newBuilder()
.setField(getFieldName(dateHistogram))
.build());
} else {

KaldbSearch.SearchRequest.SearchAggregation.ValueSourceAggregation
.DateHistogramAggregation.Builder
dateHistogramBuilder =
KaldbSearch.SearchRequest.SearchAggregation
.ValueSourceAggregation.DateHistogramAggregation
.newBuilder()
.setMinDocCount(getDateHistogramMinDocCount(dateHistogram))
.setInterval(getDateHistogramInterval(dateHistogram))
.putAllExtendedBounds(
getDateHistogramExtendedBounds(dateHistogram))
.setFormat(getDateHistogramFormat(dateHistogram))
.setOffset(getDateHistogramOffset(dateHistogram));

String zoneId = getDateHistogramZoneId(dateHistogram);
if (zoneId != null) {
dateHistogramBuilder.setZoneId(
SearchResultUtils.toValueProto(zoneId));
}

aggBuilder
.setType(DateHistogramAggBuilder.TYPE)
.setName(aggregationName)
.setValueSource(
KaldbSearch.SearchRequest.SearchAggregation
.ValueSourceAggregation.newBuilder()
.setField(getFieldName(dateHistogram))
.setDateHistogram(dateHistogramBuilder.build())
.build());
}
} else if (aggregationObject.equals(FiltersAggBuilder.TYPE)) {
JsonNode filters = aggs.get(aggregationName).get(aggregationObject);

Expand Down Expand Up @@ -440,11 +487,26 @@ private static List<KaldbSearch.SearchRequest.SearchAggregation> getRecursive(Js
}

private static String getDateHistogramInterval(JsonNode dateHistogram) {
return dateHistogram.get("interval").asText();
if (dateHistogram.has("fixed_interval")) {
return dateHistogram.get("fixed_interval").asText();
} else if (dateHistogram.has("interval")) {
return dateHistogram.get("interval").asText();
}
return "auto";
}

private static String getDateHistogramZoneId(JsonNode dateHistogram) {
if (dateHistogram.has("time_zone")) {
return dateHistogram.get("time_zone").asText();
}
return null;
}

private static String getHistogramInterval(JsonNode dateHistogram) {
return dateHistogram.get("interval").asText();
if (dateHistogram.has("interval")) {
return dateHistogram.get("interval").asText();
}
return "auto";
}

private static String getFieldName(JsonNode agg) {
Expand Down Expand Up @@ -539,6 +601,20 @@ private static String getDateHistogramOffset(JsonNode dateHistogram) {
return "";
}

private static Integer getAutoDateHistogramNumBuckets(JsonNode autoDateHistogram) {
if (autoDateHistogram.has("buckets")) {
return autoDateHistogram.get("buckets").asInt();
}
return null;
}

private static String getAutoDateHistogramMinInterval(JsonNode autoDateHistogram) {
if (autoDateHistogram.has("minimum_interval")) {
return autoDateHistogram.get("minimum_interval").asText();
}
return null;
}

private static String getFormat(JsonNode cumulateSum) {
if (cumulateSum.has("format")) {
return cumulateSum.get("format").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.logstore.search.aggregations.AggBuilder;
import com.slack.kaldb.logstore.search.aggregations.AggBuilderBase;
import com.slack.kaldb.logstore.search.aggregations.AutoDateHistogramAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.AvgAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.CumulativeSumAggBuilder;
import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder;
Expand All @@ -26,6 +27,7 @@
import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.FiltersAggregator;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
Expand Down Expand Up @@ -316,6 +319,7 @@ public InternalAggregation reduce(Collection<Aggregator> collectors) throws IOEx
private static ValuesSourceRegistry buildValueSourceRegistry() {
ValuesSourceRegistry.Builder valuesSourceRegistryBuilder = new ValuesSourceRegistry.Builder();

AutoDateHistogramAggregationBuilder.registerAggregators(valuesSourceRegistryBuilder);
DateHistogramAggregationBuilder.registerAggregators(valuesSourceRegistryBuilder);
HistogramAggregationBuilder.registerAggregators(valuesSourceRegistryBuilder);
TermsAggregationBuilder.registerAggregators(valuesSourceRegistryBuilder);
Expand Down Expand Up @@ -484,6 +488,8 @@ public Aggregator buildAggregatorUsingContext(
public static AbstractAggregationBuilder getAggregationBuilder(AggBuilder aggBuilder) {
if (aggBuilder.getType().equals(DateHistogramAggBuilder.TYPE)) {
return getDateHistogramAggregationBuilder((DateHistogramAggBuilder) aggBuilder);
} else if (aggBuilder.getType().equals(AutoDateHistogramAggBuilder.TYPE)) {
return getAutoDateHistogramAggregationBuilder((AutoDateHistogramAggBuilder) aggBuilder);
} else if (aggBuilder.getType().equals(HistogramAggBuilder.TYPE)) {
return getHistogramAggregationBuilder((HistogramAggBuilder) aggBuilder);
} else if (aggBuilder.getType().equals(FiltersAggBuilder.TYPE)) {
Expand Down Expand Up @@ -909,6 +915,35 @@ protected static FiltersAggregationBuilder getFiltersAggregationBuilder(
return filtersAggregationBuilder;
}

/**
* Given an AutoDateHistogramAggBuilder returns a AutoDateHistogramAggBuilder to be used in
* building aggregation tree
*/
protected static AutoDateHistogramAggregationBuilder getAutoDateHistogramAggregationBuilder(
AutoDateHistogramAggBuilder builder) {
AutoDateHistogramAggregationBuilder autoDateHistogramAggregationBuilder =
new AutoDateHistogramAggregationBuilder(builder.getName()).field(builder.getField());

if (builder.getMinInterval() != null && !builder.getMinInterval().isEmpty()) {
autoDateHistogramAggregationBuilder.setMinimumIntervalExpression(builder.getMinInterval());
}

if (builder.getNumBuckets() != null && builder.getNumBuckets() > 0) {
autoDateHistogramAggregationBuilder.setNumBuckets(builder.getNumBuckets());
}

for (AggBuilder subAggregation : builder.getSubAggregations()) {
if (isPipelineAggregation(subAggregation)) {
autoDateHistogramAggregationBuilder.subAggregation(
getPipelineAggregationBuilder(subAggregation));
} else {
autoDateHistogramAggregationBuilder.subAggregation(getAggregationBuilder(subAggregation));
}
}

return autoDateHistogramAggregationBuilder;
}

/**
* Given an DateHistogramAggBuilder returns a DateHistogramAggregationBuilder to be used in
* building aggregation tree
Expand All @@ -931,6 +966,10 @@ protected static DateHistogramAggregationBuilder getDateHistogramAggregationBuil
// dateHistogramAggregationBuilder.format(builder.getFormat());
}

if (builder.getZoneId() != null && !builder.getZoneId().isEmpty()) {
dateHistogramAggregationBuilder.timeZone(ZoneId.of(builder.getZoneId()));
}

if (builder.getMinDocCount() == 0) {
if (builder.getExtendedBounds() != null
&& builder.getExtendedBounds().containsKey("min")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.bucket.filter.FiltersAggregationBuilder;
import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.opensearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.opensearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.opensearch.search.aggregations.bucket.terms.DoubleTerms;
Expand Down Expand Up @@ -78,6 +80,14 @@ public class OpenSearchInternalAggregation {
InternalAggregation.class,
DateHistogramAggregationBuilder.NAME,
InternalDateHistogram::new),
new NamedWriteableRegistry.Entry(
AggregationBuilder.class,
AutoDateHistogramAggregationBuilder.NAME,
AutoDateHistogramAggregationBuilder::new),
new NamedWriteableRegistry.Entry(
InternalAggregation.class,
AutoDateHistogramAggregationBuilder.NAME,
InternalAutoDateHistogram::new),
new NamedWriteableRegistry.Entry(
AggregationBuilder.class,
FiltersAggregationBuilder.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
jsonMap.put(
LogMessage.ReservedField.KALDB_INVALID_TIMESTAMP.fieldName, message.getTimestamp());
}

addField(
doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0);
// todo - this should be removed once we simplify the time handling
// this will be overridden below if a user provided value exists
jsonMap.put(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toString());

Map<String, Trace.KeyValue> tags =
message.getTagsList().stream()
Expand Down Expand Up @@ -459,7 +463,6 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
tags.remove(LogMessage.ReservedField.NAME.fieldName);
tags.remove(LogMessage.ReservedField.DURATION_MS.fieldName);
tags.remove(LogMessage.SystemField.ID.fieldName);
tags.remove(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName);

for (Trace.KeyValue keyValue : tags.values()) {
if (keyValue.getVType() == Trace.ValueType.STRING) {
Expand Down
Loading

0 comments on commit a1c99b9

Please sign in to comment.