Skip to content

Commit

Permalink
validate timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Mar 18, 2024
1 parent 791e20a commit 64163ad
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 87 deletions.
13 changes: 8 additions & 5 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.slack.kaldb.chunk.ChunkInfo.toSnapshotMetadata;
import static com.slack.kaldb.logstore.BlobFsUtils.copyToS3;
import static com.slack.kaldb.logstore.BlobFsUtils.createURI;
import static com.slack.kaldb.writer.SpanFormatter.isValidTimestamp;

import com.google.common.annotations.VisibleForTesting;
import com.slack.kaldb.blobfs.BlobFs;
Expand Down Expand Up @@ -153,11 +154,13 @@ public void addMessage(Trace.Span message, String kafkaPartitionId, long offset)
if (!readOnly) {
logStore.addMessage(message);

chunkInfo.updateDataTimeRange(
TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS));
// if we do this i.e also validate the timestamp tests
// that use dates from 2020 start failing so not touching this logic for now
// chunkInfo.updateDataTimeRange(SpanFormatter.getTimestampFromSpan(message).toEpochMilli());
Instant timestamp =
Instant.ofEpochMilli(
TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS));
if (!isValidTimestamp(timestamp)) {
timestamp = Instant.now();
}
chunkInfo.updateDataTimeRange(timestamp.toEpochMilli());

chunkInfo.updateMaxOffset(offset);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.slack.kaldb.logstore.LogMessage.computedIndexName;
import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_INDEX_NAME;
import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE;
import static com.slack.kaldb.writer.SpanFormatter.isValidTimestamp;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -405,11 +406,20 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
throw new IllegalArgumentException("Span id is empty");
}

