Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type mappings transformation #1154

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
21fb34b
Initial checkin for type mapping removal transformer and experimentin…
gregschohn Nov 13, 2024
926e6e8
Further building out the jinjava and index type mappings transformers…
gregschohn Nov 18, 2024
9bd5c60
Checkpoint on jinjava and loading template transformations.
gregschohn Nov 19, 2024
f1e1d50
Checkpoint add support for macros and feature flags for different par…
gregschohn Nov 20, 2024
dd98377
Cleanup + added a "preserve" flag to let templates inline source node…
gregschohn Nov 21, 2024
357a6a6
Add jinja-style regex matching to jinjava transformations.
gregschohn Nov 22, 2024
12c0cfa
Checkpoint - I've added a new invoke_macro java function to lookup a …
gregschohn Nov 23, 2024
e8dfb3f
Get a test that changes some contents via the route macro to pass.
gregschohn Nov 24, 2024
93dc5c8
Assorted jinjava template changes for replayer...
gregschohn Nov 24, 2024
4b32d42
Setup regex type mappings where one can specify index and types as re…
gregschohn Nov 25, 2024
0712eae
Further work to support bulk delete, index create with type mapping m…
gregschohn Dec 2, 2024
e6c0229
Updated READMEs and finished implementing a basic version of bulk tra…
gregschohn Dec 3, 2024
071368a
Transformation cleanup in light of new type mappings transformation.
gregschohn Dec 3, 2024
5c885fe
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 3, 2024
59fd4d9
Simple bugfixes, especially around more carefully staying away from r…
gregschohn Dec 4, 2024
fb9c20c
Remove YAML support for feature flags - nobody is using it and no rea…
gregschohn Dec 4, 2024
f5c4fed
Change how the transformation netty pipeline discovers if the payload…
gregschohn Dec 4, 2024
5be3ddc
Minor cleanup + the addition of a resource cache for jinjava java res…
gregschohn Dec 4, 2024
73c8e13
Lots of improvements for jinjava and type mappings transformations an…
gregschohn Dec 7, 2024
739d5d7
Implement a simple take on translating RFS bulk requests to use index…
gregschohn Dec 8, 2024
95b71f4
Checkpoint with further refactoring improvements, mostly around tests.
gregschohn Dec 9, 2024
2b1f3e0
Add a new preserve tag (preserveWhenMissing) to copy only when the ke…
gregschohn Dec 9, 2024
4e59965
Enable type mappings transform for the replayer in the docker-compose…
gregschohn Dec 9, 2024
09377de
Flip the default should throw behavior for HttpJsonMessageWithFaultin…
gregschohn Dec 4, 2024
fcf6bbc
Update type mapping sanitization READMEs to use backslash backreferen…
gregschohn Dec 9, 2024
39ead16
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 9, 2024
1df1642
Merge branch 'SaferPayloadFaults' into TypeMappingsTransformation
gregschohn Dec 9, 2024
fbc0291
Update remaining preserve calls so that protocol will be carried forw…
gregschohn Dec 9, 2024
c13ef4a
Bugfixes for type mappings removal transformations, README + other si…
gregschohn Dec 10, 2024
00968cd
Merge branch 'main' into TypeMappingsTransformation
gregschohn Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ dependencies {

implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]'"
# command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 "
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config '[{\"TypeMappingSanitizationTransformerProvider\":{\"sourceProperties\":{\"version\":{\"major\":7,\"minor\":10}}}}]'"
opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
environment:
Expand Down
1 change: 1 addition & 0 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJMESPathMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJinjavaTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonJoltMessageTransformerProvider')
runtimeOnly project(':transformation:transformationPlugins:jsonMessageTransformers:jsonTypeMappingsSanitizationTransformerProvider')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final String EXCEPTION_KEY_STRING = "Exception";
public static final String REQUEST_URI_KEY = "Request-URI";
public static final String METHOD_KEY = "Method";
public static final String HTTP_VERSION_KEY = "HTTP-Version";
public static final String PAYLOAD_KEY = "payload";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -183,15 +187,15 @@ private static Map<String, Object> convertRequest(
var message = (HttpJsonRequestWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("Request-URI", message.path());
map.put("Method", message.method());
map.put("HTTP-Version", message.protocol());
map.put(REQUEST_URI_KEY, message.path());
map.put(METHOD_KEY, message.method());
map.put(HTTP_VERSION_KEY, message.protocol());
context.setMethod(message.method());
context.setEndpoint(message.path());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down Expand Up @@ -223,14 +227,14 @@ private static Map<String, Object> convertResponse(
var message = (HttpJsonResponseWithFaultingPayload) messageHolder.get();
if (message != null) {
var map = new LinkedHashMap<>(message.headers());
map.put("HTTP-Version", message.protocol());
map.put(HTTP_VERSION_KEY, message.protocol());
map.put(STATUS_CODE_KEY, Integer.parseInt(message.code()));
map.put("Reason-Phrase", message.reason());
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
context.setHttpVersion(message.protocol());
encodeBinaryPayloadIfExists(message);
if (!message.payload().isEmpty()) {
map.put("payload", message.payload());
map.put(PAYLOAD_KEY, message.payload());
}
return map;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public static String getTransactionSummaryStringPreamble() {
.add("SOURCE_STATUS_CODE/TARGET_STATUS_CODE...")
.add("SOURCE_RESPONSE_SIZE_BYTES/TARGET_RESPONSE_SIZE_BYTES...")
.add("SOURCE_LATENCY_MS/TARGET_LATENCY_MS...")
.add("URI...")
.toString();
}

Expand Down Expand Up @@ -218,6 +219,11 @@ public static String toTransactionSummaryString(
transformStreamToString(parsed.targetResponseList.stream(),
r -> "" + r.get(ParsedHttpMessagesAsDicts.RESPONSE_TIME_MS_KEY))
)
// uri
.add(
parsed.sourceRequestOp
.map(r -> (String) r.get(ParsedHttpMessagesAsDicts.REQUEST_URI_KEY))
.orElse(MISSING_STR))
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ public Object getNextTopLevelObject() throws IOException {
pushCompletedValue(parser.getText());
break;
case VALUE_NUMBER_INT:
pushCompletedValue(parser.getIntValue());
pushCompletedValue(parser.getLongValue());
break;
case VALUE_NUMBER_FLOAT:
pushCompletedValue(parser.getFloatValue());
pushCompletedValue(parser.getDoubleValue());
break;
case NOT_AVAILABLE:
// pipeline stall - need more data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
private boolean payloadWasAccessed;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
disableThrowingPayloadNotLoaded = true;
underlyingMap = new TreeMap<>();
isJson = Optional.ofNullable(headers.get("content-type"))
.map(list -> list.stream().anyMatch(s -> s.startsWith("application/json")))
Expand Down Expand Up @@ -86,11 +87,11 @@ public Object get(Object key) {
return value;
}

public boolean missingPaylaodWasAccessed() {
public boolean missingPayloadWasAccessed() {
return payloadWasAccessed;
}

public void resetMissingPaylaodWasAccessed() {
public void resetMissingPayloadWasAccessed() {
payloadWasAccessed = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
httpJsonMessage
);
HttpJsonRequestWithFaultingPayload transformedMessage = null;
final var payloadMap = (PayloadAccessFaultingMap) httpJsonMessage.payload();
try {
payloadMap.setDisableThrowingPayloadNotLoaded(false);
transformedMessage = transform(transformer, httpJsonMessage);
} catch (Exception e) {
var payload = (PayloadAccessFaultingMap) httpJsonMessage.payload();
if (payload.missingPaylaodWasAccessed()) {
payload.resetMissingPaylaodWasAccessed();
if (payload.missingPayloadWasAccessed()) {
payload.resetMissingPayloadWasAccessed();
log.atDebug().setMessage("The transforms for this message require payload manipulation, "
+ "all content handlers are being loaded.").log();
// make a fresh message and its headers
Expand All @@ -85,6 +87,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
} else{
throw new TransformationException(e);
}
} finally {
payloadMap.setDisableThrowingPayloadNotLoaded(true);
}

if (transformedMessage != null) {
Expand Down Expand Up @@ -139,32 +143,40 @@ private void handlePayloadNeutralTransformationOrThrow(

var pipeline = ctx.pipeline();
if (streamingAuthTransformer != null) {
log.atInfo().setMessage("{} An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted."
);
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
} else if (headerFieldsAreIdentical(originalRequest, httpJsonMessage)) {
log.atInfo().setMessage("{} Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is.")
.addArgument(diagnosticLabel)
.log();
log.info(
diagnosticLabel
+ "Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is."
);
RequestPipelineOrchestrator.removeAllHandlers(pipeline);

} else if (headerFieldIsIdentical("content-encoding", originalRequest, httpJsonMessage)
&& headerFieldIsIdentical("transfer-encoding", originalRequest, httpJsonMessage)) {
log.atInfo().setMessage("{} There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed."
);
// By adding the baseline handlers and removing this and previous handlers in reverse order,
// we will cause the upstream handlers to flush their in-progress accumulated ByteBufs downstream
// to be processed accordingly
requestPipelineOrchestrator.addBaselineHandlers(pipeline);
ctx.fireChannelRead(httpJsonMessage);
RequestPipelineOrchestrator.removeThisAndPreviousHandlers(pipeline, this);
} else {
log.atInfo().setMessage("{} New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload."
);
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;

import org.opensearch.migrations.replay.datahandlers.JsonAccumulator;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
Expand All @@ -24,6 +25,7 @@
import io.netty.util.ReferenceCountUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

/**
* This accumulates HttpContent messages through a JsonAccumulator and eventually fires off a
Expand Down Expand Up @@ -93,7 +95,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
} catch (JacksonException e) {
log.atInfo().setCause(e).setMessage("Error parsing json body. " +
log.atLevel(hasRequestContentTypeMatching(capturedHttpJsonMessage,
// a JacksonException for non-json data doesn't need to be surfaced to a user
v -> v.startsWith("application/json")) ? Level.INFO : Level.TRACE)
.setCause(e).setMessage("Error parsing json body. " +
"Will pass all payload bytes directly as a ByteBuf within the payload map").log();
jsonWasInvalid = true;
parsedJsonObjects.clear();
Expand Down Expand Up @@ -123,7 +128,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

var leftoverBody = accumulatedBody.slice(jsonBodyByteLength,
accumulatedBody.readableBytes() - jsonBodyByteLength);
if (jsonBodyByteLength == 0 && isRequestContentTypeNotText(capturedHttpJsonMessage)) {
if (jsonBodyByteLength == 0 &&
hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> !v.startsWith("text/")))
{
context.onPayloadSetBinary();
capturedHttpJsonMessage.payload()
.put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY,
Expand Down Expand Up @@ -157,12 +164,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

private boolean isRequestContentTypeNotText(HttpJsonMessageWithFaultingPayload message) {
private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message,
Predicate<String> contentTypeFilter) {
// ContentType not text if specified and has a value with / and that value does not start with text/
return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString()))
.map(s -> s.stream()
.filter(v -> v.contains("/"))
.filter(v -> !v.startsWith("text/"))
.filter(contentTypeFilter)
.count() > 1
)
.orElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public void testTransformsPropagateExceptionProperly() throws JsonProcessingExce
FAULTING_MAP.setPath("/_bulk");
FAULTING_MAP.setHeaders(new ListKeyAdaptingCaseInsensitiveHeadersMap(new StrictCaseInsensitiveHttpHeadersMap()));
FAULTING_MAP.headers().put("Content-Type", "application/json");
FAULTING_MAP.setPayloadFaultMap(new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap()));
var payloadMap = new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap());
FAULTING_MAP.setPayloadFaultMap(payloadMap);
payloadMap.setDisableThrowingPayloadNotLoaded(false);
final String EXPECTED = "{\n"
+ " \"method\": \"PUT\",\n"
+ " \"URI\": \"/_bulk\",\n"
Expand All @@ -32,6 +34,6 @@ public void testTransformsPropagateExceptionProperly() throws JsonProcessingExce
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]");
var e = Assertions.assertThrows(Exception.class,
() -> transformer.transformJson(FAULTING_MAP));
Assertions.assertTrue(((PayloadAccessFaultingMap)FAULTING_MAP.payload()).missingPaylaodWasAccessed());
Assertions.assertTrue(((PayloadAccessFaultingMap)FAULTING_MAP.payload()).missingPayloadWasAccessed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.utils.TrackedFuture;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -331,8 +332,11 @@ public void testMalformedPayload_andThrowingTransformation_IsPassedThrough() thr
new TransformationLoader().getTransformerFactoryLoader(
HOST_NAME,
null,
"[{\"TypeMappingSanitizationTransformerProvider\":\"\"}]"
),
new ObjectMapper().writeValueAsString(List.of(
Map.of("JsonJinjavaTransformerProvider", Map.of(
"template", "{%- throw \"intentional exception\" -%}"
))
))),
null,
testPacketCapture,
rootContext.getTestConnectionRequestContext(0)
Expand Down Expand Up @@ -362,10 +366,7 @@ public void testMalformedPayload_andThrowingTransformation_IsPassedThrough() thr
);
var outputAndResult = finalizationFuture.get();
Assertions.assertInstanceOf(TransformationException.class,
TrackedFuture.unwindPossibleCompletionException(outputAndResult.transformationStatus.getException()),
"It's acceptable for now that the OpenSearch upgrade transformation can't handle non-json " +
"content. If that Transform wants to handle this on its own, we'll need to use another transform " +
"configuration so that it throws and we can do this test.");
TrackedFuture.unwindPossibleCompletionException(outputAndResult.transformationStatus.getException()));
var combinedOutputBuf = outputAndResult.transformedOutput.getResponseAsByteBuf();
Assertions.assertTrue(combinedOutputBuf.readableBytes() == 0);
combinedOutputBuf.release();
Expand Down
Loading
Loading