From 1fe714737e3285585016202af1c5202d43c64f63 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Mon, 1 Apr 2024 13:16:44 -0700 Subject: [PATCH] Use the schema info while creating lucene documents in the indexer (#816) * utilize schema info at the indexer * PR feedback --- .../opensearch/BulkApiRequestParser.java | 23 +- .../com/slack/kaldb/logstore/LogMessage.java | 1 + .../opensearch/OpenSearchAdapter.java | 34 +- .../SchemaAwareLogDocumentBuilderImpl.java | 277 ++++++++------- .../logstore/search/SearchResultUtils.java | 45 +-- .../kaldb/metadata/schema/FieldType.java | 304 +++++++++++------ .../com/slack/kaldb/writer/SpanFormatter.java | 138 +------- kaldb/src/main/proto/trace.proto | 17 - .../opensearch/BulkApiRequestParserTest.java | 25 +- .../kaldb/chunk/IndexingChunkImplTest.java | 3 +- .../kaldb/chunk/RecoveryChunkImplTest.java | 3 +- .../IndexingChunkManagerTest.java | 5 +- .../RecoveryChunkManagerTest.java | 3 +- .../ElasticsearchApiServiceTest.java | 55 +++ .../slack/kaldb/logstore/LogMessageTest.java | 4 +- .../logstore/LuceneIndexStoreImplTest.java | 13 +- .../schema/FieldConflictStrategyTests.java | 21 +- .../schema/RaiseErrorFieldValueTest.java | 1 - .../search/LogIndexSearcherImplTest.java | 29 +- .../search/SearchResultUtilsTest.java | 4 +- ....java => SpanFormatterWithSchemaTest.java} | 292 +++++++++++++--- .../com/slack/kaldb/testlib/SpanUtil.java | 33 +- .../writer/LogMessageWriterImplTest.java | 118 ++++++- .../kaldb/writer/MurronLogFormatterTest.java | 16 +- .../slack/kaldb/writer/SpanFormatterTest.java | 321 ------------------ .../bulk/index_all_schema_fields.ndjson | 6 + 26 files changed, 912 insertions(+), 879 deletions(-) delete mode 100644 kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java rename kaldb/src/test/java/com/slack/kaldb/schema/{SpanTagFormatterWithSchemaTest.java => SpanFormatterWithSchemaTest.java} (51%) delete mode 100644 kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java create mode 100644 kaldb/src/test/resources/opensearchRequest/bulk/index_all_schema_fields.ndjson diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index e4e02c050d..bbc53f3ffe 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; @@ -103,6 +104,27 @@ public static Trace.Span fromIngestDocument( spanBuilder.setTimestamp( TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS)); + if (sourceAndMetadata.get(LogMessage.ReservedField.PARENT_ID.fieldName) != null) { + spanBuilder.setParentId( + ByteString.copyFromUtf8( + (String) sourceAndMetadata.get(LogMessage.ReservedField.PARENT_ID.fieldName))); + sourceAndMetadata.remove(LogMessage.ReservedField.PARENT_ID.fieldName); + } + if (sourceAndMetadata.get(LogMessage.ReservedField.TRACE_ID.fieldName) != null) { + spanBuilder.setTraceId( + ByteString.copyFromUtf8( + (String) sourceAndMetadata.get(LogMessage.ReservedField.TRACE_ID.fieldName))); + sourceAndMetadata.remove(LogMessage.ReservedField.TRACE_ID.fieldName); + } + if (sourceAndMetadata.get(LogMessage.ReservedField.NAME.fieldName) != null) { + spanBuilder.setName((String) sourceAndMetadata.get(LogMessage.ReservedField.NAME.fieldName)); + sourceAndMetadata.remove(LogMessage.ReservedField.NAME.fieldName); + } + if (sourceAndMetadata.get("duration") != null) { + spanBuilder.setDuration(Long.parseLong(sourceAndMetadata.get("duration").toString())); + sourceAndMetadata.remove("duration"); + } + // Remove the following internal metadata fields that OpenSearch adds sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName()); sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName()); @@ -123,7 +145,6 @@ public static Trace.Span fromIngestDocument( spanBuilder.addTags( Trace.KeyValue.newBuilder() .setKey(SERVICE_NAME_KEY) - .setVType(Trace.ValueType.STRING) .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr(index) .build()); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LogMessage.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LogMessage.java index 57824fc72e..a156ba4d34 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LogMessage.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LogMessage.java @@ -57,6 +57,7 @@ public enum ReservedField { NAME("name"), SERVICE_NAME("service_name"), DURATION_MS("duration_ms"), + DURATION("duration"), TRACE_ID("trace_id"), PARENT_ID("parent_id"), KALDB_INVALID_TIMESTAMP("kaldb_invalid_timestamp"); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 02f7b429a6..1d074f2643 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -221,20 +221,36 @@ public void reloadSchema() { try { if (entry.getValue().fieldType == FieldType.TEXT) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "text")); - } else if (entry.getValue().fieldType == FieldType.STRING) { + } else if (entry.getValue().fieldType == FieldType.STRING + || entry.getValue().fieldType == FieldType.KEYWORD + || entry.getValue().fieldType == FieldType.ID) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword")); - } else if (entry.getValue().fieldType == FieldType.ID) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword")); - } else if (entry.getValue().fieldType == FieldType.INTEGER) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer")); - } else if (entry.getValue().fieldType == FieldType.LONG) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "long")); + } else if (entry.getValue().fieldType == FieldType.IP) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "ip")); + } else if (entry.getValue().fieldType == FieldType.DATE) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "date")); + } else if (entry.getValue().fieldType == FieldType.BOOLEAN) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "boolean")); } else if (entry.getValue().fieldType == FieldType.DOUBLE) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "double")); } else if (entry.getValue().fieldType == FieldType.FLOAT) { tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "float")); - } else if (entry.getValue().fieldType == FieldType.BOOLEAN) { - tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "boolean")); + } else if (entry.getValue().fieldType == FieldType.HALF_FLOAT) { + tryRegisterField( + mapperService, entry.getValue().name, b -> b.field("type", "half_float")); + } else if (entry.getValue().fieldType == FieldType.INTEGER) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer")); + } else if (entry.getValue().fieldType == FieldType.LONG) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "long")); + } else if (entry.getValue().fieldType == FieldType.SCALED_LONG) { + tryRegisterField( + mapperService, entry.getValue().name, b -> b.field("type", "scaled_long")); + } else if (entry.getValue().fieldType == FieldType.SHORT) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "short")); + } else if (entry.getValue().fieldType == FieldType.BYTE) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "byte")); + } else if (entry.getValue().fieldType == FieldType.BINARY) { + tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "binary")); } else { LOG.warn( "Field type '{}' is not yet currently supported for field '{}'", diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index d4bbe98ff8..812cdf0025 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -1,12 +1,12 @@ package com.slack.kaldb.logstore.schema; import static com.slack.kaldb.logstore.LogMessage.computedIndexName; +import static com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD; import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_INDEX_NAME; import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE; import static com.slack.kaldb.writer.SpanFormatter.isValidTimestamp; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.slack.kaldb.logstore.DocumentBuilder; import com.slack.kaldb.logstore.FieldDefMismatchException; @@ -14,13 +14,12 @@ import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.metadata.schema.FieldType; import com.slack.kaldb.metadata.schema.LuceneFieldDef; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.util.JsonUtil; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -130,38 +129,16 @@ public enum FieldConflictPolicy { CONVERT_VALUE_AND_DUPLICATE_FIELD } - private static final String PLACEHOLDER_FIELD_NAME = "PLACEHOLDER_FIELD_NAME"; - private static final Map defaultPropDescriptionForType = - ImmutableMap.of( - FieldType.LONG, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.LONG.name, false, true, true), - FieldType.FLOAT, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.FLOAT.name, false, true, true), - FieldType.INTEGER, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.INTEGER.name, false, true, true), - FieldType.DOUBLE, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.DOUBLE.name, false, true, true), - FieldType.TEXT, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.TEXT.name, false, true, false), - FieldType.STRING, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.STRING.name, false, true, true), - FieldType.BOOLEAN, - new LuceneFieldDef(PLACEHOLDER_FIELD_NAME, FieldType.BOOLEAN.name, false, true, true)); - - @VisibleForTesting - public FieldConflictPolicy getIndexFieldConflictPolicy() { - return indexFieldConflictPolicy; - } - private void addField( final Document doc, final String key, final Object value, + final Schema.SchemaFieldType schemaFieldType, final String keyPrefix, int nestingDepth) { // If value is a list, convert the value to a String and index the field. if (value instanceof List) { - addField(doc, key, Strings.join((List) value, ','), keyPrefix, nestingDepth); + addField(doc, key, Strings.join((List) value, ','), schemaFieldType, keyPrefix, nestingDepth); return; } @@ -170,12 +147,13 @@ private void addField( if (value instanceof Map) { if (nestingDepth >= MAX_NESTING_DEPTH) { // Once max nesting depth is reached, index the field as a string. - addField(doc, key, value.toString(), keyPrefix, nestingDepth + 1); + addField(doc, key, value.toString(), schemaFieldType, keyPrefix, nestingDepth + 1); } else { Map mapValue = (Map) value; for (Object k : mapValue.keySet()) { if (k instanceof String) { - addField(doc, (String) k, mapValue.get(k), fieldName, nestingDepth + 1); + addField( + doc, (String) k, mapValue.get(k), schemaFieldType, fieldName, nestingDepth + 1); } else { throw new FieldDefMismatchException( String.format( @@ -186,9 +164,9 @@ private void addField( return; } - FieldType valueType = getJsonType(value); + FieldType valueType = FieldType.valueOf(schemaFieldType.name()); if (!fieldDefMap.containsKey(fieldName)) { - indexNewField(doc, fieldName, value, valueType); + indexNewField(doc, fieldName, value, schemaFieldType); } else { LuceneFieldDef registeredField = fieldDefMap.get(fieldName); // If the field types are same or the fields are type aliases @@ -222,7 +200,7 @@ private void addField( registeredField.fieldType); // Add new field with new type String newFieldName = makeNewFieldOfType(fieldName, valueType); - indexNewField(doc, newFieldName, value, valueType); + indexNewField(doc, newFieldName, value, schemaFieldType); LOG.debug( "Added new field {} of type {} due to type conflict", newFieldName, valueType); convertAndDuplicateFieldCounter.increment(); @@ -237,27 +215,41 @@ private void addField( } } - private void indexNewField(Document doc, String key, Object value, FieldType valueType) { - // If we are seeing a field for the first time index it with default template for the - // valueType and create a field def. - if (!defaultPropDescriptionForType.containsKey(valueType)) { - throw new RuntimeException("No default prop description"); - } - - final LuceneFieldDef defaultPropDescription = defaultPropDescriptionForType.get(valueType); - final LuceneFieldDef newFieldDef = - new LuceneFieldDef( - key, - defaultPropDescription.fieldType.name, - defaultPropDescription.isStored, - defaultPropDescription.isIndexed, - defaultPropDescription.storeDocValue); - // add the document to this field. + private void indexNewField( + Document doc, String key, Object value, Schema.SchemaFieldType schemaFieldType) { + final LuceneFieldDef newFieldDef = getLuceneFieldDef(key, schemaFieldType); totalFieldsCounter.increment(); fieldDefMap.put(key, newFieldDef); indexTypedField(doc, key, value, newFieldDef); } + private boolean isStored(String fieldName) { + return fieldName.equals(LogMessage.SystemField.SOURCE.fieldName); + } + + private boolean isDocValueField(Schema.SchemaFieldType schemaFieldType, String fieldName) { + return !fieldName.equals(LogMessage.SystemField.SOURCE.fieldName) + || !schemaFieldType.equals(Schema.SchemaFieldType.TEXT); + } + + private boolean isIndexed(Schema.SchemaFieldType schemaFieldType, String fieldName) { + return !fieldName.equals(LogMessage.SystemField.SOURCE.fieldName) + || !schemaFieldType.equals(Schema.SchemaFieldType.BINARY); + } + + // In the future, we need this to take SchemaField instead of FieldType + // that way we can make isIndexed/isStored etc. configurable + // we don't put it in th proto today but when we move to ZK we'll change the KeyValue to take + // SchemaField info in the future + private LuceneFieldDef getLuceneFieldDef(String key, Schema.SchemaFieldType schemaFieldType) { + return new LuceneFieldDef( + key, + schemaFieldType.name(), + isStored(key), + isIndexed(schemaFieldType, key), + isDocValueField(schemaFieldType, key)); + } + static String makeNewFieldOfType(String key, FieldType valueType) { return key + "_" + valueType.getName(); } @@ -291,29 +283,6 @@ private static void indexTypedField( fieldDef.fieldType.addField(doc, key, value, fieldDef); } - private static FieldType getJsonType(Object value) { - if (value instanceof Long) { - return FieldType.LONG; - } - if (value instanceof Integer) { - return FieldType.INTEGER; - } - if (value instanceof String) { - return FieldType.STRING; - } - if (value instanceof Float) { - return FieldType.FLOAT; - } - if (value instanceof Boolean) { - return FieldType.BOOLEAN; - } - if (value instanceof Double) { - return FieldType.DOUBLE; - } - - throw new RuntimeException("Unknown type"); - } - public static SchemaAwareLogDocumentBuilderImpl build( FieldConflictPolicy fieldConflictPolicy, boolean enableFullTextSearch, @@ -373,6 +342,7 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { doc, LogMessage.ReservedField.PARENT_ID.fieldName, message.getParentId().toStringUtf8(), + Schema.SchemaFieldType.KEYWORD, "", 0); } @@ -382,26 +352,38 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { doc, LogMessage.ReservedField.TRACE_ID.fieldName, message.getTraceId().toStringUtf8(), + Schema.SchemaFieldType.KEYWORD, "", 0); } if (!message.getName().isEmpty()) { jsonMap.put(LogMessage.ReservedField.NAME.fieldName, message.getName()); - addField(doc, LogMessage.ReservedField.NAME.fieldName, message.getName(), "", 0); + addField( + doc, + LogMessage.ReservedField.NAME.fieldName, + message.getName(), + Schema.SchemaFieldType.KEYWORD, + "", + 0); } if (message.getDuration() != 0) { - jsonMap.put( - LogMessage.ReservedField.DURATION_MS.fieldName, - Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis()); + jsonMap.put(LogMessage.ReservedField.DURATION.fieldName, message.getDuration()); addField( doc, - LogMessage.ReservedField.DURATION_MS.fieldName, - Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis(), + LogMessage.ReservedField.DURATION.fieldName, + message.getDuration(), + Schema.SchemaFieldType.LONG, "", 0); } if (!message.getId().isEmpty()) { - addField(doc, LogMessage.SystemField.ID.fieldName, message.getId().toStringUtf8(), "", 0); + addField( + doc, + LogMessage.SystemField.ID.fieldName, + message.getId().toStringUtf8(), + Schema.SchemaFieldType.KEYWORD, + "", + 0); } else { throw new IllegalArgumentException("Span id is empty"); } @@ -415,6 +397,7 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { doc, LogMessage.ReservedField.KALDB_INVALID_TIMESTAMP.fieldName, message.getTimestamp(), + Schema.SchemaFieldType.LONG, "", 0); jsonMap.put( @@ -422,7 +405,12 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { } addField( - doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0); + doc, + LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, + timestamp.toEpochMilli(), + Schema.SchemaFieldType.LONG, + "", + 0); // todo - this should be removed once we simplify the time handling // this will be overridden below if a user provided value exists jsonMap.put(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toString()); @@ -450,11 +438,29 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { ? tags.get(LogMessage.ReservedField.TYPE.fieldName).getVStr() : DEFAULT_LOG_MESSAGE_TYPE; - addField(doc, LogMessage.ReservedField.TYPE.fieldName, msgType, "", 0); + addField( + doc, + LogMessage.ReservedField.TYPE.fieldName, + msgType, + Schema.SchemaFieldType.KEYWORD, + "", + 0); jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName); - addField(doc, LogMessage.SystemField.INDEX.fieldName, indexName, "", 0); - addField(doc, LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName, "", 0); + addField( + doc, + LogMessage.SystemField.INDEX.fieldName, + indexName, + Schema.SchemaFieldType.KEYWORD, + "", + 0); + addField( + doc, + LogMessage.ReservedField.SERVICE_NAME.fieldName, + indexName, + Schema.SchemaFieldType.KEYWORD, + "", + 0); tags.remove(LogMessage.ReservedField.SERVICE_NAME.fieldName); tags.remove(LogMessage.ReservedField.TYPE.fieldName); @@ -463,35 +469,70 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { tags.remove(LogMessage.ReservedField.PARENT_ID.fieldName); tags.remove(LogMessage.ReservedField.TRACE_ID.fieldName); tags.remove(LogMessage.ReservedField.NAME.fieldName); - tags.remove(LogMessage.ReservedField.DURATION_MS.fieldName); + tags.remove(LogMessage.ReservedField.DURATION.fieldName); tags.remove(LogMessage.SystemField.ID.fieldName); for (Trace.KeyValue keyValue : tags.values()) { - if (keyValue.getVType() == Trace.ValueType.STRING) { - addField(doc, keyValue.getKey(), keyValue.getVStr(), "", 0); + Schema.SchemaFieldType schemaFieldType = keyValue.getFieldType(); + // move to switch statements + if (schemaFieldType == Schema.SchemaFieldType.STRING + || schemaFieldType == Schema.SchemaFieldType.KEYWORD) { + addField(doc, keyValue.getKey(), keyValue.getVStr(), Schema.SchemaFieldType.KEYWORD, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVStr()); + } else if (schemaFieldType == Schema.SchemaFieldType.TEXT) { + addField(doc, keyValue.getKey(), keyValue.getVStr(), Schema.SchemaFieldType.TEXT, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVStr()); - } else if (keyValue.getVType() == Trace.ValueType.BOOL) { - addField(doc, keyValue.getKey(), keyValue.getVBool(), "", 0); + } else if (schemaFieldType == Schema.SchemaFieldType.IP) { + addField(doc, keyValue.getKey(), keyValue.getVStr(), Schema.SchemaFieldType.IP, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVStr()); + } else if (schemaFieldType == Schema.SchemaFieldType.DATE) { + addField(doc, keyValue.getKey(), keyValue.getVInt64(), Schema.SchemaFieldType.DATE, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); + } else if (schemaFieldType == Schema.SchemaFieldType.BOOLEAN) { + addField( + doc, keyValue.getKey(), keyValue.getVBool(), Schema.SchemaFieldType.BOOLEAN, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVBool()); - } else if (keyValue.getVType() == Trace.ValueType.INT32) { - addField(doc, keyValue.getKey(), keyValue.getVInt32(), "", 0); + } else if (schemaFieldType == Schema.SchemaFieldType.DOUBLE) { + addField( + doc, keyValue.getKey(), keyValue.getVFloat64(), Schema.SchemaFieldType.DOUBLE, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVFloat64()); + } else if (schemaFieldType == Schema.SchemaFieldType.FLOAT) { + addField( + doc, keyValue.getKey(), keyValue.getVFloat32(), Schema.SchemaFieldType.FLOAT, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVFloat32()); + } else if (schemaFieldType == Schema.SchemaFieldType.HALF_FLOAT) { + addField( + doc, + keyValue.getKey(), + keyValue.getVFloat32(), + Schema.SchemaFieldType.HALF_FLOAT, + "", + 0); + jsonMap.put(keyValue.getKey(), keyValue.getVFloat32()); + } else if (schemaFieldType == Schema.SchemaFieldType.INTEGER) { + addField( + doc, keyValue.getKey(), keyValue.getVInt32(), Schema.SchemaFieldType.INTEGER, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVInt32()); - } else if (keyValue.getVType() == Trace.ValueType.INT64) { - addField(doc, keyValue.getKey(), keyValue.getVInt64(), "", 0); + } else if (schemaFieldType == Schema.SchemaFieldType.LONG) { + addField(doc, keyValue.getKey(), keyValue.getVInt64(), Schema.SchemaFieldType.LONG, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); - } else if (keyValue.getVType() == Trace.ValueType.FLOAT32) { - addField(doc, keyValue.getKey(), keyValue.getVFloat32(), "", 0); - jsonMap.put(keyValue.getKey(), keyValue.getVFloat32()); - } else if (keyValue.getVType() == Trace.ValueType.FLOAT64) { - addField(doc, keyValue.getKey(), keyValue.getVFloat64(), "", 0); - jsonMap.put(keyValue.getKey(), keyValue.getVFloat64()); - } else if (keyValue.getVType() == Trace.ValueType.BINARY) { - addField(doc, keyValue.getKey(), keyValue.getVBinary().toStringUtf8(), "", 0); + } else if (schemaFieldType == Schema.SchemaFieldType.SCALED_LONG) { + addField(doc, keyValue.getKey(), keyValue.getVInt64(), Schema.SchemaFieldType.LONG, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); + } else if (schemaFieldType == Schema.SchemaFieldType.SHORT) { + addField(doc, keyValue.getKey(), keyValue.getVInt32(), Schema.SchemaFieldType.SHORT, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt32()); + } else if (schemaFieldType == Schema.SchemaFieldType.BYTE) { + addField(doc, keyValue.getKey(), keyValue.getVInt32(), Schema.SchemaFieldType.BYTE, "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt32()); + } else if (schemaFieldType == Schema.SchemaFieldType.BINARY) { + addField( + doc, keyValue.getKey(), keyValue.getVBinary(), Schema.SchemaFieldType.BINARY, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVBinary().toStringUtf8()); } else { LOG.warn( - "Skipping field with unknown value type {} with key {}", - keyValue.getVType(), + "Skipping field with unknown field type {} with key {}", + schemaFieldType, keyValue.getKey()); } } @@ -499,42 +540,18 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { LogWireMessage logWireMessage = new LogWireMessage(indexName, msgType, message.getId().toStringUtf8(), timestamp, jsonMap); final String msgString = JsonUtil.writeAsString(logWireMessage); - addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); - if (enableFullTextSearch) { - addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); - } - - return doc; - } - - // this should only have one reference from SpanFormatterTest - // We want to make sure none of our conversions changed subtly so keeping the old method alive for - // testing - // Once we get rid of LogMessage we can remove this and the associated test - @VisibleForTesting - @Deprecated - public Document fromMessage(LogMessage message) throws JsonProcessingException { - Document doc = new Document(); - addField(doc, LogMessage.SystemField.INDEX.fieldName, message.getIndex(), "", 0); addField( doc, - LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, - message.getTimestamp().toEpochMilli(), + LogMessage.SystemField.SOURCE.fieldName, + msgString, + Schema.SchemaFieldType.TEXT, "", 0); - addField(doc, LogMessage.ReservedField.TYPE.fieldName, message.getType(), "", 0); - addField(doc, LogMessage.SystemField.ID.fieldName, message.getId(), "", 0); - - final String msgString = JsonUtil.writeAsString(message.toWireMessage()); - addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); if (enableFullTextSearch) { - addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); + addField( + doc, LogMessage.SystemField.ALL.fieldName, msgString, Schema.SchemaFieldType.TEXT, "", 0); } - for (String key : message.getSource().keySet()) { - addField(doc, key, message.getSource().get(key), "", 0); - } - LOG.trace("Lucene document {} for message {}", doc, message); return doc; } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultUtils.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultUtils.java index 200f27ef7c..b99f8e912e 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultUtils.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/SearchResultUtils.java @@ -25,7 +25,6 @@ import com.slack.kaldb.logstore.search.aggregations.TermsAggBuilder; import com.slack.kaldb.logstore.search.aggregations.UniqueCountAggBuilder; import com.slack.kaldb.metadata.schema.FieldType; -import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.proto.service.KaldbSearch; import com.slack.kaldb.util.JsonUtil; import java.io.IOException; @@ -700,52 +699,12 @@ public static SearchResult fromSearchResultProto( public static FieldType fromSchemaDefinitionProto( KaldbSearch.SchemaDefinition protoSchemaDefinition) { - if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.TEXT)) { - return FieldType.TEXT; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.STRING)) { - return FieldType.STRING; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.INTEGER)) { - return FieldType.INTEGER; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.LONG)) { - return FieldType.LONG; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.FLOAT)) { - return FieldType.FLOAT; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.BOOLEAN)) { - return FieldType.BOOLEAN; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.DOUBLE)) { - return FieldType.DOUBLE; - } else if (protoSchemaDefinition.getType().equals(Schema.SchemaFieldType.ID)) { - return FieldType.ID; - } else { - throw new IllegalArgumentException( - String.format("Field type %s is not a supported type", protoSchemaDefinition.getType())); - } + return FieldType.fromSchemaFieldType(protoSchemaDefinition.getType()); } public static KaldbSearch.SchemaDefinition toSchemaDefinitionProto(FieldType fieldType) { KaldbSearch.SchemaDefinition.Builder schemaBuilder = KaldbSearch.SchemaDefinition.newBuilder(); - - if (fieldType.equals(FieldType.TEXT)) { - schemaBuilder.setType(Schema.SchemaFieldType.TEXT); - } else if (fieldType.equals(FieldType.STRING)) { - schemaBuilder.setType(Schema.SchemaFieldType.STRING); - } else if (fieldType.equals(FieldType.INTEGER)) { - schemaBuilder.setType(Schema.SchemaFieldType.INTEGER); - } else if (fieldType.equals(FieldType.LONG)) { - schemaBuilder.setType(Schema.SchemaFieldType.LONG); - } else if (fieldType.equals(FieldType.FLOAT)) { - schemaBuilder.setType(Schema.SchemaFieldType.FLOAT); - } else if (fieldType.equals(FieldType.BOOLEAN)) { - schemaBuilder.setType(Schema.SchemaFieldType.BOOLEAN); - } else if (fieldType.equals(FieldType.DOUBLE)) { - schemaBuilder.setType(Schema.SchemaFieldType.DOUBLE); - } else if (fieldType.equals(FieldType.ID)) { - schemaBuilder.setType(Schema.SchemaFieldType.ID); - } else { - throw new IllegalArgumentException( - String.format("Field type %s is not a supported type", fieldType)); - } - + schemaBuilder.setType(fieldType.toSchemaFieldType()); return schemaBuilder.build(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java index 76fb1388da..ba10e5d624 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/FieldType.java @@ -1,21 +1,20 @@ package com.slack.kaldb.metadata.schema; -import static org.opensearch.common.lucene.Lucene.KEYWORD_ANALYZER; -import static org.opensearch.common.lucene.Lucene.STANDARD_ANALYZER; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.ByteString; +import com.slack.kaldb.proto.schema.Schema; +import java.net.InetAddress; import java.util.List; import java.util.Set; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoubleDocValuesField; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; import org.apache.lucene.document.FloatDocValuesField; import org.apache.lucene.document.FloatPoint; +import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; @@ -24,13 +23,16 @@ import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.TermQuery; +import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; +import org.opensearch.common.network.InetAddresses; +import org.opensearch.index.mapper.BinaryFieldMapper; import org.opensearch.index.mapper.Uid; -/** The FieldType enum describes the types of fields in a chunk. */ +/** + * The FieldType enum describes the types of fields in a chunk. In the future we want to be able to + * leverage OpenSearch FieldMapper#createFields + */ public enum FieldType { TEXT("text") { @Override @@ -47,39 +49,39 @@ public void addField(Document doc, String name, Object value, LuceneFieldDef fie } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - throw new NotImplementedException( - "text fields parsing is currently implemented directly in KaldbQueryParser"); + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.TEXT; + } + }, + STRING("string") { + @Override + public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + KEYWORD.addField(doc, name, value, fieldDef); } @Override - public Analyzer getAnalyzer(boolean quoted) { - return quoted ? KEYWORD_ANALYZER : STANDARD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.KEYWORD; } }, - STRING("string") { + KEYWORD("keyword") { @Override public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + String fieldValue = (String) value; if (fieldDef.isIndexed) { - doc.add(new StringField(name, (String) value, getStoreEnum(fieldDef.isStored))); + doc.add(new StringField(name, fieldValue, getStoreEnum(fieldDef.isStored))); } if (fieldDef.isStored) { - doc.add(new StoredField(name, (String) value)); + doc.add(new StoredField(name, fieldValue)); } if (fieldDef.storeDocValue) { - doc.add(new SortedDocValuesField(name, new BytesRef((String) value))); + doc.add(new SortedDocValuesField(name, new BytesRef(fieldValue))); } } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - final Term term = new Term(field, queryText); - return new TermQuery(term); - } - - @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.KEYWORD; } }, ID("id") { @@ -98,64 +100,90 @@ public void addField(Document doc, String name, Object value, LuceneFieldDef fie } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - final Term term = new Term(field, queryText); - return new TermQuery(term); + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.ID; + } + }, + IP("ip") { + @Override + public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + try { + String addressAsString = (String) value; + InetAddress address = InetAddresses.forString(addressAsString); + if (fieldDef.isIndexed) { + doc.add(new InetAddressPoint(name, address)); + } + if (fieldDef.isStored) { + doc.add(new StoredField(name, new BytesRef(addressAsString))); + } + if (fieldDef.storeDocValue) { + doc.add(new SortedDocValuesField(name, new BytesRef(InetAddressPoint.encode(address)))); + } + + } catch (IllegalArgumentException e) { + // allow flag to say ignore or throw exception + } } @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.IP; } }, - INTEGER("integer") { + DATE("date") { @Override - public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { - int value = (int) v; + public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + LONG.addField(doc, name, value, fieldDef); + } + + @Override + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.DATE; + } + }, + BOOLEAN("boolean") { + final BytesRef TRUE = new BytesRef("T"); + final BytesRef FALSE = new BytesRef("F"); + + @Override + public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { + // Lucene has no native support for Booleans so store that field as a bytes ref. + boolean valueBool = (boolean) value; if (fieldDef.isIndexed) { - doc.add(new IntPoint(name, value)); + doc.add(new StringField(name, valueBool ? TRUE : FALSE, getStoreEnum(fieldDef.isStored))); } if (fieldDef.isStored) { - doc.add(new StoredField(name, value)); + doc.add(new StoredField(name, valueBool ? TRUE : FALSE)); } if (fieldDef.storeDocValue) { - doc.add(new NumericDocValuesField(name, value)); + // TODO: SortedNumericDocValuesField is a long. Need a smaller field type for this? + doc.add(new SortedNumericDocValuesField(name, valueBool ? 1 : 0)); } } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - return IntPoint.newExactQuery(field, Integer.parseInt(queryText)); - } - - @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.BOOLEAN; } }, - LONG("long") { + DOUBLE("double") { @Override public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { - long value = (long) v; + double value = (double) v; if (fieldDef.isIndexed) { - doc.add(new LongPoint(name, value)); + doc.add(new DoublePoint(name, value)); } if (fieldDef.isStored) { doc.add(new StoredField(name, value)); } if (fieldDef.storeDocValue) { - doc.add(new NumericDocValuesField(name, value)); + doc.add(new DoubleDocValuesField(name, value)); } } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - return LongPoint.newExactQuery(field, Long.parseLong(queryText)); - } - - @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.DOUBLE; } }, FLOAT("float") { @@ -174,68 +202,120 @@ public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDe } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - return FloatPoint.newExactQuery(field, Float.parseFloat(queryText)); - } - - @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.FLOAT; } }, - DOUBLE("double") { + HALF_FLOAT("half_float") { @Override public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { - double value = (double) v; + float value = (float) v; if (fieldDef.isIndexed) { - doc.add(new DoublePoint(name, value)); + doc.add(new HalfFloatPoint(name, value)); } if (fieldDef.isStored) { doc.add(new StoredField(name, value)); } if (fieldDef.storeDocValue) { - doc.add(new DoubleDocValuesField(name, value)); + doc.add( + new SortedNumericDocValuesField(name, HalfFloatPoint.halfFloatToSortableShort(value))); } } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - return DoublePoint.newExactQuery(field, Double.parseDouble(queryText)); + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.HALF_FLOAT; + } + }, + INTEGER("integer") { + @Override + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + int value = (int) v; + if (fieldDef.isIndexed) { + doc.add(new IntPoint(name, value)); + } + if (fieldDef.isStored) { + doc.add(new StoredField(name, value)); + } + if (fieldDef.storeDocValue) { + doc.add(new NumericDocValuesField(name, value)); + } } @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.INTEGER; } }, - BOOLEAN("boolean") { - final BytesRef TRUE = new BytesRef("T"); - final BytesRef FALSE = new BytesRef("F"); - + LONG("long") { @Override - public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { - // Lucene has no native support for Booleans so store that field as a bytes ref. - boolean valueBool = (boolean) value; + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + long value = (long) v; if (fieldDef.isIndexed) { - doc.add(new StringField(name, valueBool ? TRUE : FALSE, getStoreEnum(fieldDef.isStored))); + doc.add(new LongPoint(name, value)); } if (fieldDef.isStored) { - doc.add(new StoredField(name, valueBool ? TRUE : FALSE)); + doc.add(new StoredField(name, value)); } if (fieldDef.storeDocValue) { - doc.add(new SortedNumericDocValuesField(name, valueBool ? 1 : 0)); + doc.add(new NumericDocValuesField(name, value)); } } @Override - public Query termQuery(String field, String queryText, Analyzer analyzer) { - final Term term = new Term(field, queryText); - return new TermQuery(term); + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.LONG; + } + }, + SCALED_LONG("scaledlong") { + @Override + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + LONG.addField(doc, name, v, fieldDef); + } + + @Override + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.SCALED_LONG; + } + }, + SHORT("short") { + @Override + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + // TODO: + INTEGER.addField(doc, name, v, fieldDef); + } + + @Override + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.SHORT; + } + }, + BYTE("byte") { + @Override + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + INTEGER.addField(doc, name, v, fieldDef); + } + + @Override + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.BYTE; + } + }, + BINARY("binary") { + @Override + public void addField(Document doc, String name, Object v, LuceneFieldDef fieldDef) { + ByteString bytes = (ByteString) v; + if (fieldDef.isStored) { + doc.add(new StoredField(name, bytes.toByteArray())); + } + if (fieldDef.storeDocValue) { + doc.add(new BinaryFieldMapper.CustomBinaryDocValuesField(name, bytes.toByteArray())); + } } @Override - public Analyzer getAnalyzer(boolean quoted) { - return KEYWORD_ANALYZER; + public Schema.SchemaFieldType toSchemaFieldType() { + return Schema.SchemaFieldType.BINARY; } }; @@ -247,23 +327,52 @@ public Analyzer getAnalyzer(boolean quoted) { public abstract void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef); - public abstract Query termQuery(String field, String queryText, Analyzer analyzer); + public abstract Schema.SchemaFieldType toSchemaFieldType(); + + public static FieldType fromSchemaFieldType(Schema.SchemaFieldType schemaFieldType) { + FieldType fieldType; + switch (schemaFieldType) { + case TEXT -> fieldType = TEXT; + case STRING, KEYWORD -> fieldType = KEYWORD; + case ID -> fieldType = ID; + case IP -> fieldType = IP; + case DATE -> fieldType = DATE; + case BOOLEAN -> fieldType = BOOLEAN; + case DOUBLE -> fieldType = DOUBLE; + case FLOAT -> fieldType = FLOAT; + case HALF_FLOAT -> fieldType = HALF_FLOAT; + case INTEGER -> fieldType = INTEGER; + case LONG -> fieldType = LONG; + case SCALED_LONG -> fieldType = SCALED_LONG; + case SHORT -> fieldType = SHORT; + case BYTE -> fieldType = BYTE; + case BINARY -> fieldType = BINARY; + default -> + throw new IllegalArgumentException("Unknown schema field type: " + schemaFieldType); + } + return fieldType; + } - public abstract Analyzer getAnalyzer(boolean quoted); + public LuceneFieldDef getFieldDefinition( + String name, String fieldType, boolean isStored, boolean isIndexed, boolean storeDocValue) { + return new LuceneFieldDef(name, fieldType, isStored, isIndexed, storeDocValue); + } public String getName() { return name; } + public static boolean isTexty(FieldType fieldType) { + return fieldType == TEXT || fieldType == STRING || fieldType == KEYWORD; + } + @VisibleForTesting public static Object convertFieldValue(Object value, FieldType fromType, FieldType toType) { - if ((fromType == toType) - || (fromType == FieldType.TEXT && toType == FieldType.STRING) - || (fromType == FieldType.STRING && toType == FieldType.TEXT)) { + if ((fromType == toType) || FieldType.areTypeAliasedFieldTypes(fromType, toType)) { return value; } - if (fromType == FieldType.TEXT || fromType == FieldType.STRING) { + if (isTexty(fromType)) { if (toType == FieldType.INTEGER) { try { return Integer.valueOf((String) value); @@ -299,7 +408,7 @@ public static Object convertFieldValue(Object value, FieldType fromType, FieldTy // Int type if (fromType == FieldType.INTEGER) { - if (toType == FieldType.TEXT || toType == FieldType.STRING) { + if (isTexty(toType)) { return ((Integer) value).toString(); } if (toType == FieldType.LONG) { @@ -318,7 +427,7 @@ public static Object convertFieldValue(Object value, FieldType fromType, FieldTy // Long type if (fromType == FieldType.LONG) { - if (toType == FieldType.TEXT || toType == FieldType.STRING) { + if (isTexty(toType)) { return ((Long) value).toString(); } if (toType == FieldType.INTEGER) { @@ -337,7 +446,7 @@ public static Object convertFieldValue(Object value, FieldType fromType, FieldTy // Float type if (fromType == FieldType.FLOAT) { - if (toType == FieldType.TEXT || toType == FieldType.STRING) { + if (isTexty(toType)) { return value.toString(); } if (toType == FieldType.INTEGER) { @@ -356,7 +465,7 @@ public static Object convertFieldValue(Object value, FieldType fromType, FieldTy // Double type if (fromType == FieldType.DOUBLE) { - if (toType == FieldType.TEXT || toType == FieldType.STRING) { + if (isTexty(toType)) { return value.toString(); } if (toType == FieldType.INTEGER) { @@ -374,7 +483,7 @@ public static Object convertFieldValue(Object value, FieldType fromType, FieldTy } if (fromType == FieldType.BOOLEAN) { - if (toType == FieldType.TEXT || toType == FieldType.STRING) { + if (isTexty(toType)) { return value.toString(); } if (toType == FieldType.INTEGER) { @@ -400,7 +509,8 @@ private static Field.Store getStoreEnum(boolean isStored) { // Aliased Field Types are FieldTypes that can be considered as same type from a field conflict // detection perspective public static final List> ALIASED_FIELD_TYPES = - ImmutableList.of(ImmutableSet.of(FieldType.STRING, FieldType.TEXT, FieldType.ID)); + ImmutableList.of( + ImmutableSet.of(FieldType.STRING, FieldType.TEXT, FieldType.ID, FieldType.KEYWORD)); public static boolean areTypeAliasedFieldTypes(FieldType type1, FieldType type2) { for (Set s : ALIASED_FIELD_TYPES) { diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java index 9d4ece12bb..0a1104d5fc 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java @@ -3,26 +3,18 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; -import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Base64; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** A utility class that converts a Span into a LogMessage, Json map to Span */ public class SpanFormatter { - private static final Logger LOG = LoggerFactory.getLogger(SpanFormatter.class); public static final String DEFAULT_LOG_MESSAGE_TYPE = "INFO"; public static final String DEFAULT_INDEX_NAME = "unknown"; @@ -71,7 +63,7 @@ public static Trace.Span toSpan( tags.add( Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr(s) .build())); } @@ -87,7 +79,7 @@ public static Trace.Span toSpan( tags.add( Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr(serviceName) .build()); } @@ -103,72 +95,58 @@ public static Trace.KeyValue convertKVtoProto( try { switch (schema.getFieldsMap().get(key).getType()) { case KEYWORD -> { - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); tagBuilder.setVStr(value.toString()); } case TEXT -> { - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT); tagBuilder.setVStr(value.toString()); } case IP -> { - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.IP); tagBuilder.setVStr(value.toString()); } case DATE -> { - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.DATE); - tagBuilder.setVStr(value.toString()); + tagBuilder.setVInt64(Instant.parse(value.toString()).toEpochMilli()); } case BOOLEAN -> { - tagBuilder.setVType(Trace.ValueType.BOOL); tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN); tagBuilder.setVBool(Boolean.parseBoolean(value.toString())); } case DOUBLE -> { - tagBuilder.setVType(Trace.ValueType.FLOAT64); tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE); tagBuilder.setVFloat64(Double.parseDouble(value.toString())); } case FLOAT -> { - tagBuilder.setVType(Trace.ValueType.FLOAT32); tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT); tagBuilder.setVFloat32(Float.parseFloat(value.toString())); } case HALF_FLOAT -> { - tagBuilder.setVType(Trace.ValueType.FLOAT32); tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT); tagBuilder.setVFloat32(Float.parseFloat(value.toString())); } case INTEGER -> { - tagBuilder.setVType(Trace.ValueType.INT32); tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER); tagBuilder.setVInt32(Integer.parseInt(value.toString())); } case LONG -> { - tagBuilder.setVType(Trace.ValueType.INT64); tagBuilder.setFieldType(Schema.SchemaFieldType.LONG); tagBuilder.setVInt64(Long.parseLong(value.toString())); } case SCALED_LONG -> { - tagBuilder.setVType(Trace.ValueType.INT64); tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG); tagBuilder.setVInt64(Long.parseLong(value.toString())); } case SHORT -> { - tagBuilder.setVType(Trace.ValueType.INT32); tagBuilder.setFieldType(Schema.SchemaFieldType.SHORT); tagBuilder.setVInt32(Integer.parseInt(value.toString())); } case BYTE -> { - tagBuilder.setVType(Trace.ValueType.INT32); tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE); tagBuilder.setVInt32(Integer.parseInt(value.toString())); } case BINARY -> { - tagBuilder.setVType(Trace.ValueType.BINARY); tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY); tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes())); } @@ -176,7 +154,6 @@ public static Trace.KeyValue convertKVtoProto( return tagBuilder.build(); } catch (Exception e) { tagBuilder.setKey(STR."failed_\{key}"); - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); tagBuilder.setVStr(value.toString()); return tagBuilder.build(); @@ -190,31 +167,24 @@ public static Trace.KeyValue convertKVtoProto(String key, Object value) { Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder(); tagBuilder.setKey(key); if (value instanceof String) { - tagBuilder.setVType(Trace.ValueType.STRING); tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); tagBuilder.setVStr(value.toString()); } else if (value instanceof Boolean) { - tagBuilder.setVType(Trace.ValueType.BOOL); tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN); tagBuilder.setVBool((boolean) value); } else if (value instanceof Integer) { - tagBuilder.setVType(Trace.ValueType.INT32); tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER); tagBuilder.setVInt32((int) value); } else if (value instanceof Long) { - tagBuilder.setVType(Trace.ValueType.INT64); tagBuilder.setFieldType(Schema.SchemaFieldType.LONG); tagBuilder.setVInt64((long) value); } else if (value instanceof Float) { - tagBuilder.setVType(Trace.ValueType.FLOAT32); tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT); tagBuilder.setVFloat32((float) value); } else if (value instanceof Double) { - tagBuilder.setVType(Trace.ValueType.FLOAT64); tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE); tagBuilder.setVFloat64((double) value); } else if (value != null) { - tagBuilder.setVType(Trace.ValueType.BINARY); tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY); tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes())); } @@ -226,10 +196,6 @@ public static Trace.ListOfSpans fromMurronMessage(Murron.MurronMessage message) return Trace.ListOfSpans.parseFrom(message.getMessage()); } - public static String encodeBinaryTagValue(ByteString binaryTagValue) { - return Base64.getEncoder().encodeToString(binaryTagValue.toByteArray()); - } - /** * Determines if provided timestamp is a reasonable value, or is too far in the past/future for * use. This can happen when using user-provided timestamp (such as on a mobile client). @@ -248,102 +214,4 @@ public static boolean isValidTimestamp(Instant timestamp) { } return true; } - - // TODO: Make this function more memory efficient? - public static LogMessage toLogMessage(Trace.Span span) { - if (span == null) return null; - - Map jsonMap = new HashMap<>(); - - String id = span.getId().toStringUtf8(); - - // Set these fields even if they are empty so we can always search these fields. - jsonMap.put(LogMessage.ReservedField.PARENT_ID.fieldName, span.getParentId().toStringUtf8()); - jsonMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, span.getTraceId().toStringUtf8()); - jsonMap.put(LogMessage.ReservedField.NAME.fieldName, span.getName()); - jsonMap.put( - LogMessage.ReservedField.DURATION_MS.fieldName, - Duration.of(span.getDuration(), ChronoUnit.MICROS).toMillis()); - - // TODO: Use a microsecond resolution, instead of millisecond resolution. - Instant timestamp = Instant.ofEpochMilli(span.getTimestamp() / 1000); - if (!isValidTimestamp(timestamp)) { - // we use an attribute here so we can find the offending logs easily - jsonMap.put(LogMessage.ReservedField.KALDB_INVALID_TIMESTAMP.fieldName, span.getTimestamp()); - // set the timestamp to ingest time - timestamp = Instant.now(); - } - - String indexName = ""; - String msgType = DEFAULT_LOG_MESSAGE_TYPE; - for (Trace.KeyValue tag : span.getTagsList()) { - String key = tag.getKey(); - int valueType = tag.getVType().getNumber(); - if (valueType == 0) { - if (key.equals(LogMessage.ReservedField.TYPE.fieldName)) { - msgType = tag.getVStr(); - continue; - } - jsonMap.put(key, tag.getVStr()); - if (key.equals(LogMessage.ReservedField.SERVICE_NAME.fieldName)) { - indexName = tag.getVStr(); - // Also, add service name to the map so can search by service name also. - jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName); - } - } else if (valueType == 1) { - jsonMap.put(key, tag.getVBool()); - } else if (valueType == 2) { - jsonMap.put(key, tag.getVInt64()); - } else if (valueType == 3) { - jsonMap.put(key, tag.getVFloat64()); - } else if (valueType == 4) { - jsonMap.put(key, tag.getVBinary().toStringUtf8()); - } else if (valueType == 5) { - jsonMap.put(key, tag.getVInt32()); - } else if (valueType == 6) { - jsonMap.put(key, tag.getVFloat32()); - } else { - LOG.warn("Skipping field with unknown value type {} with key {}", valueType, key); - } - } - - if (indexName.isEmpty()) { - indexName = DEFAULT_INDEX_NAME; - } - jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName); - - // This logging is in place to debug span parsing exceptions. Once this pipeline - // is more stable remove this code. - // try { - // return LogMessage.fromWireMessage(new LogWireMessage(indexName, msgType, id, jsonMap)); - // } catch (Exception e) { - // try { - // LOG.info( - // "span conversion failed: " - // + JsonFormat.printer() - // .includingDefaultValueFields() - // .omittingInsignificantWhitespace() - // .print(span)); - // } catch (InvalidProtocolBufferException invalidProtocolBufferException) { - // invalidProtocolBufferException.printStackTrace(); - // } - // throw e; - // } - - // Drop the type field from LogMessage since with spans it doesn't make sense. - return LogMessage.fromWireMessage( - new LogWireMessage(indexName, msgType, id, timestamp, jsonMap)); - } - - // TODO: For now assuming that the tags in ListOfSpans is empty. Handle this case in future. - public static List toLogMessage(Trace.ListOfSpans protoSpans) { - if (protoSpans == null) return Collections.EMPTY_LIST; - - List spans = protoSpans.getSpansList(); - List messages = new ArrayList<>(spans.size()); - for (Trace.Span span : spans) { - messages.add(toLogMessage(span)); - } - return messages; - } } diff --git a/kaldb/src/main/proto/trace.proto b/kaldb/src/main/proto/trace.proto index 6de48da97e..c78f7a7312 100644 --- a/kaldb/src/main/proto/trace.proto +++ b/kaldb/src/main/proto/trace.proto @@ -10,25 +10,8 @@ option go_package = "com.slack/kaldb/gen/proto/tracepb"; import "google/protobuf/empty.proto"; import "schema.proto"; -// The KeyValue message defines a key and value pair. -// The key is always a string. The value for the field is determined by the ValueType. -// If the ValueType is STRING the field v_str should be set, for BOOL v_bool is used etc.. -// Only the type of valueType is used. Rest of the fields are ignored even if they are set. -// So, of v_type is ValueType.STRING, only the value v_str is used. Rest of the fields are ignored. -// We chose not to use OneOf field, since it's JSON encoding is not as straight forward. -enum ValueType { - STRING = 0; - BOOL = 1; - INT64 = 2; - FLOAT64 = 3; - BINARY = 4; - INT32 = 5; - FLOAT32 = 6; -}; - message KeyValue { string key = 1; - ValueType v_type = 2; string v_str = 3; bool v_bool = 4; int64 v_int64 = 5; diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index 596cf85227..6189c3c7bd 100644 --- a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -21,7 +21,7 @@ public class BulkApiRequestParserTest { - private byte[] getRawQueryBytes(String filename) throws IOException { + public static byte[] getIndexRequestBytes(String filename) throws IOException { return Resources.toString( Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)), Charset.defaultCharset()) @@ -30,7 +30,7 @@ private byte[] getRawQueryBytes(String filename) throws IOException { @Test public void testSimpleIndexRequest() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_simple_with_ts"); + byte[] rawRequest = getIndexRequestBytes("index_simple_with_ts"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); @@ -59,7 +59,7 @@ public void testSimpleIndexRequest() throws Exception { @Test public void testIndexNoFields() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_no_fields"); + byte[] rawRequest = getIndexRequestBytes("index_no_fields"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); @@ -83,7 +83,7 @@ public void testIndexNoFields() throws Exception { @Test public void testIndexNoFieldsNoId() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_no_fields_no_id"); + byte[] rawRequest = getIndexRequestBytes("index_no_fields_no_id"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); @@ -107,7 +107,7 @@ public void testIndexNoFieldsNoId() throws Exception { @Test public void testIndexEmptyRequest() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_empty_request"); + byte[] rawRequest = getIndexRequestBytes("index_empty_request"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); @@ -119,14 +119,14 @@ public void testIndexEmptyRequest() throws Exception { @Test public void testOtherBulkRequests() throws Exception { - byte[] rawRequest = getRawQueryBytes("non_index"); + byte[] rawRequest = getIndexRequestBytes("non_index"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(0); } @Test public void testIndexRequestWithSpecialChars() throws Exception { - byte[] rawRequest = getRawQueryBytes("index_request_with_special_chars"); + byte[] rawRequest = getIndexRequestBytes("index_request_with_special_chars"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); Map> indexDocs = @@ -149,7 +149,7 @@ public void testIndexRequestWithSpecialChars() throws Exception { @Test public void testBulkRequests() throws Exception { - byte[] rawRequest = getRawQueryBytes("bulk_requests"); + byte[] rawRequest = getIndexRequestBytes("bulk_requests"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(2); @@ -189,7 +189,7 @@ public void testBulkRequests() throws Exception { @Test public void testUpdatesAgainstTwoIndexes() throws Exception { - byte[] rawRequest = getRawQueryBytes("two_indexes"); + byte[] rawRequest = getIndexRequestBytes("two_indexes"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(2); @@ -206,7 +206,7 @@ public void testUpdatesAgainstTwoIndexes() throws Exception { @Test public void testSchemaFieldForTags() throws IOException { - byte[] rawRequest = getRawQueryBytes("index_simple"); + byte[] rawRequest = getIndexRequestBytes("index_simple"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); @@ -228,14 +228,12 @@ public void testSchemaFieldForTags() throws IOException { span.getTagsList().stream().filter(keyValue -> keyValue.getKey().equals("field1")).toList(); assertThat(field1Def.size()).isEqualTo(1); assertThat(field1Def.getFirst().getVStr()).isEqualTo("value1"); - assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); field1Def = span.getTagsList().stream().filter(keyValue -> keyValue.getKey().equals("field2")).toList(); assertThat(field1Def.size()).isEqualTo(1); assertThat(field1Def.getFirst().getVStr()).isEqualTo("value2"); - assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT); field1Def = @@ -244,13 +242,12 @@ public void testSchemaFieldForTags() throws IOException { .toList(); assertThat(field1Def.size()).isEqualTo(1); assertThat(field1Def.getFirst().getVStr()).isEqualTo("test"); - assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); } @Test public void testTraceSpanGeneratedTimestamp() throws IOException { - byte[] rawRequest = getRawQueryBytes("index_simple"); + byte[] rawRequest = getIndexRequestBytes("index_simple"); List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java index 5008f943c8..5b7421ec7c 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java @@ -33,6 +33,7 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.SpanUtil; import com.slack.service.murron.trace.Trace; @@ -505,7 +506,7 @@ public void testAddInvalidMessagesToChunk() { Trace.KeyValue.newBuilder() .setVInt32(123) .setKey(LogMessage.ReservedField.MESSAGE.fieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build()) .build(); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java index 7dfe1b37fe..4e1552cccd 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java @@ -30,6 +30,7 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.SpanUtil; import com.slack.service.murron.trace.Trace; @@ -499,7 +500,7 @@ public void testAddInvalidMessagesToChunk() { Trace.KeyValue.newBuilder() .setVInt32(123) .setKey(LogMessage.ReservedField.MESSAGE.fieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build()) .build(); chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java index db7225a943..d322bf24ec 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java @@ -57,6 +57,7 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.proto.service.KaldbSearch; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.kaldb.testlib.MessageUtil; @@ -744,7 +745,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { Trace.KeyValue.newBuilder() .setVInt32(20000) .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build()) .build(); chunkManager.addMessage( @@ -1446,7 +1447,7 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { Trace.KeyValue schemaTestTag = Trace.KeyValue.newBuilder() .setKey("schemaTest") - .setVType(Trace.ValueType.BOOL) + .setFieldType(Schema.SchemaFieldType.BOOLEAN) .setVBool(true) .build(); Trace.Span logMessage = diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java index 6c1ff485bc..41e43f51e2 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java @@ -37,6 +37,7 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.SpanUtil; @@ -375,7 +376,7 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { Trace.KeyValue.newBuilder() .setVInt32(20000) .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); Trace.Span msg100 = SpanUtil.makeSpan(100, "Message100", Instant.now(), List.of(conflictTag)); chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset); diff --git a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java index 4ecacc9589..b9eb180fb1 100644 --- a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java @@ -1,6 +1,12 @@ package com.slack.kaldb.elasticsearchApi; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParserTest.getIndexRequestBytes; +import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER; +import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; +import static com.slack.kaldb.testlib.MetricsUtil.getCount; +import static com.slack.kaldb.writer.LogMessageWriterImplTest.consumerRecordWithValue; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -16,16 +22,21 @@ import com.google.common.io.Resources; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpResponse; +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.kaldb.chunkManager.IndexingChunkManager; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.search.KaldbLocalQueryService; +import com.slack.kaldb.metadata.schema.SchemaUtil; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.proto.service.KaldbSearch; import com.slack.kaldb.server.KaldbQueryServiceBase; import com.slack.kaldb.testlib.ChunkManagerUtil; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.kaldb.testlib.SpanUtil; +import com.slack.kaldb.writer.LogMessageWriterImpl; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -33,13 +44,17 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.assertj.core.data.Offset; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ingest.IngestDocument; @SuppressWarnings("UnstableApiUsage") public class ElasticsearchApiServiceTest { @@ -85,6 +100,46 @@ public void tearDown() throws TimeoutException, IOException { metricsRegistry.close(); } + @Test + public void testSchemaFields() throws Exception { + final File schemaFile = + new File(getClass().getClassLoader().getResource("schema/test_schema.yaml").getFile()); + Schema.IngestSchema schema = SchemaUtil.parseSchema(schemaFile.toPath()); + + byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + assertThat(indexRequests.size()).isEqualTo(2); + + for (IndexRequest indexRequest : indexRequests) { + IngestDocument ingestDocument = convertRequestToDocument(indexRequest); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument, schema); + ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); + assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); + } + + assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); + assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); + chunkManagerUtil.chunkManager.getActiveChunk().commit(); + + HttpResponse response = + elasticsearchApiService.mapping( + Optional.of("test"), Optional.of(0L), Optional.of(Long.MAX_VALUE)); + + // handle response + AggregatedHttpResponse aggregatedRes = response.aggregate().join(); + String body = aggregatedRes.content(StandardCharsets.UTF_8); + JsonNode jsonNode = new ObjectMapper().readTree(body); + assertThat(jsonNode).isNotNull(); + + ObjectMapper objectMapper = new ObjectMapper(); + Map map = + objectMapper.convertValue( + jsonNode.get("test").get("mappings").get("properties"), Map.class); + assertThat(map).isNotNull(); + assertThat(map.size()).isEqualTo(30); + } + // todo - test mapping @Test public void testResultsAreReturnedForValidQuery() throws Exception { diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LogMessageTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LogMessageTest.java index 7175804c4b..fc7def5e17 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LogMessageTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LogMessageTest.java @@ -27,8 +27,8 @@ public void testSystemField() { @Test public void testReservedField() { - assertThat(ReservedField.values().length).isEqualTo(13); - assertThat(ReservedField.reservedFieldNames.size()).isEqualTo(13); + assertThat(ReservedField.values().length).isEqualTo(14); + assertThat(ReservedField.reservedFieldNames.size()).isEqualTo(14); assertThat(ReservedField.isReservedField("hostname")).isTrue(); for (LogMessage.ReservedField f : LogMessage.ReservedField.values()) { assertThat(f.name().toLowerCase()).isEqualTo(f.fieldName); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 1bea0d45f4..17858a1cbf 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -27,6 +27,7 @@ import com.slack.kaldb.logstore.search.LogIndexSearcherImpl; import com.slack.kaldb.logstore.search.SearchResult; import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.SpanUtil; import com.slack.kaldb.testlib.TemporaryLogStoreAndSearcherExtension; @@ -87,25 +88,25 @@ public void testSearchAndQueryDocsWithNestedJson() { Trace.KeyValue.newBuilder() .setVStr("Test message") .setKey("message") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .addTags( Trace.KeyValue.newBuilder() .setVStr("duplicate1") .setKey("duplicateproperty") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .addTags( Trace.KeyValue.newBuilder() .setVStr("value1") .setKey("nested.key1") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .addTags( Trace.KeyValue.newBuilder() .setVStr("2") .setKey("nested.duplicateproperty") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .build(); logStore.logStore.addMessage(span); @@ -177,7 +178,7 @@ public void testIndexDocsWithTypeMismatchErrors() { Trace.KeyValue.newBuilder() .setKey(ReservedField.HOSTNAME.fieldName) .setVInt32(1) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); logStore.logStore.addMessage( SpanUtil.makeSpan(100, "test", Instant.now(), List.of(wrongField))); @@ -209,7 +210,7 @@ public void failIndexingDocsWithMismatchedTypeErrors() { Trace.KeyValue wrongField = Trace.KeyValue.newBuilder() .setKey(ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(20000) .build(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/FieldConflictStrategyTests.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/FieldConflictStrategyTests.java index ff8bceae62..ef6355c680 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/FieldConflictStrategyTests.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/FieldConflictStrategyTests.java @@ -13,6 +13,7 @@ import com.slack.kaldb.logstore.FieldDefMismatchException; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.metadata.schema.FieldType; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.SpanUtil; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -50,28 +51,28 @@ public void testFieldConflictNumericValues() throws JsonProcessingException { Trace.KeyValue hostField = Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("host1-dc2.abc.com") .build(); Trace.KeyValue tagField = Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.TAG.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("foo-bar") .build(); Trace.KeyValue conflictingTagStr = Trace.KeyValue.newBuilder() .setKey(conflictingFieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("1") .build(); Trace.KeyValue conflictingTagInt = Trace.KeyValue.newBuilder() .setKey(conflictingFieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(1) .build(); @@ -125,35 +126,35 @@ public void testFieldConflictBooleanValues() throws JsonProcessingException { Trace.KeyValue hostField = Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.HOSTNAME.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("host1-dc2.abc.com") .build(); Trace.KeyValue tagField = Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.TAG.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("foo-bar") .build(); Trace.KeyValue conflictingTagBool = Trace.KeyValue.newBuilder() .setKey(conflictingFieldName) - .setVType(Trace.ValueType.BOOL) + .setFieldType(Schema.SchemaFieldType.BOOLEAN) .setVBool(true) .build(); Trace.KeyValue conflictingTagStr = Trace.KeyValue.newBuilder() .setKey(conflictingFieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("random") .build(); Trace.KeyValue conflictingTagInt = Trace.KeyValue.newBuilder() .setKey(conflictingFieldName) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(1) .build(); @@ -213,7 +214,7 @@ public void testFieldConflictBooleanValues() throws JsonProcessingException { msg2Doc = convertAndDuplicateFieldDocBuilder.fromMessage(doc2); assertThat(msg2Doc.getFields().size()).isEqualTo(31); - String additionalCreatedFieldName = makeNewFieldOfType(conflictingFieldName, FieldType.STRING); + String additionalCreatedFieldName = makeNewFieldOfType(conflictingFieldName, FieldType.KEYWORD); // Value converted and new field is added. assertThat(getFieldCount(msg2Doc, Set.of(conflictingFieldName, additionalCreatedFieldName))) .isEqualTo(4); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java deleted file mode 100644 index 8b13789179..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java +++ /dev/null @@ -1 +0,0 @@ - diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index b33fd62788..426e810780 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -24,6 +24,7 @@ import com.slack.kaldb.logstore.search.aggregations.MovingAvgAggBuilder; import com.slack.kaldb.logstore.search.aggregations.SumAggBuilder; import com.slack.kaldb.logstore.search.aggregations.TermsAggBuilder; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.SpanUtil; import com.slack.kaldb.testlib.TemporaryLogStoreAndSearcherExtension; import com.slack.service.murron.trace.Trace; @@ -259,7 +260,7 @@ public void testAllQueryWithFullTextSearchEnabled() { Trace.KeyValue.newBuilder() .setVStr("value") .setKey("customField") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build(); strictLogStore.logStore.addMessage(SpanUtil.makeSpan(1, "apple", time, List.of(customField))); @@ -307,7 +308,7 @@ public void testAllQueryWithFullTextSearchDisabled() { Trace.KeyValue.newBuilder() .setVStr("value") .setKey("customField") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build(); strictLogStoreWithoutFts.logStore.addMessage( SpanUtil.makeSpan(1, "apple", time, List.of(customField))); @@ -355,13 +356,13 @@ public void testExistsQuery() { Trace.KeyValue.newBuilder() .setVStr("value") .setKey("customField") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build(); Trace.KeyValue customField1 = Trace.KeyValue.newBuilder() .setVStr("value") .setKey("customField1") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build(); strictLogStore.logStore.addMessage(SpanUtil.makeSpan(1, "apple", time, List.of(customField))); strictLogStore.logStore.addMessage(SpanUtil.makeSpan(2, "apple", time, List.of(customField1))); @@ -409,19 +410,19 @@ public void testRangeQuery() { Trace.KeyValue valTag1 = Trace.KeyValue.newBuilder() .setKey("val") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(1) .build(); Trace.KeyValue valTag2 = Trace.KeyValue.newBuilder() .setKey("val") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(2) .build(); Trace.KeyValue valTag3 = Trace.KeyValue.newBuilder() .setKey("val") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .setVInt32(3) .build(); strictLogStore.logStore.addMessage(SpanUtil.makeSpan(1, "apple", time, List.of(valTag1))); @@ -461,35 +462,35 @@ public void testQueryParsingFieldTypes() { Trace.KeyValue.newBuilder() .setVBool(true) .setKey("boolval") - .setVType(Trace.ValueType.BOOL) + .setFieldType(Schema.SchemaFieldType.BOOLEAN) .build(); Trace.KeyValue intTag = Trace.KeyValue.newBuilder() .setVInt32(1) .setKey("intval") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); Trace.KeyValue longTag = Trace.KeyValue.newBuilder() .setVInt64(2L) .setKey("longval") - .setVType(Trace.ValueType.INT64) + .setFieldType(Schema.SchemaFieldType.LONG) .build(); Trace.KeyValue floatTag = Trace.KeyValue.newBuilder() .setVFloat32(3F) .setKey("floatval") - .setVType(Trace.ValueType.FLOAT32) + .setFieldType(Schema.SchemaFieldType.FLOAT) .build(); Trace.KeyValue doubleTag = Trace.KeyValue.newBuilder() .setVFloat64(4D) .setKey("doubleval") - .setVType(Trace.ValueType.FLOAT64) + .setFieldType(Schema.SchemaFieldType.DOUBLE) .build(); Trace.Span span = @@ -1032,7 +1033,7 @@ public void testFullTextSearch() { Trace.KeyValue.newBuilder() .setVInt32(1234) .setKey("field1") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); strictLogStore.logStore.addMessage(SpanUtil.makeSpan(1, "apple", time, List.of(customField))); @@ -1280,7 +1281,7 @@ public void testDisabledFullTextSearch() { Trace.KeyValue.newBuilder() .setVInt32(1234) .setKey("field1") - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build(); strictLogStoreWithoutFts.logStore.addMessage( diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultUtilsTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultUtilsTest.java index ae9f10bc16..a2b24477f3 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultUtilsTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultUtilsTest.java @@ -386,11 +386,11 @@ public void shouldConvertSchemaDefinitionToFromProto() { KaldbSearch.SchemaDefinition.newBuilder() .setType(Schema.SchemaFieldType.STRING) .build())) - .isEqualTo(FieldType.STRING); + .isEqualTo(FieldType.KEYWORD); assertThat(SearchResultUtils.toSchemaDefinitionProto(FieldType.STRING)) .isEqualTo( KaldbSearch.SchemaDefinition.newBuilder() - .setType(Schema.SchemaFieldType.STRING) + .setType(Schema.SchemaFieldType.KEYWORD) .build()); assertThat( diff --git a/kaldb/src/test/java/com/slack/kaldb/schema/SpanTagFormatterWithSchemaTest.java b/kaldb/src/test/java/com/slack/kaldb/schema/SpanFormatterWithSchemaTest.java similarity index 51% rename from kaldb/src/test/java/com/slack/kaldb/schema/SpanTagFormatterWithSchemaTest.java rename to kaldb/src/test/java/com/slack/kaldb/schema/SpanFormatterWithSchemaTest.java index 9762e07879..39b3cbf3ac 100644 --- a/kaldb/src/test/java/com/slack/kaldb/schema/SpanTagFormatterWithSchemaTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/schema/SpanFormatterWithSchemaTest.java @@ -1,36 +1,59 @@ package com.slack.kaldb.schema; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.fromIngestDocument; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParserTest.getIndexRequestBytes; import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER; import static com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl.build; import static com.slack.kaldb.testlib.MetricsUtil.getCount; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Fail.fail; import com.google.common.io.Files; import com.google.protobuf.ByteString; +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.kaldb.logstore.LogStore; import com.slack.kaldb.logstore.LuceneIndexStoreConfig; import com.slack.kaldb.logstore.LuceneIndexStoreImpl; import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; +import com.slack.kaldb.metadata.schema.FieldType; +import com.slack.kaldb.metadata.schema.SchemaUtil; import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.File; import java.time.Duration; +import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoubleDocValuesField; +import org.apache.lucene.document.FloatDocValuesField; +import org.apache.lucene.document.InetAddressPoint; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.sandbox.document.HalfFloatPoint; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.AssertionsForClassTypes; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ingest.IngestDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SpanTagFormatterWithSchemaTest { +public class SpanFormatterWithSchemaTest { - private static final Logger LOG = LoggerFactory.getLogger(SpanTagFormatterWithSchemaTest.class); + private static final Logger LOG = LoggerFactory.getLogger(SpanFormatterWithSchemaTest.class); static Schema.IngestSchema schema; @@ -104,110 +127,152 @@ public static void initializeSchema() { schema = Schema.IngestSchema.newBuilder().putAllFields(fields).build(); } + @Test + public void parseIndexRequestToTraceProtoTest() throws Exception { + final File schemaFile = + new File(getClass().getClassLoader().getResource("schema/test_schema.yaml").getFile()); + Schema.IngestSchema schema = SchemaUtil.parseSchema(schemaFile.toPath()); + + byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + AssertionsForClassTypes.assertThat(indexRequests.size()).isEqualTo(2); + + Trace.Span doc1 = + BulkApiRequestParser.fromIngestDocument( + convertRequestToDocument(indexRequests.get(0)), schema); + Trace.Span doc2 = + BulkApiRequestParser.fromIngestDocument( + convertRequestToDocument(indexRequests.get(1)), schema); + assertThat(doc1.getId().toStringUtf8()).isEqualTo("1"); + assertThat(doc2.getId().toStringUtf8()).isEqualTo("2"); + + String[] fields = + new String[] { + "host", + "message", + "ip", + "my_date", + "success", + "cost", + "amount", + "amount_half_float", + "value", + "count", + "count_scaled_long", + "count_short", + "bucket" + }; + List doc1DefinedFields = new ArrayList<>(Arrays.stream(fields).toList()); + List doc1Tags = + doc1.getTagsList().stream().map(Trace.KeyValue::getKey).collect(Collectors.toList()); + + assertThat(doc1DefinedFields.removeAll(doc1Tags)).isEqualTo(true); + assertThat(doc1DefinedFields.size()).isEqualTo(0); + + // reset + doc1DefinedFields = new ArrayList<>(Arrays.stream(fields).toList()); + doc1Tags = doc1.getTagsList().stream().map(Trace.KeyValue::getKey).collect(Collectors.toList()); + + assertThat(doc1Tags.removeAll(doc1DefinedFields)).isEqualTo(true); + assertThat(doc1Tags.size()).isEqualTo(1); + assertThat(doc1Tags.get(0)).isEqualTo("service_name"); + + List doc2Tags = + doc2.getTagsList().stream().map(Trace.KeyValue::getKey).collect(Collectors.toList()); + assertThat(doc2Tags.size()).isEqualTo(3); // service_name and ip + assertThat(doc2Tags.contains("service_name")).isEqualTo(true); + assertThat(doc2Tags.contains("ip")).isEqualTo(true); + assertThat(doc2Tags.contains("username")).isEqualTo(true); + assertThat(doc2.getParentId().toStringUtf8()).isEqualTo("1"); + assertThat(doc2.getTraceId().toStringUtf8()).isEqualTo("2"); + assertThat(doc2.getName()).isEqualTo("check"); + assertThat(doc2.getDuration()).isEqualTo(20000L); + } + @Test public void testSimpleSchema() { Trace.KeyValue kv = SpanFormatter.convertKVtoProto("host", "host1", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("host1"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); kv = SpanFormatter.convertKVtoProto("message", "my message", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("my message"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT); kv = SpanFormatter.convertKVtoProto("ip", "8.8.8.8", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("8.8.8.8"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.IP); - kv = SpanFormatter.convertKVtoProto("myTimestamp", "2021-01-01T00:00:00Z", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); - assertThat(kv.getVStr()).isEqualTo("2021-01-01T00:00:00Z"); + String myTimestamp = "2021-01-01T00:00:00Z"; + kv = SpanFormatter.convertKVtoProto("myTimestamp", myTimestamp, schema); + assertThat(kv.getVInt64()).isEqualTo(Instant.parse(myTimestamp).toEpochMilli()); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DATE); kv = SpanFormatter.convertKVtoProto("success", "true", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); assertThat(kv.getVBool()).isEqualTo(true); kv = SpanFormatter.convertKVtoProto("success", true, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); assertThat(kv.getVBool()).isEqualTo(true); kv = SpanFormatter.convertKVtoProto("cost", "10.0", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); assertThat(kv.getVFloat64()).isEqualTo(10.0); kv = SpanFormatter.convertKVtoProto("cost", 10.0, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); assertThat(kv.getVFloat64()).isEqualTo(10.0); kv = SpanFormatter.convertKVtoProto("amount", "10.0", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); assertThat(kv.getVFloat32()).isEqualTo(10.0f); kv = SpanFormatter.convertKVtoProto("amount", 10.0, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); assertThat(kv.getVFloat32()).isEqualTo(10.0f); kv = SpanFormatter.convertKVtoProto("amount_half_float", "10.0", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); assertThat(kv.getVFloat32()).isEqualTo(10.0f); kv = SpanFormatter.convertKVtoProto("amount_half_float", 10.0, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); assertThat(kv.getVFloat32()).isEqualTo(10.0f); kv = SpanFormatter.convertKVtoProto("value", "10", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); assertThat(kv.getVInt32()).isEqualTo(10); kv = SpanFormatter.convertKVtoProto("value", 10, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); assertThat(kv.getVInt32()).isEqualTo(10); kv = SpanFormatter.convertKVtoProto("count", "10", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); assertThat(kv.getVInt64()).isEqualTo(10L); kv = SpanFormatter.convertKVtoProto("count", 10, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); assertThat(kv.getVInt64()).isEqualTo(10L); kv = SpanFormatter.convertKVtoProto("count_scaled_long", "10", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SCALED_LONG); assertThat(kv.getVInt64()).isEqualTo(10); kv = SpanFormatter.convertKVtoProto("count_scaled_long", 10, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SCALED_LONG); assertThat(kv.getVInt64()).isEqualTo(10L); kv = SpanFormatter.convertKVtoProto("count_short", "10", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SHORT); assertThat(kv.getVInt32()).isEqualTo(10L); kv = SpanFormatter.convertKVtoProto("count_short", 10, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SHORT); assertThat(kv.getVInt32()).isEqualTo(10); kv = SpanFormatter.convertKVtoProto("bucket", "e30=", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BINARY); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BINARY); assertThat(kv.getVBinary().toStringUtf8()).isEqualTo("e30="); } @@ -215,49 +280,41 @@ public void testSimpleSchema() { @Test public void testKeyValueWithWrongValues() { Trace.KeyValue kv = SpanFormatter.convertKVtoProto("success", "notBoolean", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); assertThat(kv.getVBool()).isEqualTo(false); kv = SpanFormatter.convertKVtoProto("cost", "hello", schema); assertThat(kv.getKey()).isEqualTo("failed_cost"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("hello"); kv = SpanFormatter.convertKVtoProto("amount", "hello", schema); assertThat(kv.getKey()).isEqualTo("failed_amount"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("hello"); kv = SpanFormatter.convertKVtoProto("amount_half_float", "half_float_value", schema); assertThat(kv.getKey()).isEqualTo("failed_amount_half_float"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("half_float_value"); kv = SpanFormatter.convertKVtoProto("value", "int_value", schema); assertThat(kv.getKey()).isEqualTo("failed_value"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("int_value"); kv = SpanFormatter.convertKVtoProto("count", "long_value", schema); assertThat(kv.getKey()).isEqualTo("failed_count"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("long_value"); kv = SpanFormatter.convertKVtoProto("count_scaled_long", "scaled_long_val", schema); assertThat(kv.getKey()).isEqualTo("failed_count_scaled_long"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("scaled_long_val"); kv = SpanFormatter.convertKVtoProto("count_short", "my_short-Val", schema); assertThat(kv.getKey()).isEqualTo("failed_count_short"); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("my_short-Val"); } @@ -267,56 +324,45 @@ public void testSimpleWithoutSchema() { Schema.IngestSchema schema = Schema.IngestSchema.getDefaultInstance(); Trace.KeyValue kv = SpanFormatter.convertKVtoProto("host", "host1", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("host1"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); kv = SpanFormatter.convertKVtoProto("ip", "8.8.8.8", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("8.8.8.8"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); kv = SpanFormatter.convertKVtoProto("myTimestamp", "2021-01-01T00:00:00Z", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getVStr()).isEqualTo("2021-01-01T00:00:00Z"); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); kv = SpanFormatter.convertKVtoProto("success", "true", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("true"); kv = SpanFormatter.convertKVtoProto("success", true, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); assertThat(kv.getVBool()).isEqualTo(true); kv = SpanFormatter.convertKVtoProto("cost", "10.0", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("10.0"); kv = SpanFormatter.convertKVtoProto("amount", 10.0f, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); assertThat(kv.getVFloat32()).isEqualTo(10.0f); kv = SpanFormatter.convertKVtoProto("cost", 10.0, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); assertThat(kv.getVFloat64()).isEqualTo(10.0); kv = SpanFormatter.convertKVtoProto("value", 10, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); kv = SpanFormatter.convertKVtoProto("count", 10L, schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); assertThat(kv.getVInt64()).isEqualTo(10L); kv = SpanFormatter.convertKVtoProto("bucket", "e30=", schema); - assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); assertThat(kv.getVStr()).isEqualTo("e30="); } @@ -331,7 +377,6 @@ public void testDuplicateFieldAsTag() { Trace.KeyValue.newBuilder() .setKey("name") .setVStr("service2") - .setVType(Trace.ValueType.STRING) .setFieldType(Schema.SchemaFieldType.KEYWORD)) .build(); @@ -347,17 +392,170 @@ public void testDuplicateFieldAsTag() { Trace.KeyValue.newBuilder() .setKey("tag1") .setVStr("value1") - .setVType(Trace.ValueType.STRING) .setFieldType(Schema.SchemaFieldType.KEYWORD)) .addTags( Trace.KeyValue.newBuilder() .setKey("tag1") .setVStr("value1") - .setVType(Trace.ValueType.STRING) .setFieldType(Schema.SchemaFieldType.KEYWORD)) .build(); logStore.addMessage(span); assertThat(getCount(MESSAGES_FAILED_COUNTER, meterRegistry)).isEqualTo(0); } + + @Test + public void testTraceProtoToLuceneDocumentTest() throws Exception { + final File schemaFile = + new File(getClass().getClassLoader().getResource("schema/test_schema.yaml").getFile()); + Schema.IngestSchema schema = SchemaUtil.parseSchema(schemaFile.toPath()); + + byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + assertThat(indexRequests.size()).isEqualTo(2); + IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0)); + + Trace.Span span = fromIngestDocument(ingestDocument, schema); + assertThat(span.getTagsCount()).isEqualTo(14); + Map tags = + span.getTagsList().stream() + .map(kv -> Map.entry(kv.getKey(), kv)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + assertThat(tags.get("amount").getVFloat32()).isEqualTo(1.1f); + assertThat(tags.get("cost").getVFloat64()).isEqualTo(4.0); + assertThat(tags.get("ip").getVStr()).isEqualTo("192.168.1.1"); + assertThat(tags.get("count").getVInt64()).isEqualTo(3); + assertThat(tags.get("count_short").getVInt32()).isEqualTo(10); + assertThat(tags.get("my_date").getVInt64()) + .isEqualTo(Instant.parse("2014-09-01T12:00:00Z").toEpochMilli()); + assertThat(tags.get("bucket").getVInt32()).isEqualTo(20); + assertThat(tags.get("success").getVBool()).isEqualTo(true); + assertThat(tags.get("count_scaled_long").getVInt64()).isEqualTo(80); + assertThat(tags.get("host").getVStr()).isEqualTo("host1"); + assertThat(tags.get("amount_half_float").getVFloat32()).isEqualTo(1.2f); + assertThat(tags.get("value").getVInt32()).isEqualTo(42); + assertThat(tags.get("service_name").getVStr()).isEqualTo("test"); + assertThat(tags.get("message").getVStr()).isEqualTo("foo bar"); + + SchemaAwareLogDocumentBuilderImpl dropFieldBuilder = + build( + SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.DROP_FIELD, true, meterRegistry); + Document luceneDocument = dropFieldBuilder.fromMessage(span); + // message is a tag, but is a TEXT field in schema, so it is indexed and not doc values + // 13 tags X 2(DV and indexed) + (message,_id,_timesinceepoch,type,_index) x2 + _source + _all + + assertThat(luceneDocument.getFields().size()).isEqualTo(37); + + for (Map.Entry keyAndTag : tags.entrySet()) { + String key = keyAndTag.getKey(); + Trace.KeyValue tag = keyAndTag.getValue(); + // Purposely against FieldType to ensure that conversion also works as expected + FieldType fieldType = FieldType.valueOf(tag.getFieldType().name()); + // list since same field will have two entries - indexed and docvalues + Arrays.asList(luceneDocument.getFields(key)) + .forEach( + field -> { + if (fieldType == FieldType.TEXT) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT); + assertThat(field.stringValue()).isEqualTo(tag.getVStr()); + } else if (fieldType == FieldType.KEYWORD) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + if (field instanceof SortedDocValuesField) { + assertThat(field.binaryValue().utf8ToString()).isNotNull(); + } else { + assertThat(field.stringValue()).isEqualTo(tag.getVStr()); + } + } else if (fieldType == FieldType.BOOLEAN) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); + + if (field instanceof SortedNumericDocValuesField) { + assertThat(field.numericValue()).isEqualTo(1L); + } else { + assertThat(field.binaryValue().utf8ToString()).isEqualTo("T"); + } + } else if (fieldType == FieldType.DATE) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.DATE); + assertThat(field.numericValue().longValue()).isEqualTo(tag.getVInt64()); + } else if (fieldType == FieldType.DOUBLE) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); + if (field instanceof DoubleDocValuesField) { + // reverse of Double.doubleToRawLongBits(value) + assertThat(Double.longBitsToDouble(field.numericValue().longValue())) + .isEqualTo(tag.getVFloat64()); + } else { + assertThat(field.numericValue().doubleValue()).isEqualTo(tag.getVFloat64()); + } + } else if (fieldType == FieldType.FLOAT) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); + if (field instanceof FloatDocValuesField) { + // reverse of Float.floatToRawIntBits(value) + assertThat(Float.intBitsToFloat(field.numericValue().intValue())) + .isEqualTo(tag.getVFloat32()); + } else { + assertThat(field.numericValue().floatValue()).isEqualTo(tag.getVFloat32()); + } + } else if (fieldType == FieldType.INTEGER) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); + assertThat(field.numericValue().intValue()).isEqualTo(tag.getVInt32()); + } else if (fieldType == FieldType.LONG) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); + assertThat(field.numericValue().longValue()).isEqualTo(tag.getVInt64()); + } else if (fieldType == FieldType.HALF_FLOAT) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); + if (field instanceof HalfFloatPoint) { + assertThat(Math.abs(field.numericValue().floatValue() - tag.getVFloat32())) + .isLessThan(0.001F); + } else { + assertThat( + Math.abs( + HalfFloatPoint.sortableShortToHalfFloat( + field.numericValue().shortValue()) + - tag.getVFloat32())) + .isLessThan(0.001F); + } + } else if (fieldType == FieldType.SCALED_LONG) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.SCALED_LONG); + assertThat(field.numericValue().longValue()).isEqualTo(tag.getVInt64()); + } else if (fieldType == FieldType.SHORT) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.SHORT); + assertThat(field.numericValue().intValue()).isEqualTo(tag.getVInt32()); + } else if (fieldType == FieldType.BINARY) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.BINARY); + assertThat(field.binaryValue().utf8ToString()).isEqualTo(tag.getVStr()); + } else if (fieldType == FieldType.IP) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.IP); + if (field instanceof SortedDocValuesField) { + assertThat(InetAddressPoint.decode(field.binaryValue().bytes).getHostName()) + .isEqualTo(tag.getVStr()); + } else { + assertThat(InetAddressPoint.decode(field.binaryValue().bytes).getHostName()) + .isEqualTo(tag.getVStr()); + } + } else if (fieldType == FieldType.BYTE) { + assertThat(tag.getFieldType()).isEqualTo(Schema.SchemaFieldType.BYTE); + assertThat(field.numericValue().byteValue()).isEqualTo((byte) tag.getVInt32()); + } else { + fail("fieldType not defined for field: " + tag); + } + }); + } + } + + @Test + public void testValidateTimestamp() { + Assertions.assertThat(SpanFormatter.isValidTimestamp(Instant.ofEpochMilli(0))).isFalse(); + Assertions.assertThat( + SpanFormatter.isValidTimestamp(Instant.now().plus(61, ChronoUnit.MINUTES))) + .isFalse(); + Assertions.assertThat( + SpanFormatter.isValidTimestamp(Instant.now().plus(59, ChronoUnit.MINUTES))) + .isTrue(); + Assertions.assertThat( + SpanFormatter.isValidTimestamp(Instant.now().minus(167, ChronoUnit.HOURS))) + .isTrue(); + Assertions.assertThat( + SpanFormatter.isValidTimestamp(Instant.now().minus(169, ChronoUnit.HOURS))) + .isFalse(); + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java b/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java index eb174e9aca..2514052773 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java @@ -10,6 +10,7 @@ import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.trace.Trace; import java.time.Instant; import java.util.ArrayList; @@ -66,59 +67,57 @@ private static Trace.Span.Builder makeSpanBuilder( tags.add( Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) - .setVTypeValue(Trace.ValueType.STRING.getNumber()) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr(serviceName) .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("http_method") - .setVTypeValue(Trace.ValueType.STRING.getNumber()) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("POST") .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("method") - .setVTypeValue(Trace.ValueType.STRING.getNumber()) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr("callbacks.flannel") .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("boolean") - .setVTypeValue(Trace.ValueType.BOOL.getNumber()) + .setFieldType(Schema.SchemaFieldType.BOOLEAN) .setVBool(true) .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("int") - .setVTypeValue(Trace.ValueType.INT64.getNumber()) + .setFieldType(Schema.SchemaFieldType.LONG) .setVInt64(1000) - .setVFloat64(1001.2) .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("float") - .setVTypeValue(Trace.ValueType.FLOAT64.getNumber()) + .setFieldType(Schema.SchemaFieldType.DOUBLE) .setVFloat64(1001.2) - .setVInt64(1000) .build()); tags.add( Trace.KeyValue.newBuilder() .setKey("binary") - .setVTypeValue(Trace.ValueType.BINARY.getNumber()) .setVBinary(ByteString.copyFromUtf8(BINARY_TAG_VALUE)) + .setFieldType(Schema.SchemaFieldType.BINARY) .setVStr("ignored") .build()); tags.add( Trace.KeyValue.newBuilder() .setKey(LogMessage.ReservedField.TYPE.fieldName) - .setVTypeValue(Trace.ValueType.STRING.getNumber()) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .setVStr(msgType) .build()); @@ -152,43 +151,43 @@ public static Trace.Span makeSpan( Trace.KeyValue.newBuilder() .setVStr(message) .setKey("message") - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .addTags( Trace.KeyValue.newBuilder() .setVInt32(i) .setKey(TEST_SOURCE_INT_PROPERTY) - .setVType(Trace.ValueType.INT32) + .setFieldType(Schema.SchemaFieldType.INTEGER) .build()) .addTags( Trace.KeyValue.newBuilder() .setVInt64(i) .setKey(TEST_SOURCE_LONG_PROPERTY) - .setVType(Trace.ValueType.INT64) + .setFieldType(Schema.SchemaFieldType.LONG) .build()) .addTags( Trace.KeyValue.newBuilder() .setVFloat32(i) .setKey(TEST_SOURCE_FLOAT_PROPERTY) - .setVType(Trace.ValueType.FLOAT32) + .setFieldType(Schema.SchemaFieldType.FLOAT) .build()) .addTags( Trace.KeyValue.newBuilder() .setVFloat64(i) .setKey(TEST_SOURCE_DOUBLE_PROPERTY) - .setVType(Trace.ValueType.FLOAT64) + .setFieldType(Schema.SchemaFieldType.DOUBLE) .build()) .addTags( Trace.KeyValue.newBuilder() .setVStr(String.format("String-%s", i)) .setKey(TEST_SOURCE_STRING_PROPERTY) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .addTags( Trace.KeyValue.newBuilder() .setVStr(TEST_DATASET_NAME) .setKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) - .setVType(Trace.ValueType.STRING) + .setFieldType(Schema.SchemaFieldType.KEYWORD) .build()) .build(); for (Trace.KeyValue additionalTag : additionalTags) { diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java index 69c05caa5d..a4431ff02b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java @@ -1,6 +1,7 @@ package com.slack.kaldb.writer; import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParserTest.getIndexRequestBytes; import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER; import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; @@ -19,11 +20,13 @@ import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; +import com.slack.kaldb.metadata.schema.SchemaUtil; import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.ChunkManagerUtil; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -96,7 +99,7 @@ private SearchResult searchChunkManager(String indexName, String que Duration.ofMillis(3000)); } - private static ConsumerRecord consumerRecordWithValue(byte[] recordValue) { + public static ConsumerRecord consumerRecordWithValue(byte[] recordValue) { return new ConsumerRecord<>( "testTopic", 1, 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "testKey", recordValue); } @@ -289,6 +292,119 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { assertThat(value).isEqualTo("{nestedField2=nestedValue2, nestedField1=nestedValue1}"); } + @Test + public void indexAndSearchAllFieldTypes() throws IOException { + final File schemaFile = + new File(getClass().getClassLoader().getResource("schema/test_schema.yaml").getFile()); + Schema.IngestSchema schema = SchemaUtil.parseSchema(schemaFile.toPath()); + + byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + assertThat(indexRequests.size()).isEqualTo(2); + + for (IndexRequest indexRequest : indexRequests) { + IngestDocument ingestDocument = convertRequestToDocument(indexRequest); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument, schema); + ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); + assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); + } + + assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); + assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); + chunkManagerUtil.chunkManager.getActiveChunk().commit(); + + SearchResult results = searchChunkManager("test", "_id:1"); + assertThat(results.hits.size()).isEqualTo(1); + + // message field + results = searchChunkManager("test", "message:foo"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "message:bar"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "message:\"foo bar\""); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "ip:\"192.168.0.0/16\""); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "ip:\"192.168.1.1/32\""); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "ip:192.168.1.1"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "ip:[192.168.0.0 TO 192.168.1.1]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "my_date:\"2014-09-01T12:00:00Z\""); + assertThat(results.hits.size()).isEqualTo(1); + results = + searchChunkManager( + "test", "my_date:[\"2014-09-01T12:00:00Z\" TO \"2014-09-01T12:00:00Z\"]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "success:true"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "cost:4.0"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "cost:[0 TO 4.0]"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "cost:[4 TO 4.1]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "amount:1.1"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "amount:[1 TO 1.1]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "amount_half_float:1.2"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "amount_half_float:[1.0 TO 1.3]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "value:42"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "value:[0 TO 42]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "count:3"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "count:[3 TO 42]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "count_scaled_long:80"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "count_scaled_long:[79 TO 81}"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "count_short:10"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "count_short:{9 TO 10]"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "bucket:20"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "bucket:[0 TO 20]"); + assertThat(results.hits.size()).isEqualTo(1); + + // tests for doc2 + results = searchChunkManager("test", "_id:2"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "parent_id:1"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "trace_id:2"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "name:check"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "duration:20000"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "duration:[0 TO 20000]"); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "ip:\"::afff:4567:890a\""); + assertThat(results.hits.size()).isEqualTo(1); + results = searchChunkManager("test", "username:me"); + assertThat(results.hits.size()).isEqualTo(1); + } + @Test public void testNullTraceSpan() throws IOException { LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/MurronLogFormatterTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/MurronLogFormatterTest.java index 76264498e0..e57ff0600b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/MurronLogFormatterTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/MurronLogFormatterTest.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException; import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import java.nio.charset.StandardCharsets; @@ -19,23 +20,24 @@ public class MurronLogFormatterTest { public Object getTagValue(List tags, String key) { for (Trace.KeyValue tag : tags) { if (tag.getKey().equals(key)) { - Trace.ValueType valueType = tag.getVType(); - if (valueType.equals(Trace.ValueType.STRING)) { + Schema.SchemaFieldType schemaFieldType = tag.getFieldType(); + if (schemaFieldType.equals(Schema.SchemaFieldType.STRING) + || schemaFieldType.equals(Schema.SchemaFieldType.KEYWORD)) { return tag.getVStr(); } - if (valueType.equals(Trace.ValueType.INT32)) { + if (schemaFieldType.equals(Schema.SchemaFieldType.INTEGER)) { return tag.getVInt32(); } - if (valueType.equals(Trace.ValueType.INT64)) { + if (schemaFieldType.equals(Schema.SchemaFieldType.LONG)) { return tag.getVInt64(); } - if (valueType.equals(Trace.ValueType.FLOAT64)) { + if (schemaFieldType.equals(Schema.SchemaFieldType.DOUBLE)) { return tag.getVFloat64(); } - if (valueType.equals(Trace.ValueType.BOOL)) { + if (schemaFieldType.equals(Schema.SchemaFieldType.BOOLEAN)) { return tag.getVBool(); } - if (valueType.equals(Trace.ValueType.BINARY_VALUE)) { + if (schemaFieldType.equals(Schema.SchemaFieldType.BINARY)) { return tag.getVBinary(); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java deleted file mode 100644 index fea6f5e8dd..0000000000 --- a/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java +++ /dev/null @@ -1,321 +0,0 @@ -package com.slack.kaldb.writer; - -import static com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_FIELD_VALUE; -import static com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl.build; -import static com.slack.kaldb.testlib.SpanUtil.BINARY_TAG_VALUE; -import static org.assertj.core.api.Assertions.assertThat; - -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LogWireMessage; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.testlib.SpanUtil; -import com.slack.kaldb.util.JsonUtil; -import com.slack.service.murron.trace.Trace; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.lucene.document.Document; -import org.assertj.core.data.Offset; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class SpanFormatterTest { - - private SimpleMeterRegistry meterRegistry; - - @BeforeEach - public void setup() throws Exception { - meterRegistry = new SimpleMeterRegistry(); - } - - @Test - public void testNonRootSpanToLogMessage() { - final String traceId = "t1"; - final String id = "i2"; - final String parentId = "p2"; - final Instant timestamp = Instant.now(); - final long durationMicros = 500000L; - final String serviceName = "test_service"; - final String name = "testSpanName"; - final Trace.Span span = - SpanUtil.makeSpan( - traceId, - id, - parentId, - TimeUnit.MICROSECONDS.convert(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS), - durationMicros, - name, - serviceName, - SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - - LogMessage logMsg = SpanFormatter.toLogMessage(span); - assertThat(logMsg.getTimestamp().toEpochMilli()).isEqualTo(timestamp.toEpochMilli()); - assertThat(logMsg.getId()).isEqualTo(id); - assertThat(logMsg.getType()).isEqualTo("INFO"); - assertThat(logMsg.getIndex()).isEqualTo(serviceName); - - Map source = logMsg.getSource(); - assertThat(source.get(LogMessage.ReservedField.PARENT_ID.fieldName)).isEqualTo(parentId); - assertThat(source.get(LogMessage.ReservedField.TRACE_ID.fieldName)).isEqualTo(traceId); - assertThat(source.get(LogMessage.ReservedField.SERVICE_NAME.fieldName)).isEqualTo(serviceName); - assertThat(source.get(LogMessage.ReservedField.NAME.fieldName)).isEqualTo(name); - assertThat(source.get(LogMessage.ReservedField.DURATION_MS.fieldName)) - .isEqualTo(Duration.of(durationMicros, ChronoUnit.MICROS).toMillis()); - assertThat(source.get("http_method")).isEqualTo("POST"); - assertThat(source.get("method")).isEqualTo("callbacks.flannel"); - assertThat(source.get("boolean")).isEqualTo(true); - assertThat(source.get("int")).isEqualTo(1000L); - assertThat(source.get("float")).isEqualTo(1001.2); - String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); - } - - @Test - public void testRootSpanToLogMessage() { - final String traceId = "traceid1"; - final String id = "1"; - final Instant timestamp = Instant.now(); - final String msgType = "test_message_type"; - final long durationMicros = 5000L; - final String serviceName = "test_service"; - final String name = "testSpanName"; - final Trace.Span span = - SpanUtil.makeSpan( - traceId, - id, - "", - TimeUnit.MICROSECONDS.convert(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS), - durationMicros, - name, - serviceName, - msgType); - - LogMessage logMsg = SpanFormatter.toLogMessage(span); - assertThat(logMsg.getTimestamp().toEpochMilli()).isEqualTo(timestamp.toEpochMilli()); - assertThat(logMsg.getId()).isEqualTo(id); - assertThat(logMsg.getType()).isEqualTo(msgType); - assertThat(logMsg.getIndex()).isEqualTo(serviceName); - - Map source = logMsg.getSource(); - assertThat((String) source.get(LogMessage.ReservedField.PARENT_ID.fieldName)).isEmpty(); - assertThat(source.get(LogMessage.ReservedField.TRACE_ID.fieldName)).isEqualTo(traceId); - assertThat(source.get(LogMessage.ReservedField.SERVICE_NAME.fieldName)).isEqualTo(serviceName); - assertThat(source.get(LogMessage.ReservedField.NAME.fieldName)).isEqualTo(name); - assertThat(source.get(LogMessage.ReservedField.DURATION_MS.fieldName)) - .isEqualTo(Duration.of(durationMicros, ChronoUnit.MICROS).toMillis()); - assertThat(source.get("http_method")).isEqualTo("POST"); - assertThat(source.get("method")).isEqualTo("callbacks.flannel"); - assertThat(source.get("boolean")).isEqualTo(true); - assertThat(source.get("int")).isEqualTo(1000L); - assertThat(source.get("float")).isEqualTo(1001.2); - String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); - } - - @Test - public void testListOfSpansConversion() { - final String traceId = "t1"; - final String id = "i1"; - final String id2 = "i2"; - final Instant timestamp = Instant.now(); - final long durationMicros = 500000L; - final String serviceName = "test_service"; - final String name = "testSpanName"; - final Trace.Span span1 = - SpanUtil.makeSpan( - traceId, - id, - "", - TimeUnit.MICROSECONDS.convert(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS), - durationMicros, - name, - serviceName, - SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - final Trace.Span span2 = - SpanUtil.makeSpan( - traceId, - id2, - id, - TimeUnit.MICROSECONDS.convert(timestamp.toEpochMilli() + 1000, TimeUnit.MILLISECONDS), - durationMicros, - name + "2", - serviceName, - SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - - // Test empty list - assertThat( - SpanFormatter.toLogMessage( - Trace.ListOfSpans.newBuilder().addAllSpans(Collections.emptyList()).build()) - .size()) - .isZero(); - - // List with 1 span. - assertThat( - SpanFormatter.toLogMessage( - Trace.ListOfSpans.newBuilder().addAllSpans(List.of(span1)).build()) - .size()) - .isEqualTo(1); - - // List with 2 spans - List logMessages = - SpanFormatter.toLogMessage( - Trace.ListOfSpans.newBuilder().addAllSpans(List.of(span1, span2)).build()); - assertThat(logMessages.size()).isEqualTo(2); - - for (LogMessage logMsg : logMessages) { - assertThat(logMsg.getTimestamp().toEpochMilli()) - .isIn(timestamp.toEpochMilli(), timestamp.toEpochMilli() + 1000); - assertThat(logMsg.getId()).isIn(id, id2); - assertThat(logMsg.getType()).isEqualTo("INFO"); - assertThat(logMsg.getIndex()).isEqualTo(serviceName); - Map source = logMsg.getSource(); - assertThat(source.get(LogMessage.ReservedField.PARENT_ID.fieldName)).isIn(id, ""); - assertThat(source.get(LogMessage.ReservedField.TRACE_ID.fieldName)).isEqualTo(traceId); - assertThat(source.get(LogMessage.ReservedField.SERVICE_NAME.fieldName)) - .isEqualTo(serviceName); - assertThat((String) source.get(LogMessage.ReservedField.NAME.fieldName)).startsWith(name); - assertThat(source.get(LogMessage.ReservedField.DURATION_MS.fieldName)) - .isEqualTo(Duration.of(durationMicros, ChronoUnit.MICROS).toMillis()); - assertThat(source.get("http_method")).isEqualTo("POST"); - assertThat(source.get("method")).isEqualTo("callbacks.flannel"); - assertThat(source.get("boolean")).isEqualTo(true); - assertThat(source.get("int")).isEqualTo(1000L); - assertThat(source.get("float")).isEqualTo(1001.2); - String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); - } - } - - @Test - public void testEmptyTimestamp() { - final Trace.Span span = - SpanUtil.makeSpan("", "", "", 0, 0, "", "", SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - final LogMessage logMessage = SpanFormatter.toLogMessage(span); - assertThat(logMessage.getTimestamp().toEpochMilli()) - .isCloseTo(Instant.now().toEpochMilli(), Offset.offset(1000L)); - } - - @Test - public void testSpanWithoutKeyFieldsToLogMessage() { - Instant timestamp = Instant.now(); - final Trace.Span span = - SpanUtil.makeSpan( - "", - "", - "", - TimeUnit.MICROSECONDS.convert(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS), - 0, - "", - "", - SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - - LogMessage logMsg = SpanFormatter.toLogMessage(span); - // we convert any time by 1000 in SpanFormatter#toLogMessage - assertThat(logMsg.getTimestamp().toEpochMilli()).isEqualTo(timestamp.toEpochMilli()); - assertThat(logMsg.getId()).isEmpty(); - assertThat(logMsg.getType()).isEqualTo("INFO"); - assertThat(logMsg.getIndex()).isEqualTo(SpanFormatter.DEFAULT_INDEX_NAME); - - Map source = logMsg.getSource(); - assertThat((String) source.get(LogMessage.ReservedField.PARENT_ID.fieldName)).isEmpty(); - assertThat((String) source.get(LogMessage.ReservedField.TRACE_ID.fieldName)).isEmpty(); - assertThat(source.get(LogMessage.ReservedField.SERVICE_NAME.fieldName)) - .isEqualTo(SpanFormatter.DEFAULT_INDEX_NAME); - assertThat((String) source.get(LogMessage.ReservedField.NAME.fieldName)).isEmpty(); - assertThat((long) source.get(LogMessage.ReservedField.DURATION_MS.fieldName)).isZero(); - assertThat(source.get("http_method")).isEqualTo("POST"); - assertThat(source.get("method")).isEqualTo("callbacks.flannel"); - assertThat(source.get("boolean")).isEqualTo(true); - assertThat(source.get("int")).isEqualTo(1000L); - assertThat(source.get("float")).isEqualTo(1001.2); - String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); - } - - @Test - public void testValidateTimestamp() { - assertThat(SpanFormatter.isValidTimestamp(Instant.ofEpochMilli(0))).isFalse(); - assertThat(SpanFormatter.isValidTimestamp(Instant.now().plus(61, ChronoUnit.MINUTES))) - .isFalse(); - assertThat(SpanFormatter.isValidTimestamp(Instant.now().plus(59, ChronoUnit.MINUTES))).isTrue(); - assertThat(SpanFormatter.isValidTimestamp(Instant.now().minus(167, ChronoUnit.HOURS))).isTrue(); - assertThat(SpanFormatter.isValidTimestamp(Instant.now().minus(169, ChronoUnit.HOURS))) - .isFalse(); - } - - @Test - public void testLogMessageToTraceSpanCorrectness() throws IOException { - Trace.Span span = SpanUtil.makeSpan(1); - SchemaAwareLogDocumentBuilderImpl convertFieldBuilder = - build(CONVERT_FIELD_VALUE, true, meterRegistry); - Document documentFromSpan = convertFieldBuilder.fromMessage(span); - assertThat(documentFromSpan.getFields().size()).isEqualTo(23); - - LogMessage logMsg = SpanFormatter.toLogMessage(span); - Document documentFromOldLogMessage = convertFieldBuilder.fromMessage(logMsg); - - // why the +8? - // name, parent_id, trace_id, duration_ms were empty in the span - // the new method doesn't add empty fields which is the right thing to do. - // 4 fields X 2 ( DocValues ) = 8 - // manually verified _source and _all are the same. Tough to compare since ordering of data is - // not the same - assertThat(documentFromSpan.getFields().size() + 8) - .isEqualTo(documentFromOldLogMessage.getFields().size()); - - LogWireMessage wireMessage = - JsonUtil.read(documentFromSpan.get("_source"), LogWireMessage.class); - LogMessage logMessageReturn = - new LogMessage( - wireMessage.getIndex(), - wireMessage.getType(), - wireMessage.getId(), - wireMessage.getTimestamp(), - wireMessage.getSource()); - assertThat(wireMessage.getId()).isEqualTo(span.getId().toStringUtf8()); - assertThat(logMessageReturn.getId()).isEqualTo(span.getId().toStringUtf8()); - } - - @Test - public void testEncodingAndDecoding() throws IOException { - String error_message = - """ - Unexpected character (' ' (code 32)) in numeric value: expected digit (0-9) to follow minus sign, for valid numeric value\n at [Source: (byte[])\"+ NEBULA_HOST_DIR=/etc/nebula/host\"; line: 1, column: 3] - """; - Trace.Span span = SpanUtil.makeSpan(1, error_message, Instant.now()); - SchemaAwareLogDocumentBuilderImpl convertFieldBuilder = - build(CONVERT_FIELD_VALUE, true, meterRegistry); - Document documentFromSpan = convertFieldBuilder.fromMessage(span); - - assertThat(documentFromSpan.getFields().size()).isEqualTo(23); - - LogMessage logMsg = SpanFormatter.toLogMessage(span); - Document documentFromOldLogMessage = convertFieldBuilder.fromMessage(logMsg); - - // why the +8? - // name, parent_id, trace_id, duration_ms were empty in the span - // the new method doesn't add empty fields which is the right thing to do. - // 4 fields X 2 ( DocValues ) = 8 - // manually verified _source and _all are the same. Tough to compare since ordering of data is - // not the same - assertThat(documentFromSpan.getFields().size() + 8) - .isEqualTo(documentFromOldLogMessage.getFields().size()); - - LogWireMessage wireMessage = - JsonUtil.read(documentFromSpan.get("_source"), LogWireMessage.class); - LogMessage logMessageReturn = - new LogMessage( - wireMessage.getIndex(), - wireMessage.getType(), - wireMessage.getId(), - wireMessage.getTimestamp(), - wireMessage.getSource()); - assertThat(logMessageReturn.getSource().get("message")).isEqualTo(error_message); - } -} diff --git a/kaldb/src/test/resources/opensearchRequest/bulk/index_all_schema_fields.ndjson b/kaldb/src/test/resources/opensearchRequest/bulk/index_all_schema_fields.ndjson new file mode 100644 index 0000000000..3963cc0be8 --- /dev/null +++ b/kaldb/src/test/resources/opensearchRequest/bulk/index_all_schema_fields.ndjson @@ -0,0 +1,6 @@ +{ "index" : { "_index" : "test", "_id" : "1" } } +{ "host" : "host1", "message" : "foo bar", "ip" : "192.168.1.1", "my_date" : "2014-09-01T12:00:00Z", "success" : true, "cost": 4.0, "amount" : 1.1, "amount_half_float" : 1.2, "value" : 42, "count" : 3, "count_scaled_long" : 80, "count_short" : 10, "bucket" : "20" } +{ "index" : { "_index" : "test", "_id" : "2" } } +{ "parent_id" : "1", "trace_id" : "2", "name": "check", "ip" : "::afff:4567:890a", "duration" : 20000, "username": "me" } + +