diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java index baf3686fbd..b67a9d8a84 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java @@ -28,8 +28,10 @@ public class BulkIngestApi { private final DatasetRateLimitingService datasetRateLimitingService; private final MeterRegistry meterRegistry; private final Counter incomingByteTotal; + private final Counter incomingDocsTotal; private final Timer bulkIngestTimer; private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte"; + private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs"; private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest"; public BulkIngestApi( @@ -41,6 +43,7 @@ public BulkIngestApi( this.datasetRateLimitingService = datasetRateLimitingService; this.meterRegistry = meterRegistry; this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL); + this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS); this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER); } @@ -70,6 +73,7 @@ public HttpResponse addDocument(String bulkRequest) { } for (Map.Entry> indexDocs : docs.entrySet()) { + incomingDocsTotal.increment(indexDocs.getValue().size()); final String index = indexDocs.getKey(); if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) { BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index baf58f146d..d32ba7d6ed 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -4,8 +4,11 @@ import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,12 +39,43 @@ public static Map> parseRequest(byte[] postBody) throws return convertIndexRequestToTraceFormat(parseBulkRequest(postBody)); } - protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { - ZonedDateTime timestamp = - (ZonedDateTime) + /** + * We need users to be able to specify the timestamp field and unit. For now we will do the + * following: 1. Check to see if the "timestamp" field exists and if it does parse that as a long + * in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since + * logstash sets that) 3. Use the current time from the ingestMetadata + */ + public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) { + // assumption that the provided timestamp is in millis + // at some point both th unit and field need to be configurable + // when we do that, remember to change the called to appropriately remove the field + if (ingestDocument.hasField("timestamp")) { + return ingestDocument.getFieldValue("timestamp", Long.class); + } + + if (ingestDocument.hasField("_timestamp")) { + return ingestDocument.getFieldValue("_timestamp", Long.class); + } + + if (ingestDocument.hasField("@timestamp")) { + String dateString = ingestDocument.getFieldValue("@timestamp", String.class); + LocalDateTime localDateTime = + LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); + Instant instant = localDateTime.toInstant(ZoneOffset.UTC); + return instant.toEpochMilli(); + } + + return ((ZonedDateTime) ingestDocument .getIngestMetadata() - .getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC)); + .getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC))) + .toInstant() + .toEpochMilli(); + } + + protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { + + long timestampInMillis = getTimestampFromIngestDocument(ingestDocument); Map sourceAndMetadata = ingestDocument.getSourceAndMetadata(); String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName()); @@ -56,15 +90,19 @@ protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { spanBuilder.setId(ByteString.copyFrom(id.getBytes())); // Trace.Span proto expects duration in microseconds today spanBuilder.setTimestamp( - TimeUnit.MICROSECONDS.convert(timestamp.toInstant().toEpochMilli(), TimeUnit.MILLISECONDS)); + TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS)); // Remove the following internal metadata fields that OpenSearch adds sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName()); sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName()); sourceAndMetadata.remove(IngestDocument.Metadata.VERSION_TYPE.getFieldName()); - // these two fields don't need to be tags as they have been explicitly set already + + // these fields don't need to be tags as they have been explicitly set already sourceAndMetadata.remove(IngestDocument.Metadata.ID.getFieldName()); sourceAndMetadata.remove(IngestDocument.Metadata.INDEX.getFieldName()); + sourceAndMetadata.remove("timestamp"); + sourceAndMetadata.remove("_timestamp"); + sourceAndMetadata.remove("@timestamp"); sourceAndMetadata.forEach( (key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value))); diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index 9c3aaf4f5d..86228bd08b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.opensearch.action.index.IndexRequest; +import org.opensearch.index.VersionType; import org.opensearch.ingest.IngestDocument; public class BulkApiRequestParserTest { @@ -28,13 +29,13 @@ private byte[] getRawQueryBytes(String filename) throws IOException { @Test public void testSimpleIndexRequest() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_simple"); + byte[] rawRequest = getRawQueryBytes("index_simple_with_ts"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); assertThat(indexRequests.get(0).index()).isEqualTo("test"); assertThat(indexRequests.get(0).id()).isEqualTo("1"); - assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2); + assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3); Map> indexDocs = BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); @@ -51,6 +52,7 @@ public void testSimpleIndexRequest() throws Exception { && keyValue.getVStr().equals("test")) .count()) .isEqualTo(1); + assertThat(indexDocs.get("test").get(0).getTimestamp()).isEqualTo(4739680479544000L); } @Test @@ -210,5 +212,53 @@ public void testTraceSpanGeneratedTimestamp() throws IOException { TimeUnit.MILLISECONDS.convert(span.getTimestamp(), TimeUnit.MICROSECONDS)); Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES); assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); + + Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES); + assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); + } + + @Test + public void testTimestampParsingFromIngestDocument() { + IngestDocument ingestDocument = + new IngestDocument("index", "1", "routing", 1L, VersionType.INTERNAL, Map.of()); + long timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + Instant ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); + + // this tests that the parser inserted a timestamp close to the current time + Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES); + Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES); + assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); + assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); + + // We respect the user provided @timestamp field + String ts = "2024-01-01T00:00:00Z"; + Instant providedTimeStamp = Instant.parse(ts); + ingestDocument = + new IngestDocument( + "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("@timestamp", ts)); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); + + ingestDocument = + new IngestDocument( + "index", + "1", + "routing", + 1L, + VersionType.INTERNAL, + Map.of("timestamp", providedTimeStamp.toEpochMilli())); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); + + ingestDocument = + new IngestDocument( + "index", + "1", + "routing", + 1L, + VersionType.INTERNAL, + Map.of("_timestamp", providedTimeStamp.toEpochMilli())); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java index b72bdb397d..cb4cf97e9d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java @@ -284,7 +284,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() metricsRegistry, 10 * 1024 * 1024 * 1024L, 1000000L); KaldbConfigs.IndexerConfig indexerConfig = - KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 0); + KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 10_000); initChunkManager( chunkRollOverStrategy, S3_TEST_BUCKET, @@ -312,8 +312,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() // Get the count of the amount of indices so that we can confirm we've cleaned them up // after the rollover - final File dataDirectory = new File(indexerConfig.getDataDirectory()); - final File indexDirectory = new File(dataDirectory.getAbsolutePath() + "/indices"); + final File indexDirectory = tmpPath.resolve("indices").toFile(); // files before rollover may or may-not be null, depending on other test timing int filesBeforeRollover = diff --git a/kaldb/src/test/resources/opensearchRequest/bulk/index_simple.ndjson b/kaldb/src/test/resources/opensearchRequest/bulk/index_simple.ndjson index a20ef5041a..104192040c 100644 --- a/kaldb/src/test/resources/opensearchRequest/bulk/index_simple.ndjson +++ b/kaldb/src/test/resources/opensearchRequest/bulk/index_simple.ndjson @@ -1,2 +1,2 @@ { "index" : { "_index" : "test", "_id" : "1" } } -{ "field1" : "value1", "field2" : "value2" } +{ "field1" : "value1", "field2" : "value2"} diff --git a/kaldb/src/test/resources/opensearchRequest/bulk/index_simple_with_ts.ndjson b/kaldb/src/test/resources/opensearchRequest/bulk/index_simple_with_ts.ndjson new file mode 100644 index 0000000000..b153fcda4b --- /dev/null +++ b/kaldb/src/test/resources/opensearchRequest/bulk/index_simple_with_ts.ndjson @@ -0,0 +1,2 @@ +{ "index" : { "_index" : "test", "_id" : "1" } } +{ "field1" : "value1", "field2" : "value2", "@timestamp": "2120-03-12T09:54:39.544Z" }