Skip to content

Commit

Permalink
Metadata add 1:many custom transformations (#1225)
Browse files Browse the repository at this point in the history
* Metadata add 1:many custom transformations

Signed-off-by: Andre Kurait <[email protected]>

* Fix FanOutCompositeTransformer

Signed-off-by: Andre Kurait <[email protected]>

---------

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait authored Jan 9, 2025
1 parent 9781a9b commit ee3fcc5
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import org.opensearch.migrations.MigrateOrEvaluateArgs;
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.transformers.CompositeTransformer;
import org.opensearch.migrations.bulkload.transformers.FanOutCompositeTransformer;
import org.opensearch.migrations.bulkload.transformers.TransformFunctions;
import org.opensearch.migrations.bulkload.transformers.Transformer;
import org.opensearch.migrations.bulkload.transformers.TransformerToIJsonTransformerAdapter;
Expand Down Expand Up @@ -76,7 +76,7 @@ protected Transformer selectTransformer(Clusters clusters) {
arguments.metadataTransformationParams
);
var customTransformer = getCustomTransformer();
var compositeTransformer = new CompositeTransformer(customTransformer, versionTransformer);
var compositeTransformer = new FanOutCompositeTransformer(customTransformer, versionTransformer);
log.atInfo().setMessage("Selected transformer: {}").addArgument(compositeTransformer).log();
return compositeTransformer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void multiTypeTransformationTest_union() {
var dataFilterArgs = new DataFilterArgs();
dataFilterArgs.indexAllowlist = List.of(originalIndexName);
arguments.dataFilterArgs = dataFilterArgs;
arguments.sourceVersion = upgradedSourceCluster.getContainerVersion().getVersion();

// Use union method for multi-type mappings
arguments.metadataTransformationParams.multiTypeResolutionBehavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.UNION;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

public class CompositeTransformer implements Transformer {
public class FanOutCompositeTransformer implements Transformer {
private final Transformer[] transformers;

public CompositeTransformer(Transformer... transformers) {
public FanOutCompositeTransformer(Transformer... transformers) {
this.transformers = transformers;
}

Expand All @@ -19,10 +23,11 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
var indexDataStream = Stream.of(indexData);
for (Transformer transformer : transformers) {
indexData = transformer.transformIndexMetadata(indexData);
indexDataStream = indexDataStream.flatMap(data -> transformer.transformIndexMetadata(data).stream());
}
return indexData;
return indexDataStream.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

Expand All @@ -16,5 +18,5 @@ public interface Transformer {
/**
* Takes the raw JSON representing the Index Metadata of one version and returns a new, transformed copy of the JSON
*/
public IndexMetadata transformIndexMetadata(IndexMetadata indexData);
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Adapts an {@link IJsonTransformer} to a {@link Transformer}.
*/
@Slf4j
public class TransformerToIJsonTransformerAdapter implements Transformer {
public static final String OUTPUT_TRANSFORMATION_JSON_LOGGER = "OutputTransformationJsonLogger";
Expand Down Expand Up @@ -74,12 +76,21 @@ private static String printMap(Object map) {
}

@SuppressWarnings("unchecked")
private MigrationItem transformMigrationItem(MigrationItem migrationItem) {
private List<MigrationItem> transformMigrationItem(MigrationItem migrationItem) {
// Keep untouched original for logging
final Map<String, Object> originalMap = MAPPER.convertValue(migrationItem, Map.class);
Object transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedMigrationItem);
return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class);
Object transformedResult = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedResult);

List<MigrationItem> transformedItems = new ArrayList<>();
if (transformedResult instanceof List) {
for (Object item : (List<Object>) transformedResult) {
transformedItems.add(MAPPER.convertValue(item, MigrationItem.class));
}
} else {
transformedItems.add(MAPPER.convertValue(transformedResult, MigrationItem.class));
}
return transformedItems;
}

void updateTemplates(Collection<? extends MigrationItem> transformedItems, ObjectNode itemsRoot) {
Expand All @@ -98,11 +109,8 @@ void updateTemplates(Collection<? extends MigrationItem> transformedItems, Objec

@Override
public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
var inputJson = objectNodeToMap(globalData.toObjectNode());
log.atInfo().setMessage("BeforeJsonGlobal: {}").addArgument(() -> printMap(inputJson)).log();
Object afterJson = transformer.transformJson(inputJson);
log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log();

log.atInfo().setMessage("BeforeJsonGlobal: {}")
.addArgument(() -> printMap(objectNodeToMap(globalData.toObjectNode()))).log();

final List<LegacyTemplate> legacyTemplates = new ArrayList<>();
globalData.getTemplates().fields().forEachRemaining(
Expand All @@ -122,7 +130,8 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
indexTemplates.stream()),
componentTemplates.stream()
)
.map(this::transformMigrationItem).collect(Collectors.toList());
.flatMap(item -> transformMigrationItem(item).stream())
.collect(Collectors.toList());

var transformedLegacy = transformedTemplates.stream()
.filter(LegacyTemplate.class::isInstance)
Expand All @@ -139,24 +148,30 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
.map(ComponentTemplate.class::cast)
.collect(Collectors.toList());

assert transformedLegacy.size() + transformedIndex.size() + transformedComponent.size() == transformedTemplates.size();

updateTemplates(transformedLegacy, globalData.getTemplates());
updateTemplates(transformedIndex, globalData.getIndexTemplates());
updateTemplates(transformedComponent, globalData.getComponentTemplates());

log.atInfo().setMessage("GlobalOutput: {}").addArgument(() -> printMap(objectNodeToMap(globalData.toObjectNode()))).log();
log.atInfo().setMessage("AfterJsonGlobal: {}")
.addArgument(() -> printMap(objectNodeToMap(globalData.toObjectNode()))).log();
return globalData;
}

@SuppressWarnings("unchecked")
@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
final Map<String, Object> originalInput = MAPPER.convertValue(indexData, Map.class);
final Map<String, Object> inputJson = MAPPER.convertValue(indexData, Map.class);
Object afterJson = transformer.transformJson(inputJson);
logTransformation(originalInput, afterJson);
return MAPPER.convertValue(inputJson, IndexMetadata.class);
if (afterJson instanceof List) {
return ((List<Map<String, Object>>) afterJson).stream()
.map(json -> MAPPER.convertValue(json, IndexMetadata.class))
.collect(Collectors.toList());
} else if (afterJson instanceof Map) {
return List.of(MAPPER.convertValue(afterJson, IndexMetadata.class));
} else {
throw new IllegalArgumentException("Unexpected transformation result type: " + afterJson.getClass());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public ObjectNode getRawJson() {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata index) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata index) {
var copy = index.deepCopy();
transformIndex(copy, IndexType.CONCRETE);
return new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName());
return List.of(new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName()));
}

private void transformIndex(Index index, IndexType type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.version_os_2_11.GlobalMetadataData_OS_2_11;
Expand Down Expand Up @@ -84,7 +86,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
log.atDebug().setMessage("Original Object: {}").addArgument(indexData::getRawJson).log();
var copy = indexData.deepCopy();
var newRoot = copy.getRawJson();
Expand All @@ -96,6 +98,6 @@ public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality);

log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log();
return copy;
return List.of(copy);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.ArrayList;
import java.util.List;

import org.opensearch.migrations.MigrationMode;
Expand Down Expand Up @@ -33,44 +34,56 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.forEach(index -> {
CreationResult creationResult;
List<CreationResult> creationResults;
if (skipCreation.test(index.getName())) {
log.atInfo()
.setMessage("Index {} was not part of the allowlist and will not be migrated.")
.addArgument(index.getName())
.log();
creationResult = CreationResult.builder()
creationResults = List.of(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
.build());
} else {
creationResult = createIndex(index.getName(), mode, context);
creationResults = createIndex(index.getName(), mode, context);
}

results.index(creationResult);
creationResults.forEach(results::index);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(creationResult.getFailureType());
aliasResult.failureType(creationResults.get(0).getFailureType());
results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
private List<CreationResult> createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
List<CreationResult> creationResults = new ArrayList<>();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
List<IndexMetadata> transformedMetadataList = transformer.transformIndexMetadata(indexMetadata);
for (IndexMetadata transformedMetadata : transformedMetadataList) {
try {

Check failure on line 70 in RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1141

Extract this nested try block into a separate method.
creationResults.add(indexCreator.create(transformedMetadata, mode, context));
} catch (Exception e) {
creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build());
}
}
} catch (Exception e) {
return CreationResult.builder()
creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
.build());
}
return creationResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class CompositeTransformerTest {
public class FanOutCompositeTransformerTest {

@Test
public void testCompositeTransformer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public void transformIndexMetadata_AsExpected() throws Exception {
Transformer_ES_7_10_OS_2_11 transformer = new Transformer_ES_7_10_OS_2_11(2);

IndexMetadata indexMetadataBwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "bwc_index_1");
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc);
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc).get(0);
IndexMetadataData_OS_2_11 finalIndexBwc =new IndexMetadataData_OS_2_11(transformedIndexBwc.getRawJson(), transformedIndexBwc.getId(), transformedIndexBwc.getName());

IndexMetadata indexMetadataFwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "fwc_index_1");
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc);
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc).get(0);
IndexMetadataData_OS_2_11 finalIndexFwc =new IndexMetadataData_OS_2_11(transformedIndexFwc.getRawJson(), transformedIndexFwc.getId(), transformedIndexFwc.getName());

