Skip to content

Commit

Permalink
fix(#3325): Add better logging if count of events in measurement fails
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe committed Nov 11, 2024
1 parent 9997b31 commit 5b9449b
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,45 @@
import org.apache.streampipes.model.datalake.AggregationFunction;
import org.apache.streampipes.model.datalake.DataLakeMeasure;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter {

private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);

private static final String COUNT_FIELD = "count";

public DataLakeMeasurementCounterInflux(List<DataLakeMeasure> allMeasurements,
List<String> measurementNames) {
public DataLakeMeasurementCounterInflux(
List<DataLakeMeasure> allMeasurements,
List<String> measurementNames
) {
super(allMeasurements, measurementNames);
}

@Override
protected CompletableFuture<Integer> createQueryAsAsyncFuture(DataLakeMeasure measure) {
return CompletableFuture.supplyAsync(() -> {
var firstColumn = getFirstMeasurementProperty(measure);
if (firstColumn == null) {
LOG.error(
"Could not count events in measurement: {}, because no measurement property was found in event schema",
measure.getMeasureName()
);
return 0;
}

var builder = DataLakeInfluxQueryBuilder
.create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
.create(measure.getMeasureName())
.withEndTime(System.currentTimeMillis())
.withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
if (queryResult.getTotal() > 0) {
return extractResult(queryResult, COUNT_FIELD);
} else {
return 0;
}

return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ public abstract class DataLakeMeasurementCounter implements IDataLakeMeasurement
protected final List<DataLakeMeasure> allMeasurements;
protected final List<String> measurementNames;

public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
List<String> measurementNames) {
public DataLakeMeasurementCounter(
List<DataLakeMeasure> allMeasurements,
List<String> measurementNames
) {
this.allMeasurements = allMeasurements;
this.measurementNames = measurementNames;
}
Expand All @@ -52,12 +54,14 @@ public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
public Map<String, Integer> countMeasurementSizes() {

// create async futures so that count queries can be executed parallel
Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames.stream()
Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames
.stream()
.map(this::getMeasure)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
DataLakeMeasure::getMeasureName,
this::createQueryAsAsyncFuture)
DataLakeMeasure::getMeasureName,
this::createQueryAsAsyncFuture
)
);

return getQueryResults(countQueriesFutures);
Expand All @@ -72,7 +76,8 @@ public Map<String, Integer> countMeasurementSizes() {
private DataLakeMeasure getMeasure(String measureName) {
return allMeasurements
.stream()
.filter(m -> m.getMeasureName().equals(measureName))
.filter(m -> m.getMeasureName()
.equals(measureName))
.findFirst()
.orElse(null);
}
Expand All @@ -83,7 +88,7 @@ private DataLakeMeasure getMeasure(String measureName) {
* @param queryFutures A Map containing the futures of
* asynchronous count queries mapped by their respective keys.
* @return A Map representing the results of the queries, where each key corresponds to
* a measure name and the value is the count result.
* a measure name and the value is the count result.
*/
private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integer>> queryFutures) {
Map<String, Integer> resultPerMeasure = new HashMap<>();
Expand All @@ -106,18 +111,34 @@ private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integ
* @return The runtime name of the first measurement property, or null if no such property is found.
*/
protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
return measure.getEventSchema().getEventProperties()
var propertyRuntimeName = measure
.getEventSchema()
.getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope() != null
&& ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
&& ep.getPropertyScope()
.equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.findFirst()
.orElse(null);

if (propertyRuntimeName == null) {
LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName());
}

return propertyRuntimeName;
}

protected Integer extractResult(SpQueryResult queryResult, String fieldName) {
return ((Double) (
queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
return (
(Double) (
queryResult.getAllDataSeries()
.get(0)
.getRows()
.get(0)
.get(queryResult.getHeaders()
.indexOf(fieldName))
)
).intValue();
}

Expand Down

0 comments on commit 5b9449b

Please sign in to comment.