Skip to content

Commit

Permalink
Remove ingest processor supports field patterns and excluding fields
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed Oct 27, 2023
1 parent e9affea commit 768f629
Show file tree
Hide file tree
Showing 6 changed files with 579 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))
- Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Remove ingest processor supports field patterns and excluding fields

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.ingest.common;

import org.opensearch.common.ValidationException;
import org.opensearch.common.regex.Regex;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
Expand All @@ -41,10 +43,14 @@
import org.opensearch.script.TemplateScript;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields. Nothing happens if the field is not present.
*/
Expand All @@ -53,34 +59,118 @@ public final class RemoveProcessor extends AbstractProcessor {
public static final String TYPE = "remove";

private final List<TemplateScript.Factory> fields;
private final List<String> fieldPatterns;
private final List<TemplateScript.Factory> excludeFields;
private final List<String> excludeFieldPatterns;
private final boolean ignoreMissing;

RemoveProcessor(String tag, String description, List<TemplateScript.Factory> fields, boolean ignoreMissing) {
RemoveProcessor(
String tag,
String description,
List<TemplateScript.Factory> fields,
List<String> fieldPatterns,
List<TemplateScript.Factory> excludeFields,
List<String> excludeFieldPatterns,
boolean ignoreMissing
) {
super(tag, description);
this.fields = new ArrayList<>(fields);
this.fieldPatterns = new ArrayList<>(fieldPatterns);
this.excludeFields = new ArrayList<>(excludeFields);
this.excludeFieldPatterns = new ArrayList<>(excludeFieldPatterns);
this.ignoreMissing = ignoreMissing;
}

public List<TemplateScript.Factory> getFields() {
return fields;
}

public List<String> getFieldPatterns() {
return fieldPatterns;
}

public List<TemplateScript.Factory> getExcludeFields() {
return excludeFields;
}

public List<String> getExcludeFieldPatterns() {
return excludeFieldPatterns;
}

@Override
public IngestDocument execute(IngestDocument document) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
if (!fields.isEmpty()) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}
}
document.removeField(path);
});

document.removeField(path);
});
}

if (!fieldPatterns.isEmpty()) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
final boolean matched = fieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
if (matched) {
document.removeField(field);
}
}
});
}

Set<String> excludeFieldSet = new HashSet<>();
if (!excludeFields.isEmpty()) {
excludeFields.forEach(field -> {
String path = document.renderTemplate(field);
// ignore the empty or null field path
if (!Strings.isNullOrEmpty(path)) {
excludeFieldSet.add(path);
}
});
}

if (!excludeFieldSet.isEmpty() || !excludeFieldPatterns.isEmpty()) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
// when both exclude_field and exclude_field_pattern are not empty, remove the field if it doesn't exist in both of them
// if not, remove the field if it doesn't exist in the non-empty one
if (!excludeFieldPatterns.isEmpty()) {
final boolean matched = excludeFieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
if (!excludeFieldSet.isEmpty() && !excludeFieldSet.contains(field) && !matched
|| excludeFieldSet.isEmpty() && !matched) {
document.removeField(field);
}
} else if (!excludeFieldSet.isEmpty() && !excludeFieldSet.contains(field)) {
document.removeField(field);
}
}
});
}

return document;
}

Expand All @@ -105,20 +195,125 @@ public RemoveProcessor create(
Map<String, Object> config
) throws Exception {
final List<String> fields = new ArrayList<>();
final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, "field");
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
final List<String> fieldPatterns = new ArrayList<>();
final List<String> excludeFields = new ArrayList<>();
final List<String> excludeFieldPatterns = new ArrayList<>();

final Object field = ConfigurationUtils.readOptionalObject(config, "field");
final Object fieldPattern = ConfigurationUtils.readOptionalObject(config, "field_pattern");
final Object excludeField = ConfigurationUtils.readOptionalObject(config, "exclude_field");
final Object excludeFieldPattern = ConfigurationUtils.readOptionalObject(config, "exclude_field_pattern");

if (field == null && fieldPattern == null && excludeField == null && excludeFieldPattern == null) {
throw newConfigurationException(
TYPE,
processorTag,
"field",
"at least one of the parameters field, field_pattern, exclude_field and exclude_field_pattern need to be set"
);
}

if ((field != null || fieldPattern != null) && (excludeField != null || excludeFieldPattern != null)) {
throw newConfigurationException(
TYPE,
processorTag,
"field",
"ether (field,field_pattern) or (exclude_field,exclude_field_pattern) can be set"
);
}

List<TemplateScript.Factory> fieldCompiledTemplates = new ArrayList<>();
if (field != null) {
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
}
fieldCompiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
}

if (fieldPattern != null) {
if (fieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> fieldPatternList = (List<String>) fieldPattern;
fieldPatterns.addAll(fieldPatternList);
} else {
fieldPatterns.add((String) fieldPattern);
}
validateFieldPatterns(processorTag, fieldPatterns, "field_pattern");
}

List<TemplateScript.Factory> excludeFieldCompiledTemplates = new ArrayList<>();
if (excludeField != null) {
if (excludeField instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) excludeField;
excludeFields.addAll(stringList);
} else {
excludeFields.add((String) excludeField);
}
excludeFieldCompiledTemplates = excludeFields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "exclude_field", f, scriptService))
.collect(Collectors.toList());
}

if (excludeFieldPattern != null) {
if (excludeFieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> excludeFieldPatternList = (List<String>) excludeFieldPattern;
excludeFieldPatterns.addAll(excludeFieldPatternList);
} else {
excludeFieldPatterns.add((String) excludeFieldPattern);
}
validateFieldPatterns(processorTag, excludeFieldPatterns, "exclude_field_pattern");
}

final List<TemplateScript.Factory> compiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new RemoveProcessor(processorTag, description, compiledTemplates, ignoreMissing);
return new RemoveProcessor(
processorTag,
description,
fieldCompiledTemplates,
fieldPatterns,
excludeFieldCompiledTemplates,
excludeFieldPatterns,
ignoreMissing
);
}

private void validateFieldPatterns(String processorTag, List<String> patterns, String patternKey) {
List<String> validationErrors = new ArrayList<>();
for (String fieldPattern : patterns) {
if (fieldPattern.contains(" ")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a space");
}
if (fieldPattern.contains(",")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a ','");
}
if (fieldPattern.contains("#")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a '#'");
}
if (fieldPattern.contains(":")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a ':'");
}
if (fieldPattern.startsWith("_")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not start with '_'");
}
if (Strings.validFileNameExcludingAstrix(fieldPattern) == false) {
validationErrors.add(
patternKey + " [" + fieldPattern + "] must not contain the following characters " + Strings.INVALID_FILENAME_CHARS
);
}
}

if (validationErrors.size() > 0) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw newConfigurationException(TYPE, processorTag, patternKey, validationException.getMessage());
}
}
}
}
Loading

0 comments on commit 768f629

Please sign in to comment.