IndexMetadata indexMetadataNoMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "no_mappings_no_docs");
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs);
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexNoMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexNoMappingNoDocs.getRawJson(), transformedIndexNoMappingNoDocs.getId(), transformedIndexNoMappingNoDocs.getName());

IndexMetadata indexMetadataEmptyMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "empty_mappings_no_docs");
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs);
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexEmptyMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexEmptyMappingNoDocs.getRawJson(), transformedIndexEmptyMappingNoDocs.getId(), transformedIndexEmptyMappingNoDocs.getName());

String expectedIndexBwc = "{\"version\":3,\"mapping_version\":1,\"settings_version\":1,\"aliases_version\":1,\"routing_num_shards\":1024,\"state\":\"open\",\"settings\":{\"creation_date\":\"1727459371883\",\"number_of_replicas\":1,\"number_of_shards\":\"1\",\"provided_name\":\"bwc_index_1\",\"uuid\":\"tBmFXxGhTeiDlznQiKfNCA\",\"version\":{\"created\":\"7100299\"}},\"mappings\":{\"properties\":{\"content\":{\"type\":\"text\"},\"title\":{\"type\":\"text\"}}},\"aliases\":{\"bwc_alias\":{}},\"primary_terms\":[1],\"in_sync_allocations\":{\"0\":[\"jSYEePXYTka3EJ0vvdPGJA\"]},\"rollover_info\":{},\"system\":false}";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.transformation.rules;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.CanApplyResult.Unsupported;
import org.opensearch.migrations.transformation.TransformationRule;
import org.opensearch.migrations.transformation.entity.Index;

Expand Down Expand Up @@ -77,10 +76,10 @@ public CanApplyResult canApply(final Index index) {
// 2. <pre>{"mappings": [{ "foo": {...}, "bar": {...} }]}</pre>
if (mappingNode.isArray() && (mappingNode.size() > 1 || mappingNode.get(0).size() > 1)) {
if (MultiTypeResolutionBehavior.NONE.equals(multiTypeResolutionBehavior)) {
return new Unsupported("No multi type resolution behavior declared, specify --multi-type-behavior to process");
throw new IllegalArgumentException("No multi type resolution behavior declared, specify --multi-type-behavior to process");
}
if (MultiTypeResolutionBehavior.SPLIT.equals(multiTypeResolutionBehavior)) {
return new Unsupported("Split on multiple mapping types is not supported");
throw new IllegalArgumentException("Split on multiple mapping types is not supported");
}
// Support UNION
}
Expand Down
Loading

0 comments on commit ee3fcc5

Please sign in to comment.