From 5b9449b39346eabdddf90e541018fff2439ec482 Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Mon, 11 Nov 2024 11:05:38 +0100 Subject: [PATCH] fix(#3325): Add better logging if count of events in measurement fails --- .../DataLakeMeasurementCounterInflux.java | 29 +++++++++---- .../query/DataLakeMeasurementCounter.java | 43 ++++++++++++++----- 2 files changed, 53 insertions(+), 19 deletions(-) diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java index db11222324..0a58b8b29a 100644 --- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java +++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java @@ -22,16 +22,23 @@ import org.apache.streampipes.model.datalake.AggregationFunction; import org.apache.streampipes.model.datalake.DataLakeMeasure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter { + private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class); + private static final String COUNT_FIELD = "count"; - public DataLakeMeasurementCounterInflux(List allMeasurements, - List measurementNames) { + public DataLakeMeasurementCounterInflux( + List allMeasurements, + List measurementNames + ) { super(allMeasurements, measurementNames); } @@ -39,15 +46,21 @@ public DataLakeMeasurementCounterInflux(List allMeasurements, protected CompletableFuture createQueryAsAsyncFuture(DataLakeMeasure measure) { return CompletableFuture.supplyAsync(() -> { var firstColumn = getFirstMeasurementProperty(measure); + if (firstColumn == null) { + LOG.error( + "Could not count events in measurement: {}, because no measurement property was found in event schema", + measure.getMeasureName() + ); + return 0; + } + var builder = DataLakeInfluxQueryBuilder - .create(measure.getMeasureName()).withEndTime(System.currentTimeMillis()) + .create(measure.getMeasureName()) + .withEndTime(System.currentTimeMillis()) .withAggregatedColumn(firstColumn, AggregationFunction.COUNT); var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true); - if (queryResult.getTotal() > 0) { - return extractResult(queryResult, COUNT_FIELD); - } else { - return 0; - } + + return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0; }); } } diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java index 844ce5b573..bc3a551ed7 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java @@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements IDataLakeMeasurement protected final List allMeasurements; protected final List measurementNames; - public DataLakeMeasurementCounter(List allMeasurements, - List measurementNames) { + public DataLakeMeasurementCounter( + List allMeasurements, + List measurementNames + ) { this.allMeasurements = allMeasurements; this.measurementNames = measurementNames; } @@ -52,12 +54,14 @@ public DataLakeMeasurementCounter(List allMeasurements, public Map countMeasurementSizes() { // create async futures so that count queries can be executed parallel - Map> countQueriesFutures = measurementNames.stream() + Map> countQueriesFutures = measurementNames + .stream() .map(this::getMeasure) .filter(Objects::nonNull) .collect(Collectors.toMap( - DataLakeMeasure::getMeasureName, - this::createQueryAsAsyncFuture) + DataLakeMeasure::getMeasureName, + this::createQueryAsAsyncFuture + ) ); return getQueryResults(countQueriesFutures); @@ -72,7 +76,8 @@ public Map countMeasurementSizes() { private DataLakeMeasure getMeasure(String measureName) { return allMeasurements .stream() - .filter(m -> m.getMeasureName().equals(measureName)) + .filter(m -> m.getMeasureName() + .equals(measureName)) .findFirst() .orElse(null); } @@ -83,7 +88,7 @@ private DataLakeMeasure getMeasure(String measureName) { * @param queryFutures A Map containing the futures of * asynchronous count queries mapped by their respective keys. * @return A Map representing the results of the queries, where each key corresponds to - * a measure name and the value is the count result. + * a measure name and the value is the count result. */ private Map getQueryResults(Map> queryFutures) { Map resultPerMeasure = new HashMap<>(); @@ -106,18 +111,34 @@ private Map getQueryResults(Map ep.getPropertyScope() != null - && ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name())) + && ep.getPropertyScope() + .equals(PropertyScope.MEASUREMENT_PROPERTY.name())) .map(EventProperty::getRuntimeName) .findFirst() .orElse(null); + + if (propertyRuntimeName == null) { + LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName()); + } + + return propertyRuntimeName; } protected Integer extractResult(SpQueryResult queryResult, String fieldName) { - return ((Double) ( - queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName))) + return ( + (Double) ( + queryResult.getAllDataSeries() + .get(0) + .getRows() + .get(0) + .get(queryResult.getHeaders() + .indexOf(fieldName)) + ) ).intValue(); }