Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Metadata add 1:many custom transformations
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
AndreKurait committed Jan 8, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 9781a9b commit 9e2899f
Showing 10 changed files with 89 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -81,6 +81,7 @@ public void multiTypeTransformationTest_union() {
var dataFilterArgs = new DataFilterArgs();
dataFilterArgs.indexAllowlist = List.of(originalIndexName);
arguments.dataFilterArgs = dataFilterArgs;
arguments.sourceVersion = upgradedSourceCluster.getContainerVersion().getVersion();

// Use union method for multi-type mappings
arguments.metadataTransformationParams.multiTypeResolutionBehavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.UNION;
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

@@ -19,10 +23,13 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
for (Transformer transformer : transformers) {
indexData = transformer.transformIndexMetadata(indexData);
}
return indexData;
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
return Stream.of(transformers)
.reduce(
Stream.of(indexData),
(stream, transformer) -> stream.flatMap(data -> transformer.transformIndexMetadata(data).stream()),
Stream::concat
)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;

@@ -16,5 +18,5 @@ public interface Transformer {
/**
* Takes the raw JSON representing the Index Metadata of one version and returns a new, transformed copy of the JSON
*/
public IndexMetadata transformIndexMetadata(IndexMetadata indexData);
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData);
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Adapts an {@link IJsonTransformer} to a {@link Transformer}.
*/
@Slf4j
public class TransformerToIJsonTransformerAdapter implements Transformer {
public static final String OUTPUT_TRANSFORMATION_JSON_LOGGER = "OutputTransformationJsonLogger";
@@ -74,12 +76,21 @@ private static String printMap(Object map) {
}

@SuppressWarnings("unchecked")
private MigrationItem transformMigrationItem(MigrationItem migrationItem) {
private List<MigrationItem> transformMigrationItem(MigrationItem migrationItem) {
// Keep untouched original for logging
final Map<String, Object> originalMap = MAPPER.convertValue(migrationItem, Map.class);
Object transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedMigrationItem);
return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class);
Object transformedResult = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class));
logTransformation(originalMap, transformedResult);

List<MigrationItem> transformedItems = new ArrayList<>();
if (transformedResult instanceof List) {
for (Object item : (List<Object>) transformedResult) {
transformedItems.add(MAPPER.convertValue(item, MigrationItem.class));
}
} else {
transformedItems.add(MAPPER.convertValue(transformedResult, MigrationItem.class));
}
return transformedItems;
}

void updateTemplates(Collection<? extends MigrationItem> transformedItems, ObjectNode itemsRoot) {
@@ -103,7 +114,6 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
Object afterJson = transformer.transformJson(inputJson);
log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log();


final List<LegacyTemplate> legacyTemplates = new ArrayList<>();
globalData.getTemplates().fields().forEachRemaining(
entry -> legacyTemplates.add(new LegacyTemplate(entry.getKey(), (ObjectNode) entry.getValue()))
@@ -122,7 +132,8 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {
indexTemplates.stream()),
componentTemplates.stream()
)
.map(this::transformMigrationItem).collect(Collectors.toList());
.flatMap(item -> transformMigrationItem(item).stream())
.collect(Collectors.toList());

var transformedLegacy = transformedTemplates.stream()
.filter(LegacyTemplate.class::isInstance)
@@ -151,12 +162,19 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) {

@SuppressWarnings("unchecked")
@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
final Map<String, Object> originalInput = MAPPER.convertValue(indexData, Map.class);
final Map<String, Object> inputJson = MAPPER.convertValue(indexData, Map.class);
Object afterJson = transformer.transformJson(inputJson);
logTransformation(originalInput, afterJson);
return MAPPER.convertValue(inputJson, IndexMetadata.class);
if (afterJson instanceof List) {
return ((List<Map<String, Object>>) afterJson).stream()
.map(json -> MAPPER.convertValue(json, IndexMetadata.class))
.collect(Collectors.toList());
} else if (afterJson instanceof Map) {
return List.of(MAPPER.convertValue(afterJson, IndexMetadata.class));
} else {
throw new IllegalArgumentException("Unexpected transformation result type: " + afterJson.getClass());
}
}

}
Original file line number Diff line number Diff line change
@@ -85,10 +85,10 @@ public ObjectNode getRawJson() {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata index) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata index) {
var copy = index.deepCopy();
transformIndex(copy, IndexType.CONCRETE);
return new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName());
return List.of(new IndexMetadataData_OS_2_11(copy.getRawJson(), copy.getId(), copy.getName()));
}

