Skip to content

Commit

Permalink
Added boolean and Instant tests, plus additional python server and cl…
Browse files Browse the repository at this point in the history
…ient tests for verification of API and GRPC implementations.
  • Loading branch information
lbooker42 committed Dec 18, 2024
1 parent 45ffa6f commit aa9aa85
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,11 @@ public void init(

@Override
public Optional<ChunkFilter> chunkFilter() {
Assert.eqTrue(filter instanceof ExposesChunkFilter, "filter instanceof ExposesChunkFilter");
return ((ExposesChunkFilter) filter).chunkFilter();
// The underlying filter may be a ConditionFilter
if (filter instanceof ExposesChunkFilter) {
return ((ExposesChunkFilter) filter).chunkFilter();
}
return Optional.empty();
}

private static LongRangeFilter makeInstantRangeFilter(String columnName, Condition condition, long value) {
Expand Down
165 changes: 108 additions & 57 deletions engine/table/src/test/java/io/deephaven/engine/table/impl/TestAggBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,93 @@ public void testBy() {
Table doubleCounted = table.aggBy(
List.of(
AggCount("Count1"),
AggCount("Count2"),
AggCount("Count2")),
"A");
show(doubleCounted);
assertEquals(2, doubleCounted.size());

LongVector counts = ColumnVectors.ofLong(doubleCounted, "Count1");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "Count2");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));

// Lets do some interesting incremental computations, as this is the use case that I'm really aiming at. For
// example, getting the count, and average on each update.
// It would be nice to do a min and a max as well,
// which can often be efficient (but sometimes could also require linear work). That isn't related to this test
// but more related to the underlying min and max.

// Interestingly, the factories appear to be single use. If you try to reuse a factory it fails with an NPE.
// minFactory = new AggregationFormulaSpec("min(each)", "each");
// maxFactory = new AggregationFormulaSpec("max(each)", "each");

Collection<? extends Aggregation> summaryStatistics = List.of(
AggCount("Count"),
AggMin("MinB=B", "MinC=C"),
AggMed("MedB=B", "MedC=C"),
AggMax("MaxB=B", "MaxC=C"),
AggAvg("AvgB=B", "AvgC=C"),
AggStd("StdB=B", "StdC=C"),
AggSum("SumB=B", "SumC=C"),
AggCountDistinct("DistinctA=A"),
AggCountDistinct("DistinctB=B"));

Collection<? extends Aggregation> percentiles = List.of(
AggPct(0.25, "Pct01B=B", "Pct01C=C"),
AggPct(0.25, "Pct25B=B", "Pct25C=C"),
AggPct(0.75, "Pct75B=B", "Pct75C=C"),
AggPct(0.75, true, "Pct75T_B=B", "Pct75T_C=C"),
AggPct(0.75, false, "Pct75F_B=B", "Pct75F_C=C"),
AggPct(0.99, "Pct99B=B", "Pct99C=C"),
AggPct(0.50, "Pct50B=B", "Pct50C=C"),
AggPct(0.50, true, "Pct50T_B=B", "Pct50T_C=C"),
AggPct(0.50, false, "Pct50F_B=B", "Pct50F_C=C"));

Double[] doubles = new Double[10];
final IntChunk<Values> bChunk = bHolder.getChunk().asIntChunk();
for (int ii = 0; ii < bChunk.size(); ++ii) {
doubles[ii] = 1.1 * bChunk.get(ii);
}
ColumnHolder<?> dHolder = col("D", doubles);
table = TableTools.newTable(aHolder, bHolder, cHolder, dHolder);
show(table);
Table summary = table.aggBy(summaryStatistics, "A");
show(summary);

System.out.println("\nPercentiles (overall):");
Table percentilesAll = table.aggBy(percentiles);
show(percentilesAll);
}

