Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata add 1:many custom transformations #1225

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void multiTypeTransformationTest_union() {
var dataFilterArgs = new DataFilterArgs();
dataFilterArgs.indexAllowlist = List.of(originalIndexName);
arguments.dataFilterArgs = dataFilterArgs;
arguments.sourceVersion = upgradedSourceCluster.getContainerVersion().getVersion();
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved

// Use union method for multi-type mappings
arguments.metadataTransformationParams.multiTypeResolutionBehavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.UNION;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

Expand All @@ -19,10 +23,13 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
for (Transformer transformer : transformers) {
indexData = transformer.transformIndexMetadata(indexData);
}
return indexData;
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
return Stream.of(transformers)
.reduce(
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
Stream.of(indexData),
(stream, transformer) -> stream.flatMap(data -> transformer.transformIndexMetadata(data).stream()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spirit of this call - slicing the output into separate subsequent transforms is really weird. For the JSON Transformer composite transformer code, we'll do direct composition. In this case, there's an implicit fanout because the type for the per-index metadata transform calls is a scalar Index - not a collection of Indices.

My opinion here, from the limited code that I've seen, would be to abstain from allowing the RFS-MetaData CompositeTransformer to host transforms that can return lists. Keep two types of transforms. Merge the outputs of the list one with a specific class that does fanout across another sequence of 'simple' transforms. That might be a few more lines of code, but it might be a lot clearer.

Stream::concat
)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

Expand All @@ -16,5 +18,5 @@ public interface Transformer {
/**
* Takes the raw JSON representing the Index Metadata of one version and returns a new, transformed copy of the JSON
*/
public IndexMetadata transformIndexMetadata(IndexMetadata indexData);
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above about the composability problem. It's hard to get one's head around this once you know that we need to chain some transforms together.

What if you keep all of the "simple transforms" that are IndexMetadata->IndexMetadata. For when you need to return a list or take a list in, use an adapter that does the wrapping and any fanout. It feels capricious/fragile in the code below when we're boxing within a list.

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Adapts an {@link IJsonTransformer} to a {@link Transformer}.
*/
@Slf4j
public class TransformerToIJsonTransformerAdapter implements Transformer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name of this class backward? It feels like the IJsonTransformerAdapter is being transformed into a Metadata Transformer interface, not the other way around.

public static final String OUTPUT_TRANSFORMATION_JSON_LOGGER = "OutputTransformationJsonLogger";
Expand Down Expand Up @@ -74,12 +76,21 @@ private static String printMap(Object map) {
}

@SuppressWarnings("unchecked")
private MigrationItem transformMigrationItem(MigrationItem migrationItem) {
private List<MigrationItem> transformMigrationItem(MigrationItem migrationItem) {
// Keep untouched original for logging
final Map<String, Object> originalMap = MAPPER.convertValue(migrationItem, Map.class);
Object transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedMigrationItem);
return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class);
Object transformedResult = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedResult);

List<MigrationItem> transformedItems = new ArrayList<>();
if (transformedResult instanceof List) {
for (Object item : (List<Object>) transformedResult) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: map/collect would work too - this and the next comment could return immutable lists. If you can't do that, you should add why (as I think of it, you might not be able to do that - so it is tempting).

transformedItems.add(MAPPER.convertValue(item, MigrationItem.class));
}
} else {
transformedItems.add(MAPPER.convertValue(transformedResult, MigrationItem.class));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: List.of() would work too

}
return transformedItems;
}

void updateTemplates(Collection<? extends MigrationItem> transformedItems, ObjectNode itemsRoot) {
Expand All @@ -103,7 +114,6 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
Object afterJson = transformer.transformJson(inputJson);
log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log();


final List<LegacyTemplate> legacyTemplates = new ArrayList<>();
globalData.getTemplates().fields().forEachRemaining(
entry -> legacyTemplates.add(new LegacyTemplate(entry.getKey(), (ObjectNode) entry.getValue()))
Expand All @@ -122,7 +132,8 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
indexTemplates.stream()),
componentTemplates.stream()
)
.map(this::transformMigrationItem).collect(Collectors.toList());
.flatMap(item -> transformMigrationItem(item).stream())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting this calling pattern to be a transform on the whole list would be safer (no collisions), give the user more flexibility, & probably also make it easier for genAI to create complete transforms.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect(Collectors.toList());
Comment on lines +133 to +134
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some risk here. If a custom transform maps some components to OS2 and leaves others behind. We'd really want/need to let version transforms run for some things and not others. As discussed w/ Andre, adding a version key to each item would help us identify the version in the transform and also let subsequent transforms know that it's already been updated to that schema.