private void transformIndex(Index index, IndexType type) {
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.bulkload.transformers;

import java.util.List;

import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.version_os_2_11.GlobalMetadataData_OS_2_11;
@@ -84,7 +86,7 @@ public GlobalMetadata transformGlobalMetadata(GlobalMetadata metaData) {
}

@Override
public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
public List<IndexMetadata> transformIndexMetadata(IndexMetadata indexData) {
log.atDebug().setMessage("Original Object: {}").addArgument(indexData::getRawJson).log();
var copy = indexData.deepCopy();
var newRoot = copy.getRawJson();
@@ -96,6 +98,6 @@ public IndexMetadata transformIndexMetadata(IndexMetadata indexData) {
TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality);

log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log();
return copy;
return List.of(copy);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.ArrayList;
import java.util.List;

import org.opensearch.migrations.MigrationMode;
@@ -33,44 +34,56 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.forEach(index -> {
CreationResult creationResult;
List<CreationResult> creationResults;
if (skipCreation.test(index.getName())) {
log.atInfo()
.setMessage("Index {} was not part of the allowlist and will not be migrated.")
.addArgument(index.getName())
.log();
creationResult = CreationResult.builder()
creationResults = List.of(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
.build());
} else {
creationResult = createIndex(index.getName(), mode, context);
creationResults = createIndex(index.getName(), mode, context);
}

results.index(creationResult);
creationResults.forEach(results::index);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(creationResult.getFailureType());
aliasResult.failureType(creationResults.get(0).getFailureType());
results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
private List<CreationResult> createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
List<CreationResult> creationResults = new ArrayList<>();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
List<IndexMetadata> transformedMetadataList = transformer.transformIndexMetadata(indexMetadata);
for (IndexMetadata transformedMetadata : transformedMetadataList) {
try {

Check failure on line 70 in RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java

GitHub Actions / Run SonarQube Analysis

java:S1141

Extract this nested try block into a separate method.
creationResults.add(indexCreator.create(transformedMetadata, mode, context));
} catch (Exception e) {
creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build());
}
}
} catch (Exception e) {
return CreationResult.builder()
creationResults.add(CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, e))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
.build());
}
return creationResults;
}
}
Original file line number Diff line number Diff line change
@@ -51,19 +51,19 @@ public void transformIndexMetadata_AsExpected() throws Exception {
Transformer_ES_7_10_OS_2_11 transformer = new Transformer_ES_7_10_OS_2_11(2);

IndexMetadata indexMetadataBwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "bwc_index_1");
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc);
IndexMetadata transformedIndexBwc = transformer.transformIndexMetadata(indexMetadataBwc).get(0);
IndexMetadataData_OS_2_11 finalIndexBwc =new IndexMetadataData_OS_2_11(transformedIndexBwc.getRawJson(), transformedIndexBwc.getId(), transformedIndexBwc.getName());

IndexMetadata indexMetadataFwc = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "fwc_index_1");
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc);
IndexMetadata transformedIndexFwc = transformer.transformIndexMetadata(indexMetadataFwc).get(0);
IndexMetadataData_OS_2_11 finalIndexFwc =new IndexMetadataData_OS_2_11(transformedIndexFwc.getRawJson(), transformedIndexFwc.getId(), transformedIndexFwc.getName());

IndexMetadata indexMetadataNoMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "no_mappings_no_docs");
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs);
IndexMetadata transformedIndexNoMappingNoDocs = transformer.transformIndexMetadata(indexMetadataNoMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexNoMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexNoMappingNoDocs.getRawJson(), transformedIndexNoMappingNoDocs.getId(), transformedIndexNoMappingNoDocs.getName());

IndexMetadata indexMetadataEmptyMappingNoDocs = sourceResourceProvider.getIndexMetadata().fromRepo(snapshot.name, "empty_mappings_no_docs");
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs);
IndexMetadata transformedIndexEmptyMappingNoDocs = transformer.transformIndexMetadata(indexMetadataEmptyMappingNoDocs).get(0);
IndexMetadataData_OS_2_11 finalIndexEmptyMappingNoDocs = new IndexMetadataData_OS_2_11(transformedIndexEmptyMappingNoDocs.getRawJson(), transformedIndexEmptyMappingNoDocs.getId(), transformedIndexEmptyMappingNoDocs.getName());

