diff --git a/config/config.yaml b/config/config.yaml index 6f53a74f47..b33b89c90e 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -128,6 +128,7 @@ preprocessorConfig: kafkaTopic: ${KAFKA_TOPIC:-test-topic} kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} + schemaFile: ${PREPROCESSOR_SCHEMA_FILE:-schema.yaml} serverConfig: serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086} diff --git a/config/schema.yaml b/config/schema.yaml new file mode 100644 index 0000000000..c4392b9a1d --- /dev/null +++ b/config/schema.yaml @@ -0,0 +1 @@ +fields: diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java index 37cb47e00f..90000f8c9c 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java @@ -6,6 +6,7 @@ import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.server.annotation.Post; import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; +import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; @@ -34,12 +35,14 @@ public class BulkIngestApi { private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs"; private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest"; private final int rateLimitExceededErrorCode; + private final Schema.KaldbSchema kaldbSchema; public BulkIngestApi( BulkIngestKafkaProducer bulkIngestKafkaProducer, DatasetRateLimitingService datasetRateLimitingService, MeterRegistry meterRegistry, - int rateLimitExceededErrorCode) { + int rateLimitExceededErrorCode, + Schema.KaldbSchema kaldbSchema) { this.bulkIngestKafkaProducer = bulkIngestKafkaProducer; this.datasetRateLimitingService = datasetRateLimitingService; @@ -52,6 +55,7 @@ public BulkIngestApi( } else { this.rateLimitExceededErrorCode = rateLimitExceededErrorCode; } + this.kaldbSchema = kaldbSchema; } @Post("/_bulk") @@ -65,7 +69,8 @@ public HttpResponse addDocument(String bulkRequest) { try { byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8); incomingByteTotal.increment(bulkRequestBytes.length); - Map> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes); + Map> docs = + BulkApiRequestParser.parseRequest(bulkRequestBytes, kaldbSchema); // todo - our rate limiter doesn't have a way to acquire permits across multiple // datasets diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index 850b657438..54d6185a82 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; import java.io.IOException; @@ -36,8 +37,9 @@ public class BulkApiRequestParser { private static final String SERVICE_NAME_KEY = "service_name"; - public static Map> parseRequest(byte[] postBody) throws IOException { - return convertIndexRequestToTraceFormat(parseBulkRequest(postBody)); + public static Map> parseRequest( + byte[] postBody, Schema.KaldbSchema kaldbSchema) throws IOException { + return convertIndexRequestToTraceFormat(parseBulkRequest(postBody), kaldbSchema); } /** @@ -81,7 +83,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) } @VisibleForTesting - public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { + public static Trace.Span fromIngestDocument( + IngestDocument ingestDocument, Schema.KaldbSchema kaldbSchema) { long timestampInMillis = getTimestampFromIngestDocument(ingestDocument); @@ -113,18 +116,20 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { sourceAndMetadata.remove("@timestamp"); sourceAndMetadata.forEach( - (key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value))); + (key, value) -> + spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value, kaldbSchema))); spanBuilder.addTags( Trace.KeyValue.newBuilder() .setKey(SERVICE_NAME_KEY) .setVType(Trace.ValueType.STRING) .setVStr(index) .build()); + return spanBuilder.build(); } protected static Map> convertIndexRequestToTraceFormat( - List indexRequests) { + List indexRequests, Schema.KaldbSchema kaldbSchema) { // key - index. value - list of docs to be indexed Map> indexDocs = new HashMap<>(); @@ -135,7 +140,7 @@ protected static Map> convertIndexRequestToTraceFormat( } IngestDocument ingestDocument = convertRequestToDocument(indexRequest); List docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>()); - docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument)); + docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument, kaldbSchema)); } return indexDocs; } diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/schema/KaldbSchemaUtil.java b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/KaldbSchemaUtil.java new file mode 100644 index 0000000000..5f48bddcca --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/schema/KaldbSchemaUtil.java @@ -0,0 +1,50 @@ +package com.slack.kaldb.metadata.schema; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import com.slack.kaldb.proto.schema.Schema; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.commons.text.StringSubstitutor; +import org.apache.commons.text.lookup.StringLookup; + +public class KaldbSchemaUtil { + + public static Schema.KaldbSchema parseSchema(Path schemaPath) throws IOException { + String filename = schemaPath.getFileName().toString(); + if (filename.endsWith(".yaml")) { + return parseSchemaYaml(Files.readString(schemaPath), System::getenv); + } else if (filename.endsWith(".json")) { + return parseJsonSchema(Files.readString(schemaPath)); + } else { + throw new RuntimeException( + "Invalid config file format provided - must be either .json or .yaml"); + } + } + + @VisibleForTesting + public static Schema.KaldbSchema parseSchemaYaml(String yamlStr, StringLookup variableResolver) + throws JsonProcessingException, InvalidProtocolBufferException { + StringSubstitutor substitute = new StringSubstitutor(variableResolver); + ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); + ObjectMapper jsonWriter = new ObjectMapper(); + + Object obj = yamlReader.readValue(substitute.replace(yamlStr), Object.class); + return parseJsonSchema(jsonWriter.writeValueAsString(obj)); + } + + @VisibleForTesting + public static Schema.KaldbSchema parseJsonSchema(String jsonStr) + throws InvalidProtocolBufferException { + Schema.KaldbSchema.Builder kaldbSchemaBuilder = Schema.KaldbSchema.newBuilder(); + JsonFormat.parser().merge(jsonStr, kaldbSchemaBuilder); + Schema.KaldbSchema kaldbSchema = kaldbSchemaBuilder.build(); + // TODO: validate schema + return kaldbSchema; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java index 8c8a6fbf53..9b1fa7d295 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java @@ -31,11 +31,13 @@ import com.slack.kaldb.metadata.recovery.RecoveryNodeMetadataStore; import com.slack.kaldb.metadata.recovery.RecoveryTaskMetadataStore; import com.slack.kaldb.metadata.replica.ReplicaMetadataStore; +import com.slack.kaldb.metadata.schema.KaldbSchemaUtil; import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.preprocessor.PreprocessorService; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.proto.metadata.Metadata; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.recovery.RecoveryService; import com.slack.kaldb.util.RuntimeHalterImpl; import com.slack.kaldb.zipkinApi.ZipkinService; @@ -398,12 +400,16 @@ private static Set getServices( new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry); services.add(datasetRateLimitingService); + Schema.KaldbSchema kaldbSchema = + KaldbSchemaUtil.parseSchema(Path.of(preprocessorConfig.getSchemaFile())); + BulkIngestApi openSearchBulkApiService = new BulkIngestApi( bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, - preprocessorConfig.getRateLimitExceededErrorCode()); + preprocessorConfig.getRateLimitExceededErrorCode(), + kaldbSchema); armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService); } else { PreprocessorService preprocessorService = diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java index 2db5d61be2..cd0f235297 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java @@ -4,6 +4,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogWireMessage; +import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import java.time.Duration; @@ -94,26 +95,127 @@ public static Trace.Span toSpan( return spanBuilder.build(); } + public static Trace.KeyValue convertKVtoProto( + String key, Object value, Schema.KaldbSchema schema) { + if (schema.containsFields(key)) { + Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder(); + tagBuilder.setKey(key); + try { + switch (schema.getFieldsMap().get(key).getType()) { + case KEYWORD -> { + tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); + tagBuilder.setVStr(value.toString()); + } + case TEXT -> { + tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT); + tagBuilder.setVStr(value.toString()); + } + case IP -> { + tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.IP); + tagBuilder.setVStr(value.toString()); + } + case DATE -> { + tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.DATE); + tagBuilder.setVStr(value.toString()); + } + case BOOLEAN -> { + tagBuilder.setVType(Trace.ValueType.BOOL); + tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN); + tagBuilder.setVBool(Boolean.parseBoolean(value.toString())); + } + case DOUBLE -> { + tagBuilder.setVType(Trace.ValueType.FLOAT64); + tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE); + tagBuilder.setVFloat64(Double.parseDouble(value.toString())); + } + case FLOAT -> { + tagBuilder.setVType(Trace.ValueType.FLOAT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT); + tagBuilder.setVFloat32(Float.parseFloat(value.toString())); + } + case HALF_FLOAT -> { + tagBuilder.setVType(Trace.ValueType.FLOAT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT); + tagBuilder.setVFloat32(Float.parseFloat(value.toString())); + } + case INTEGER -> { + tagBuilder.setVType(Trace.ValueType.INT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER); + tagBuilder.setVInt32(Integer.parseInt(value.toString())); + } + case LONG -> { + tagBuilder.setVType(Trace.ValueType.INT64); + tagBuilder.setFieldType(Schema.SchemaFieldType.LONG); + tagBuilder.setVInt64(Long.parseLong(value.toString())); + } + case SCALED_LONG -> { + tagBuilder.setVType(Trace.ValueType.INT64); + tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG); + tagBuilder.setVInt64(Long.parseLong(value.toString())); + } + case SHORT -> { + tagBuilder.setVType(Trace.ValueType.INT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.SHORT); + tagBuilder.setVInt32(Integer.parseInt(value.toString())); + } + case BYTE -> { + tagBuilder.setVType(Trace.ValueType.INT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE); + tagBuilder.setVInt32(Integer.parseInt(value.toString())); + } + case BINARY -> { + tagBuilder.setVType(Trace.ValueType.BINARY); + tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY); + tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes())); + } + } + return tagBuilder.build(); + } catch (Exception e) { + tagBuilder.setKey(STR."failed_\{key}"); + tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); + tagBuilder.setVStr(value.toString()); + return tagBuilder.build(); + } + } else { + return SpanFormatter.convertKVtoProto(key, value); + } + } + public static Trace.KeyValue convertKVtoProto(String key, Object value) { Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder(); tagBuilder.setKey(key); if (value instanceof String) { tagBuilder.setVType(Trace.ValueType.STRING); + tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD); tagBuilder.setVStr(value.toString()); } else if (value instanceof Boolean) { tagBuilder.setVType(Trace.ValueType.BOOL); + tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN); tagBuilder.setVBool((boolean) value); } else if (value instanceof Integer) { + tagBuilder.setVType(Trace.ValueType.INT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER); + tagBuilder.setVInt32((int) value); + } else if (value instanceof Long) { tagBuilder.setVType(Trace.ValueType.INT64); - tagBuilder.setVInt64((int) value); + tagBuilder.setFieldType(Schema.SchemaFieldType.LONG); + tagBuilder.setVInt64((long) value); } else if (value instanceof Float) { - tagBuilder.setVType(Trace.ValueType.FLOAT64); - tagBuilder.setVFloat64((float) value); + tagBuilder.setVType(Trace.ValueType.FLOAT32); + tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT); + tagBuilder.setVFloat32((float) value); } else if (value instanceof Double) { tagBuilder.setVType(Trace.ValueType.FLOAT64); + tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE); tagBuilder.setVFloat64((double) value); } else if (value != null) { tagBuilder.setVType(Trace.ValueType.BINARY); + tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY); tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes())); } return tagBuilder.build(); diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index f6fd33b9c8..084ff514ec 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -274,4 +274,6 @@ message PreprocessorConfig { // Set this to 429 for clients to retry the request after a delay // Only used when we use the bulk API int32 rate_limit_exceeded_error_code = 11; + + string schema_file = 12; } diff --git a/kaldb/src/main/proto/schema.proto b/kaldb/src/main/proto/schema.proto new file mode 100644 index 0000000000..f8f0280ca9 --- /dev/null +++ b/kaldb/src/main/proto/schema.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package slack.proto.kaldb.schema; + +option java_package = "com.slack.kaldb.proto.schema"; + +message KaldbSchema { + // convert to a map + map fields = 1; +} + +message SchemaField { + SchemaFieldType type = 2; + // other field definitions in the future +} + +// https://opensearch.org/docs/2.4/opensearch/supported-field-types/index/ +// Add the remaining types as needed +enum SchemaFieldType { + KEYWORD = 0; + TEXT = 1; + IP = 2; + // expose format in the future + DATE = 3; + BOOLEAN = 4; + DOUBLE = 5; + FLOAT = 6; + HALF_FLOAT = 7; + INTEGER = 8; + LONG = 9; + SCALED_LONG = 10; + SHORT = 11; + BYTE = 12; + BINARY = 13; +}; diff --git a/kaldb/src/main/proto/trace.proto b/kaldb/src/main/proto/trace.proto index cef333d567..6de48da97e 100644 --- a/kaldb/src/main/proto/trace.proto +++ b/kaldb/src/main/proto/trace.proto @@ -8,6 +8,7 @@ option java_package = "com.slack.service.murron.trace"; option go_package = "com.slack/kaldb/gen/proto/tracepb"; import "google/protobuf/empty.proto"; +import "schema.proto"; // The KeyValue message defines a key and value pair. // The key is always a string. The value for the field is determined by the ValueType. @@ -35,6 +36,7 @@ message KeyValue { bytes v_binary = 7; int32 v_int32 = 8; float v_float32 = 9; + slack.proto.kaldb.schema.SchemaFieldType fieldType = 10; } // A span defines a single event in a trace. diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index 973b943397..45a545ddbd 100644 --- a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.io.Resources; +import com.slack.kaldb.proto.schema.Schema; import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.nio.charset.Charset; @@ -38,7 +39,8 @@ public void testSimpleIndexRequest() throws Exception { assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(3); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -62,7 +64,8 @@ public void testIndexNoFields() throws Exception { List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -85,7 +88,8 @@ public void testIndexNoFieldsNoId() throws Exception { List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -108,7 +112,8 @@ public void testIndexEmptyRequest() throws Exception { List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(0); } @@ -125,7 +130,8 @@ public void testIndexRequestWithSpecialChars() throws Exception { List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("index_name").size()).isEqualTo(1); @@ -148,7 +154,8 @@ public void testBulkRequests() throws Exception { assertThat(indexRequests.size()).isEqualTo(2); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(2); assertThat(indexDocs.get("test1").size()).isEqualTo(1); assertThat(indexDocs.get("test3").size()).isEqualTo(1); @@ -187,7 +194,8 @@ public void testUpdatesAgainstTwoIndexes() throws Exception { assertThat(indexRequests.size()).isEqualTo(2); Map> indexDocs = - BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat( + indexRequests, Schema.KaldbSchema.newBuilder().build()); assertThat(indexDocs.keySet().size()).isEqualTo(2); assertThat(indexDocs.get("test1").size()).isEqualTo(1); assertThat(indexDocs.get("test2").size()).isEqualTo(1); @@ -196,6 +204,50 @@ public void testUpdatesAgainstTwoIndexes() throws Exception { // however we throw an exception if that happens in practice } + @Test + public void testSchemaFieldForTags() throws IOException { + byte[] rawRequest = getRawQueryBytes("index_simple"); + + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + assertThat(indexRequests.size()).isEqualTo(1); + + Schema.SchemaField type1 = + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.KEYWORD).build(); + Schema.SchemaField type2 = + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.TEXT).build(); + Schema.KaldbSchema schema = + Schema.KaldbSchema.newBuilder() + .putFields("field1", type1) + .putFields("field2", type2) + .build(); + + IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0)); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument, schema); + + List field1Def = + span.getTagsList().stream().filter(keyValue -> keyValue.getKey().equals("field1")).toList(); + assertThat(field1Def.size()).isEqualTo(1); + assertThat(field1Def.getFirst().getVStr()).isEqualTo("value1"); + assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + + field1Def = + span.getTagsList().stream().filter(keyValue -> keyValue.getKey().equals("field2")).toList(); + assertThat(field1Def.size()).isEqualTo(1); + assertThat(field1Def.getFirst().getVStr()).isEqualTo("value2"); + assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT); + + field1Def = + span.getTagsList().stream() + .filter(keyValue -> keyValue.getKey().equals("service_name")) + .toList(); + assertThat(field1Def.size()).isEqualTo(1); + assertThat(field1Def.getFirst().getVStr()).isEqualTo("test"); + assertThat(field1Def.getFirst().getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(field1Def.getFirst().getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + } + @Test public void testTraceSpanGeneratedTimestamp() throws IOException { byte[] rawRequest = getRawQueryBytes("index_simple"); @@ -204,7 +256,9 @@ public void testTraceSpanGeneratedTimestamp() throws IOException { assertThat(indexRequests.size()).isEqualTo(1); IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0)); - Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); + Trace.Span span = + BulkApiRequestParser.fromIngestDocument( + ingestDocument, Schema.KaldbSchema.newBuilder().build()); // timestamp is in microseconds based on the trace.proto definition Instant ingestDocumentTime = diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/schema/KaldbSchemaTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/schema/KaldbSchemaTest.java new file mode 100644 index 0000000000..ede23b2df0 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/schema/KaldbSchemaTest.java @@ -0,0 +1,269 @@ +package com.slack.kaldb.bulkIngestApi.schema; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; +import com.slack.kaldb.proto.schema.Schema; +import com.slack.service.murron.trace.Trace; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class KaldbSchemaTest { + + static Schema.KaldbSchema schema; + + @BeforeAll + public static void initializeSchema() { + Map fields = new HashMap<>(); + fields.put( + "host", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.KEYWORD).build()); + fields.put( + "message", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.TEXT).build()); + fields.put("ip", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.IP).build()); + fields.put( + "myTimestamp", + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.DATE).build()); + fields.put( + "success", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.BOOLEAN).build()); + fields.put( + "cost", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.DOUBLE).build()); + fields.put( + "amount", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.FLOAT).build()); + fields.put( + "amount_half_float", + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.HALF_FLOAT).build()); + fields.put( + "value", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.INTEGER).build()); + fields.put( + "count", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.LONG).build()); + fields.put( + "count_scaled_long", + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.SCALED_LONG).build()); + fields.put( + "count_short", + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.SHORT).build()); + fields.put( + "bucket", Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.BINARY).build()); + + schema = Schema.KaldbSchema.newBuilder().putAllFields(fields).build(); + } + + @Test + public void testSimpleSchema() { + Trace.KeyValue kv = BulkApiRequestParser.convertFieldsToProto("host", "host1", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("host1"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + + kv = BulkApiRequestParser.convertFieldsToProto("message", "my message", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("my message"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT); + + kv = BulkApiRequestParser.convertFieldsToProto("ip", "8.8.8.8", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("8.8.8.8"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.IP); + + kv = BulkApiRequestParser.convertFieldsToProto("myTimestamp", "2021-01-01T00:00:00Z", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("2021-01-01T00:00:00Z"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DATE); + + kv = BulkApiRequestParser.convertFieldsToProto("success", "true", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); + assertThat(kv.getVBool()).isEqualTo(true); + + kv = BulkApiRequestParser.convertFieldsToProto("success", true, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); + assertThat(kv.getVBool()).isEqualTo(true); + + kv = BulkApiRequestParser.convertFieldsToProto("cost", "10.0", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); + assertThat(kv.getVFloat64()).isEqualTo(10.0); + + kv = BulkApiRequestParser.convertFieldsToProto("cost", 10.0, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); + assertThat(kv.getVFloat64()).isEqualTo(10.0); + + kv = BulkApiRequestParser.convertFieldsToProto("amount", "10.0", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); + assertThat(kv.getVFloat32()).isEqualTo(10.0f); + + kv = BulkApiRequestParser.convertFieldsToProto("amount", 10.0, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); + assertThat(kv.getVFloat32()).isEqualTo(10.0f); + + kv = BulkApiRequestParser.convertFieldsToProto("amount_half_float", "10.0", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); + assertThat(kv.getVFloat32()).isEqualTo(10.0f); + + kv = BulkApiRequestParser.convertFieldsToProto("amount_half_float", 10.0, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); + assertThat(kv.getVFloat32()).isEqualTo(10.0f); + + kv = BulkApiRequestParser.convertFieldsToProto("value", "10", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); + assertThat(kv.getVInt32()).isEqualTo(10); + + kv = BulkApiRequestParser.convertFieldsToProto("value", 10, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); + assertThat(kv.getVInt32()).isEqualTo(10); + + kv = BulkApiRequestParser.convertFieldsToProto("count", "10", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); + assertThat(kv.getVInt64()).isEqualTo(10L); + + kv = BulkApiRequestParser.convertFieldsToProto("count", 10, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); + assertThat(kv.getVInt64()).isEqualTo(10L); + + kv = BulkApiRequestParser.convertFieldsToProto("count_scaled_long", "10", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SCALED_LONG); + assertThat(kv.getVInt64()).isEqualTo(10); + + kv = BulkApiRequestParser.convertFieldsToProto("count_scaled_long", 10, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SCALED_LONG); + assertThat(kv.getVInt64()).isEqualTo(10L); + + kv = BulkApiRequestParser.convertFieldsToProto("count_short", "10", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SHORT); + assertThat(kv.getVInt32()).isEqualTo(10L); + + kv = BulkApiRequestParser.convertFieldsToProto("count_short", 10, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.SHORT); + assertThat(kv.getVInt32()).isEqualTo(10); + + kv = BulkApiRequestParser.convertFieldsToProto("bucket", "e30=", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BINARY); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BINARY); + assertThat(kv.getVBinary().toStringUtf8()).isEqualTo("e30="); + } + + @Test + public void testKeyValueWithWrongValues() { + Trace.KeyValue kv = BulkApiRequestParser.convertFieldsToProto("success", "notBoolean", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); + assertThat(kv.getVBool()).isEqualTo(false); + + kv = BulkApiRequestParser.convertFieldsToProto("cost", "hello", schema); + assertThat(kv.getKey()).isEqualTo("failed_cost"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("hello"); + + kv = BulkApiRequestParser.convertFieldsToProto("amount", "hello", schema); + assertThat(kv.getKey()).isEqualTo("failed_amount"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("hello"); + + kv = BulkApiRequestParser.convertFieldsToProto("amount_half_float", "half_float_value", schema); + assertThat(kv.getKey()).isEqualTo("failed_amount_half_float"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("half_float_value"); + + kv = BulkApiRequestParser.convertFieldsToProto("value", "int_value", schema); + assertThat(kv.getKey()).isEqualTo("failed_value"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("int_value"); + + kv = BulkApiRequestParser.convertFieldsToProto("count", "long_value", schema); + assertThat(kv.getKey()).isEqualTo("failed_count"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("long_value"); + + kv = BulkApiRequestParser.convertFieldsToProto("count_scaled_long", "scaled_long_val", schema); + assertThat(kv.getKey()).isEqualTo("failed_count_scaled_long"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("scaled_long_val"); + + kv = BulkApiRequestParser.convertFieldsToProto("count_short", "my_short-Val", schema); + assertThat(kv.getKey()).isEqualTo("failed_count_short"); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("my_short-Val"); + } + + @Test + public void testSimpleWithoutSchema() { + + Schema.KaldbSchema schema = Schema.KaldbSchema.getDefaultInstance(); + Trace.KeyValue kv = BulkApiRequestParser.convertFieldsToProto("host", "host1", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("host1"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + + kv = BulkApiRequestParser.convertFieldsToProto("ip", "8.8.8.8", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("8.8.8.8"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + + kv = BulkApiRequestParser.convertFieldsToProto("myTimestamp", "2021-01-01T00:00:00Z", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getVStr()).isEqualTo("2021-01-01T00:00:00Z"); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + + kv = BulkApiRequestParser.convertFieldsToProto("success", "true", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("true"); + + kv = BulkApiRequestParser.convertFieldsToProto("success", true, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.BOOL); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.BOOLEAN); + assertThat(kv.getVBool()).isEqualTo(true); + + kv = BulkApiRequestParser.convertFieldsToProto("cost", "10.0", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("10.0"); + + kv = BulkApiRequestParser.convertFieldsToProto("amount", 10.0f, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT); + assertThat(kv.getVFloat32()).isEqualTo(10.0f); + + kv = BulkApiRequestParser.convertFieldsToProto("cost", 10.0, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.FLOAT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DOUBLE); + assertThat(kv.getVFloat64()).isEqualTo(10.0); + + kv = BulkApiRequestParser.convertFieldsToProto("value", 10, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT32); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.INTEGER); + + kv = BulkApiRequestParser.convertFieldsToProto("count", 10L, schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.INT64); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.LONG); + assertThat(kv.getVInt64()).isEqualTo(10L); + + kv = BulkApiRequestParser.convertFieldsToProto("bucket", "e30=", schema); + assertThat(kv.getVType()).isEqualTo(Trace.ValueType.STRING); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD); + assertThat(kv.getVStr()).isEqualTo("e30="); + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/schema/KaldbSchemaTest.java b/kaldb/src/test/java/com/slack/kaldb/schema/KaldbSchemaTest.java new file mode 100644 index 0000000000..58ae971b35 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/schema/KaldbSchemaTest.java @@ -0,0 +1,97 @@ +package com.slack.kaldb.schema; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import com.slack.kaldb.metadata.schema.KaldbSchemaUtil; +import com.slack.kaldb.proto.schema.Schema; +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class KaldbSchemaTest { + + @Test + public void testParsingEmptySchema() { + List.of("json", "yaml") + .forEach( + ext -> { + try { + final File cfgFile = + new File( + getClass() + .getClassLoader() + .getResource("schema/test_schema_empty." + ext) + .getFile()); + Schema.KaldbSchema kaldbSchema = KaldbSchemaUtil.parseSchema(cfgFile.toPath()); + assertThat(kaldbSchema).isNotNull(); + assertThat(kaldbSchema.getFieldsCount()).isEqualTo(0); + } catch (IOException e) { + fail("Failed to parse schema", e); + } + }); + } + + @Test + public void testParsingSchema() { + List.of("json", "yaml") + .forEach( + ext -> { + try { + final File cfgFile = + new File( + getClass() + .getClassLoader() + .getResource("schema/test_schema." + ext) + .getFile()); + Schema.KaldbSchema kaldbSchema = KaldbSchemaUtil.parseSchema(cfgFile.toPath()); + assertThat(kaldbSchema).isNotNull(); + + assertThat(kaldbSchema).isNotNull(); + assertThat(kaldbSchema.getFieldsCount()).isEqualTo(13); + + assertThat(kaldbSchema.getFieldsMap().get("host").getType()) + .isEqualTo(Schema.SchemaFieldType.KEYWORD); + + assertThat(kaldbSchema.getFieldsMap().get("message").getType()) + .isEqualTo(Schema.SchemaFieldType.TEXT); + + assertThat(kaldbSchema.getFieldsMap().get("ip").getType()) + .isEqualTo(Schema.SchemaFieldType.IP); + + assertThat(kaldbSchema.getFieldsMap().get("timestamp").getType()) + .isEqualTo(Schema.SchemaFieldType.DATE); + + assertThat(kaldbSchema.getFieldsMap().get("success").getType()) + .isEqualTo(Schema.SchemaFieldType.BOOLEAN); + + assertThat(kaldbSchema.getFieldsMap().get("cost").getType()) + .isEqualTo(Schema.SchemaFieldType.DOUBLE); + + assertThat(kaldbSchema.getFieldsMap().get("amount").getType()) + .isEqualTo(Schema.SchemaFieldType.FLOAT); + + assertThat(kaldbSchema.getFieldsMap().get("amount_half_float").getType()) + .isEqualTo(Schema.SchemaFieldType.HALF_FLOAT); + + assertThat(kaldbSchema.getFieldsMap().get("value").getType()) + .isEqualTo(Schema.SchemaFieldType.INTEGER); + + assertThat(kaldbSchema.getFieldsMap().get("count").getType()) + .isEqualTo(Schema.SchemaFieldType.LONG); + + assertThat(kaldbSchema.getFieldsMap().get("count_scaled_long").getType()) + .isEqualTo(Schema.SchemaFieldType.SCALED_LONG); + + assertThat(kaldbSchema.getFieldsMap().get("count_short").getType()) + .isEqualTo(Schema.SchemaFieldType.SHORT); + + assertThat(kaldbSchema.getFieldsMap().get("bucket").getType()) + .isEqualTo(Schema.SchemaFieldType.BYTE); + } catch (IOException e) { + fail("Failed to parse schema", e); + } + }); + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java index fca2e5e8be..08e421fefe 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java @@ -22,6 +22,7 @@ import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; import com.slack.kaldb.preprocessor.PreprocessorRateLimiter; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.MetricsUtil; import com.slack.kaldb.testlib.TestKafkaServer; import com.slack.kaldb.util.JsonUtil; @@ -126,7 +127,12 @@ public void bootstrapCluster() throws Exception { bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION); bulkApi = - new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, 400); + new BulkIngestApi( + bulkIngestKafkaProducer, + datasetRateLimitingService, + meterRegistry, + 400, + Schema.KaldbSchema.newBuilder().build()); } // I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and @@ -201,7 +207,8 @@ public void testBulkApiBasic() throws Exception { """; // use the way we calculate the throughput in the rate limiter to get the exact bytes Map> docs = - BulkApiRequestParser.parseRequest(request1.getBytes(StandardCharsets.UTF_8)); + BulkApiRequestParser.parseRequest( + request1.getBytes(StandardCharsets.UTF_8), Schema.KaldbSchema.newBuilder().build()); int limit = PreprocessorRateLimiter.getSpanBytes(docs.get("testindex")); // for some reason if we pass the exact limit, the rate limiter doesn't work as expected updateDatasetThroughput(limit / 2); @@ -263,7 +270,8 @@ public void testBulkApiBasic() throws Exception { bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry, - TOO_MANY_REQUESTS.code()); + TOO_MANY_REQUESTS.code(), + Schema.KaldbSchema.newBuilder().build()); httpResponse = bulkApi2.addDocument(request1).aggregate().join(); assertThat(httpResponse.status().isSuccess()).isEqualTo(false); assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code()); diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java index 067f2a4583..9457bcb136 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java @@ -275,6 +275,7 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400); + assertThat(preprocessorConfig.getSchemaFile()).isEqualTo("schema/test_schema.yaml"); final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = config.getPreprocessorConfig().getKafkaConfig(); @@ -455,6 +456,7 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true); assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(429); + assertThat(preprocessorConfig.getSchemaFile()).isEqualTo("schema/test_schema.yaml"); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java index 84fc9291d4..57930cbad7 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java @@ -19,6 +19,7 @@ import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; +import com.slack.kaldb.proto.schema.Schema; import com.slack.kaldb.testlib.ChunkManagerUtil; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.service.murron.trace.Trace; @@ -253,7 +254,9 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { for (IndexRequest indexRequest : indexRequests) { IngestDocument ingestDocument = convertRequestToDocument(indexRequest); - Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); + Trace.Span span = + BulkApiRequestParser.fromIngestDocument( + ingestDocument, Schema.KaldbSchema.newBuilder().build()); ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); diff --git a/kaldb/src/test/resources/schema/test_schema.json b/kaldb/src/test/resources/schema/test_schema.json new file mode 100644 index 0000000000..62b83a27af --- /dev/null +++ b/kaldb/src/test/resources/schema/test_schema.json @@ -0,0 +1,43 @@ +{ + "fields": { + "host": { + "type": "KEYWORD" + }, + "message": { + "type": "TEXT" + }, + "ip": { + "type": "IP" + }, + "timestamp": { + "type": "DATE" + }, + "success": { + "type": "BOOLEAN" + }, + "cost": { + "type": "DOUBLE" + }, + "amount": { + "type": "FLOAT" + }, + "amount_half_float": { + "type": "HALF_FLOAT" + }, + "value": { + "type": "INTEGER" + }, + "count": { + "type": "LONG" + }, + "count_scaled_long": { + "type": "SCALED_LONG" + }, + "count_short": { + "type": "SHORT" + }, + "bucket": { + "type": "BYTE" + } + } +} diff --git a/kaldb/src/test/resources/schema/test_schema.yaml b/kaldb/src/test/resources/schema/test_schema.yaml new file mode 100644 index 0000000000..9b8f279de5 --- /dev/null +++ b/kaldb/src/test/resources/schema/test_schema.yaml @@ -0,0 +1,27 @@ +fields: + host: + type: KEYWORD + message: + type: TEXT + ip: + type: IP + timestamp: + type: DATE + success: + type: BOOLEAN + cost: + type: DOUBLE + amount: + type: FLOAT + amount_half_float: + type: HALF_FLOAT + value: + type: INTEGER + count: + type: LONG + count_scaled_long: + type: SCALED_LONG + count_short: + type: SHORT + bucket: + type: BYTE diff --git a/kaldb/src/test/resources/schema/test_schema_empty.json b/kaldb/src/test/resources/schema/test_schema_empty.json new file mode 100644 index 0000000000..bacdee7aba --- /dev/null +++ b/kaldb/src/test/resources/schema/test_schema_empty.json @@ -0,0 +1,4 @@ +{ + "fields": { + } +} diff --git a/kaldb/src/test/resources/schema/test_schema_empty.yaml b/kaldb/src/test/resources/schema/test_schema_empty.yaml new file mode 100644 index 0000000000..c4392b9a1d --- /dev/null +++ b/kaldb/src/test/resources/schema/test_schema_empty.yaml @@ -0,0 +1 @@ +fields: diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json index 109a765160..0749e2d580 100644 --- a/kaldb/src/test/resources/test_config.json +++ b/kaldb/src/test/resources/test_config.json @@ -157,7 +157,8 @@ "rateLimiterMaxBurstSeconds": 2, "bootstrapServers": "localhost:9092", "useBulkApi": false, - "rateLimitExceededErrorCode": 400 + "rateLimitExceededErrorCode": 400, + "kaldbSchemaFile": "schema/test_schema.yaml" }, "clusterConfig": { "clusterName": "test_kaldb_json_cluster", diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml index 8d6e74b0c9..84f1b3c0f8 100644 --- a/kaldb/src/test/resources/test_config.yaml +++ b/kaldb/src/test/resources/test_config.yaml @@ -128,6 +128,7 @@ preprocessorConfig: bootstrapServers: localhost:9092 useBulkApi: true rateLimitExceededErrorCode: 429 + schemaFile: "schema/test_schema.yaml" clusterConfig: clusterName: "test_kaldb_cluster"