var transformedLegacy = transformedTemplates.stream()
.filter(LegacyTemplate.class::isInstance)
Expand Down Expand Up @@ -151,12 +162,19 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {

@SuppressWarnings("unchecked")
@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
final Map<String, Object> originalInput = MAPPER.convertValue(indexData, Map.class);
final Map<String, Object> inputJson = MAPPER.convertValue(indexData, Map.class);
Object afterJson = transformer.transformJson(inputJson);
logTransformation(originalInput, afterJson);
return MAPPER.convertValue(inputJson, IndexMetadata.class);
if (afterJson instanceof List) {
return ((List<Map<String, Object>>) afterJson).stream()
.map(json -> MAPPER.convertValue(json, IndexMetadata.class))
.collect(Collectors.toList());
} else if (afterJson instanceof Map) {
return List.of(MAPPER.convertValue(afterJson, IndexMetadata.class));
} else {
throw new IllegalArgumentException("Unexpected transformation result type: " + afterJson.getClass());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public ObjectNode getRawJson() {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata index) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata index) {
var copy = index.deepCopy();
transformIndex(copy, IndexType.CONCRETE);
return new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName());
return List.of(new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName()));
}

private void transformIndex(Index index, IndexType type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.version_os_2_11.GlobalMetadataData_OS_2_11;
Expand Down Expand Up @@ -84,7 +86,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
log.atDebug().setMessage("Original Object: {}").addArgument(indexData::getRawJson).log();
var copy = indexData.deepCopy();
var newRoot = copy.getRawJson();
Expand All @@ -96,6 +98,6 @@ public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality);

log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log();
return copy;
return List.of(copy);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.ArrayList;
import java.util.List;

import org.opensearch.migrations.MigrationMode;
Expand Down Expand Up @@ -33,44 +34,56 @@
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.forEach(index -> {
CreationResult creationResult;
List<CreationResult> creationResults;
if (skipCreation.test(index.getName())) {
log.atInfo()
.setMessage("Index {} was not part of the allowlist and will not be migrated.")
.addArgument(index.getName())
.log();
creationResult = CreationResult.builder()
creationResults = List.of(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
.build());
} else {
creationResult = createIndex(index.getName(), mode, context);
creationResults = createIndex(index.getName(), mode, context);
}

results.index(creationResult);
creationResults.forEach(results::index);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(creationResult.getFailureType());
aliasResult.failureType(creationResults.get(0).getFailureType());
Comment on lines +37 to +56
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, this code might be quicker to read if it were tighter (it would be easy to compress w/out the log statement - which could be moved to outside the construction.

results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
private List<CreationResult> createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
List<CreationResult> creationResults = new ArrayList<>();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
List<IndexMetadata> transformedMetadataList = transformer.transformIndexMetadata(indexMetadata);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree w/ the SonarQube warning - if this fails, just return a List.of(...).
Then, if it succeeds, go into the for loop w/ the separate try-catch blocks

for (IndexMetadata transformedMetadata : transformedMetadataList) {
try {

Check failure on line 70 in RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1141

Extract this nested try block into a separate method.
creationResults.add(indexCreator.create(transformedMetadata, mode, context));
} catch (Exception e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did sonarqube not flag catching a raw Exception?

creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build());
}
}
} catch (Exception e) {
return CreationResult.builder()
creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
.build());
}
return creationResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public void transformIndexMetadata_AsExpected() throws Exception {
Transformer_ES_7_10_OS_2_11 transformer = new Transformer_ES_7_10_OS_2_11(2);

IndexMetadata indexMetadataBwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "bwc_index_1");
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc);
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc).get(0);
IndexMetadataData_OS_2_11 finalIndexBwc =new IndexMetadataData_OS_2_11(transformedIndexBwc.getRawJson(), transformedIndexBwc.getId(), transformedIndexBwc.getName());

IndexMetadata indexMetadataFwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "fwc_index_1");
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc);
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc).get(0);
IndexMetadataData_OS_2_11 finalIndexFwc =new IndexMetadataData_OS_2_11(transformedIndexFwc.getRawJson(), transformedIndexFwc.getId(), transformedIndexFwc.getName());

