Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement AggCountWhere to support generic counting using Filter #6497

Merged
merged 21 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
35e3025
WIP, added LongConsumer version of ChunkFilter
lbooker42 Nov 25, 2024
c1f7698
Hacky way to accelerate chunk-based filtering.
lbooker42 Nov 27, 2024
8bc5349
Added OR support for chunk filters.
lbooker42 Dec 3, 2024
7f20937
Added generic matches() to ChunkFilter impls
lbooker42 Dec 4, 2024
6785725
Merge branch 'refs/heads/lab-filter-simplify' into lab-count-filter
lbooker42 Dec 6, 2024
596973b
Updated chunk filters.
lbooker42 Dec 11, 2024
ee44274
Added ConditionFilter chunk-level support for CountWhere
lbooker42 Dec 12, 2024
c268767
Spotless applied
lbooker42 Dec 12, 2024
39925d7
WIP, working and tested on simple filters.
lbooker42 Dec 13, 2024
fa253fc
Added multi-column support and RecordingInternalOperator generic reco…
lbooker42 Dec 16, 2024
b8e235c
Little more cleanup, testing DynamicWhereFilters.
lbooker42 Dec 17, 2024
a13bbab
Few additional tests, improving coverage.
lbooker42 Dec 17, 2024
931dd19
Added server-side python support.
lbooker42 Dec 17, 2024
26a8911
Added client-side GRPC and python support.
lbooker42 Dec 17, 2024
45ffa6f
Addressed PR comments.
lbooker42 Dec 18, 2024
aa9aa85
Added boolean and Instant tests, plus additional python server and cl…
lbooker42 Dec 18, 2024
ad22cd4
Corrected python server test.
lbooker42 Dec 18, 2024
22a518d
Merge branch 'main' into lab-count-filter
lbooker42 Dec 18, 2024
fd4fa25
Corrected python server test.
lbooker42 Dec 18, 2024
030069d
Corrected CI test failure.
lbooker42 Dec 18, 2024
a3e25d2
Replicated ChunkFilter, removed filterOr(), added tests for full cove…
lbooker42 Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* Implements a counting operator that counts the number of rows that pass a set of filters. Chunk data is accessed
* through {@link RecordingInternalOperator recorder> instances.
* through {@link RecordingInternalOperator recorder} instances.
*/

public class CountWhereOperator implements IterativeChunkedAggregationOperator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.lang.*;
import java.util.*;
import io.deephaven.base.string.cache.CompressedString;
import io.deephaven.chunk.BooleanChunk;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.Chunk;
Expand All @@ -14,6 +15,7 @@
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.ShortChunk;
import io.deephaven.chunk.WritableBooleanChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.WritableChunk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.lang.*;
import java.util.*;
import io.deephaven.base.string.cache.CompressedString;
import io.deephaven.chunk.BooleanChunk;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.Chunk;
Expand All @@ -14,6 +15,7 @@
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.ShortChunk;
import io.deephaven.chunk.WritableBooleanChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableCharChunk;
import io.deephaven.chunk.WritableChunk;
Expand Down
2,290 changes: 1,143 additions & 1,147 deletions go/internal/proto/table/table.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ message Aggregation {
// The output column name
string column_name = 1;

repeated Condition filters = 2;
repeated string filters = 2;
}

message AggregationRowKey {
Expand Down
182 changes: 91 additions & 91 deletions py/client/deephaven_core/proto/table_pb2.py

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions py/client/pydeephaven/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_GrpcAggregation = table_pb2.Aggregation
_GrpcAggregationColumns = _GrpcAggregation.AggregationColumns
_GrpcAggregationCount = _GrpcAggregation.AggregationCount
_GrpcAggregationCountWhere = _GrpcAggregation.AggregationCountWhere
_GrpcAggregationFormula = _GrpcAggregation.AggregationFormula
_GrpcAggregationPartition = _GrpcAggregation.AggregationPartition
_GrpcAggSpec = table_pb2.AggSpec
Expand Down Expand Up @@ -51,6 +52,17 @@ def make_grpc_message(self) -> _GrpcAggregation:
agg_count = _GrpcAggregationCount(column_name=self.col)
return _GrpcAggregation(count=agg_count)


@dataclass
class _AggregationCountWhere(Aggregation):
col: str
filters: Union[str, List[str]]

def make_grpc_message(self) -> _GrpcAggregation:
agg_count_where = _GrpcAggregationCountWhere(column_name=self.col, filters=to_list(self.filters))
return _GrpcAggregation(count_where=agg_count_where)


@dataclass
class _AggregationFormula(Aggregation):
selectable: _GrpcSelectable
Expand All @@ -59,6 +71,7 @@ def make_grpc_message(self) -> _GrpcAggregation:
agg_formula = _GrpcAggregationFormula(selectable=self.selectable)
return _GrpcAggregation(formula=agg_formula)


@dataclass
class _AggregationPartition(Aggregation):
col: str
Expand Down Expand Up @@ -137,6 +150,18 @@ def count_(col: str) -> Aggregation:
return _AggregationCount(col=col)


def count_where(col: str, filters: Union[str, List[str]]) -> Aggregation:
"""Creates a Count aggregation. This is not supported in 'Table.agg_all_by'.

Args:
col (str): the column to hold the counts of each distinct group

Returns:
an aggregation
"""
return _AggregationCountWhere(col=col, filters=to_list(filters))


def partition(col: str, include_by_columns: bool = True) -> Aggregation:
"""Creates a Partition aggregation. This is not supported in 'Table.agg_all_by'.

Expand Down
6 changes: 5 additions & 1 deletion py/client/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pydeephaven import DHError
from pydeephaven import SortDirection
from pydeephaven.agg import sum_, avg, pct, weighted_avg, count_, partition, median, unique, count_distinct, distinct, formula
from pydeephaven.agg import sum_, avg, pct, weighted_avg, count_, count_where, partition, median, unique, count_distinct, distinct, formula
from pydeephaven.table import Table
from tests.testbase import BaseTestCase

Expand Down Expand Up @@ -247,6 +247,10 @@ def test_agg_by(self):
pct(percentile=0.5, cols=["PctC = c"]),
weighted_avg(wcol="d", cols=["WavGD = d"]),
count_(col="ca"),
count_where(col="count_where1", filters="a > 5"),
count_where("agg_count_where_1", "a > 100"),
count_where("agg_count_where_2", ["a > 100", "b < 250"]),
count_where("agg_count_where_3", "a <= 100 || b >= 250"),
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
partition(col="aggPartition"),
formula(formula="min(x)", formula_param="x", cols=["min_a=a", "min_b=b"]),
formula(formula="avg(x)", formula_param="x", cols=["avg_c=c", "avg_d=d"]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static Count adapt(AggregationCount count) {
}

public static CountWhere adapt(AggregationCountWhere count) {
return Aggregation.AggCountWhere(count.getColumnName());
return Aggregation.AggCountWhere(count.getColumnName(), count.getFiltersList().toArray(String[]::new));
}

public static Formula adapt(AggregationFormula formula) {
Expand Down Expand Up @@ -180,7 +180,7 @@ public void visit(Count count) {

public void visit(CountWhere countWhere) {
add(
TypeCase.COUNT,
TypeCase.COUNT_WHERE,
AggregationCountWhere.class,
CountWhere.class,
GrpcErrorHelper::checkHasNoUnknownFieldsRecursive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ static void visitAll(Visitor visitor) {
visitor.visit((ColumnAggregation) null);
visitor.visit((ColumnAggregations) null);
visitor.visit((Count) null);
visitor.visit((CountWhere) null);
visitor.visit((FirstRowKey) null);
visitor.visit((LastRowKey) null);
visitor.visit((Partition) null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public void visit(Count c) {
++count;
}

@Override
public void visit(CountWhere countWhere) {
++count;
}

@Override
public void visit(FirstRowKey firstRowKey) {
++count;
Expand Down
Loading