From e19ae8f2302b096363229abad77c1ab5a2042569 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 6 Oct 2023 11:17:22 -0500 Subject: [PATCH] Add inline template_content support to the opensearch sink (#3431) Signed-off-by: Taylor Gray --- .../opensearch/index/IndexConfiguration.java | 38 +++++++- .../index/IndexConfigurationTests.java | 91 +++++++++++++++---- 2 files changed, 107 insertions(+), 22 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 80bc3e3f06..343a2c27da 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -5,18 +5,18 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.event.JacksonEvent; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.EnumUtils; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.BulkAction; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.FileReader; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3ClientProvider; import org.opensearch.dataprepper.plugins.sink.opensearch.s3.S3FileReader; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.arns.Arn; @@ -27,8 +27,8 @@ import java.io.InputStream; import java.net.URL; import java.util.HashMap; -import java.util.Map; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -44,6 +44,7 @@ public class IndexConfiguration { public static final String INDEX_TYPE = "index_type"; public static final String TEMPLATE_TYPE = "template_type"; public static final String TEMPLATE_FILE = "template_file"; + public static final String TEMPLATE_CONTENT = "template_content"; public static final String NUM_SHARDS = "number_of_shards"; public static final String NUM_REPLICAS = "number_of_replicas"; public static final String BULK_SIZE = "bulk_size"; @@ -105,7 +106,8 @@ private IndexConfiguration(final Builder builder) { this.s3Client = builder.s3Client; determineTemplateType(builder); - this.indexTemplate = readIndexTemplate(builder.templateFile, indexType, templateType); + + this.indexTemplate = builder.templateContent != null ? readTemplateContent(builder.templateContent) : readIndexTemplate(builder.templateFile, indexType, templateType); if (builder.numReplicas > 0) { indexTemplate.putIfAbsent(SETTINGS, new HashMap<>()); @@ -187,6 +189,16 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti if (templateFile != null) { builder = builder.withTemplateFile(templateFile); } + + final String templateContent = pluginSetting.getStringOrDefault(TEMPLATE_CONTENT, null); + if (templateContent != null) { + builder = builder.withTemplateContent(templateContent); + } + + if (templateContent != null && templateFile != null) { + LOG.warn("Both template_content and template_file are configured. Only template_content will be used"); + } + builder = builder.withNumShards(pluginSetting.getIntegerOrDefault(NUM_SHARDS, 0)); builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0)); final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE); @@ -365,6 +377,7 @@ private Map readIndexTemplate(final String templateFile, final I templateURL = new File(templateFile).toURI().toURL(); } } + if (templateURL != null) { return new ObjectMapper().readValue(templateURL, new TypeReference>() { }); @@ -379,6 +392,14 @@ private Map readIndexTemplate(final String templateFile, final I } } + private Map readTemplateContent(final String templateContent) { + try { + return OBJECT_MAPPER.readValue(templateContent, new TypeReference>() {}); + } catch (IOException ex) { + throw new InvalidPluginConfigurationException(String.format("template_content is invalid: %s", ex.getMessage())); + } + } + private URL loadExistingTemplate(TemplateType templateType, String predefinedTemplateName) { String resourcePath = templateType == TemplateType.V1 ? predefinedTemplateName : templateType.getTypeName() + "/" + predefinedTemplateName; return getClass().getClassLoader() @@ -390,6 +411,7 @@ public static class Builder { private String indexType; private TemplateType templateType; private String templateFile; + private String templateContent; private int numShards; private int numReplicas; private String routingField; @@ -437,6 +459,12 @@ public Builder withTemplateFile(final String templateFile) { return this; } + public Builder withTemplateContent(final String templateContent) { + checkArgument(templateContent != null, "templateContent cannot be null."); + this.templateContent = templateContent; + return this; + } + public Builder withDocumentIdField(final String documentIdField) { checkNotNull(documentIdField, "document_id_field cannot be null"); this.documentIdField = documentIdField; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index cda9476743..cbf30df859 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -5,11 +5,15 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; @@ -34,6 +38,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -52,6 +57,8 @@ @SuppressWarnings("unchecked") public class IndexConfigurationTests { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String DEFAULT_TEMPLATE_FILE = "test-template-withshards.json"; private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json"; @@ -193,7 +200,7 @@ public void testValidCustom_from_s3() { } @Test - public void testValidCustomWithNoTemplateFile() throws MalformedURLException { + public void testValidCustomWithNoTemplateFile() { final String testIndexAlias = "foo"; IndexConfiguration indexConfiguration = new IndexConfiguration.Builder() .withIndexAlias(testIndexAlias) @@ -263,6 +270,43 @@ public void testValidCustomWithTemplateFileAndShards() { assertEquals(-1, indexConfiguration.getBulkSize()); } + @Test + public void testValidCustomWithTemplateContent() throws JsonProcessingException { + final String testIndexAlias = "test"; + IndexConfiguration indexConfiguration = new IndexConfiguration.Builder() + .withIndexAlias(testIndexAlias) + .withTemplateContent(createTemplateContent()) + .withBulkSize(10) + .build(); + + assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); + assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); + assertEquals(10, indexConfiguration.getBulkSize()); + assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); + assertThat(indexConfiguration.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {}))); + } + + @Test + public void readIndexConfigWithTemplateFileAndTemplateContentUsesTemplateContent() throws JsonProcessingException { + final PluginSetting pluginSetting = generatePluginSetting("custom", "test", "test-file", createTemplateContent(), null, null, null); + + final IndexConfiguration objectUnderTest = IndexConfiguration.readIndexConfig(pluginSetting); + + assertThat(objectUnderTest, notNullValue()); + assertThat(objectUnderTest.getIndexTemplate(), notNullValue()); + assertThat(objectUnderTest.getIndexTemplate(), equalTo(OBJECT_MAPPER.readValue(createTemplateContent(), new TypeReference<>() {}))); + } + + @Test + public void invalidTemplateContentThrowsInvalidPluginConfigurationException() { + final String invalidTemplateContent = UUID.randomUUID().toString(); + + final PluginSetting pluginSetting = generatePluginSetting("custom", null, null, invalidTemplateContent, null, null, null); + + assertThrows(InvalidPluginConfigurationException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); + + } + @Test public void testInvalidCustom() { // Missing index alias @@ -274,7 +318,7 @@ public void testInvalidCustom() { @Test public void testReadIndexConfig_RawIndexType() { final Map metadata = initializeConfigMetaData( - IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null); + IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); final URL expTemplateFile = indexConfiguration @@ -292,7 +336,7 @@ public void testReadIndexConfig_RawIndexType() { @Test public void testReadIndexConfig_InvalidIndexTypeValueString() { final Map metadata = initializeConfigMetaData( - "i-am-an-illegitimate-index-type", null, null, null, null, null); + "i-am-an-illegitimate-index-type", null, null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); } @@ -300,7 +344,7 @@ public void testReadIndexConfig_InvalidIndexTypeValueString() { @Test public void testReadIndexConfig_ServiceMapIndexType() { final Map metadata = initializeConfigMetaData( - IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null); + IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); final URL expTemplateFile = indexConfiguration @@ -324,7 +368,7 @@ public void testReadIndexConfigCustom() { final long testFlushTimeout = 30_000L; final String testIdField = "someId"; final PluginSetting pluginSetting = generatePluginSetting( - null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField); + null, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField); pluginSetting.getSettings().put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, true); pluginSetting.getSettings().put(IndexConfiguration.MAX_LOCAL_COMPRESSIONS_FOR_ESTIMATION, 5); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -348,7 +392,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() { final long testFlushTimeout = 30_000L; final String testIdField = "someId"; final Map metadata = initializeConfigMetaData( - testIndexType, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField); + testIndexType, testIndexAlias, defaultTemplateFilePath, null, testBulkSize, testFlushTimeout, testIdField); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); @@ -363,7 +407,7 @@ public void testReadIndexConfig_ExplicitCustomIndexType() { public void testReadIndexConfig_awsOptionServerlessDefault() { final String testIndexAlias = "foo"; final Map metadata = initializeConfigMetaData( - null, testIndexAlias, null, null, null, null); + null, testIndexAlias, null, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -375,7 +419,7 @@ public void testReadIndexConfig_awsOptionServerlessDefault() { public void testReadIndexConfig_awsServerlessIndexTypeOverride() { final String testIndexAlias = "foo"; final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null); + IndexType.CUSTOM.getValue(), testIndexAlias, null, null, null, null, null); metadata.put(AWS_OPTION, Map.of(SERVERLESS, true)); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -387,7 +431,7 @@ public void testReadIndexConfig_awsServerlessIndexTypeOverride() { @Test public void testReadIndexConfig_distributionVersionDefault() { final Map metadata = initializeConfigMetaData( - null, "foo", null, null, null, null); + null, "foo", null,null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(indexConfiguration.getDistributionVersion(), DistributionVersion.DEFAULT); @@ -396,7 +440,7 @@ public void testReadIndexConfig_distributionVersionDefault() { @Test public void testReadIndexConfig_es6Override() { final Map metadata = initializeConfigMetaData( - null, "foo", null, null, null, null); + null, "foo", null, null, null, null, null); metadata.put(DISTRIBUTION_VERSION, "es6"); metadata.put(TEMPLATE_TYPE, TemplateType.INDEX_TEMPLATE.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); @@ -409,7 +453,7 @@ public void testReadIndexConfig_es6Override() { @Test public void testReadIndexConfig_documentRootKey() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); final String expectedRootKey = UUID.randomUUID().toString(); metadata.put(DOCUMENT_ROOT_KEY, expectedRootKey); final PluginSetting pluginSetting = getPluginSetting(metadata); @@ -420,7 +464,7 @@ public void testReadIndexConfig_documentRootKey() { @Test public void testReadIndexConfig_emptyDocumentRootKey() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); metadata.put(DOCUMENT_ROOT_KEY, ""); final PluginSetting pluginSetting = getPluginSetting(metadata); assertThrows(IllegalArgumentException.class, () -> IndexConfiguration.readIndexConfig(pluginSetting)); @@ -429,7 +473,7 @@ public void testReadIndexConfig_emptyDocumentRootKey() { @Test void getTemplateType_defaults_to_V1() { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertThat(indexConfiguration.getTemplateType(), equalTo(TemplateType.V1)); @@ -439,7 +483,7 @@ void getTemplateType_defaults_to_V1() { @EnumSource(TemplateType.class) void getTemplateType_with_configured_templateType(final TemplateType templateType) { final Map metadata = initializeConfigMetaData( - IndexType.CUSTOM.getValue(), "foo", null, null, null, null); + IndexType.CUSTOM.getValue(), "foo", null, null, null, null, null); metadata.put(TEMPLATE_TYPE, templateType.getTypeName()); final PluginSetting pluginSetting = getPluginSetting(metadata); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); @@ -447,9 +491,9 @@ void getTemplateType_with_configured_templateType(final TemplateType templateTyp } private PluginSetting generatePluginSetting( - final String indexType, final String indexAlias, final String templateFilePath, + final String indexType, final String indexAlias, final String templateFilePath, final String templateContent, final Long bulkSize, final Long flushTimeout, final String documentIdField) { - final Map metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, bulkSize, flushTimeout, documentIdField); + final Map metadata = initializeConfigMetaData(indexType, indexAlias, templateFilePath, templateContent, bulkSize, flushTimeout, documentIdField); return getPluginSetting(metadata); } @@ -458,7 +502,7 @@ private PluginSetting getPluginSetting(Map metadata) { } private Map initializeConfigMetaData( - String indexType, String indexAlias, String templateFilePath, Long bulkSize, Long flushTimeout, String documentId) { + String indexType, String indexAlias, String templateFilePath, String templateContent, Long bulkSize, Long flushTimeout, String documentId) { final Map metadata = new HashMap<>(); if (indexType != null) { metadata.put(IndexConfiguration.INDEX_TYPE, indexType); @@ -469,6 +513,11 @@ private Map initializeConfigMetaData( if (templateFilePath != null) { metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); } + + if (templateContent != null) { + metadata.put(IndexConfiguration.TEMPLATE_CONTENT, templateContent); + } + if (bulkSize != null) { metadata.put(IndexConfiguration.BULK_SIZE, bulkSize); } @@ -480,4 +529,12 @@ private Map initializeConfigMetaData( } return metadata; } + + private String createTemplateContent() { + return "{\"index_patterns\":[\"test-*\"]," + + "\"template\":{\"aliases\":{\"my_test_logs\":{}}," + + "\"settings\":{\"number_of_shards\":5,\"number_of_replicas\":2,\"refresh_interval\":-1}," + + "\"mappings\":{\"properties\":{\"timestamp\":{\"type\":\"date\",\"format\":\"yyyy-MM-ddHH:mm:ss||yyyy-MM-dd||epoch_millis\"}," + + "\"value\":{\"type\":\"double\"}}}}}"; + } }