|
6 | 6 | * | __/\ V /| | || (_| | |_| | |_) |
|
7 | 7 | * \___| \_/ |_|\__\__,_|____/|____/
|
8 | 8 | *
|
9 |
| - * Copyright (c) 2023-2024 |
| 9 | + * Copyright (c) 2023-2025 |
10 | 10 | *
|
11 | 11 | * Licensed under the Business Source License, Version 1.1 (the "License");
|
12 | 12 | * you may not use this file except in compliance with the License.
|
|
34 | 34 | import io.evitadb.index.bitmap.Bitmap;
|
35 | 35 | import io.evitadb.index.bitmap.RoaringBitmapBackedBitmap;
|
36 | 36 | import lombok.Getter;
|
| 37 | +import lombok.RequiredArgsConstructor; |
37 | 38 | import org.roaringbitmap.BatchIterator;
|
38 | 39 | import org.roaringbitmap.RoaringBatchIterator;
|
39 | 40 | import org.roaringbitmap.RoaringBitmap;
|
@@ -91,12 +92,14 @@ private static PartialSortResult fetchSlice(
|
91 | 92 |
|
92 | 93 | // skip previous pages quickly
|
93 | 94 | if (toSkip > 0) {
|
94 |
| - toSkip -= bufferPeak; |
95 |
| - } |
96 |
| - if (skippedRecordsConsumer != null && skip > 0) { |
97 |
| - for (int i = 0; i < bufferPeak + toSkip; i++) { |
98 |
| - skippedRecordsConsumer.accept(preSortedRecordIds[buffer[i]]); |
| 95 | + if (skippedRecordsConsumer != null) { |
| 96 | + // skip records in buffer, cap really read records in the buffer |
| 97 | + final int skippedInBuffer = Math.max(0, Math.min(toSkip, bufferPeak)); |
| 98 | + for (int i = 0; i < skippedInBuffer; i++) { |
| 99 | + skippedRecordsConsumer.accept(preSortedRecordIds[buffer[i]]); |
| 100 | + } |
99 | 101 | }
|
| 102 | + toSkip -= bufferPeak; |
100 | 103 | }
|
101 | 104 |
|
102 | 105 | // now we are on the page
|
@@ -235,13 +238,15 @@ public int sortAndSlice(
|
235 | 238 | } else {
|
236 | 239 | final int[] buffer = queryContext.borrowBuffer();
|
237 | 240 | try {
|
| 241 | + final SkippingRecordConsumer delegateConsumer = new SkippingRecordConsumer(skippedRecordsConsumer); |
238 | 242 | final SortResult sortResult = collectPartialResults(
|
239 |
| - queryContext, selectedRecordIds, startIndex, endIndex, result, peak, buffer, skippedRecordsConsumer |
| 243 | + queryContext, selectedRecordIds, startIndex, endIndex, result, peak, buffer, delegateConsumer |
240 | 244 | );
|
241 | 245 | return returnResultAppendingUnknown(
|
242 | 246 | queryContext, sortResult.notSortedRecords(),
|
243 | 247 | unknownRecordIdsSorter,
|
244 |
| - startIndex, endIndex, |
| 248 | + Math.max(0, startIndex - delegateConsumer.getCounter()), |
| 249 | + Math.max(0, endIndex - delegateConsumer.getCounter()), |
245 | 250 | result, sortResult.peak(), buffer
|
246 | 251 | );
|
247 | 252 | } finally {
|
@@ -336,4 +341,22 @@ private record SortResult(
|
336 | 341 | ) {
|
337 | 342 | }
|
338 | 343 |
|
| 344 | + /** |
| 345 | + * The SkippingRecordConsumer is an implementation of the {@link IntConsumer} interface that wraps an optional delegate |
| 346 | + * {@code IntConsumer} and tracks the count of records processed. This class is intended to be used for scenarios |
| 347 | + * where certain records need to be tracked, possibly skipped, or processed with optional side effects. |
| 348 | + */ |
| 349 | + @RequiredArgsConstructor |
| 350 | + private static class SkippingRecordConsumer implements IntConsumer { |
| 351 | + @Nullable private final IntConsumer delegate; |
| 352 | + @Getter private int counter = 0; |
| 353 | + |
| 354 | + @Override |
| 355 | + public void accept(int value) { |
| 356 | + if (this.delegate != null) { |
| 357 | + this.delegate.accept(value); |
| 358 | + } |
| 359 | + this.counter++; |
| 360 | + } |
| 361 | + } |
339 | 362 | }
|
0 commit comments