Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type mappings transformation #1154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
21fb34b
Initial checkin for type mapping removal transformer and experimentin…
gregschohn Nov 13, 2024
926e6e8
Further building out the jinjava and index type mappings transformers…
gregschohn Nov 18, 2024
9bd5c60
Checkpoint on jinjava and loading template transformations.
gregschohn Nov 19, 2024
f1e1d50
Checkpoint add support for macros and feature flags for different par…
gregschohn Nov 20, 2024
dd98377
Cleanup + added a "preserve" flag to let templates inline source node…
gregschohn Nov 21, 2024
357a6a6
Add jinja-style regex matching to jinjava transformations.
gregschohn Nov 22, 2024
12c0cfa
Checkpoint - I've added a new invoke_macro java function to lookup a …
gregschohn Nov 23, 2024
e8dfb3f
Get a test that changes some contents via the route macro to pass.
gregschohn Nov 24, 2024
93dc5c8
Assorted jinjava template changes for replayer...
gregschohn Nov 24, 2024
4b32d42
Setup regex type mappings where one can specify index and types as re…
gregschohn Nov 25, 2024
0712eae
Further work to support bulk delete, index create with type mapping m…
gregschohn Dec 2, 2024
e6c0229
Updated READMEs and finished implementing a basic version of bulk tra…
gregschohn Dec 3, 2024
071368a
Transformation cleanup in light of new type mappings transformation.
gregschohn Dec 3, 2024
5c885fe
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 3, 2024
59fd4d9
Simple bugfixes, especially around more carefully staying away from r…
gregschohn Dec 4, 2024
fb9c20c
Remove YAML support for feature flags - nobody is using it and no rea…
gregschohn Dec 4, 2024
f5c4fed
Change how the transformation netty pipeline discovers if the payload…
gregschohn Dec 4, 2024
5be3ddc
Minor cleanup + the addition of a resource cache for jinjava java res…
gregschohn Dec 4, 2024
73c8e13
Lots of improvements for jinjava and type mappings transformations an…
gregschohn Dec 7, 2024
739d5d7
Implement a simple take on translating RFS bulk requests to use index…
gregschohn Dec 8, 2024
95b71f4
Checkpoint with further refactoring improvements, mostly around tests.
gregschohn Dec 9, 2024
2b1f3e0
Add a new preserve tag (preserveWhenMissing) to copy only when the ke…
gregschohn Dec 9, 2024
4e59965
Enable type mappings transform for the replayer in the docker-compose…
gregschohn Dec 9, 2024
09377de
Flip the default should throw behavior for HttpJsonMessageWithFaultin…
gregschohn Dec 4, 2024
fcf6bbc
Update type mapping sanitization READMEs to use backslash backreferen…
gregschohn Dec 9, 2024
39ead16
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 9, 2024
1df1642
Merge branch 'SaferPayloadFaults' into TypeMappingsTransformation
gregschohn Dec 9, 2024
fbc0291
Update remaining preserve calls so that protocol will be carried forw…
gregschohn Dec 9, 2024
c13ef4a
Bugfixes for type mappings removal transformations, README + other si…
gregschohn Dec 10, 2024
00968cd
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies {
implementation project(":RFS")
implementation project(":transformation")
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
4 changes: 3 additions & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ dependencies {

implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -62,7 +64,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'

testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testRuntimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":{\"sourceProperties\":{\"version\":{\"major\":7,\"minor\":10}}}}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
4 changes: 2 additions & 2 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ transform to add GZIP encoding and another to apply a new header would be config
```

To run only one transformer without any configuration, the `--transformer-config` argument can simply
be set to the name of the transformer (e.g. 'JsonTransformerForOpenSearch23PlusTargetTransformerProvider',
be set to the name of the transformer (e.g. 'TypeMappingSanitizationTransformerProvider',
without quotes or any json surrounding it).

The user can also specify a file to read the transformations from using the `--transformer-config-file`. Users can
also pass the script as an argument via `--transformer-config-base64`. Each of the `transformer-config` options
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
be chunked. Another transformer, [TypeMappingSanitizationTransformer.java](../../transformation/transformationPlugins/jsonMessageTransformers/jsonTypeMappingsSanitizationTransformer/src/main/java/org/opensearch/migrations/transform/TypeMappingsSanitizationTransformer.java),
is a work-in-progress to excise type mapping references from URIs and message payloads since versions of OpenSource
greater than 2.3 do not support them.

Expand Down
5 changes: 3 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down Expand Up @@ -59,7 +60,7 @@ dependencies {
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:openSearch23PlusTargetTransformerProvider')
testImplementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5'
testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else {
content.release();
}
} else if (msg instanceof HttpMessage) {
}
if (msg instanceof HttpMessage) { // this & HttpContent are interfaces & 'Full' messages implement both
message = (HttpMessage) msg;
}
if (msg instanceof LastHttpContent) {
Expand All @@ -206,16 +207,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
var finalMsg = (message instanceof HttpRequest)
? new DefaultFullHttpRequest(message.protocolVersion(),
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
((HttpRequest) message).method(),
((HttpRequest) message).uri(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders())
: new DefaultFullHttpResponse(message.protocolVersion(),
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
((HttpResponse)message).status(),
aggregatedContents,
message.headers(),
((LastHttpContent) msg).trailingHeaders());
super.channelRead(ctx, finalMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final String EXCEPTION_KEY_STRING = "Exception";
public static final String REQUEST_URI_KEY = "Request-URI";
public static final String METHOD_KEY = "Method";
public static final String HTTP_VERSION_KEY = "HTTP-Version";
public static final String PAYLOAD_KEY = "payload";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -183,15 +187,15 @@ private static Map<String, Object> convertRequest(
var message = (HttpJsonRequestWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
map.put(REQUEST_URI_KEY, message.path());
map.put(METHOD_KEY, message.method());
map.put(HTTP_VERSION_KEY, message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down Expand Up @@ -223,14 +227,14 @@ private static Map<String, Object> convertResponse(
var message = (HttpJsonResponseWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("HTTP-Version", message.protocol());
map.put(HTTP_VERSION_KEY, message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("METHOD...")
.add("URI...")
.toString();
}

Expand Down Expand Up @@ -218,6 +220,16 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// method
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.METHOD_KEY))
.orElse(MISSING_STR))
// uri
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.REQUEST_URI_KEY))
.orElse(MISSING_STR))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public Object getNextTopLevelObject() throws IOException {
pushCompletedValue(parser.getText());
break;
case VALUE_NUMBER_INT:
pushCompletedValue(parser.getIntValue());
pushCompletedValue(parser.getLongValue());
break;
case VALUE_NUMBER_FLOAT:
pushCompletedValue(parser.getFloatValue());
pushCompletedValue(parser.getDoubleValue());
break;
case NOT_AVAILABLE:
// pipeline stall - need more data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
@Getter
@Setter
private boolean disableThrowingPayloadNotLoaded;
private boolean payloadWasAccessed;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
disableThrowingPayloadNotLoaded = true;
underlyingMap = new TreeMap<>();
isJson = Optional.ofNullable(headers.get("content-type"))
.map(list -> list.stream().anyMatch(s -> s.startsWith("application/json")))
Expand All @@ -51,19 +53,19 @@ public Iterator<Map.Entry<String, Object>> iterator() {
return new Iterator<>() {
@Override
public boolean hasNext() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}

@Override
public Map.Entry<String, Object> next() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
}

@Override
public int size() {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
};
} else {
Expand All @@ -80,8 +82,21 @@ public Object put(String key, Object value) {
public Object get(Object key) {
var value = super.get(key);
if (value == null && !disableThrowingPayloadNotLoaded) {
throw PayloadNotLoadedException.getInstance();
throw makeFault();
}
return value;
}

public boolean missingPayloadWasAccessed() {
return payloadWasAccessed;
}

public void resetMissingPayloadWasAccessed() {
payloadWasAccessed = false;
}

private PayloadNotLoadedException makeFault() throws PayloadNotLoadedException {
payloadWasAccessed = true;
return PayloadNotLoadedException.getInstance();
}
}
Loading
Loading