Skip to content

Commit

Permalink
add a new schema config file
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Mar 18, 2024
1 parent e308a86 commit 2260cbf
Show file tree
Hide file tree
Showing 22 changed files with 744 additions and 25 deletions.
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ preprocessorConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
schemaFile: ${PREPROCESSOR_SCHEMA_FILE:-schema.yaml}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand Down
1 change: 1 addition & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fields:
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -34,12 +35,14 @@ public class BulkIngestApi {
private final String BULK_INGEST_INCOMING_BYTE_DOCS = "kaldb_preprocessor_incoming_docs";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";
private final int rateLimitExceededErrorCode;
private final Schema.KaldbSchema kaldbSchema;

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry,
int rateLimitExceededErrorCode) {
int rateLimitExceededErrorCode,
Schema.KaldbSchema kaldbSchema) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
Expand All @@ -52,6 +55,7 @@ public BulkIngestApi(
} else {
this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
}
this.kaldbSchema = kaldbSchema;
}

@Post("/_bulk")
Expand All @@ -65,7 +69,8 @@ public HttpResponse addDocument(String bulkRequest) {
try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);
Map<String, List<Trace.Span>> docs =
BulkApiRequestParser.parseRequest(bulkRequestBytes, kaldbSchema);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
Expand Down Expand Up @@ -36,8 +37,9 @@ public class BulkApiRequestParser {

private static final String SERVICE_NAME_KEY = "service_name";

public static Map<String, List<Trace.Span>> parseRequest(byte[] postBody) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody));
public static Map<String, List<Trace.Span>> parseRequest(
byte[] postBody, Schema.KaldbSchema kaldbSchema) throws IOException {
return convertIndexRequestToTraceFormat(parseBulkRequest(postBody), kaldbSchema);
}

