Skip to content

Commit

Permalink
[test] Fix test in KeyValueFileStoreScanTest, make it more accurate (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Mar 28, 2024
1 parent 2407353 commit ac27b66
Showing 1 changed file with 25 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestEntry;
Expand Down Expand Up @@ -148,19 +146,19 @@ public void testWithKeyFilter() throws Exception {
}

@Test
public void testWithValueFilter() throws Exception {
public void testWithValueFilterBucket() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
// 0 <= hr <= 999
List<KeyValue> data = generateData(100, random.nextInt(1000));
// 0 <= item <= 999
List<KeyValue> data = generateData(100, 0, (long) random.nextInt(1000));
writeData(data, 0);
// 1000 <= hr <= 1999
data = generateData(100, random.nextInt(1000) + 1000);
// 1000 <= item <= 1999
data = generateData(100, 0, (long) random.nextInt(1000) + 1000);
writeData(data, 1);
// 2000 <= hr <= 2999
data = generateData(100, random.nextInt(1000) + 2000);
// 2000 <= item <= 2999
data = generateData(100, 0, (long) random.nextInt(1000) + 2000);
writeData(data, 2);
// 3000 <= hr <= 3999
data = generateData(100, random.nextInt(1000) + 3000);
// 3000 <= item <= 3999
data = generateData(100, 0, (long) random.nextInt(1000) + 3000);
Snapshot snapshot = writeData(data, 3);

KeyValueFileStoreScan scan = store.newScan();
Expand All @@ -171,7 +169,7 @@ public void testWithValueFilter() throws Exception {
scan.withSnapshot(snapshot.id());
scan.withValueFilter(
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
.between(1, 1000, 1999));
.between(4, 1000L, 1999L));

List<ManifestEntry> filesFiltered = scan.plan().files();

Expand All @@ -180,16 +178,16 @@ public void testWithValueFilter() throws Exception {
}

@Test
public void testWithValuePartitionFilter() throws Exception {
public void testWithValueFilterPartition() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(100, Math.abs(random.nextInt(1000)));
writeData(data, "0", 0);
data = generateData(100, Math.abs(random.nextInt(1000)) + 1000);
writeData(data, "1", 0);
data = generateData(100, Math.abs(random.nextInt(1000)) + 2000);
writeData(data, "2", 0);
generateData(100, Math.abs(random.nextInt(1000)) + 3000);
Snapshot snapshot = writeData(data, "3", 0);
List<KeyValue> data = generateData(100, 0, (long) Math.abs(random.nextInt(1000)));
writeData(data, 0);
data = generateData(100, 1, (long) Math.abs(random.nextInt(1000)) + 1000);
writeData(data, 0);
data = generateData(100, 2, (long) Math.abs(random.nextInt(1000)) + 2000);
writeData(data, 0);
data = generateData(100, 3, (long) Math.abs(random.nextInt(1000)) + 3000);
Snapshot snapshot = writeData(data, 0);

KeyValueFileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
Expand All @@ -199,7 +197,7 @@ public void testWithValuePartitionFilter() throws Exception {
scan.withSnapshot(snapshot.id());
scan.withValueFilter(
new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
.between(1, 1000, 2000));
.between(4, 1000L, 1999L));

List<ManifestEntry> filesFiltered = scan.plan().files();

Expand Down Expand Up @@ -312,9 +310,13 @@ private List<KeyValue> generateData(int numRecords) {
}

private List<KeyValue> generateData(int numRecords, int hr) {
return generateData(numRecords, hr, null);
}

private List<KeyValue> generateData(int numRecords, int hr, Long itemId) {
List<KeyValue> data = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
data.add(gen.nextInsert("", hr, null, null, null));
data.add(gen.nextInsert("", hr, itemId, null, null));
}
return data;
}
Expand All @@ -329,16 +331,6 @@ private Snapshot writeData(List<KeyValue> kvs, int bucket) throws Exception {
return snapshots.get(snapshots.size() - 1);
}

private Snapshot writeData(List<KeyValue> kvs, String partition, int bucket) throws Exception {
BinaryRow binaryRow = new BinaryRow(2);
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
binaryRowWriter.writeString(0, BinaryString.fromString(partition));
binaryRowWriter.writeInt(1, 0);
binaryRowWriter.complete();
List<Snapshot> snapshots = store.commitData(kvs, p -> binaryRow, b -> bucket);
return snapshots.get(snapshots.size() - 1);
}

private int getBucket(KeyValue kv) {
return (kv.key().hashCode() % NUM_BUCKETS + NUM_BUCKETS) % NUM_BUCKETS;
}
Expand Down

0 comments on commit ac27b66

Please sign in to comment.