From edfcf43940e0f187ae0aebe62b4dbbe71f57643a Mon Sep 17 00:00:00 2001 From: vthacker Date: Thu, 1 Feb 2024 13:51:33 -0800 Subject: [PATCH] parse timestamp field in a try-catch --- .../opensearch/BulkApiRequestParser.java | 36 +++++++++++-------- .../opensearch/BulkApiRequestParserTest.java | 35 ++++++++++++++++++ 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index d32ba7d6ed..1f815fc64e 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -46,23 +46,29 @@ public static Map> parseRequest(byte[] postBody) throws * logstash sets that) 3. Use the current time from the ingestMetadata */ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) { - // assumption that the provided timestamp is in millis - // at some point both th unit and field need to be configurable - // when we do that, remember to change the called to appropriately remove the field - if (ingestDocument.hasField("timestamp")) { - return ingestDocument.getFieldValue("timestamp", Long.class); - } - if (ingestDocument.hasField("_timestamp")) { - return ingestDocument.getFieldValue("_timestamp", Long.class); - } + try { + if (ingestDocument.hasField("@timestamp")) { + String dateString = ingestDocument.getFieldValue("@timestamp", String.class); + LocalDateTime localDateTime = + LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); + Instant instant = localDateTime.toInstant(ZoneOffset.UTC); + return instant.toEpochMilli(); + } - if (ingestDocument.hasField("@timestamp")) { - String dateString = ingestDocument.getFieldValue("@timestamp", String.class); - LocalDateTime localDateTime = - LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); - Instant instant = localDateTime.toInstant(ZoneOffset.UTC); - return instant.toEpochMilli(); + // assumption that the provided timestamp is in millis + // at some point both th unit and field need to be configurable + // when we do that, remember to change the called to appropriately remove the field + if (ingestDocument.hasField("timestamp")) { + return ingestDocument.getFieldValue("timestamp", Long.class); + } + + if (ingestDocument.hasField("_timestamp")) { + return ingestDocument.getFieldValue("_timestamp", Long.class); + } + } catch (Exception e) { + LOG.warn( + "Unable to parse timestamp from ingest document. Using current time as timestamp", e); } return ((ZonedDateTime) diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index 86228bd08b..973b943397 100644 --- a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -239,6 +239,41 @@ public void testTimestampParsingFromIngestDocument() { timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); + // we put a long in the @timestamp field, which today we don't parse + // so it won't be 2024-01-01 but be the current timestamp + ingestDocument = + new IngestDocument( + "index", + "1", + "routing", + 1L, + VersionType.INTERNAL, + Map.of("@timestamp", providedTimeStamp.toEpochMilli())); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); + assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); + assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); + + // we put a string in the timestamp field, which today we don't parse + // so it won't be 2024-01-01 but be the current timestamp + ingestDocument = + new IngestDocument( + "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("_timestamp", ts)); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); + assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); + assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); + + // we put a string in the _timestamp field, which today we don't parse + // so it won't be 2024-01-01 but be the current timestamp + ingestDocument = + new IngestDocument( + "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("timestamp", ts)); + timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); + assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); + assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); + ingestDocument = new IngestDocument( "index",