Skip to content

Commit

Permalink
Fix some bugs related to FunnelMaxStepAggregationFunction (apache#13228)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored May 25, 2024
1 parent b6e8135 commit 518fd18
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1747,11 +1747,12 @@ public ObjectLinkedOpenHashSet<String> deserialize(ByteBuffer byteBuffer) {

@Override
public byte[] serialize(PriorityQueue<FunnelStepEvent> funnelStepEvents) {
long bufferSize = Integer.BYTES + funnelStepEvents.size() * FunnelStepEvent.SIZE_IN_BYTES;
int numEvents = funnelStepEvents.size();
long bufferSize = Integer.BYTES + (long) numEvents * FunnelStepEvent.SIZE_IN_BYTES;
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
byte[] bytes = new byte[(int) bufferSize];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
byteBuffer.putInt(funnelStepEvents.size());
byteBuffer.putInt(numEvents);
for (FunnelStepEvent funnelStepEvent : funnelStepEvents) {
byteBuffer.put(funnelStepEvent.getBytes());
}
Expand All @@ -1766,9 +1767,12 @@ public PriorityQueue<FunnelStepEvent> deserialize(byte[] bytes) {
@Override
public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
int size = byteBuffer.getInt();
if (size == 0) {
return new PriorityQueue<>();
}
PriorityQueue<FunnelStepEvent> funnelStepEvents = new PriorityQueue<>(size);
byte[] bytes = new byte[FunnelStepEvent.SIZE_IN_BYTES];
for (int i = 0; i < size; i++) {
byte[] bytes = new byte[FunnelStepEvent.SIZE_IN_BYTES];
byteBuffer.get(bytes);
funnelStepEvents.add(new FunnelStepEvent(bytes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
import org.apache.pinot.segment.spi.AggregationFunctionType;


public class FunnelMaxStepAggregationFunction
implements AggregationFunction<PriorityQueue<FunnelStepEvent>, Long> {
public class FunnelMaxStepAggregationFunction implements AggregationFunction<PriorityQueue<FunnelStepEvent>, Long> {
private final ExpressionContext _timestampExpression;
private final long _windowSize;
private final List<ExpressionContext> _stepExpressions;
Expand Down Expand Up @@ -77,8 +76,7 @@ public AggregationFunctionType getType() {
@Override
public String getResultColumnName() {
return getType().getName().toLowerCase() + "(" + _windowSize + ") (" + _timestampExpression.toString() + ", "
+ _stepExpressions.stream().map(ExpressionContext::toString)
.collect(Collectors.joining(",")) + ")";
+ _stepExpressions.stream().map(ExpressionContext::toString).collect(Collectors.joining(",")) + ")";
}

@Override
Expand All @@ -103,11 +101,15 @@ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int ma
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
List<int[]> stepBlocks = new ArrayList<>();
List<int[]> stepBlocks = new ArrayList<>(_numSteps);
for (ExpressionContext stepExpression : _stepExpressions) {
stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
}
PriorityQueue<FunnelStepEvent> stepEvents = new PriorityQueue<>(length);
PriorityQueue<FunnelStepEvent> stepEvents = aggregationResultHolder.getResult();
if (stepEvents == null) {
stepEvents = new PriorityQueue<>();
aggregationResultHolder.setValue(stepEvents);
}
for (int i = 0; i < length; i++) {
for (int j = 0; j < _numSteps; j++) {
if (stepBlocks.get(j)[i] == 1) {
Expand All @@ -116,14 +118,13 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
}
}
}
aggregationResultHolder.setValue(stepEvents);
}

@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
List<int[]> stepBlocks = new ArrayList<>();
List<int[]> stepBlocks = new ArrayList<>(_numSteps);
for (ExpressionContext stepExpression : _stepExpressions) {
stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
}
Expand All @@ -134,9 +135,9 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
PriorityQueue<FunnelStepEvent> stepEvents = groupByResultHolder.getResult(groupKey);
if (stepEvents == null) {
stepEvents = new PriorityQueue<>();
groupByResultHolder.setValueForKey(groupKey, stepEvents);
}
stepEvents.add(new FunnelStepEvent(timestampBlock[i], j));
groupByResultHolder.setValueForKey(groupKey, stepEvents);
break;
}
}
Expand All @@ -147,7 +148,7 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
List<int[]> stepBlocks = new ArrayList<>();
List<int[]> stepBlocks = new ArrayList<>(_numSteps);
for (ExpressionContext stepExpression : _stepExpressions) {
stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
}
Expand All @@ -159,9 +160,9 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
PriorityQueue<FunnelStepEvent> stepEvents = groupByResultHolder.getResult(groupKey);
if (stepEvents == null) {
stepEvents = new PriorityQueue<>();
groupByResultHolder.setValueForKey(groupKey, stepEvents);
}
stepEvents.add(new FunnelStepEvent(timestampBlock[i], j));
groupByResultHolder.setValueForKey(groupKey, stepEvents);
}
break;
}
Expand Down Expand Up @@ -298,18 +299,17 @@ public Long mergeFinalResult(Long finalResult1, Long finalResult2) {

@Override
public String toExplainString() {
//@formatter:off
return "WindowFunnelAggregationFunction{"
+ "_timestampExpression=" + _timestampExpression
+ ", _windowSize=" + _windowSize
+ ", _stepExpressions=" + _stepExpressions
+ ", _numSteps=" + _numSteps
+ "timestampExpression=" + _timestampExpression
+ ", windowSize=" + _windowSize
+ ", stepExpressions=" + _stepExpressions
+ '}';
//@formatter:on
}

enum Mode {
STRICT_DEDUPLICATION(1),
STRICT_ORDER(2),
STRICT_INCREASE(4);
STRICT_DEDUPLICATION(1), STRICT_ORDER(2), STRICT_INCREASE(4);

private final int _value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ public FunnelStepEvent(long timestamp, int step) {
}

public FunnelStepEvent(byte[] bytes) {
try {
DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
try (DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes))) {
_timestamp = dataInputStream.readLong();
_step = dataInputStream.readInt();
dataInputStream.close();
} catch (Exception e) {
throw new RuntimeException("Caught exception while converting byte[] to FunnelStepEvent", e);
}
Expand All @@ -56,10 +54,7 @@ public int getStep() {

@Override
public String toString() {
return "StepEvent{"
+ "timestamp=" + _timestamp
+ ", step=" + _step
+ '}';
return "StepEvent{" + "timestamp=" + _timestamp + ", step=" + _step + '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;


public class ObjectSerDeUtilsTest {
Expand Down Expand Up @@ -601,7 +602,7 @@ public void testOrderedStringSet() {
public void testFunnelStepEventAccumulator() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
int size = RANDOM.nextInt(1000);
PriorityQueue<FunnelStepEvent> expected = new PriorityQueue<FunnelStepEvent>();
PriorityQueue<FunnelStepEvent> expected = new PriorityQueue<>();
for (int j = 0; j < size; j++) {
expected.add(new FunnelStepEvent(RANDOM.nextLong(), RANDOM.nextInt()));
}
Expand All @@ -612,5 +613,10 @@ public void testFunnelStepEventAccumulator() {
assertEquals(actual.poll(), expected.poll(), ERROR_MESSAGE);
}
}
// Test empty queue
PriorityQueue<FunnelStepEvent> empty = new PriorityQueue<>();
PriorityQueue<FunnelStepEvent> deserialized = ObjectSerDeUtils.deserialize(ObjectSerDeUtils.serialize(empty),
ObjectSerDeUtils.ObjectType.FunnelStepEventAccumulator);
assertTrue(deserialized.isEmpty());
}
}

0 comments on commit 518fd18

Please sign in to comment.