Skip to content

Commit

Permalink
respect user provided timestamp (#753)
Browse files Browse the repository at this point in the history
* respect user provided timestamp

* remove from tag list
  • Loading branch information
vthacker authored Jan 31, 2024
1 parent 0adba74 commit abc781b
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ public class BulkIngestApi {
private final DatasetRateLimitingService datasetRateLimitingService;
private final MeterRegistry meterRegistry;
private final Counter incomingByteTotal;
private final Counter incomingDocsTotal;
private final Timer bulkIngestTimer;
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";

public BulkIngestApi(
Expand All @@ -41,6 +43,7 @@ public BulkIngestApi(
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
}

Expand Down Expand Up @@ -70,6 +73,7 @@ public HttpResponse addDocument(String bulkRequest) {
}

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
incomingDocsTotal.increment(indexDocs.getValue().size());
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -36,12 +39,43 @@ public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
ZonedDateTime timestamp =
(ZonedDateTime)
/**
* We need users to be able to specify the timestamp field and unit. For now we will do the
* following: 1. Check to see if the "timestamp" field exists and if it does parse that as a long
* in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since
* logstash sets that) 3. Use the current time from the ingestMetadata
*/
public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) {
// assumption that the provided timestamp is in millis
// at some point both th unit and field need to be configurable
// when we do that, remember to change the called to appropriately remove the field
if (ingestDocument.hasField("timestamp")) {
return ingestDocument.getFieldValue("timestamp", Long.class);
}

if (ingestDocument.hasField("_timestamp")) {
return ingestDocument.getFieldValue("_timestamp", Long.class);
}

if (ingestDocument.hasField("@timestamp")) {
String dateString = ingestDocument.getFieldValue("@timestamp", String.class);
LocalDateTime localDateTime =
LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME);
Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
return instant.toEpochMilli();
}

return ((ZonedDateTime)
ingestDocument
.getIngestMetadata()
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC));
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC)))
.toInstant()
.toEpochMilli();
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName());
Expand All @@ -56,15 +90,19 @@ protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
spanBuilder.setId(ByteString.copyFrom(id.getBytes()));
// Trace.Span proto expects duration in microseconds today
spanBuilder.setTimestamp(
TimeUnit.MICROSECONDS.convert(timestamp.toInstant().toEpochMilli(), TimeUnit.MILLISECONDS));
TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS));

// Remove the following internal metadata fields that OpenSearch adds
sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.VERSION_TYPE.getFieldName());
// these two fields don't need to be tags as they have been explicitly set already

// these fields don't need to be tags as they have been explicitly set already
sourceAndMetadata.remove(IngestDocument.Metadata.ID.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.INDEX.getFieldName());
sourceAndMetadata.remove("timestamp");
sourceAndMetadata.remove("_timestamp");
sourceAndMetadata.remove("@timestamp");

sourceAndMetadata.forEach(
(key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.index.VersionType;
import org.opensearch.ingest.IngestDocument;

public class BulkApiRequestParserTest {
Expand All @@ -28,13 +29,13 @@ private byte[] getRawQueryBytes(String filename) throws IOException {

@Test
public void testSimpleIndexRequest() throws Exception {
byte[] rawRequest = getRawQueryBytes("index_simple");
byte[] rawRequest = getRawQueryBytes("index_simple_with_ts");

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(1);
assertThat(indexRequests.get(0).index()).isEqualTo("test");
assertThat(indexRequests.get(0).id()).isEqualTo("1");
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2);
assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3);

Map<String, List<Trace.Span>> indexDocs =
BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests);
Expand All @@ -51,6 +52,7 @@ public void testSimpleIndexRequest() throws Exception {
&& keyValue.getVStr().equals("test"))
.count())
.isEqualTo(1);
assertThat(indexDocs.get("test").get(0).getTimestamp()).isEqualTo(4739680479544000L);
}

@Test
Expand Down Expand Up @@ -210,5 +212,53 @@ public void testTraceSpanGeneratedTimestamp() throws IOException {
TimeUnit.MILLISECONDS.convert(span.getTimestamp(), TimeUnit.MICROSECONDS));
Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();

Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();
}

@Test
public void testTimestampParsingFromIngestDocument() {
IngestDocument ingestDocument =
new IngestDocument("index", "1", "routing", 1L, VersionType.INTERNAL, Map.of());
long timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
Instant ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);

// this tests that the parser inserted a timestamp close to the current time
Instant oneMinuteBefore = Instant.now().minus(1, ChronoUnit.MINUTES);
Instant oneMinuteAfter = Instant.now().plus(1, ChronoUnit.MINUTES);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();

// We respect the user provided @timestamp field
String ts = "2024-01-01T00:00:00Z";
Instant providedTimeStamp = Instant.parse(ts);
ingestDocument =
new IngestDocument(
"index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("@timestamp", ts));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

ingestDocument =
new IngestDocument(
"index",
"1",
"routing",
1L,
VersionType.INTERNAL,
Map.of("timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

ingestDocument =
new IngestDocument(
"index",
"1",
"routing",
1L,
VersionType.INTERNAL,
Map.of("_timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0()
metricsRegistry, 10 * 1024 * 1024 * 1024L, 1000000L);

KaldbConfigs.IndexerConfig indexerConfig =
KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 0);
KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 10_000);
initChunkManager(
chunkRollOverStrategy,
S3_TEST_BUCKET,
Expand Down Expand Up @@ -312,8 +312,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0()

// Get the count of the amount of indices so that we can confirm we've cleaned them up
// after the rollover
final File dataDirectory = new File(indexerConfig.getDataDirectory());
final File indexDirectory = new File(dataDirectory.getAbsolutePath() + "/indices");
final File indexDirectory = tmpPath.resolve("indices").toFile();

// files before rollover may or may-not be null, depending on other test timing
int filesBeforeRollover =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2" }
{ "field1" : "value1", "field2" : "value2"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2", "@timestamp": "2120-03-12T09:54:39.544Z" }

0 comments on commit abc781b

Please sign in to comment.