From c341e06b14dd6dd3e54f945eac025ad37e7e2e89 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Sat, 15 Jun 2024 10:30:18 -0700 Subject: [PATCH] Add mode to allow adding dummy events for non-matching steps (#13382) --- .../window/FunnelBaseAggregationFunction.java | 50 ++++++++++--- .../tests/custom/WindowFunnelTest.java | 72 +++++++++++++++++++ 2 files changed, 111 insertions(+), 11 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java index 4df7a83a883c..502a49cdbecb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java @@ -105,12 +105,18 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde aggregationResultHolder.setValue(stepEvents); } for (int i = 0; i < length; i++) { + boolean stepFound = false; for (int j = 0; j < _numSteps; j++) { if (stepBlocks.get(j)[i] == 1) { stepEvents.add(new FunnelStepEvent(timestampBlock[i], j)); + stepFound = true; break; } } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1)); + } } } @@ -124,17 +130,20 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol } for (int i = 0; i < length; i++) { int groupKey = groupKeyArray[i]; + boolean stepFound = false; for (int j = 0; j < _numSteps; j++) { if (stepBlocks.get(j)[i] == 1) { - PriorityQueue stepEvents = groupByResultHolder.getResult(groupKey); - if (stepEvents == null) { - stepEvents = new PriorityQueue<>(); - groupByResultHolder.setValueForKey(groupKey, stepEvents); - } + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); stepEvents.add(new FunnelStepEvent(timestampBlock[i], j)); + stepFound = true; break; } } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); + stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1)); + } } } @@ -148,20 +157,35 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult } for (int i = 0; i < length; i++) { int[] groupKeys = groupKeysArray[i]; + boolean stepFound = false; for (int j = 0; j < _numSteps; j++) { if (stepBlocks.get(j)[i] == 1) { for (int groupKey : groupKeys) { - PriorityQueue stepEvents = groupByResultHolder.getResult(groupKey); - if (stepEvents == null) { - stepEvents = new PriorityQueue<>(); - groupByResultHolder.setValueForKey(groupKey, stepEvents); - } + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); stepEvents.add(new FunnelStepEvent(timestampBlock[i], j)); } + stepFound = true; break; } } + // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1 + if (_modes.hasKeepAll() && !stepFound) { + for (int groupKey : groupKeys) { + PriorityQueue stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey); + stepEvents.add(new FunnelStepEvent(timestampBlock[i], -1)); + } + } + } + } + + private static PriorityQueue getFunnelStepEvents(GroupByResultHolder groupByResultHolder, + int groupKey) { + PriorityQueue stepEvents = groupByResultHolder.getResult(groupKey); + if (stepEvents == null) { + stepEvents = new PriorityQueue<>(); + groupByResultHolder.setValueForKey(groupKey, stepEvents); } + return stepEvents; } @Override @@ -233,7 +257,7 @@ public String toExplainString() { } protected enum Mode { - STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4); + STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4), KEEP_ALL(8); private final int _value; @@ -272,5 +296,9 @@ public boolean hasStrictOrder() { public boolean hasStrictIncrease() { return contains(Mode.STRICT_INCREASE); } + + public boolean hasKeepAll() { + return contains(Mode.KEEP_ALL); + } } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java index 0ce6b80b610f..e41f23511660 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java @@ -212,6 +212,78 @@ public void testFunnelMaxStepGroupByQueriesWithMode(boolean useMultiStageQueryEn } } + + @Test(dataProvider = "useBothQueryEngines") + public void testFunnelMaxStepGroupByQueriesWithModeKeepAll(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String query = + String.format("SELECT " + + "userId, funnelMaxStep(timestampCol, '1000', 3, " + + "url = '/product/search', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_order', 'keep_all' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + JsonNode jsonNode = postQuery(query); + JsonNode rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 1); + break; + case 1: + assertEquals(row.get(1).intValue(), 1); + break; + case 2: + assertEquals(row.get(1).intValue(), 1); + break; + case 3: + assertEquals(row.get(1).intValue(), 1); + break; + default: + throw new IllegalStateException(); + } + } + + query = + String.format("SELECT " + + "userId, funnelMaxStep(timestampCol, '1000', 3, " + + "url = '/product/search', " + + "url = '/checkout/start', " + + "url = '/checkout/confirmation', " + + "'strict_order' ) " + + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult()); + jsonNode = postQuery(query); + rows = jsonNode.get("resultTable").get("rows"); + assertEquals(rows.size(), 40); + for (int i = 0; i < 40; i++) { + JsonNode row = rows.get(i); + assertEquals(row.size(), 2); + assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10)); + switch (i / 10) { + case 0: + assertEquals(row.get(1).intValue(), 3); + break; + case 1: + assertEquals(row.get(1).intValue(), 2); + break; + case 2: + assertEquals(row.get(1).intValue(), 2); + break; + case 3: + assertEquals(row.get(1).intValue(), 1); + break; + default: + throw new IllegalStateException(); + } + } + } + @Test(dataProvider = "useBothQueryEngines") public void testFunnelMatchStepGroupByQueriesWithMode(boolean useMultiStageQueryEngine) throws Exception {