Skip to content

Commit

Permalink
support multi-fields (#839)
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker authored Apr 10, 2024
1 parent f83c7d9 commit 74da443
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static Trace.Span fromIngestDocument(
if (!tagsContainServiceName && kv.getKey().equals(SERVICE_NAME_KEY)) {
tagsContainServiceName = true;
}
spanBuilder.addTags(SpanFormatter.convertKVtoProto(kv.getKey(), kv.getValue(), schema));
spanBuilder.addAllTags(SpanFormatter.convertKVtoProto(kv.getKey(), kv.getValue(), schema));
}
if (!tagsContainServiceName) {
spanBuilder.addTags(
Expand Down
158 changes: 90 additions & 68 deletions astra/src/main/java/com/slack/astra/writer/SpanFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,79 +87,101 @@ public static Trace.Span toSpan(
return spanBuilder.build();
}

public static Trace.KeyValue convertKVtoProto(
public static Trace.KeyValue makeTraceKV(String key, Object value, Schema.SchemaFieldType type) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
try {
switch (type) {
case KEYWORD -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
}
case TEXT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT);
tagBuilder.setVStr(value.toString());
}
case IP -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.IP);
tagBuilder.setVStr(value.toString());
}
case DATE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.DATE);
tagBuilder.setVInt64(Instant.parse(value.toString()).toEpochMilli());
}
case BOOLEAN -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN);
tagBuilder.setVBool(Boolean.parseBoolean(value.toString()));
}
case DOUBLE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE);
tagBuilder.setVFloat64(Double.parseDouble(value.toString()));
}
case FLOAT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case HALF_FLOAT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case INTEGER -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case LONG -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SCALED_LONG -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SHORT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.SHORT);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BYTE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BINARY -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY);
tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes()));
}
}
return tagBuilder.build();
} catch (Exception e) {
tagBuilder.setKey(STR."failed_\{key}");
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
return tagBuilder.build();
}
}

public static List<Trace.KeyValue> convertKVtoProto(
String key, Object value, Schema.IngestSchema schema) {
if (schema.containsFields(key)) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
try {
switch (schema.getFieldsMap().get(key).getType()) {
case KEYWORD -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
}
case TEXT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.TEXT);
tagBuilder.setVStr(value.toString());
}
case IP -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.IP);
tagBuilder.setVStr(value.toString());
}
case DATE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.DATE);
tagBuilder.setVInt64(Instant.parse(value.toString()).toEpochMilli());
}
case BOOLEAN -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BOOLEAN);
tagBuilder.setVBool(Boolean.parseBoolean(value.toString()));
}
case DOUBLE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.DOUBLE);
tagBuilder.setVFloat64(Double.parseDouble(value.toString()));
}
case FLOAT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case HALF_FLOAT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.HALF_FLOAT);
tagBuilder.setVFloat32(Float.parseFloat(value.toString()));
}
case INTEGER -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.INTEGER);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case LONG -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SCALED_LONG -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.SCALED_LONG);
tagBuilder.setVInt64(Long.parseLong(value.toString()));
}
case SHORT -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.SHORT);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BYTE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BYTE);
tagBuilder.setVInt32(Integer.parseInt(value.toString()));
}
case BINARY -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.BINARY);
tagBuilder.setVBinary(ByteString.copyFrom(value.toString().getBytes()));
}
List<Trace.KeyValue> tags = new ArrayList<>();
Schema.SchemaField schemaFieldDef = schema.getFieldsMap().get(key);
tags.add(makeTraceKV(key, value, schemaFieldDef.getType()));
for (Map.Entry<String, Schema.SchemaField> additionalField :
schemaFieldDef.getFieldsMap().entrySet()) {
// skip conditions
if (additionalField.getValue().getIgnoreAbove() > 0
&& additionalField.getValue().getType() == Schema.SchemaFieldType.KEYWORD
&& value.toString().length() > additionalField.getValue().getIgnoreAbove()) {
continue;
}
return tagBuilder.build();
} catch (Exception e) {
tagBuilder.setKey(STR."failed_\{key}");
tagBuilder.setFieldType(Schema.SchemaFieldType.KEYWORD);
tagBuilder.setVStr(value.toString());
return tagBuilder.build();
Trace.KeyValue additionalKV =
makeTraceKV(
STR."\{key}.\{additionalField.getKey()}",
value,
additionalField.getValue().getType());
tags.add(additionalKV);
}
return tags;
} else {
return SpanFormatter.convertKVtoProto(key, value);
return List.of(SpanFormatter.convertKVtoProto(key, value));
}
}

Expand Down
3 changes: 2 additions & 1 deletion astra/src/main/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ message IngestSchema {

message SchemaField {
SchemaFieldType type = 2;
// other field definitions in the future
map<string, SchemaField> fields = 3;
int32 ignore_above = 4;
}

// https://opensearch.org/docs/2.4/opensearch/supported-field-types/index/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testSchemaFields() throws Exception {
objectMapper.convertValue(
jsonNode.get("test").get("mappings").get("properties"), Map.class);
assertThat(map).isNotNull();
assertThat(map.size()).isEqualTo(30);
assertThat(map.size()).isEqualTo(31);
}

