Skip to content

Commit

Permalink
Use the schema info while creating lucene documents in the indexer (s…
Browse files Browse the repository at this point in the history
…lackhq#816)

* utilize schema info at the indexer

* PR feedback
  • Loading branch information
vthacker authored Apr 1, 2024
1 parent abc5016 commit 1fe7147
Show file tree
Hide file tree
Showing 26 changed files with 912 additions and 879 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.proto.schema.Schema;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -103,6 +104,27 @@ public static Trace.Span fromIngestDocument(
spanBuilder.setTimestamp(
TimeUnit.MICROSECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS));

if (sourceAndMetadata.get(LogMessage.ReservedField.PARENT_ID.fieldName) != null) {
spanBuilder.setParentId(
ByteString.copyFromUtf8(
(String) sourceAndMetadata.get(LogMessage.ReservedField.PARENT_ID.fieldName)));
sourceAndMetadata.remove(LogMessage.ReservedField.PARENT_ID.fieldName);
}
if (sourceAndMetadata.get(LogMessage.ReservedField.TRACE_ID.fieldName) != null) {
spanBuilder.setTraceId(
ByteString.copyFromUtf8(
(String) sourceAndMetadata.get(LogMessage.ReservedField.TRACE_ID.fieldName)));
sourceAndMetadata.remove(LogMessage.ReservedField.TRACE_ID.fieldName);
}
if (sourceAndMetadata.get(LogMessage.ReservedField.NAME.fieldName) != null) {
spanBuilder.setName((String) sourceAndMetadata.get(LogMessage.ReservedField.NAME.fieldName));
sourceAndMetadata.remove(LogMessage.ReservedField.NAME.fieldName);
}
if (sourceAndMetadata.get("duration") != null) {
spanBuilder.setDuration(Long.parseLong(sourceAndMetadata.get("duration").toString()));
sourceAndMetadata.remove("duration");
}

// Remove the following internal metadata fields that OpenSearch adds
sourceAndMetadata.remove(IngestDocument.Metadata.ROUTING.getFieldName());
sourceAndMetadata.remove(IngestDocument.Metadata.VERSION.getFieldName());
Expand All @@ -123,7 +145,6 @@ public static Trace.Span fromIngestDocument(
spanBuilder.addTags(
Trace.KeyValue.newBuilder()
.setKey(SERVICE_NAME_KEY)
.setVType(Trace.ValueType.STRING)
.setFieldType(Schema.SchemaFieldType.KEYWORD)
.setVStr(index)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public enum ReservedField {
NAME("name"),
SERVICE_NAME("service_name"),
DURATION_MS("duration_ms"),
DURATION("duration"),
TRACE_ID("trace_id"),
PARENT_ID("parent_id"),
KALDB_INVALID_TIMESTAMP("kaldb_invalid_timestamp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,36 @@ public void reloadSchema() {
try {
if (entry.getValue().fieldType == FieldType.TEXT) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "text"));
} else if (entry.getValue().fieldType == FieldType.STRING) {
} else if (entry.getValue().fieldType == FieldType.STRING
|| entry.getValue().fieldType == FieldType.KEYWORD
|| entry.getValue().fieldType == FieldType.ID) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword"));
} else if (entry.getValue().fieldType == FieldType.ID) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "keyword"));
} else if (entry.getValue().fieldType == FieldType.INTEGER) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer"));
} else if (entry.getValue().fieldType == FieldType.LONG) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "long"));
} else if (entry.getValue().fieldType == FieldType.IP) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "ip"));
} else if (entry.getValue().fieldType == FieldType.DATE) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "date"));
} else if (entry.getValue().fieldType == FieldType.BOOLEAN) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "boolean"));
} else if (entry.getValue().fieldType == FieldType.DOUBLE) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "double"));
} else if (entry.getValue().fieldType == FieldType.FLOAT) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "float"));
} else if (entry.getValue().fieldType == FieldType.BOOLEAN) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "boolean"));
} else if (entry.getValue().fieldType == FieldType.HALF_FLOAT) {
tryRegisterField(
mapperService, entry.getValue().name, b -> b.field("type", "half_float"));
} else if (entry.getValue().fieldType == FieldType.INTEGER) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "integer"));
} else if (entry.getValue().fieldType == FieldType.LONG) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "long"));
} else if (entry.getValue().fieldType == FieldType.SCALED_LONG) {
tryRegisterField(
mapperService, entry.getValue().name, b -> b.field("type", "scaled_long"));
} else if (entry.getValue().fieldType == FieldType.SHORT) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "short"));
} else if (entry.getValue().fieldType == FieldType.BYTE) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "byte"));
} else if (entry.getValue().fieldType == FieldType.BINARY) {
tryRegisterField(mapperService, entry.getValue().name, b -> b.field("type", "binary"));
} else {
LOG.warn(
"Field type '{}' is not yet currently supported for field '{}'",
Expand Down
Loading

0 comments on commit 1fe7147

Please sign in to comment.