/**
Expand Down Expand Up @@ -81,7 +83,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument)
}

@VisibleForTesting
public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
public static Trace.Span fromIngestDocument(
IngestDocument ingestDocument, Schema.KaldbSchema kaldbSchema) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Expand Down Expand Up @@ -113,18 +116,20 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
sourceAndMetadata.remove("@timestamp");

sourceAndMetadata.forEach(
(key, value) -> spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value)));
(key, value) ->
spanBuilder.addTags(SpanFormatter.convertKVtoProto(key, value, kaldbSchema)));
spanBuilder.addTags(
Trace.KeyValue.newBuilder()
.setKey(SERVICE_NAME_KEY)
.setVType(Trace.ValueType.STRING)
.setVStr(index)
.build());

return spanBuilder.build();
}

protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
List<IndexRequest> indexRequests) {
List<IndexRequest> indexRequests, Schema.KaldbSchema kaldbSchema) {
// key - index. value - list of docs to be indexed
Map<String, List<Trace.Span>> indexDocs = new HashMap<>();

Expand All @@ -135,7 +140,7 @@ protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
}
IngestDocument ingestDocument = convertRequestToDocument(indexRequest);
List<Trace.Span> docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>());
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument));
docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument, kaldbSchema));
}
return indexDocs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.slack.kaldb.metadata.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.slack.kaldb.proto.schema.Schema;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.text.StringSubstitutor;
import org.apache.commons.text.lookup.StringLookup;

public class KaldbSchemaUtil {

public static Schema.KaldbSchema parseSchema(Path schemaPath) throws IOException {
String filename = schemaPath.getFileName().toString();
if (filename.endsWith(".yaml")) {
return parseSchemaYaml(Files.readString(schemaPath), System::getenv);
} else if (filename.endsWith(".json")) {
return parseJsonSchema(Files.readString(schemaPath));
} else {
throw new RuntimeException(
"Invalid config file format provided - must be either .json or .yaml");
}
}

@VisibleForTesting
public static Schema.KaldbSchema parseSchemaYaml(String yamlStr, StringLookup variableResolver)
throws JsonProcessingException, InvalidProtocolBufferException {
StringSubstitutor substitute = new StringSubstitutor(variableResolver);
ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
ObjectMapper jsonWriter = new ObjectMapper();

Object obj = yamlReader.readValue(substitute.replace(yamlStr), Object.class);
return parseJsonSchema(jsonWriter.writeValueAsString(obj));
}

@VisibleForTesting
public static Schema.KaldbSchema parseJsonSchema(String jsonStr)
throws InvalidProtocolBufferException {
Schema.KaldbSchema.Builder kaldbSchemaBuilder = Schema.KaldbSchema.newBuilder();
JsonFormat.parser().merge(jsonStr, kaldbSchemaBuilder);
Schema.KaldbSchema kaldbSchema = kaldbSchemaBuilder.build();
// TODO: validate schema
return kaldbSchema;
}
}
8 changes: 7 additions & 1 deletion kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.slack.kaldb.metadata.recovery.RecoveryNodeMetadataStore;
import com.slack.kaldb.metadata.recovery.RecoveryTaskMetadataStore;
import com.slack.kaldb.metadata.replica.ReplicaMetadataStore;
import com.slack.kaldb.metadata.schema.KaldbSchemaUtil;
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.preprocessor.PreprocessorService;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.proto.metadata.Metadata;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.recovery.RecoveryService;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.kaldb.zipkinApi.ZipkinService;
Expand Down Expand Up @@ -398,12 +400,16 @@ private static Set<Service> getServices(
new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
services.add(datasetRateLimitingService);

Schema.KaldbSchema kaldbSchema =
KaldbSchemaUtil.parseSchema(Path.of(preprocessorConfig.getSchemaFile()));

BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(
bulkIngestKafkaProducer,
datasetRateLimitingService,
meterRegistry,
preprocessorConfig.getRateLimitExceededErrorCode());
preprocessorConfig.getRateLimitExceededErrorCode(),
kaldbSchema);
armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService);
} else {
PreprocessorService preprocessorService =
Expand Down
108 changes: 105 additions & 3 deletions kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.logstore.LogWireMessage;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.service.murron.Murron;
import com.slack.service.murron.trace.Trace;
import java.time.Duration;
Expand Down Expand Up @@ -94,26 +95,127 @@ public static Trace.Span toSpan(
return spanBuilder.build();
}

public static Trace.KeyValue convertKVtoProto(
String key, Object value, Schema.KaldbSchema schema) {
if (schema.containsFields(key)) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
try {
switch (schema.getFieldsMap().get(key).getType()) {
case KEYWORD -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
}
case TEXT -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT);
tagBuilder.setVStr(value.toString());
}
case IP -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.IP);
tagBuilder.setVStr(value.toString());
}
case DATE -> {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.DATE);
tagBuilder.setVStr(value.toString());
}
case BOOLEAN -> {
tagBuilder.setVType(Trace.ValueType.BOOL);
tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN);
tagBuilder.setVBool(Boolean.parseBoolean(value.toString()));
}
case DOUBLE -> {
tagBuilder.setVType(Trace.ValueType.FLOAT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE);
tagBuilder.setVFloat64(Double.parseDouble(value.toString()));
}
case FLOAT -> {
tagBuilder.setVType(Trace.ValueType.FLOAT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case HALF_FLOAT -> {
tagBuilder.setVType(Trace.ValueType.FLOAT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case INTEGER -> {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case LONG -> {
tagBuilder.setVType(Trace.ValueType.INT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SCALED_LONG -> {
tagBuilder.setVType(Trace.ValueType.INT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SHORT -> {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.SHORT);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BYTE -> {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BINARY -> {
tagBuilder.setVType(Trace.ValueType.BINARY);
tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY);
tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes()));
}
}
return tagBuilder.build();
} catch (Exception e) {
tagBuilder.setKey(STR."failed_\{key}");
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
return tagBuilder.build();
}
} else {
return SpanFormatter.convertKVtoProto(key, value);
}
}

public static Trace.KeyValue convertKVtoProto(String key, Object value) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
if (value instanceof String) {
tagBuilder.setVType(Trace.ValueType.STRING);
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
} else if (value instanceof Boolean) {
tagBuilder.setVType(Trace.ValueType.BOOL);
tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN);
tagBuilder.setVBool((boolean) value);
} else if (value instanceof Integer) {
tagBuilder.setVType(Trace.ValueType.INT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER);
tagBuilder.setVInt32((int) value);
} else if (value instanceof Long) {
tagBuilder.setVType(Trace.ValueType.INT64);
tagBuilder.setVInt64((int) value);
tagBuilder.setFieldType(Schema.SchemaFieldType.LONG);
tagBuilder.setVInt64((long) value);
} else if (value instanceof Float) {
tagBuilder.setVType(Trace.ValueType.FLOAT64);
tagBuilder.setVFloat64((float) value);
tagBuilder.setVType(Trace.ValueType.FLOAT32);
tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT);
tagBuilder.setVFloat32((float) value);
} else if (value instanceof Double) {
tagBuilder.setVType(Trace.ValueType.FLOAT64);
tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE);
tagBuilder.setVFloat64((double) value);
} else if (value != null) {
tagBuilder.setVType(Trace.ValueType.BINARY);
tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY);
tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes()));
}
return tagBuilder.build();
Expand Down
2 changes: 2 additions & 0 deletions kaldb/src/main/proto/kaldb_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,6 @@ message PreprocessorConfig {
// Set this to 429 for clients to retry the request after a delay
// Only used when we use the bulk API
int32 rate_limit_exceeded_error_code = 11;

string schema_file = 12;
}
35 changes: 35 additions & 0 deletions kaldb/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
syntax = "proto3";

package slack.proto.kaldb.schema;

option java_package = "com.slack.kaldb.proto.schema";

message KaldbSchema {
// convert to a map
map<string, SchemaField> fields = 1;
}

message SchemaField {
SchemaFieldType type = 2;
// other field definitions in the future
}

// https://opensearch.org/docs/2.4/opensearch/supported-field-types/index/
// Add the remaining types as needed
enum SchemaFieldType {
KEYWORD = 0;
TEXT = 1;
IP = 2;
// expose format in the future
DATE = 3;
BOOLEAN = 4;
DOUBLE = 5;
FLOAT = 6;
HALF_FLOAT = 7;
INTEGER = 8;
LONG = 9;
SCALED_LONG = 10;
SHORT = 11;
BYTE = 12;
BINARY = 13;
};
2 changes: 2 additions & 0 deletions kaldb/src/main/proto/trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option java_package = "com.slack.service.murron.trace";
option go_package = "com.slack/kaldb/gen/proto/tracepb";

import "google/protobuf/empty.proto";
import "schema.proto";

// The KeyValue message defines a key and value pair.
// The key is always a string. The value for the field is determined by the ValueType.
Expand Down Expand Up @@ -35,6 +36,7 @@ message KeyValue {
bytes v_binary = 7;
int32 v_int32 = 8;
float v_float32 = 9;
slack.proto.kaldb.schema.SchemaFieldType fieldType = 10;
}

// A span defines a single event in a trace.
Expand Down
Loading

0 comments on commit 2260cbf

Please sign in to comment.