@Test
public void testAggCountWhere() {
ColumnHolder<?> aHolder = col("A", 0, 0, 1, 1, 0, 0, 1, 1, 0, 0);
ColumnHolder<?> bHolder = col("B", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ColumnHolder<?> cHolder = col("C", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1);
ColumnHolder<?> dHolder = booleanCol("D", true, true, true, true, true, false, false, false, false, false);
final Instant startInstant = Instant.parse("1970-01-01T00:00:00.000Z");

final Instant[] instantData = new Instant[] {
startInstant, // 1970-01-01T00:00:00.000Z
startInstant.plusNanos(10 * 1_000_000), // 1970-01-01T00:00:00.010Z
startInstant.plusNanos(20 * 1_000_000), // 1970-01-01T00:00:00.020Z
startInstant.plusNanos(30 * 1_000_000), // 1970-01-01T00:00:00.030Z
startInstant.plusNanos(40 * 1_000_000), // 1970-01-01T00:00:00.040Z
startInstant.plusNanos(50 * 1_000_000), // 1970-01-01T00:00:00.050Z
startInstant.plusNanos(60 * 1_000_000), // 1970-01-01T00:00:00.060Z
startInstant.plusNanos(70 * 1_000_000), // 1970-01-01T00:00:00.070Z
startInstant.plusNanos(80 * 1_000_000), // 1970-01-01T00:00:00.080Z
startInstant.plusNanos(90 * 1_000_000) // 1970-01-01T00:00:00.090Z
};
ColumnHolder<?> eHolder = instantCol("E", instantData);
Table table = TableTools.newTable(aHolder, bHolder, cHolder, dHolder, eHolder);
show(table);
assertEquals(10, table.size());
assertEquals(2, table.groupBy("A").size());
Table doubleCounted = table.aggBy(
List.of(
AggCountWhere("filter1", "B >= 5"),
AggCountWhere("filter2", "B >= 5", "B != 8"),
AggCountWhere("filter3", Filter.or(Filter.from("B >= 5", "B == 3"))),
Expand All @@ -140,18 +226,16 @@ public void testBy() {
// Multiple input columns
AggCountWhere("filter10", "B >= 5", "C == 1"),
AggCountWhere("filter11", "B >= 5 && C == 1 && A == 0"),
AggCountWhere("filter12", "B >= 5", "C >= 1")),
AggCountWhere("filter12", "B >= 5", "C >= 1"),
// Boolean column test
AggCountWhere("filter13", "D == true"),
// Instant column test
AggCountWhere("filter14", "E > '1970-01-01T00:00:00.030Z'")),
"A");
show(doubleCounted);
assertEquals(2, doubleCounted.size());

LongVector counts = ColumnVectors.ofLong(doubleCounted, "Count1");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "Count2");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "filter1");
LongVector counts = ColumnVectors.ofLong(doubleCounted, "filter1");
assertEquals(4L, counts.get(0));
assertEquals(2L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "filter2");
Expand Down Expand Up @@ -187,6 +271,12 @@ public void testBy() {
counts = ColumnVectors.ofLong(doubleCounted, "filter12");
assertEquals(4L, counts.get(0));
assertEquals(2L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "filter13");
assertEquals(3L, counts.get(0));
assertEquals(2L, counts.get(1));
counts = ColumnVectors.ofLong(doubleCounted, "filter14");
assertEquals(4L, counts.get(0));
assertEquals(2L, counts.get(1));

doubleCounted = table.aggBy(
List.of(
Expand All @@ -204,7 +294,11 @@ public void testBy() {
// Multiple input columns
AggCountWhere("filter10", "B >= 5", "C == 1"),
AggCountWhere("filter11", "B >= 5 && C == 1 && A == 0"),
AggCountWhere("filter12", "B >= 5", "C >= 1")));
AggCountWhere("filter12", "B >= 5", "C >= 1"),
// Boolean column test
AggCountWhere("filter13", "D == true"),
// Instant column test
AggCountWhere("filter14", "E > '1970-01-01T00:00:00.030Z'")));
show(doubleCounted);
assertEquals(1, doubleCounted.size());

Expand Down Expand Up @@ -232,53 +326,10 @@ public void testBy() {
assertEquals(4L, counts.get(0));
counts = ColumnVectors.ofLong(doubleCounted, "filter12");
assertEquals(6L, counts.get(0));

// Lets do some interesting incremental computations, as this is the use case that I'm really aiming at. For
// example, getting the count, and average on each update.
// It would be nice to do a min and a max as well,
// which can often be efficient (but sometimes could also require linear work). That isn't related to this test
// but more related to the underlying min and max.

// Interestingly, the factories appear to be single use. If you try to reuse a factory it fails with an NPE.
// minFactory = new AggregationFormulaSpec("min(each)", "each");
// maxFactory = new AggregationFormulaSpec("max(each)", "each");

Collection<? extends Aggregation> summaryStatistics = List.of(
AggCount("Count"),
AggMin("MinB=B", "MinC=C"),
AggMed("MedB=B", "MedC=C"),
AggMax("MaxB=B", "MaxC=C"),
AggAvg("AvgB=B", "AvgC=C"),
AggStd("StdB=B", "StdC=C"),
AggSum("SumB=B", "SumC=C"),
AggCountDistinct("DistinctA=A"),
AggCountDistinct("DistinctB=B"));

Collection<? extends Aggregation> percentiles = List.of(
AggPct(0.25, "Pct01B=B", "Pct01C=C"),
AggPct(0.25, "Pct25B=B", "Pct25C=C"),
AggPct(0.75, "Pct75B=B", "Pct75C=C"),
AggPct(0.75, true, "Pct75T_B=B", "Pct75T_C=C"),
AggPct(0.75, false, "Pct75F_B=B", "Pct75F_C=C"),
AggPct(0.99, "Pct99B=B", "Pct99C=C"),
AggPct(0.50, "Pct50B=B", "Pct50C=C"),
AggPct(0.50, true, "Pct50T_B=B", "Pct50T_C=C"),
AggPct(0.50, false, "Pct50F_B=B", "Pct50F_C=C"));

Double[] doubles = new Double[10];
final IntChunk<Values> bChunk = bHolder.getChunk().asIntChunk();
for (int ii = 0; ii < bChunk.size(); ++ii) {
doubles[ii] = 1.1 * bChunk.get(ii);
}
ColumnHolder<?> dHolder = col("D", doubles);
table = TableTools.newTable(aHolder, bHolder, cHolder, dHolder);
show(table);
Table summary = table.aggBy(summaryStatistics, "A");
show(summary);

System.out.println("\nPercentiles (overall):");
Table percentilesAll = table.aggBy(percentiles);
show(percentilesAll);
counts = ColumnVectors.ofLong(doubleCounted, "filter13");
assertEquals(5L, counts.get(0));
counts = ColumnVectors.ofLong(doubleCounted, "filter14");
assertEquals(6L, counts.get(0));
}