// TODO: this interferes in tests
// Instant timestamp = SpanFormatter.getTimestampFromSpan(message);
Instant timestamp =
Instant.ofEpochMilli(
TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS));
if (!isValidTimestamp(timestamp)) {
timestamp = Instant.now();
addField(
doc,
LogMessage.ReservedField.KALDB_INVALID_TIMESTAMP.fieldName,
message.getTimestamp(),
"",
0);
jsonMap.put(
LogMessage.ReservedField.KALDB_INVALID_TIMESTAMP.fieldName, message.getTimestamp());
}
addField(
doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -204,8 +202,7 @@ public void testAddAndSearchChunk() {

@Test
public void testAddAndSearchChunkInTimeRange() {
final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
final long messageStartTimeMs =
TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
Expand All @@ -223,7 +220,11 @@ public void testAddAndSearchChunkInTimeRange() {

final long expectedEndTimeEpochMs = messageStartTimeMs + (99 * 1000);
// Ensure chunk info is correct.
assertThat(chunk.info().getDataStartTimeEpochMs()).isEqualTo(messageStartTimeMs);
Instant oneMinBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
Instant oneMinBeforeAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(chunk.info().getDataStartTimeEpochMs()).isGreaterThan(oneMinBefore.toEpochMilli());
assertThat(chunk.info().getDataStartTimeEpochMs())
.isLessThan(oneMinBeforeAfter.toEpochMilli());
assertThat(chunk.info().getDataEndTimeEpochMs()).isEqualTo(expectedEndTimeEpochMs);
assertThat(chunk.info().chunkId).contains(CHUNK_DATA_PREFIX);
assertThat(chunk.info().getChunkSnapshotTimeEpochMs()).isZero();
Expand All @@ -248,9 +249,12 @@ public void testAddAndSearchChunkInTimeRange() {

// Add more messages in other time range and search again with new time ranges.
List<Trace.Span> newMessages =
SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime.plus(2, ChronoUnit.DAYS));
SpanUtil.makeSpansWithTimeDifference(
1, 100, 1000, startTime.plus(10, ChronoUnit.MINUTES));
final long newMessageStartTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
final long newMessageEndTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS);
for (Trace.Span m : newMessages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
offset++;
Expand All @@ -262,9 +266,10 @@ public void testAddAndSearchChunkInTimeRange() {
assertThat(getTimerCount(REFRESHES_TIMER, registry)).isEqualTo(2);
assertThat(getTimerCount(COMMITS_TIMER, registry)).isEqualTo(2);

assertThat(chunk.info().getDataStartTimeEpochMs()).isEqualTo(messageStartTimeMs);
assertThat(chunk.info().getDataEndTimeEpochMs())
.isEqualTo(newMessageStartTimeEpochMs + (99 * 1000));
assertThat(chunk.info().getDataStartTimeEpochMs()).isGreaterThan(oneMinBefore.toEpochMilli());
assertThat(chunk.info().getDataStartTimeEpochMs())
.isLessThan(oneMinBeforeAfter.toEpochMilli());
assertThat(chunk.info().getDataEndTimeEpochMs()).isEqualTo(newMessageEndTimeEpochMs);

// Search for message in expected time range.
searchChunk("Message1", messageStartTimeMs, expectedEndTimeEpochMs, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -192,8 +190,7 @@ public void testAddAndSearchChunk() {

@Test
public void testAddAndSearchChunkInTimeRange() {
final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
final long messageStartTimeMs =
TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
Expand All @@ -212,7 +209,11 @@ public void testAddAndSearchChunkInTimeRange() {
final long expectedEndTimeEpochMs =
TimeUnit.MILLISECONDS.convert(messages.get(99).getTimestamp(), TimeUnit.MICROSECONDS);
// Ensure chunk info is correct.
assertThat(chunk.info().getDataStartTimeEpochMs()).isEqualTo(messageStartTimeMs);
Instant oneMinBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
Instant oneMinBeforeAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(chunk.info().getDataStartTimeEpochMs()).isGreaterThan(oneMinBefore.toEpochMilli());
assertThat(chunk.info().getDataStartTimeEpochMs())
.isLessThan(oneMinBeforeAfter.toEpochMilli());
assertThat(chunk.info().getDataEndTimeEpochMs()).isEqualTo(expectedEndTimeEpochMs);
assertThat(chunk.info().chunkId).contains(CHUNK_DATA_PREFIX);
assertThat(chunk.info().getChunkSnapshotTimeEpochMs()).isZero();
Expand All @@ -238,9 +239,12 @@ public void testAddAndSearchChunkInTimeRange() {
// Add more messages in other time range and search again with new time ranges.

List<Trace.Span> newMessages =
SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime.plus(2, ChronoUnit.DAYS));
SpanUtil.makeSpansWithTimeDifference(
1, 100, 1000, startTime.plus(10, ChronoUnit.MINUTES));
final long newMessageStartTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
final long newMessageEndTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS);
for (Trace.Span m : newMessages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
offset++;
Expand All @@ -252,9 +256,10 @@ public void testAddAndSearchChunkInTimeRange() {
assertThat(getTimerCount(REFRESHES_TIMER, registry)).isEqualTo(2);
assertThat(getTimerCount(COMMITS_TIMER, registry)).isEqualTo(2);

assertThat(chunk.info().getDataStartTimeEpochMs()).isEqualTo(messageStartTimeMs);
assertThat(chunk.info().getDataEndTimeEpochMs())
.isEqualTo(newMessageStartTimeEpochMs + (99 * 1000));
assertThat(chunk.info().getDataStartTimeEpochMs()).isGreaterThan(oneMinBefore.toEpochMilli());
assertThat(chunk.info().getDataStartTimeEpochMs())
.isLessThan(oneMinBeforeAfter.toEpochMilli());
assertThat(chunk.info().getDataEndTimeEpochMs()).isEqualTo(newMessageEndTimeEpochMs);

// Search for message in expected time range.
searchChunk("Message1", messageStartTimeMs, expectedEndTimeEpochMs, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,16 +1071,15 @@ public void testCommitInvalidChunk() throws Exception {

@Test
public void testMultiChunkSearch() throws Exception {
final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();

final List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 10, 1000, startTime);
messages.addAll(
SpanUtil.makeSpansWithTimeDifference(11, 20, 1000, startTime.plus(2, ChronoUnit.HOURS)));
SpanUtil.makeSpansWithTimeDifference(11, 20, 1000, startTime.plus(2, ChronoUnit.MINUTES)));
messages.addAll(
SpanUtil.makeSpansWithTimeDifference(21, 30, 1000, startTime.plus(4, ChronoUnit.HOURS)));
SpanUtil.makeSpansWithTimeDifference(21, 30, 1000, startTime.plus(4, ChronoUnit.MINUTES)));
messages.addAll(
SpanUtil.makeSpansWithTimeDifference(31, 35, 1000, startTime.plus(6, ChronoUnit.HOURS)));
SpanUtil.makeSpansWithTimeDifference(31, 35, 1000, startTime.plus(6, ChronoUnit.MINUTES)));

final ChunkRollOverStrategy chunkRollOverStrategy =
new DiskOrMessageCountBasedRolloverStrategy(metricsRegistry, 10 * 1024 * 1024 * 1024L, 10L);
Expand Down Expand Up @@ -1148,7 +1147,7 @@ public void testMultiChunkSearch() throws Exception {
chunkManager, "Message11", messagesStartTimeMs, messagesStartTimeMs + 10000))
.isEqualTo(0);

final long chunk2StartTimeMs = chunk1StartTimeMs + Duration.ofHours(2).toMillis();
final long chunk2StartTimeMs = chunk1StartTimeMs + Duration.ofMinutes(2).toMillis();
final long chunk2EndTimeMs = chunk2StartTimeMs + 10000;

assertThat(searchAndGetHitCount(chunkManager, "Message11", chunk2StartTimeMs, chunk2EndTimeMs))
Expand All @@ -1161,7 +1160,7 @@ public void testMultiChunkSearch() throws Exception {
.isEqualTo(0);

// Chunk 3
final long chunk3StartTimeMs = chunk1StartTimeMs + Duration.ofHours(4).toMillis();
final long chunk3StartTimeMs = chunk1StartTimeMs + Duration.ofMinutes(4).toMillis();
final long chunk3EndTimeMs = chunk3StartTimeMs + 10000;

assertThat(searchAndGetHitCount(chunkManager, "Message21", chunk3StartTimeMs, chunk3EndTimeMs))
Expand All @@ -1174,7 +1173,7 @@ public void testMultiChunkSearch() throws Exception {
.isEqualTo(0);

// Chunk 4
final long chunk4StartTimeMs = chunk1StartTimeMs + Duration.ofHours(6).toMillis();
final long chunk4StartTimeMs = chunk1StartTimeMs + Duration.ofMinutes(6).toMillis();
final long chunk4EndTimeMs = chunk4StartTimeMs + 10000;

assertThat(searchAndGetHitCount(chunkManager, "Message31", chunk4StartTimeMs, chunk4EndTimeMs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ public void testDiskBasedRolloverWithMaxMessages() throws Exception {
initChunkManager(
chunkRollOverStrategy, S3_TEST_BUCKET, MoreExecutors.newDirectExecutorService());

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();

int totalMessages = 10;
int offset = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -108,8 +106,7 @@ private static KaldbSearch.SearchRequest.SearchAggregation buildHistogramRequest
public void testKalDbSearch() throws IOException {
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -176,8 +173,7 @@ public void testKalDbSearch() throws IOException {
public void testKalDbSearchNoData() throws IOException {
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -222,8 +218,7 @@ public void testKalDbSearchNoData() throws IOException {
public void testKalDbSearchNoHits() throws IOException {
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -270,8 +265,7 @@ public void testKalDbSearchNoHits() throws IOException {
public void testKalDbSearchNoHistogram() throws IOException {
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -326,8 +320,7 @@ public void testKalDbSearchNoHistogram() throws IOException {
public void testKalDbBadArgSearch() throws Throwable {
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -361,8 +354,7 @@ public void testKalDbGrpcSearch() throws IOException {
// Load test data into chunk manager.
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down Expand Up @@ -440,8 +432,7 @@ public void testKalDbGrpcSearchThrowsException() throws IOException {
// Load test data into chunk manager.
IndexingChunkManager<LogMessage> chunkManager = chunkManagerUtil.chunkManager;

final Instant startTime =
LocalDateTime.of(2020, 10, 1, 10, 10, 0).atZone(ZoneOffset.UTC).toInstant();
final Instant startTime = Instant.now();
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1000, startTime);
int offset = 1;
for (Trace.Span m : messages) {
Expand Down
Loading

0 comments on commit 64163ad

Please sign in to comment.