Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement AggCountWhere to support generic counting using Filter #6497

Merged
merged 21 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
35e3025
WIP, added LongConsumer version of ChunkFilter
lbooker42 Nov 25, 2024
c1f7698
Hacky way to accelerate chunk-based filtering.
lbooker42 Nov 27, 2024
8bc5349
Added OR support for chunk filters.
lbooker42 Dec 3, 2024
7f20937
Added generic matches() to ChunkFilter impls
lbooker42 Dec 4, 2024
6785725
Merge branch 'refs/heads/lab-filter-simplify' into lab-count-filter
lbooker42 Dec 6, 2024
596973b
Updated chunk filters.
lbooker42 Dec 11, 2024
ee44274
Added ConditionFilter chunk-level support for CountWhere
lbooker42 Dec 12, 2024
c268767
Spotless applied
lbooker42 Dec 12, 2024
39925d7
WIP, working and tested on simple filters.
lbooker42 Dec 13, 2024
fa253fc
Added multi-column support and RecordingInternalOperator generic reco…
lbooker42 Dec 16, 2024
b8e235c
Little more cleanup, testing DynamicWhereFilters.
lbooker42 Dec 17, 2024
a13bbab
Few additional tests, improving coverage.
lbooker42 Dec 17, 2024
931dd19
Added server-side python support.
lbooker42 Dec 17, 2024
26a8911
Added client-side GRPC and python support.
lbooker42 Dec 17, 2024
45ffa6f
Addressed PR comments.
lbooker42 Dec 18, 2024
aa9aa85
Added boolean and Instant tests, plus additional python server and cl…
lbooker42 Dec 18, 2024
ad22cd4
Corrected python server test.
lbooker42 Dec 18, 2024
22a518d
Merge branch 'main' into lab-count-filter
lbooker42 Dec 18, 2024
fd4fa25
Corrected python server test.
lbooker42 Dec 18, 2024
030069d
Corrected CI test failure.
lbooker42 Dec 18, 2024
a3e25d2
Replicated ChunkFilter, removed filterOr(), added tests for full cove…
lbooker42 Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import io.deephaven.engine.table.impl.by.ssmminmax.SsmChunkedMinMaxOperator;
import io.deephaven.engine.table.impl.by.ssmpercentile.SsmChunkedPercentileOperator;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.ssms.SegmentedSortedMultiSet;
import io.deephaven.engine.table.impl.util.freezeby.FreezeByCountOperator;
Expand All @@ -107,13 +108,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -693,6 +688,56 @@ public void visit(@NotNull final Count count) {
addNoInputOperator(new CountAggregationOperator(count.column().name()));
}

@Override
public void visit(@NotNull final CountWhere countWhere) {
final WhereFilter[] whereFilters = WhereFilter.fromInternal(countWhere.filter());

final Map<String, RecordingInternalOperator> inputColumnRecorderMap = new HashMap<>();
final List<RecordingInternalOperator> recorderList = new ArrayList<>();
final List<RecordingInternalOperator[]> filterRecorderList = new ArrayList<>();

// Verify all the columns in the where filters are present in the table and valid for use.
for (final WhereFilter whereFilter : whereFilters) {
whereFilter.init(table.getDefinition());
if (whereFilter.isRefreshing()) {
throw new UnsupportedOperationException("AggCountWhere does not support refreshing filters");
}

// Compute which recording operators this filter will use.
final List<String> inputColumnNames = whereFilter.getColumns();
final int inputColumnCount = whereFilter.getColumns().size();
final RecordingInternalOperator[] recorders = new RecordingInternalOperator[inputColumnCount];
for (int ii = 0; ii < inputColumnCount; ++ii) {
final String inputColumnName = inputColumnNames.get(ii);
final RecordingInternalOperator recorder =
inputColumnRecorderMap.computeIfAbsent(inputColumnName, k -> {
// Create a recording operator for the column and add it to the list of operators.
final ColumnSource<?> inputSource = table.getColumnSource(inputColumnName);
final RecordingInternalOperator newRecorder =
new RecordingInternalOperator(inputColumnName, inputSource);
recorderList.add(newRecorder);
return newRecorder;
});
recorders[ii] = recorder;
}
filterRecorderList.add(recorders);
}

final RecordingInternalOperator[] recorders = recorderList.toArray(RecordingInternalOperator[]::new);
final RecordingInternalOperator[][] filterRecorders =
filterRecorderList.toArray(RecordingInternalOperator[][]::new);
final String[] inputColumnNames =
inputColumnRecorderMap.keySet().toArray(ArrayTypeUtils.EMPTY_STRING_ARRAY);

// Add the recording operators, making them dependent on all input columns so they all are populated if any
// are modified
for (final RecordingInternalOperator recorder : recorders) {
addOperator(recorder, recorder.getInputColumnSource(), inputColumnNames);
}
addOperator(new CountWhereOperator(countWhere.column().name(), whereFilters, recorders, filterRecorders),
null, inputColumnNames);
}

@Override
public void visit(@NotNull final FirstRowKey firstRowKey) {
addFirstOrLastOperators(true, firstRowKey.column().name());
Expand Down Expand Up @@ -1004,6 +1049,11 @@ public void visit(@NotNull final Count count) {
addNoInputOperator(new CountAggregationOperator(count.column().name()));
}

@Override
public void visit(@NotNull final CountWhere countWhere) {
addNoInputOperator(new CountAggregationOperator(countWhere.column().name()));
}

@Override
public void visit(@NotNull final NullColumns nullColumns) {
transformers.add(new NullColumnAggregationTransformer(nullColumns.resultColumns()));
Expand Down Expand Up @@ -1149,6 +1199,13 @@ public void visit(@NotNull final Count count) {
addOperator(makeSumOperator(resultSource.getType(), resultName, false), resultSource, resultName);
}

@Override
public void visit(@NotNull final CountWhere countWhere) {
final String resultName = countWhere.column().name();
final ColumnSource<?> resultSource = table.getColumnSource(resultName);
addOperator(makeSumOperator(resultSource.getType(), resultName, false), resultSource, resultName);
}

@Override
public void visit(@NotNull final NullColumns nullColumns) {
transformers.add(new NullColumnAggregationTransformer(nullColumns.resultColumns()));
Expand Down
Loading
Loading