@Test
Expand Down
22 changes: 22 additions & 0 deletions py/client/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@ def test_agg_all_by(self):
with self.assertRaises(DHError) as cm:
test_table.agg_all_by(agg=count_(col="ca"), by=["a"])


def test_agg_count_where_output(self):
"""
Test and validation of the agg_count_where feature
"""
test_table = self.session.empty_table(100).update(["a=ii", "b=ii%2"])
count_aggs = [
count_where("count1", "a >= 25"),
count_where("count2", "a % 3 == 0")
]
result_table = test_table.agg_by(aggs=count_aggs, by="b")
self.assertEqual(result_table.size, 2)

# get the table as a local pandas dataframe
df = result_table.to_arrow().to_pandas()
# assert the values meet expectations
self.assertEqual(df.loc[0, "count1"], 37)
self.assertEqual(df.loc[1, "count1"], 38)
self.assertEqual(df.loc[0, "count2"], 17)
self.assertEqual(df.loc[1, "count2"], 17)


def test_where_in(self):
pa_table = csv.read_csv(self.csv_file)
test_table = self.session.import_table(pa_table)
Expand Down
20 changes: 10 additions & 10 deletions py/server/deephaven/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def avg(cols: Union[str, List[str]] = None) -> Aggregation:
def count_(col: str) -> Aggregation:
"""Creates a Count aggregation. This is not supported in 'Table.agg_all_by'.
Args:
col (str): the column to hold the counts of each distinct group
Args:
col (str): the column to hold the counts of each distinct group
Returns:
an aggregation
"""
Returns:
an aggregation
"""
if not isinstance(col, str):
raise DHError(message="count_ aggregation requires a string value for the 'col' argument.")
return Aggregation(j_aggregation=_JAggregation.AggCount(col))
Expand All @@ -118,12 +118,12 @@ def count_where(col: str, filters: Union[str, Filter, Sequence[str], Sequence[Fi
"""Creates a CountWhere aggregation with the supplied output column name, counting values that pass the supplied
filters.
Args:
col (str): the column to hold the counts of each distinct group
Args:
col (str): the column to hold the counts of each distinct group
Returns:
an aggregation
"""
Returns:
an aggregation
"""
if not isinstance(col, str):
raise DHError(message="count_where aggregation requires a string value for the 'col' argument.")
filters = to_sequence(filters)
Expand Down
20 changes: 20 additions & 0 deletions py/server/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,26 @@ def test_agg_by_2(self):
result_table = test_table.agg_by(agg, "grp_id")
self.assertEqual(result_table.size, 2)

def test_agg_count_where_output(self):
"""
Test and validation of the agg_count_where feature
"""
test_table = empty_table(100).update(["a=ii", "b=ii%2"])
count_aggs = [
count_where("count1", "a >= 25"),
count_where("count2", "a % 3 == 0")
]
result_table = test_table.agg_by(aggs=count_aggs, by="b")
self.assertEqual(result_table.size, 2)

# get the table as a local pandas dataframe
df = result_table.to_pandas()
# assert the values meet expectations
self.assertEqual(df.loc[0, "count1"], 37)
self.assertEqual(df.loc[1, "count1"], 38)
self.assertEqual(df.loc[0, "count2"], 17)
self.assertEqual(df.loc[1, "count2"], 17)

def test_agg_by_initial_groups_preserve_empty(self):
test_table = empty_table(10)
test_table = test_table.update(
Expand Down

0 comments on commit aa9aa85

Please sign in to comment.