Skip to content

Commit

Permalink
Fix bug where ingestion failed for input document containing list of …
Browse files Browse the repository at this point in the history
…nested objects (#1040)

* Fix bug where ingestion failed for input document containing list of nested objects

Signed-off-by: Yizhe Liu <[email protected]>

* Address comments to use better method name/implementation

Signed-off-by: Yizhe Liu <[email protected]>

* Address comments: modify the test case to have doc with various fields

Signed-off-by: Yizhe Liu <[email protected]>

---------

Signed-off-by: Yizhe Liu <[email protected]>
  • Loading branch information
yizheliu-amazon authored and martin-gaievski committed Jan 13, 2025
1 parent 0afd102 commit d5a176e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 365 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

import org.opensearch.index.query.MatchQueryBuilder;

import static org.opensearch.knn.index.query.KNNQueryBuilder.EXPAND_NESTED_FIELD;
import static org.opensearch.neuralsearch.common.MinClusterVersionUtil.isClusterOnOrAfterMinReqVersion;
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.PARAM_NAME_WEIGHTS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
preprocessIngestDocument(ingestDocument);
validateEmbeddingFieldsValue(ingestDocument);
Map<String, Object> processMap = buildMapWithTargetKeys(ingestDocument);
List<String> inferenceList = createInferenceList(processMap);
Expand All @@ -151,15 +150,6 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
}
}

@VisibleForTesting
void preprocessIngestDocument(IngestDocument ingestDocument) {
if (ingestDocument == null || ingestDocument.getSourceAndMetadata() == null) return;
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
Map<String, Object> unflattened = ProcessorDocumentUtils.unflattenJson(sourceAndMetadataMap);
unflattened.forEach(ingestDocument::setFieldValue);
sourceAndMetadataMap.keySet().removeIf(key -> key.contains("."));
}

/**
* This is the function which does actual inference work for batchExecute interface.
* @param inferenceList a list of String for inference.
Expand Down Expand Up @@ -254,14 +244,12 @@ private List<DataForInference> getDataForInference(List<IngestDocumentWrapper> i
for (IngestDocumentWrapper ingestDocumentWrapper : ingestDocumentWrappers) {
Map<String, Object> processMap = null;
List<String> inferenceList = null;
IngestDocument ingestDocument = ingestDocumentWrapper.getIngestDocument();
try {
preprocessIngestDocument(ingestDocument);
validateEmbeddingFieldsValue(ingestDocument);
processMap = buildMapWithTargetKeys(ingestDocument);
validateEmbeddingFieldsValue(ingestDocumentWrapper.getIngestDocument());
processMap = buildMapWithTargetKeys(ingestDocumentWrapper.getIngestDocument());
inferenceList = createInferenceList(processMap);
} catch (Exception e) {
ingestDocumentWrapper.update(ingestDocument, e);
ingestDocumentWrapper.update(ingestDocumentWrapper.getIngestDocument(), e);
} finally {
dataForInferences.add(new DataForInference(ingestDocumentWrapper, processMap, inferenceList));
}
Expand Down Expand Up @@ -319,7 +307,7 @@ Map<String, Object> buildMapWithTargetKeys(IngestDocument ingestDocument) {
buildNestedMap(originalKey, targetKey, sourceAndMetadataMap, treeRes);
mapWithProcessorKeys.put(originalKey, treeRes.get(originalKey));
} else {
mapWithProcessorKeys.put(String.valueOf(targetKey), normalizeSourceValue(sourceAndMetadataMap.get(originalKey)));
mapWithProcessorKeys.put(String.valueOf(targetKey), sourceAndMetadataMap.get(originalKey));
}
}
return mapWithProcessorKeys;
Expand All @@ -345,33 +333,17 @@ void buildNestedMap(String parentKey, Object processorKey, Map<String, Object> s
} else if (sourceAndMetadataMap.get(parentKey) instanceof List) {
for (Map.Entry<String, Object> nestedFieldMapEntry : ((Map<String, Object>) processorKey).entrySet()) {
List<Map<String, Object>> list = (List<Map<String, Object>>) sourceAndMetadataMap.get(parentKey);
Pair<String, Object> processedNestedKey = processNestedKey(nestedFieldMapEntry);
List<Object> listOfStrings = list.stream().map(x -> {
Object nestedSourceValue = x.get(processedNestedKey.getKey());
return normalizeSourceValue(nestedSourceValue);
}).collect(Collectors.toList());
List<Object> listOfStrings = list.stream().map(x -> x.get(nestedFieldMapEntry.getKey())).collect(Collectors.toList());
Map<String, Object> map = new LinkedHashMap<>();
map.put(processedNestedKey.getKey(), listOfStrings);
buildNestedMap(processedNestedKey.getKey(), processedNestedKey.getValue(), map, next);
map.put(nestedFieldMapEntry.getKey(), listOfStrings);
buildNestedMap(nestedFieldMapEntry.getKey(), nestedFieldMapEntry.getValue(), map, next);
}
}
treeRes.merge(parentKey, next, REMAPPING_FUNCTION);
} else {
Object parentValue = sourceAndMetadataMap.get(parentKey);
String key = String.valueOf(processorKey);
treeRes.put(key, normalizeSourceValue(parentValue));
}
}

private boolean isBlankString(Object object) {
return object instanceof String && StringUtils.isBlank((String) object);
}

private Object normalizeSourceValue(Object value) {
if (isBlankString(value)) {
return null;
treeRes.put(key, sourceAndMetadataMap.get(parentKey));
}
return value;
}

/**
Expand Down Expand Up @@ -400,11 +372,11 @@ private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) {
ProcessorDocumentUtils.validateMapTypeValue(
FIELD_MAP_FIELD,
sourceAndMetadataMap,
ProcessorDocumentUtils.unflattenJson(fieldMap),
fieldMap,
indexName,
clusterService,
environment,
true
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,9 @@
*/
package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;

import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
Expand Down
Loading

0 comments on commit d5a176e

Please sign in to comment.