IndexMetadata indexMetadataNoMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "no_mappings_no_docs");
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs);
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexNoMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexNoMappingNoDocs.getRawJson(), transformedIndexNoMappingNoDocs.getId(), transformedIndexNoMappingNoDocs.getName());

IndexMetadata indexMetadataEmptyMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "empty_mappings_no_docs");
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs);
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexEmptyMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexEmptyMappingNoDocs.getRawJson(), transformedIndexEmptyMappingNoDocs.getId(), transformedIndexEmptyMappingNoDocs.getName());

String expectedIndexBwc = "{\"version\":3,\"mapping_version\":1,\"settings_version\":1,\"aliases_version\":1,\"routing_num_shards\":1024,\"state\":\"open\",\"settings\":{\"creation_date\":\"1727459371883\",\"number_of_replicas\":1,\"number_of_shards\":\"1\",\"provided_name\":\"bwc_index_1\",\"uuid\":\"tBmFXxGhTeiDlznQiKfNCA\",\"version\":{\"created\":\"7100299\"}},\"mappings\":{\"properties\":{\"content\":{\"type\":\"text\"},\"title\":{\"type\":\"text\"}}},\"aliases\":{\"bwc_alias\":{}},\"primary_terms\":[1],\"in_sync_allocations\":{\"0\":[\"jSYEePXYTka3EJ0vvdPGJA\"]},\"rollover_info\":{},\"system\":false}";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.transformation.rules;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.CanApplyResult.Unsupported;
import org.opensearch.migrations.transformation.TransformationRule;
import org.opensearch.migrations.transformation.entity.Index;

Expand Down Expand Up @@ -77,10 +76,10 @@ public CanApplyResult canApply(final Index index) {
// 2. <pre>{"mappings": [{ "foo": {...}, "bar": {...} }]}</pre>
if (mappingNode.isArray() && (mappingNode.size() > 1 || mappingNode.get(0).size() > 1)) {
if (MultiTypeResolutionBehavior.NONE.equals(multiTypeResolutionBehavior)) {
return new Unsupported("No multi type resolution behavior declared, specify --multi-type-behavior to process");
throw new IllegalArgumentException("No multi type resolution behavior declared, specify --multi-type-behavior to process");
}
if (MultiTypeResolutionBehavior.SPLIT.equals(multiTypeResolutionBehavior)) {
return new Unsupported("Split on multiple mapping types is not supported");
throw new IllegalArgumentException("Split on multiple mapping types is not supported");
}
// Support UNION
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.function.Function;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.CanApplyResult.Unsupported;
import org.opensearch.migrations.transformation.entity.Index;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -15,7 +14,6 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -240,14 +238,11 @@ void testApplyTransformation_twoCustomTypes(String resolutionBehavior, String ex

var behavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(resolutionBehavior);

// Action
var wasChanged = applyTransformation(behavior, indexJson);
var canApply = canApply(behavior, originalJson);
assertThat(canApply, instanceOf(Unsupported.class));
assertThat(((Unsupported) canApply).getReason(), equalTo(expectedReason));
// Action & Assertion
var exception = assertThrows(IllegalArgumentException.class, () -> canApply(behavior, originalJson));
assertThat(exception.getMessage(), equalTo(expectedReason));

// Verification
assertThat(wasChanged, equalTo(false));
assertThat(originalJson.toPrettyString(), equalTo(indexJson.toPrettyString()));
}

Expand All @@ -263,14 +258,11 @@ void testApplyTransformation_twoMappingEntries(String resolutionBehavior, String
var indexJson = originalJson.deepCopy();
var behavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(resolutionBehavior);

// Action
var wasChanged = applyTransformation(behavior, indexJson);
var canApply = canApply(behavior, originalJson);
assertThat(canApply, instanceOf(Unsupported.class));
assertThat(((Unsupported) canApply).getReason(), equalTo(expectedReason));
// Action & Assertion
var exception = assertThrows(IllegalArgumentException.class, () -> canApply(behavior, originalJson));
assertThat(exception.getMessage(), equalTo(expectedReason));

// Verification
assertThat(wasChanged, equalTo(false));
assertThat(originalJson.toPrettyString(), equalTo(indexJson.toPrettyString()));
}

Expand Down
Loading