String expectedIndexBwc = "{\"version\":3,\"mapping_version\":1,\"settings_version\":1,\"aliases_version\":1,\"routing_num_shards\":1024,\"state\":\"open\",\"settings\":{\"creation_date\":\"1727459371883\",\"number_of_replicas\":1,\"number_of_shards\":\"1\",\"provided_name\":\"bwc_index_1\",\"uuid\":\"tBmFXxGhTeiDlznQiKfNCA\",\"version\":{\"created\":\"7100299\"}},\"mappings\":{\"properties\":{\"content\":{\"type\":\"text\"},\"title\":{\"type\":\"text\"}}},\"aliases\":{\"bwc_alias\":{}},\"primary_terms\":[1],\"in_sync_allocations\":{\"0\":[\"jSYEePXYTka3EJ0vvdPGJA\"]},\"rollover_info\":{},\"system\":false}";
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.migrations.transformation.rules;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.CanApplyResult.Unsupported;
import org.opensearch.migrations.transformation.TransformationRule;
import org.opensearch.migrations.transformation.entity.Index;

@@ -77,12 +76,12 @@ public CanApplyResult canApply(final Index index) {
// 2. <pre>{"mappings": [{ "foo": {...}, "bar": {...} }]}</pre>
if (mappingNode.isArray() && (mappingNode.size() > 1 || mappingNode.get(0).size() > 1)) {
if (MultiTypeResolutionBehavior.NONE.equals(multiTypeResolutionBehavior)) {
return new Unsupported("No multi type resolution behavior declared, specify --multi-type-behavior to process");
throw new IllegalArgumentException("No multi type resolution behavior declared, specify --multi-type-behavior to process");
}
if (MultiTypeResolutionBehavior.SPLIT.equals(multiTypeResolutionBehavior)) {
return new Unsupported("Split on multiple mapping types is not supported");
throw new IllegalArgumentException("Split on multiple mapping types is not supported");
}
// Support UNION
// Support UNION, SPLIT
}

// There is a type under mappings
@@ -140,6 +139,8 @@ public boolean applyTransformation(final Index index) {
});
}
);
} else if (MultiTypeResolutionBehavior.SPLIT.equals(multiTypeResolutionBehavior)) {

}
index.getRawJson().set(MAPPINGS_KEY, resolvedMappingsNode);
}
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
import java.util.function.Function;

import org.opensearch.migrations.transformation.CanApplyResult;
import org.opensearch.migrations.transformation.CanApplyResult.Unsupported;
import org.opensearch.migrations.transformation.entity.Index;

import com.fasterxml.jackson.databind.ObjectMapper;
@@ -15,7 +14,6 @@
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mockito;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -240,14 +238,11 @@ void testApplyTransformation_twoCustomTypes(String resolutionBehavior, String ex

var behavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(resolutionBehavior);

// Action
var wasChanged = applyTransformation(behavior, indexJson);
var canApply = canApply(behavior, originalJson);
assertThat(canApply, instanceOf(Unsupported.class));
assertThat(((Unsupported) canApply).getReason(), equalTo(expectedReason));
// Action & Assertion
var exception = assertThrows(IllegalArgumentException.class, () -> canApply(behavior, originalJson));
assertThat(exception.getMessage(), equalTo(expectedReason));

// Verification
assertThat(wasChanged, equalTo(false));
assertThat(originalJson.toPrettyString(), equalTo(indexJson.toPrettyString()));
}

@@ -263,14 +258,11 @@ void testApplyTransformation_twoMappingEntries(String resolutionBehavior, String
var indexJson = originalJson.deepCopy();
var behavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(resolutionBehavior);

// Action
var wasChanged = applyTransformation(behavior, indexJson);
var canApply = canApply(behavior, originalJson);
assertThat(canApply, instanceOf(Unsupported.class));
assertThat(((Unsupported) canApply).getReason(), equalTo(expectedReason));
// Action & Assertion
var exception = assertThrows(IllegalArgumentException.class, () -> canApply(behavior, originalJson));
assertThat(exception.getMessage(), equalTo(expectedReason));

// Verification
assertThat(wasChanged, equalTo(false));
assertThat(originalJson.toPrettyString(), equalTo(indexJson.toPrettyString()));
}

0 comments on commit 9e2899f

Please sign in to comment.