// todo - test mapping
Expand Down
57 changes: 57 additions & 0 deletions astra/src/test/java/com/slack/astra/schema/IgnoreAboveTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.slack.astra.schema;

import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument;
import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser.fromIngestDocument;
import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParserTest.getIndexRequestBytes;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.astra.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ingest.IngestDocument;

public class IgnoreAboveTest {

@Test
public void testIgnoreAbove() throws IOException {
Map<String, Schema.SchemaField> fields = new HashMap<>();

Schema.SchemaField messageMultiField =
Schema.SchemaField.newBuilder()
.setType(Schema.SchemaFieldType.TEXT)
.putFields(
"keyword",
Schema.SchemaField.newBuilder()
.setType(Schema.SchemaFieldType.KEYWORD)
.setIgnoreAbove(2)
.build())
.build();

fields.put("message", messageMultiField);

Schema.IngestSchema schema = Schema.IngestSchema.newBuilder().putAllFields(fields).build();

byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(2);
IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0));

Trace.Span span = fromIngestDocument(ingestDocument, schema);
Map<String, Trace.KeyValue> tags =
span.getTagsList().stream()
.map(kv -> Map.entry(kv.getKey(), kv))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

assertThat(tags.get("message").getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT);
assertThat(tags.get("message").getVStr()).isEqualTo("foo bar");

assertThat(tags.get("message.keyword")).isNull();
}
}
78 changes: 78 additions & 0 deletions astra/src/test/java/com/slack/astra/schema/MultiFieldsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.slack.kaldb.schema;

import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument;
import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser.fromIngestDocument;
import static com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParserTest.getIndexRequestBytes;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.astra.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ingest.IngestDocument;

public class MultiFieldsTest {

@Test
public void testMultiFields() throws IOException {
Map<String, Schema.SchemaField> fields = new HashMap<>();

Schema.SchemaField amountMultiField =
Schema.SchemaField.newBuilder()
.setType(Schema.SchemaFieldType.FLOAT)
.putFields(
"keyword",
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.KEYWORD).build())
.putFields(
"integer",
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.INTEGER).build())
.build();
Schema.SchemaField messageMultiField =
Schema.SchemaField.newBuilder()
.setType(Schema.SchemaFieldType.TEXT)
.putFields(
"keyword",
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.KEYWORD).build())
.build();

fields.put("amount", amountMultiField);
fields.put("message", messageMultiField);

Schema.IngestSchema schema = Schema.IngestSchema.newBuilder().putAllFields(fields).build();

byte[] rawRequest = getIndexRequestBytes("index_all_schema_fields");
List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(2);
IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0));

Trace.Span span = fromIngestDocument(ingestDocument, schema);
Map<String, Trace.KeyValue> tags =
span.getTagsList().stream()
.map(kv -> Map.entry(kv.getKey(), kv))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

assertThat(tags.get("amount").getFieldType()).isEqualTo(Schema.SchemaFieldType.FLOAT);
assertThat(tags.get("amount").getVFloat32()).isEqualTo(1.1f);

assertThat(tags.get("amount.keyword").getFieldType()).isEqualTo(Schema.SchemaFieldType.KEYWORD);
assertThat(tags.get("amount.keyword").getVStr()).isEqualTo("1.1");

// cannot parse 1.1 as integer
assertThat(tags.get("failed_amount.integer").getFieldType())
.isEqualTo(Schema.SchemaFieldType.KEYWORD);
assertThat(tags.get("failed_amount.integer").getVStr()).isEqualTo("1.1");

assertThat(tags.get("message").getFieldType()).isEqualTo(Schema.SchemaFieldType.TEXT);
assertThat(tags.get("message").getVStr()).isEqualTo("foo bar");

assertThat(tags.get("message.keyword").getFieldType())
.isEqualTo(Schema.SchemaFieldType.KEYWORD);
assertThat(tags.get("message.keyword").getVStr()).isEqualTo("foo bar");
}
}
10 changes: 10 additions & 0 deletions astra/src/test/java/com/slack/astra/schema/SchemaConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ public void testParsingSchema() {
assertThat(schema.getFieldsMap().get("message").getType())
.isEqualTo(Schema.SchemaFieldType.TEXT);

assertThat(schema.getFieldsMap().get("message").getFieldsMap().size()).isEqualTo(1);
assertThat(
schema
.getFieldsMap()
.get("message")
.getFieldsMap()
.get("keyword")
.getType())
.isEqualTo(Schema.SchemaFieldType.KEYWORD);

assertThat(schema.getFieldsMap().get("ip").getType())
.isEqualTo(Schema.SchemaFieldType.IP);

Expand Down
Loading

0 comments on commit 74da443

Please sign in to comment.