Skip to content

Commit

Permalink
Better examples, reverse order keep limit, reverse order tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Nov 7, 2024
1 parent 53353f5 commit 4224319
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,37 @@ public CompletableFuture<Iterator<StoredGroupedMessageBatch>> nextIterator() {
return CompletableFuture.completedFuture(null);
}

// There is no way to make queries with limit work 100% of the cases through cassandra query filtering with only first_message_time and first_message_date being clustering columns.
// It is required to have last_message_time, last_message_date to make DB filtering possible
// We have to rely on programmatic filtering for now

// Example:
// batch1: t1 - t1
// batch2: t1 - t2
//
// start time filter >= t1 + 5
// limit: 1
//
// If we put first_message_time >= t1 + 5 clause in cassandra query we will not receive both batches
// If we put first_message_time >= t1 and LIMIT 1 clauses in cassandra query we will receive batch1 and we will programmatically filter it.
// Only if we put first_message_time >= t1 without LIMIT clause we will receive batch1 and batch2 and we will be able to filter batch1 programmatically and return batch2 to user.

CassandraGroupedMessageFilter cassandraFilter = createFilter(nextPage, 0);
CassandraGroupedMessageFilter cassandraFilter;
if(this.order == Order.DIRECT) {
// There is no way to make queries with limit work 100% of the cases through cassandra query filtering with only first_message_time and first_message_date being clustering columns.
// It is required to have last_message_time, last_message_date to make DB filtering possible
// We have to rely on programmatic filtering for now

// Example 1:
// startTime from user: 18:00 LIMIT: 1 ORDER: DIRECT
//
// Batches on the page:
// batch1: 17:59 - 17:59
// batch2: 18:01 - 18:01
//
// first query from getNearestBatchTime: give me the last batch out of all pages where start time <= 18:00 -> batch1
// second query with start 17:59 limit 1 and direction DIRECT: cassandra returns batch1, this batch is later programmatically filtered as last_message_time < 18:00. User haven't received batch2 as it should.
//

// Example 2:
// startTime from user: 18:00 LIMIT: 1 ORDER: DIRECT
//
// Batches on the page:
// batch1: 17:59 - 17:59
// batch2: 17:59 - 18:01
//
// first query from getNearestBatchTime: give me the last batch out of all pages where start time <= 18:00 -> batch2
// second query with start 17:59 limit 1 and direction DIRECT: cassandra returns batch1, this batch is later programmatically filtered as last_message_time < 18:00. User haven't received batch2 as it should.

cassandraFilter = createFilter(nextPage, 0);
} else {
cassandraFilter = createFilter(nextPage, max(limit - returned.get(), 0));
}


logger.debug("Getting next iterator for '{}' by filter {}", getRequestInfo(), cassandraFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ protected void generateData() {
GroupedMessageBatchToStore b8 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 56, 18L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 58, 19L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 60, 20L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 59, 20L));

List<GroupedMessageBatchToStore> data = List.of(b1, b2, b3, b4, b5, b6, b7, b8);
GroupedMessageBatchToStore b9 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b9.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 60, 21L));

List<GroupedMessageBatchToStore> data = List.of(b1, b2, b3, b4, b5, b6, b7, b8, b9);
for (GroupedMessageBatchToStore el : data) {
storage.storeGroupedMessageBatch(el);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.exactpro.cradle.cassandra.integration.messages;

import com.exactpro.cradle.Order;
import com.exactpro.cradle.messages.GroupedMessageFilter;
import com.exactpro.cradle.utils.CradleStorageException;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -70,13 +71,14 @@ public void getGroupedMessagesWithWideIntervalTest2() throws CradleStorageExcept
.build();
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Assertions.assertThat(resultAsList.size()).isEqualTo(6);
Assertions.assertThat(resultAsList.size()).isEqualTo(7);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(4);
Assertions.assertThat(resultAsList.get(1).getMessages().size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(2).getMessages().size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(3).getMessages().size()).isEqualTo(2);
Assertions.assertThat(resultAsList.get(4).getMessages().size()).isEqualTo(2);
Assertions.assertThat(resultAsList.get(5).getMessages().size()).isEqualTo(3);
Assertions.assertThat(resultAsList.get(6).getMessages().size()).isEqualTo(1);
}


Expand Down Expand Up @@ -248,7 +250,7 @@ public void getGroupedMessageWithBothTimeLimitsResultInTheMiddleOfBatch() throws
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with time borders corresponding batch8 but not equal to first and last timestamp")
Expand All @@ -269,7 +271,7 @@ public void getGroupedMessageWithBothTimeLimitsCoveringWholeBatch() throws Cradl
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with start time limit corresponding to batch5 but less than its start time.")
Expand Down Expand Up @@ -309,6 +311,71 @@ public void getGroupedMessageWithLeftLimitBeforeLastBatchStart() throws CradleSt
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with start time limit is the same as start time of the batch")
public void getGroupedMessagesWithLimitInReverseStartTimesEqual() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(60, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();


// Expected batch9 from BaseMessageApiTest
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(1);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1.")
public void getGroupedMessagesWithLimitInReverseStartTimeRequestLessThanStartTimeBatch() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(59, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();


// Expected batch9 from BaseMessageApiTest
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(1);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1.")
public void getGroupedMessagesWithLimitInReverseStartTimeRequestInTheMiddleOfTheBatch() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(58, ChronoUnit.MINUTES))
.timestampTo().isLessThanOrEqualTo(dataStart.plus(59, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();


// Expected batch8 from BaseMessageApiTest
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
System.out.println(resultAsList.get(0).getFirstTimestamp() + " " + resultAsList.get(0).getLastTimestamp());
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
}
}
Empty file modified gradlew
100755 → 100644
Empty file.

0 comments on commit 4224319

Please sign in to comment.