From 71a6956e2a29c00094bbb859b1ce2fbbd6ee411e Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Thu, 5 Oct 2023 09:46:44 -0700 Subject: [PATCH] ENH: data-prepper-core support for secrets refreshment (#3415) * INIT: secrets refreshment infra Signed-off-by: George Chen * MAINT: add interval and test validity Signed-off-by: George Chen * MAINT: some more refactoring Signed-off-by: George Chen * MAINT: delete unused classes Signed-off-by: George Chen * TST: AwsSecretsPluginConfigPublisherExtensionProviderTest Signed-off-by: George Chen * MAINT: inject PluginConfigPublisher into PluginCreator Signed-off-by: George Chen * MAINT: complete test cases for AwsSecretPluginIT Signed-off-by: George Chen * MAINT: test refresh secrets Signed-off-by: George Chen * MAINT: refactoring and documentation Signed-off-by: George Chen * STY: import Signed-off-by: George Chen * MAINT: fix test cases Signed-off-by: George Chen * MAINT: missing test case Signed-off-by: George Chen * MAINT: address minor comments Signed-off-by: George Chen * REF: PluginConfigurationObservableRegister Signed-off-by: George Chen --------- Signed-off-by: George Chen --- .../plugin/PluginComponentRefresher.java | 23 +++ .../model/plugin/PluginConfigObservable.java | 20 ++ .../model/plugin/PluginConfigObserver.java | 16 ++ .../model/plugin/PluginConfigPublisher.java | 17 ++ .../ComponentPluginArgumentsContext.java | 12 ++ .../plugin/DefaultPluginConfigObservable.java | 38 ++++ .../plugin/DefaultPluginFactory.java | 10 +- .../PluginConfigurationObservableFactory.java | 16 ++ ...PluginConfigurationObservableRegister.java | 29 +++ .../dataprepper/plugin/PluginCreator.java | 10 + .../ComponentPluginArgumentsContextTest.java | 14 ++ .../plugin/DefaultPluginFactoryTest.java | 22 ++- .../PluginConfigObservableFactoryTest.java | 28 +++ ...inConfigurationObservableRegisterTest.java | 40 ++++ .../dataprepper/plugin/PluginCreatorTest.java | 36 +++- data-prepper-plugins/aws-plugin/build.gradle | 3 +- .../aws/AwsSecretManagerConfiguration.java | 12 +- .../plugins/aws/AwsSecretPlugin.java | 52 +++++- .../aws/AwsSecretsPluginConfigPublisher.java | 23 +++ ...luginConfigPublisherExtensionProvider.java | 25 +++ ...nfigValueTranslatorExtensionProvider.java} | 4 +- .../plugins/aws/AwsSecretsSupplier.java | 72 ++++--- .../plugins/aws/SecretsRefreshJob.java | 23 +++ .../plugins/aws/SecretsSupplier.java | 2 + .../AwsSecretManagerConfigurationTest.java | 46 ++++- .../plugins/aws/AwsSecretPluginIT.java | 175 ++++++++++++++++-- ...nConfigPublisherExtensionProviderTest.java | 51 +++++ .../AwsSecretsPluginConfigPublisherTest.java | 24 +++ ...ValueTranslatorExtensionProviderTest.java} | 8 +- .../plugins/aws/AwsSecretsSupplierTest.java | 27 +++ .../plugins/aws/SecretsRefreshJobTest.java | 35 ++++ ...onfiguration-invalid-refresh-interval.yaml | 2 + 32 files changed, 860 insertions(+), 55 deletions(-) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginComponentRefresher.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObservable.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObserver.java create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigPublisher.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegister.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegisterTest.java create mode 100644 data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisher.java create mode 100644 data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProvider.java rename data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/{AwsSecretExtensionProvider.java => AwsSecretsPluginConfigValueTranslatorExtensionProvider.java} (72%) create mode 100644 data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJob.java create mode 100644 data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProviderTest.java create mode 100644 data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherTest.java rename data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/{AwsSecretExtensionProviderTest.java => AwsSecretsPluginConfigValueTranslatorExtensionProviderTest.java} (83%) create mode 100644 data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJobTest.java create mode 100644 data-prepper-plugins/aws-plugin/src/test/resources/test-aws-secret-manager-configuration-invalid-refresh-interval.yaml diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginComponentRefresher.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginComponentRefresher.java new file mode 100644 index 0000000000..a11b318b02 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginComponentRefresher.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.model.plugin; + +/** + * An interface to be implemented by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source} + * /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink}, + * to refresh their components, e.g. client connection. + * @since 2.5 + */ +public interface PluginComponentRefresher { + /** + * Returns the refreshed {@link PluginComponent}. + * + * @return An instance of {@link PluginComponent}. + */ + PluginComponent get(); + + /** + * Updates the {@link PluginComponent} with the new {@link PluginConfig}. + * + * @param pluginConfig The new pluginConfig used to refresh the {@link PluginComponent}. + */ + void update(PluginConfig pluginConfig); +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObservable.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObservable.java new file mode 100644 index 0000000000..6a62e2647a --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObservable.java @@ -0,0 +1,20 @@ +package org.opensearch.dataprepper.model.plugin; + +/** + * An interface used by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source} + * /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink}, + * to onboard {@link PluginConfigObserver}. + * @since 2.5 + */ +public interface PluginConfigObservable { + + /** + * Onboard a new {@link PluginConfigObserver} within the plugin. + */ + boolean addPluginConfigObserver(PluginConfigObserver pluginConfigObserver); + + /** + * Invoke all {@link PluginConfigObserver}. + */ + void update(); +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObserver.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObserver.java new file mode 100644 index 0000000000..58c885da58 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigObserver.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.model.plugin; + +/** + * An interface by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source} + * * /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink} + * to implement custom plugin component refreshment logic. + * @since 2.5 + */ +public interface PluginConfigObserver { + /** + * Update plugin components with a new pluginConfig. + * + * @param pluginConfig The plugin configuration object used as reference for update. + */ + void update(T pluginConfig); +} diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigPublisher.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigPublisher.java new file mode 100644 index 0000000000..66982a8db6 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/plugin/PluginConfigPublisher.java @@ -0,0 +1,17 @@ +package org.opensearch.dataprepper.model.plugin; + +/** + * An interface used to onboard and notify all {@link PluginConfigObservable} to invoke update. + * @since 2.5 + */ +public interface PluginConfigPublisher { + /** + * Onboard a new {@link PluginConfigObservable}. + */ + boolean addPluginConfigObservable(PluginConfigObservable pluginConfigObservable); + + /** + * Notify all {@link PluginConfigObservable} to update. + */ + void notifyAllPluginConfigObservable(); +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java index 807de63367..7f68405f3b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContext.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PipelineDescription; @@ -65,6 +66,11 @@ private ComponentPluginArgumentsContext(final Builder builder) { if (builder.acknowledgementSetManager != null) { typedArgumentsSuppliers.put(AcknowledgementSetManager.class, () -> builder.acknowledgementSetManager); } + + if (builder.pluginConfigObservable != null) { + typedArgumentsSuppliers.put(PluginConfigObservable.class, () -> builder.pluginConfigObservable); + } + if (builder.sinkContext != null) { typedArgumentsSuppliers.put(SinkContext.class, () -> builder.sinkContext); } @@ -118,6 +124,7 @@ static class Builder { private BeanFactory beanFactory; private EventFactory eventFactory; private AcknowledgementSetManager acknowledgementSetManager; + private PluginConfigObservable pluginConfigObservable; private SinkContext sinkContext; Builder withPluginConfiguration(final Object pluginConfiguration) { @@ -160,6 +167,11 @@ Builder withBeanFactory(final BeanFactory beanFactory) { return this; } + Builder withPluginConfigurationObservable(final PluginConfigObservable pluginConfigObservable) { + this.pluginConfigObservable = pluginConfigObservable; + return this; + } + ComponentPluginArgumentsContext build() { return new ComponentPluginArgumentsContext(this); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java new file mode 100644 index 0000000000..d3d65ab0fc --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginConfigObservable.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugin; + +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginConfigObserver; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultPluginConfigObservable implements PluginConfigObservable { + private final Map pluginConfigObserverBooleanMap + = new ConcurrentHashMap<>(); + private final PluginConfigurationConverter pluginConfigurationConverter; + private final Class pluginConfigClass; + private final PluginSetting rawPluginSettings; + + public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter, + final Class pluginConfigClass, + final PluginSetting rawPluginSettings) { + this.pluginConfigurationConverter = pluginConfigurationConverter; + this.pluginConfigClass = pluginConfigClass; + this.rawPluginSettings = rawPluginSettings; + } + + @Override + public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigObserver) { + pluginConfigObserverBooleanMap.put(pluginConfigObserver, true); + return true; + } + + @Override + public void update() { + final Object newPluginConfiguration = pluginConfigurationConverter.convert( + pluginConfigClass, rawPluginSettings); + pluginConfigObserverBooleanMap.keySet().forEach( + pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration)); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java index 7560b98e10..5a26c9518e 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/DefaultPluginFactory.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugin; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -41,6 +42,7 @@ public class DefaultPluginFactory implements PluginFactory { private final PluginBeanFactoryProvider pluginBeanFactoryProvider; private final DefaultEventFactory eventFactory; private final DefaultAcknowledgementSetManager acknowledgementSetManager; + private final PluginConfigurationObservableFactory pluginConfigurationObservableFactory; @Inject DefaultPluginFactory( @@ -49,9 +51,11 @@ public class DefaultPluginFactory implements PluginFactory { final PluginConfigurationConverter pluginConfigurationConverter, final PluginBeanFactoryProvider pluginBeanFactoryProvider, final DefaultEventFactory eventFactory, - final DefaultAcknowledgementSetManager acknowledgementSetManager + final DefaultAcknowledgementSetManager acknowledgementSetManager, + final PluginConfigurationObservableFactory pluginConfigurationObservableFactory ) { Objects.requireNonNull(pluginProviderLoader); + Objects.requireNonNull(pluginConfigurationObservableFactory); this.pluginCreator = Objects.requireNonNull(pluginCreator); this.pluginConfigurationConverter = Objects.requireNonNull(pluginConfigurationConverter); @@ -59,6 +63,7 @@ public class DefaultPluginFactory implements PluginFactory { this.pluginBeanFactoryProvider = Objects.requireNonNull(pluginBeanFactoryProvider); this.eventFactory = Objects.requireNonNull(eventFactory); this.acknowledgementSetManager = Objects.requireNonNull(acknowledgementSetManager); + this.pluginConfigurationObservableFactory = pluginConfigurationObservableFactory; if(pluginProviders.isEmpty()) { throw new RuntimeException("Data Prepper requires at least one PluginProvider. " + @@ -113,6 +118,8 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS final Class pluginConfigurationType = pluginAnnotation.pluginConfigurationType(); final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting); + final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory + .createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginClass, pluginSetting); return new ComponentPluginArgumentsContext.Builder() .withPluginSetting(pluginSetting) @@ -122,6 +129,7 @@ private ComponentPluginArgumentsContext getConstructionContext(final PluginS .withBeanFactory(pluginBeanFactoryProvider.get()) .withEventFactory(eventFactory) .withAcknowledgementSetManager(acknowledgementSetManager) + .withPluginConfigurationObservable(pluginConfigObservable) .withSinkContext(sinkContext) .build(); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java new file mode 100644 index 0000000000..942b152101 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableFactory.java @@ -0,0 +1,16 @@ +package org.opensearch.dataprepper.plugin; + +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; + +import javax.inject.Named; + +@Named +public class PluginConfigurationObservableFactory { + public final PluginConfigObservable createDefaultPluginConfigObservable( + final PluginConfigurationConverter pluginConfigurationConverter, final Class pluginConfigClass, + final PluginSetting rawPluginSettings) { + return new DefaultPluginConfigObservable( + pluginConfigurationConverter, pluginConfigClass, rawPluginSettings); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegister.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegister.java new file mode 100644 index 0000000000..7691e37c53 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegister.java @@ -0,0 +1,29 @@ +package org.opensearch.dataprepper.plugin; + +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; + +@Named +public class PluginConfigurationObservableRegister { + private final Set pluginConfigPublishers; + + @Inject + public PluginConfigurationObservableRegister(final Set pluginConfigPublishers) { + this.pluginConfigPublishers = pluginConfigPublishers; + } + + public void registerPluginConfigurationObservables(final Object[] constructorArguments) { + Optional.ofNullable(constructorArguments).ifPresent(arguments -> Arrays.stream(arguments) + .filter(arg -> arg instanceof PluginConfigObservable) + .forEach(arg -> pluginConfigPublishers.forEach(pluginConfigPublisher -> + pluginConfigPublisher.addPluginConfigObservable( + (PluginConfigObservable) arg))) + ); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java index df6e223ef0..947b8543ba 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/PluginCreator.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import javax.inject.Named; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -25,6 +26,13 @@ class PluginCreator { private static final Logger LOG = LoggerFactory.getLogger(PluginCreator.class); + private final PluginConfigurationObservableRegister pluginConfigurationObservableRegister; + + @Inject + PluginCreator(final PluginConfigurationObservableRegister pluginConfigurationObservableRegister) { + this.pluginConfigurationObservableRegister = pluginConfigurationObservableRegister; + } + T newPluginInstance(final Class pluginClass, final PluginArgumentsContext pluginArgumentsContext, final String pluginName) { @@ -36,6 +44,8 @@ T newPluginInstance(final Class pluginClass, final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes()); + pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments); + try { return (T) constructor.newInstance(constructorArguments); } catch (final IllegalAccessException | InstantiationException ex) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java index ec3dc17644..9d3b9bdf0b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/ComponentPluginArgumentsContextTest.java @@ -9,6 +9,7 @@ import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.sink.SinkContext; import org.junit.jupiter.api.Test; @@ -112,6 +113,19 @@ void createArguments_with_single_class_using_sink_context() { equalTo(new Object[] { sinkContext})); } + @Test + void createArguments_with_single_class_using_plugin_configuration_observable() { + final PluginConfigObservable pluginConfigObservable = mock(PluginConfigObservable.class); + + final ComponentPluginArgumentsContext objectUnderTest = new ComponentPluginArgumentsContext.Builder() + .withPluginSetting(pluginSetting) + .withPluginConfigurationObservable(pluginConfigObservable) + .build(); + + assertThat(objectUnderTest.createArguments(new Class[] { PluginConfigObservable.class }), + equalTo(new Object[] {pluginConfigObservable})); + } + @Test void createArguments_given_bean_not_available_with_single_class_using_bean_factory() { doThrow(mock(BeansException.class)).when(beanFactory).getBean((Class) any()); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java index 3a5b1f3bdd..86b57af90b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryTest.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.NoPluginFoundException; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import org.opensearch.dataprepper.event.DefaultEventFactory; @@ -57,6 +58,8 @@ class DefaultPluginFactoryTest { private String pipelineName; private DefaultAcknowledgementSetManager acknowledgementSetManager; private DefaultEventFactory eventFactory; + private PluginConfigurationObservableFactory pluginConfigurationObservableFactory; + private PluginConfigObservable pluginConfigObservable; @BeforeEach void setUp() { @@ -80,10 +83,18 @@ void setUp() { beanFactoryProvider = mock(PluginBeanFactoryProvider.class); beanFactory = mock(BeanFactory.class); + pluginConfigurationObservableFactory = mock(PluginConfigurationObservableFactory.class); + given(pluginConfigurationObservableFactory.createDefaultPluginConfigObservable( + eq(pluginConfigurationConverter), + any(Class.class), + any(PluginSetting.class) + )).willReturn(pluginConfigObservable); } private DefaultPluginFactory createObjectUnderTest() { - return new DefaultPluginFactory(pluginProviderLoader, pluginCreator, pluginConfigurationConverter, beanFactoryProvider, eventFactory, acknowledgementSetManager); + return new DefaultPluginFactory( + pluginProviderLoader, pluginCreator, pluginConfigurationConverter, beanFactoryProvider, eventFactory, + acknowledgementSetManager, pluginConfigurationObservableFactory); } @Test @@ -113,6 +124,15 @@ void constructor_should_throw_if_pluginProviders_is_empty() { verifyNoInteractions(beanFactoryProvider); } + @Test + void constructor_should_throw_if_pluginConfigurationObservableFactory_is_null() { + pluginConfigurationObservableFactory = null; + + assertThrows(NullPointerException.class, + this::createObjectUnderTest); + verifyNoInteractions(beanFactoryProvider); + } + @Test void constructor_should_throw_if_pluginCreator_is_null() { pluginCreator = null; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java new file mode 100644 index 0000000000..6fbe53d089 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigObservableFactoryTest.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugin; + +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +class PluginConfigObservableFactoryTest { + @Mock + private PluginConfigurationConverter pluginConfigurationConverter; + + @Mock + private PluginSetting pluginSetting; + + private final Class baseClass = Object.class; + + private final PluginConfigurationObservableFactory objectUnderTest = new PluginConfigurationObservableFactory(); + + @Test + void testCreateDefaultPluginConfigurationObservableFactory() { + assertThat(objectUnderTest.createDefaultPluginConfigObservable( + pluginConfigurationConverter, baseClass, pluginSetting), + instanceOf(PluginConfigObservable.class)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegisterTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegisterTest.java new file mode 100644 index 0000000000..407c2c6e0e --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationObservableRegisterTest.java @@ -0,0 +1,40 @@ +package org.opensearch.dataprepper.plugin; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +import java.util.Set; + +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class PluginConfigurationObservableRegisterTest { + @Mock + private PluginConfigPublisher pluginConfigPublisher; + + @Mock + private PluginSetting pluginSetting; + + @Mock + private PluginConfigObservable pluginConfigObservable; + + private PluginConfigurationObservableRegister objectUnderTest; + + @BeforeEach + void setup() { + objectUnderTest = new PluginConfigurationObservableRegister(Set.of(pluginConfigPublisher)); + } + + @Test + void testRegisterPluginConfigurationObservables() { + final Object[] constructorArgs = new Object[] { pluginSetting, pluginConfigObservable }; + objectUnderTest.registerPluginConfigurationObservables(constructorArgs); + verify(pluginConfigPublisher).addPluginConfigObservable(pluginConfigObservable); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java index b43dd40ea0..daf5a2c57d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugin/PluginCreatorTest.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginInvocationException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,12 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; class PluginCreatorTest { private PluginSetting pluginSetting; private String pluginName; private ComponentPluginArgumentsContext pluginConstructionContext; + private PluginConfigurationObservableRegister pluginConfigurationObservableRegister; public static class ValidPluginClass { private final PluginSetting pluginSetting; @@ -72,6 +75,18 @@ public PluginClassWithMultipleConstructors(final PluginSetting pluginSetting, fi } } + public static class PluginClassWithPluginConfigurationObservableConstructor { + private PluginSetting pluginSetting; + private PluginConfigObservable pluginConfigObservable; + + @DataPrepperPluginConstructor + public PluginClassWithPluginConfigurationObservableConstructor( + final PluginSetting pluginSetting, final PluginConfigObservable pluginConfigObservable) { + this.pluginSetting = pluginSetting; + this.pluginConfigObservable = pluginConfigObservable; + } + } + public static class InvalidPluginClassDueToMultipleAnnotatedConstructors { @DataPrepperPluginConstructor public InvalidPluginClassDueToMultipleAnnotatedConstructors() {} @@ -87,10 +102,12 @@ void setUp() { pluginName = UUID.randomUUID().toString(); pluginConstructionContext = mock(ComponentPluginArgumentsContext.class); + + pluginConfigurationObservableRegister = mock(PluginConfigurationObservableRegister.class); } private PluginCreator createObjectUnderTest() { - return new PluginCreator(); + return new PluginCreator(pluginConfigurationObservableRegister); } @Test @@ -108,6 +125,23 @@ void newPluginInstance_should_create_new_instance_from_annotated_constructor() { assertThat(instance.alternatePluginConfig, equalTo(alternatePluginConfig)); } + @Test + void newPluginInstance_should_register_pluginConfigurationObservable() { + final PluginCreator objectUnderTest = new PluginCreator(pluginConfigurationObservableRegister); + final PluginConfigObservable pluginConfigObservable = mock(PluginConfigObservable.class); + final Object[] constructorArgs = new Object[] { pluginSetting, pluginConfigObservable }; + given(pluginConstructionContext.createArguments(new Class[] {PluginSetting.class, PluginConfigObservable.class})) + .willReturn(constructorArgs); + + final PluginClassWithPluginConfigurationObservableConstructor instance = objectUnderTest + .newPluginInstance(PluginClassWithPluginConfigurationObservableConstructor.class, pluginConstructionContext, pluginName); + + verify(pluginConfigurationObservableRegister).registerPluginConfigurationObservables(constructorArgs); + assertThat(instance, notNullValue()); + assertThat(instance.pluginSetting, equalTo(pluginSetting)); + assertThat(instance.pluginConfigObservable, equalTo(pluginConfigObservable)); + } + @Test void newPluginInstance_should_create_new_instance_from_PluginSetting_if_the_constructor() { given(pluginConstructionContext.createArguments(new Class[] {PluginSetting.class})) diff --git a/data-prepper-plugins/aws-plugin/build.gradle b/data-prepper-plugins/aws-plugin/build.gradle index bb14699ee6..298c6dbe41 100644 --- a/data-prepper-plugins/aws-plugin/build.gradle +++ b/data-prepper-plugins/aws-plugin/build.gradle @@ -7,8 +7,9 @@ dependencies { implementation 'software.amazon.awssdk:secretsmanager' implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:arns' + implementation 'org.hibernate.validator:hibernate-validator:8.0.0.Final' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - testImplementation 'org.hibernate.validator:hibernate-validator:8.0.0.Final' + testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' } test { diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfiguration.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfiguration.java index 2f44fc2033..08acfc2824 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfiguration.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfiguration.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; +import org.hibernate.validator.constraints.time.DurationMin; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; @@ -13,11 +14,12 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import java.time.Duration; import java.util.Optional; import java.util.UUID; public class AwsSecretManagerConfiguration { - private static final String DEFAULT_AWS_REGION = "us-east-1"; + static final String DEFAULT_AWS_REGION = "us-east-1"; private static final String AWS_IAM_ROLE = "role"; private static final String AWS_IAM = "iam"; @@ -34,6 +36,10 @@ public class AwsSecretManagerConfiguration { @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") private String awsStsRoleArn; + @JsonProperty("refresh_interval") + @DurationMin(hours = 1L, message = "Refresh interval must be at least 1 hour.") + private Duration refreshInterval = Duration.ofHours(1L); + public String getAwsSecretId() { return awsSecretId; } @@ -42,6 +48,10 @@ public Region getAwsRegion() { return Region.of(awsRegion); } + public Duration getRefreshInterval() { + return refreshInterval; + } + public SecretsManagerClient createSecretManagerClient() { return SecretsManagerClient.builder() .credentialsProvider(authenticateAwsConfiguration()) diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java index 58dd819014..69f68b6819 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPlugin.java @@ -5,19 +5,36 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import org.opensearch.dataprepper.model.plugin.ExtensionPoints; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; import org.opensearch.dataprepper.model.plugin.PluginConfigValueTranslator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; @DataPrepperExtensionPlugin(modelType = AwsSecretPluginConfig.class, rootKeyJsonPath = "/aws/secrets", allowInPipelineConfigurations = true) public class AwsSecretPlugin implements ExtensionPlugin { + static final int PERIOD_IN_SECONDS = 60; + private static final Logger LOG = LoggerFactory.getLogger(AwsSecretPlugin.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private ScheduledExecutorService scheduledExecutorService; + private PluginConfigPublisher pluginConfigPublisher; + private SecretsSupplier secretsSupplier; private final PluginConfigValueTranslator pluginConfigValueTranslator; @DataPrepperPluginConstructor public AwsSecretPlugin(final AwsSecretPluginConfig awsSecretPluginConfig) { if (awsSecretPluginConfig != null) { - final SecretsSupplier secretsSupplier = new AwsSecretsSupplier(awsSecretPluginConfig, OBJECT_MAPPER); + secretsSupplier = new AwsSecretsSupplier(awsSecretPluginConfig, OBJECT_MAPPER); + this.pluginConfigPublisher = new AwsSecretsPluginConfigPublisher(); pluginConfigValueTranslator = new AwsSecretsPluginConfigValueTranslator(secretsSupplier); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + submitSecretsRefreshJobs(awsSecretPluginConfig, secretsSupplier); + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); } else { pluginConfigValueTranslator = null; } @@ -25,6 +42,37 @@ public AwsSecretPlugin(final AwsSecretPluginConfig awsSecretPluginConfig) { @Override public void apply(final ExtensionPoints extensionPoints) { - extensionPoints.addExtensionProvider(new AwsSecretExtensionProvider(pluginConfigValueTranslator)); + extensionPoints.addExtensionProvider(new AwsSecretsPluginConfigValueTranslatorExtensionProvider(pluginConfigValueTranslator)); + extensionPoints.addExtensionProvider(new AwsSecretsPluginConfigPublisherExtensionProvider( + pluginConfigPublisher)); + } + + private void submitSecretsRefreshJobs(final AwsSecretPluginConfig awsSecretPluginConfig, + final SecretsSupplier secretsSupplier) { + awsSecretPluginConfig.getAwsSecretManagerConfigurationMap().forEach((key, value) -> { + final SecretsRefreshJob secretsRefreshJob = new SecretsRefreshJob( + key, secretsSupplier, pluginConfigPublisher); + final long period = value.getRefreshInterval().toSeconds(); + final long jitterDelay = ThreadLocalRandom.current().nextLong(60L); + scheduledExecutorService.scheduleAtFixedRate(secretsRefreshJob, period + jitterDelay, + period, TimeUnit.SECONDS); + }); + } + + void shutdown() { + if (scheduledExecutorService != null) { + LOG.info("Shutting down secrets refreshing tasks."); + scheduledExecutorService.shutdown(); + try { + if (!scheduledExecutorService.awaitTermination(PERIOD_IN_SECONDS, TimeUnit.SECONDS)) { + LOG.warn("Secrets refreshing tasks did not terminate in time, forcing termination"); + scheduledExecutorService.shutdownNow(); + } + } catch (InterruptedException ex) { + LOG.info("Encountered interruption terminating the secrets refreshing tasks execution, " + + "attempting to force the termination"); + scheduledExecutorService.shutdownNow(); + } + } } } diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisher.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisher.java new file mode 100644 index 0000000000..456a34b014 --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisher.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class AwsSecretsPluginConfigPublisher implements PluginConfigPublisher { + private final Map pluginConfigurationObservableBooleanMap + = new ConcurrentHashMap<>(); + + @Override + public boolean addPluginConfigObservable(final PluginConfigObservable pluginConfigObservable) { + pluginConfigurationObservableBooleanMap.put(pluginConfigObservable, true); + return true; + } + + @Override + public void notifyAllPluginConfigObservable() { + pluginConfigurationObservableBooleanMap.keySet().forEach(PluginConfigObservable::update); + } +} diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProvider.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProvider.java new file mode 100644 index 0000000000..f11e83cb18 --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProvider.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +import java.util.Optional; + +public class AwsSecretsPluginConfigPublisherExtensionProvider implements ExtensionProvider { + + private final PluginConfigPublisher pluginConfigPublisher; + + AwsSecretsPluginConfigPublisherExtensionProvider(final PluginConfigPublisher pluginConfigPublisher) { + this.pluginConfigPublisher = pluginConfigPublisher; + } + + @Override + public Optional provideInstance(final Context context) { + return Optional.ofNullable(pluginConfigPublisher); + } + + @Override + public Class supportedClass() { + return PluginConfigPublisher.class; + } +} diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProvider.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProvider.java similarity index 72% rename from data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProvider.java rename to data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProvider.java index ff911f0211..b2a03b66c1 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProvider.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProvider.java @@ -5,10 +5,10 @@ import java.util.Optional; -public class AwsSecretExtensionProvider implements ExtensionProvider { +public class AwsSecretsPluginConfigValueTranslatorExtensionProvider implements ExtensionProvider { private final PluginConfigValueTranslator pluginConfigValueTranslator; - AwsSecretExtensionProvider(final PluginConfigValueTranslator pluginConfigValueTranslator) { + AwsSecretsPluginConfigValueTranslatorExtensionProvider(final PluginConfigValueTranslator pluginConfigValueTranslator) { this.pluginConfigValueTranslator = pluginConfigValueTranslator; } @Override diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplier.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplier.java index 7919324a6e..64c0df277b 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplier.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplier.java @@ -3,52 +3,43 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class AwsSecretsSupplier implements SecretsSupplier { + private static final Logger LOG = LoggerFactory.getLogger(AwsSecretsSupplier.class); static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference<>() { }; private final ObjectMapper objectMapper; - private final Map secretIdToValue; + private final Map awsSecretManagerConfigurationMap; + private final Map secretsManagerClientMap; + private final ConcurrentMap secretIdToValue; public AwsSecretsSupplier(final AwsSecretPluginConfig awsSecretPluginConfig, final ObjectMapper objectMapper) { this.objectMapper = objectMapper; - secretIdToValue = toSecretMap(awsSecretPluginConfig); + awsSecretManagerConfigurationMap = awsSecretPluginConfig + .getAwsSecretManagerConfigurationMap(); + secretsManagerClientMap = toSecretsManagerClientMap(awsSecretPluginConfig); + secretIdToValue = toSecretMap(awsSecretManagerConfigurationMap); } - private Map toSecretMap(final AwsSecretPluginConfig awsSecretPluginConfig) { - final Map secretsManagerClientMap = toSecretsManagerClientMap( - awsSecretPluginConfig); - final Map awsSecretManagerConfigurationMap = awsSecretPluginConfig - .getAwsSecretManagerConfigurationMap(); + private ConcurrentMap toSecretMap( + final Map awsSecretManagerConfigurationMap) { return secretsManagerClientMap.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + .collect(Collectors.toConcurrentMap(Map.Entry::getKey, entry -> { final String secretConfigurationId = entry.getKey(); final AwsSecretManagerConfiguration awsSecretManagerConfiguration = awsSecretManagerConfigurationMap.get(secretConfigurationId); final SecretsManagerClient secretsManagerClient = entry.getValue(); - final GetSecretValueRequest getSecretValueRequest = awsSecretManagerConfiguration - .createGetSecretValueRequest(); - final GetSecretValueResponse getSecretValueResponse; - try { - getSecretValueResponse = secretsManagerClient.getSecretValue(getSecretValueRequest); - } catch (Exception e) { - throw new RuntimeException( - String.format("Unable to retrieve secret: %s", - awsSecretManagerConfiguration.getAwsSecretId()), e); - } - - try { - return objectMapper.readValue(getSecretValueResponse.secretString(), MAP_TYPE_REFERENCE); - } catch (JsonProcessingException e) { - return getSecretValueResponse.secretString(); - } + return retrieveSecretsFromSecretManager(awsSecretManagerConfiguration, secretsManagerClient); })); } @@ -93,4 +84,37 @@ public Object retrieveValue(String secretId) { secretId)); } } + + @Override + public void refresh(String secretConfigId) { + LOG.info("Retrieving latest secrets in aws:secrets:{}.", secretConfigId); + secretIdToValue.compute(secretConfigId, (key, oldValue) -> { + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = + awsSecretManagerConfigurationMap.get(key); + final SecretsManagerClient secretsManagerClient = + secretsManagerClientMap.get(key); + return retrieveSecretsFromSecretManager(awsSecretManagerConfiguration, secretsManagerClient); + }); + LOG.info("Finished retrieving latest secret in aws:secrets:{}.", secretConfigId); + } + + private Object retrieveSecretsFromSecretManager(final AwsSecretManagerConfiguration awsSecretManagerConfiguration, + final SecretsManagerClient secretsManagerClient) { + final GetSecretValueRequest getSecretValueRequest = awsSecretManagerConfiguration + .createGetSecretValueRequest(); + final GetSecretValueResponse getSecretValueResponse; + try { + getSecretValueResponse = secretsManagerClient.getSecretValue(getSecretValueRequest); + } catch (Exception e) { + throw new RuntimeException( + String.format("Unable to retrieve secret: %s", + awsSecretManagerConfiguration.getAwsSecretId()), e); + } + + try { + return objectMapper.readValue(getSecretValueResponse.secretString(), MAP_TYPE_REFERENCE); + } catch (JsonProcessingException e) { + return getSecretValueResponse.secretString(); + } + } } diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJob.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJob.java new file mode 100644 index 0000000000..66bf6d3aad --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJob.java @@ -0,0 +1,23 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +public class SecretsRefreshJob implements Runnable { + private final String secretConfigId; + private final SecretsSupplier secretsSupplier; + private final PluginConfigPublisher pluginConfigPublisher; + + public SecretsRefreshJob(final String secretConfigId, + final SecretsSupplier secretsSupplier, + final PluginConfigPublisher pluginConfigPublisher) { + this.secretConfigId = secretConfigId; + this.secretsSupplier = secretsSupplier; + this.pluginConfigPublisher = pluginConfigPublisher; + } + + @Override + public void run() { + secretsSupplier.refresh(secretConfigId); + pluginConfigPublisher.notifyAllPluginConfigObservable(); + } +} diff --git a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsSupplier.java b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsSupplier.java index 467649775b..c24621b4fa 100644 --- a/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsSupplier.java +++ b/data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/SecretsSupplier.java @@ -4,4 +4,6 @@ public interface SecretsSupplier { Object retrieveValue(String secretId, String key); Object retrieveValue(String secretId); + + void refresh(String secretId); } diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfigurationTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfigurationTest.java index 7daed30307..14e49b42fc 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfigurationTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretManagerConfigurationTest.java @@ -2,10 +2,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import jakarta.validation.ConstraintViolation; import jakarta.validation.Validation; import jakarta.validation.Validator; import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -25,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; +import java.time.Duration; import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; @@ -40,12 +43,14 @@ @ExtendWith(MockitoExtension.class) class AwsSecretManagerConfigurationTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()); + private static final Validator VALIDATOR = Validation.byDefaultProvider() .configure() .messageInterpolator(new ParameterMessageInterpolator()) .buildValidatorFactory().getValidator(); + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory()); + @Mock private GetSecretValueRequest.Builder getSecretValueRequestBuilder; @@ -61,13 +66,42 @@ class AwsSecretManagerConfigurationTest { @Captor private ArgumentCaptor awsCredentialsProviderArgumentCaptor; + @BeforeEach + void setup() { + objectMapper.registerModule(new JavaTimeModule()); + } + + @Test + void testAwsSecretManagerConfigurationDefault() throws IOException { + final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( + "/test-aws-secret-manager-configuration-default.yaml"); + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( + inputStream, AwsSecretManagerConfiguration.class); + assertThat(awsSecretManagerConfiguration.getAwsSecretId(), equalTo("test-secret")); + assertThat(awsSecretManagerConfiguration.getAwsRegion(), equalTo(Region.US_EAST_1)); + assertThat(awsSecretManagerConfiguration.getRefreshInterval(), equalTo(Duration.ofHours(1))); + } + + @Test + void testAwsSecretManagerConfigurationInvalidRefreshInterval() throws IOException { + final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( + "/test-aws-secret-manager-configuration-invalid-refresh-interval.yaml"); + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( + inputStream, AwsSecretManagerConfiguration.class); + final Set> violations = VALIDATOR.validate( + awsSecretManagerConfiguration); + assertThat(violations.size(), equalTo(1)); + final ConstraintViolation violation = violations.stream().findFirst().get(); + assertThat(violation.getMessage(), equalTo("Refresh interval must be at least 1 hour.")); + } + @Test void testCreateGetSecretValueRequest() throws IOException { when(getSecretValueRequestBuilder.secretId(anyString())).thenReturn(getSecretValueRequestBuilder); when(getSecretValueRequestBuilder.build()).thenReturn(getSecretValueRequest); final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( "/test-aws-secret-manager-configuration-default.yaml"); - final AwsSecretManagerConfiguration awsSecretManagerConfiguration = OBJECT_MAPPER.readValue( + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( inputStream, AwsSecretManagerConfiguration.class); try (final MockedStatic getSecretValueRequestMockedStatic = mockStatic(GetSecretValueRequest.class)) { @@ -82,7 +116,7 @@ void testCreateGetSecretValueRequest() throws IOException { void testCreateSecretManagerClientWithDefaultCredential() throws IOException { final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( "/test-aws-secret-manager-configuration-default.yaml"); - final AwsSecretManagerConfiguration awsSecretManagerConfiguration = OBJECT_MAPPER.readValue( + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( inputStream, AwsSecretManagerConfiguration.class); assertThat(awsSecretManagerConfiguration.getAwsSecretId(), equalTo("test-secret")); when(secretsManagerClientBuilder.region(any(Region.class))).thenReturn(secretsManagerClientBuilder); @@ -103,7 +137,7 @@ void testCreateSecretManagerClientWithDefaultCredential() throws IOException { void testCreateSecretManagerClientWithStsCredential() throws IOException { final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( "/test-aws-secret-manager-configuration-with-sts.yaml"); - final AwsSecretManagerConfiguration awsSecretManagerConfiguration = OBJECT_MAPPER.readValue( + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( inputStream, AwsSecretManagerConfiguration.class); assertThat(awsSecretManagerConfiguration.getAwsSecretId(), equalTo("test-secret")); when(secretsManagerClientBuilder.region(any(Region.class))).thenReturn(secretsManagerClientBuilder); @@ -128,7 +162,7 @@ void testCreateSecretManagerClientWithStsCredential() throws IOException { }) void testCreateSecretManagerClientWithInvalidStsRoleArn(final String testFileName) throws IOException { final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream(testFileName); - final AwsSecretManagerConfiguration awsSecretManagerConfiguration = OBJECT_MAPPER.readValue( + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( inputStream, AwsSecretManagerConfiguration.class); try (final MockedStatic secretsManagerClientMockedStatic = mockStatic( SecretsManagerClient.class)) { @@ -142,7 +176,7 @@ void testCreateSecretManagerClientWithInvalidStsRoleArn(final String testFileNam void testDeserializationMissingName() throws IOException { final InputStream inputStream = AwsSecretPluginConfigTest.class.getResourceAsStream( "/test-aws-secret-manager-configuration-missing-secret-id.yaml"); - final AwsSecretManagerConfiguration awsSecretManagerConfiguration = OBJECT_MAPPER.readValue( + final AwsSecretManagerConfiguration awsSecretManagerConfiguration = objectMapper.readValue( inputStream, AwsSecretManagerConfiguration.class); final Set> violations = VALIDATOR.validate( awsSecretManagerConfiguration); diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java index b0cefab287..b47b0fc02f 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretPluginIT.java @@ -3,48 +3,117 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.plugin.ExtensionPoints; import org.opensearch.dataprepper.model.plugin.ExtensionProvider; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; import org.opensearch.dataprepper.model.plugin.PluginConfigValueTranslator; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; +import java.time.Duration; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.aws.AwsSecretPlugin.PERIOD_IN_SECONDS; @ExtendWith(MockitoExtension.class) class AwsSecretPluginIT { + private static String TEST_SECRET_CONFIG_ID = "testSecretConfig"; @Mock private AwsSecretPluginConfig awsSecretPluginConfig; + @Mock + private AwsSecretManagerConfiguration awsSecretManagerConfiguration; + + @Mock + private SecretsManagerClient secretsManagerClient; + + @Mock + private GetSecretValueRequest getSecretValueRequest; + + @Mock + private GetSecretValueResponse getSecretValueResponse; + @Mock private ExtensionPoints extensionPoints; @Mock private ExtensionProvider.Context context; + @Mock + private ScheduledExecutorService scheduledExecutorService; + + @Mock + private Runtime runtime; + + @Captor + private ArgumentCaptor initialDelayCaptor; + + @Captor + private ArgumentCaptor periodCaptor; + private AwsSecretPlugin objectUnderTest; @Test void testInitializationWithNonNullConfig() { - when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn(Collections.emptyMap()); - objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); - objectUnderTest.apply(extensionPoints); + final Duration testInterval = Duration.ofHours(2); + when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( + Map.of(TEST_SECRET_CONFIG_ID, awsSecretManagerConfiguration)); + when(awsSecretManagerConfiguration.getRefreshInterval()).thenReturn(testInterval); + when(awsSecretManagerConfiguration.createSecretManagerClient()).thenReturn(secretsManagerClient); + when(awsSecretManagerConfiguration.createGetSecretValueRequest()).thenReturn(getSecretValueRequest); + when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenReturn(getSecretValueResponse); + when(getSecretValueResponse.secretString()).thenReturn(UUID.randomUUID().toString()); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) + ) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) + .thenReturn(scheduledExecutorService); + runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); + objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); + objectUnderTest.apply(extensionPoints); + } final ArgumentCaptor extensionProviderArgumentCaptor = ArgumentCaptor.forClass(ExtensionProvider.class); - verify(extensionPoints).addExtensionProvider(extensionProviderArgumentCaptor.capture()); - final ExtensionProvider actualExtensionProvider = extensionProviderArgumentCaptor.getValue(); - assertThat(actualExtensionProvider, instanceOf(AwsSecretExtensionProvider.class)); + verify(extensionPoints, times(2)).addExtensionProvider(extensionProviderArgumentCaptor.capture()); + final List actualExtensionProviders = extensionProviderArgumentCaptor.getAllValues(); + assertThat(actualExtensionProviders.get(0), instanceOf(AwsSecretsPluginConfigValueTranslatorExtensionProvider.class)); final Optional optionalPluginConfigValueTranslator = - actualExtensionProvider.provideInstance(context); + actualExtensionProviders.get(0).provideInstance(context); assertThat(optionalPluginConfigValueTranslator.isPresent(), is(true)); assertThat(optionalPluginConfigValueTranslator.get(), instanceOf(AwsSecretsPluginConfigValueTranslator.class)); + assertThat(actualExtensionProviders.get(1), instanceOf(AwsSecretsPluginConfigPublisherExtensionProvider.class)); + final Optional optionalPluginConfigPublisher = + actualExtensionProviders.get(1).provideInstance(context); + assertThat(optionalPluginConfigPublisher.isPresent(), is(true)); + verify(scheduledExecutorService).scheduleAtFixedRate( + any(), initialDelayCaptor.capture(), periodCaptor.capture(), eq(TimeUnit.SECONDS)); + assertThat(initialDelayCaptor.getValue() >= testInterval.toSeconds(), is(true)); + assertThat(periodCaptor.getValue(), equalTo(testInterval.toSeconds())); + verify(runtime).addShutdownHook(any()); } @Test @@ -53,11 +122,93 @@ void testInitializationWithNullConfig() { objectUnderTest.apply(extensionPoints); final ArgumentCaptor extensionProviderArgumentCaptor = ArgumentCaptor.forClass(ExtensionProvider.class); - verify(extensionPoints).addExtensionProvider(extensionProviderArgumentCaptor.capture()); - final ExtensionProvider actualExtensionProvider = extensionProviderArgumentCaptor.getValue(); - assertThat(actualExtensionProvider, instanceOf(AwsSecretExtensionProvider.class)); + verify(extensionPoints, times(2)).addExtensionProvider(extensionProviderArgumentCaptor.capture()); + final List actualExtensionProviders = extensionProviderArgumentCaptor.getAllValues(); + assertThat(actualExtensionProviders.get(0), instanceOf(AwsSecretsPluginConfigValueTranslatorExtensionProvider.class)); final Optional optionalPluginConfigValueTranslator = - actualExtensionProvider.provideInstance(context); + actualExtensionProviders.get(0).provideInstance(context); assertThat(optionalPluginConfigValueTranslator.isEmpty(), is(true)); + assertThat(actualExtensionProviders.get(1), instanceOf(AwsSecretsPluginConfigPublisherExtensionProvider.class)); + final Optional optionalPluginConfigPublisher = + actualExtensionProviders.get(1).provideInstance(context); + assertThat(optionalPluginConfigPublisher.isEmpty(), is(true)); + } + + @Test + void testShutdownAwaitTerminationSuccess() throws InterruptedException { + when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( + Collections.emptyMap()); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) + ) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) + .thenReturn(scheduledExecutorService); + runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); + objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); + } + when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); + objectUnderTest.shutdown(); + + verify(scheduledExecutorService).shutdown(); + verify(scheduledExecutorService).awaitTermination( + eq(Integer.valueOf(PERIOD_IN_SECONDS).longValue()), eq(TimeUnit.SECONDS)); + verify(scheduledExecutorService, times(0)).shutdownNow(); + } + + @Test + void testShutdownAwaitTerminationTimeout() throws InterruptedException { + when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( + Collections.emptyMap()); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) + ) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) + .thenReturn(scheduledExecutorService); + runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); + objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); + } + when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false); + objectUnderTest.shutdown(); + + verify(scheduledExecutorService).shutdown(); + verify(scheduledExecutorService).awaitTermination( + eq(Integer.valueOf(PERIOD_IN_SECONDS).longValue()), eq(TimeUnit.SECONDS)); + verify(scheduledExecutorService).shutdownNow(); + } + + @Test + void testShutdownAwaitTerminationInterrupted() throws InterruptedException { + when(awsSecretPluginConfig.getAwsSecretManagerConfigurationMap()).thenReturn( + Collections.emptyMap()); + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) + ) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) + .thenReturn(scheduledExecutorService); + runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); + objectUnderTest = new AwsSecretPlugin(awsSecretPluginConfig); + } + when(scheduledExecutorService.awaitTermination(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException()); + objectUnderTest.shutdown(); + + verify(scheduledExecutorService).shutdown(); + verify(scheduledExecutorService).awaitTermination( + eq(Integer.valueOf(PERIOD_IN_SECONDS).longValue()), eq(TimeUnit.SECONDS)); + verify(scheduledExecutorService).shutdownNow(); + } + + @Test + void testShutdownWithNullScheduledExecutorService() { + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); + final MockedStatic runtimeMockedStatic = mockStatic(Runtime.class) + ) { + executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor) + .thenReturn(scheduledExecutorService); + runtimeMockedStatic.when(Runtime::getRuntime).thenReturn(runtime); + objectUnderTest = new AwsSecretPlugin(null); + } + objectUnderTest.shutdown(); + verifyNoInteractions(scheduledExecutorService); } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProviderTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProviderTest.java new file mode 100644 index 0000000000..ab919d7e20 --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherExtensionProviderTest.java @@ -0,0 +1,51 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.ExtensionProvider; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; + +@ExtendWith(MockitoExtension.class) +class AwsSecretsPluginConfigPublisherExtensionProviderTest { + @Mock + private AwsSecretsPluginConfigPublisher awsSecretsPluginConfigPublisher; + + @Mock + private ExtensionProvider.Context context; + + private AwsSecretsPluginConfigPublisherExtensionProvider createObjectUnderTest() { + return new AwsSecretsPluginConfigPublisherExtensionProvider(awsSecretsPluginConfigPublisher); + } + + @Test + void supportedClass_returns_PluginConfigPublisher() { + assertThat(createObjectUnderTest().supportedClass(), equalTo(PluginConfigPublisher.class)); + } + + @Test + void provideInstance_returns_the_PluginConfigPublisher_from_the_constructor() { + final AwsSecretsPluginConfigPublisherExtensionProvider objectUnderTest = createObjectUnderTest(); + + final Optional optionalPluginConfigPublisher = + objectUnderTest.provideInstance(context); + assertThat(optionalPluginConfigPublisher, notNullValue()); + assertThat(optionalPluginConfigPublisher.isPresent(), equalTo(true)); + assertThat(optionalPluginConfigPublisher.get(), equalTo(awsSecretsPluginConfigPublisher)); + + final Optional anotherOptionalPluginConfigPublisher = + objectUnderTest.provideInstance(context); + assertThat(anotherOptionalPluginConfigPublisher, notNullValue()); + assertThat(anotherOptionalPluginConfigPublisher.isPresent(), equalTo(true)); + assertThat(anotherOptionalPluginConfigPublisher.get(), sameInstance( + anotherOptionalPluginConfigPublisher.get())); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherTest.java new file mode 100644 index 0000000000..57b0e75c9e --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigPublisherTest.java @@ -0,0 +1,24 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +class AwsSecretsPluginConfigPublisherTest { + + private AwsSecretsPluginConfigPublisher objectUnderTest; + + @Test + void testAddPluginConfigurationObservableAndThenNotifyAll() { + final PluginConfigObservable pluginConfigObservable1 = mock(PluginConfigObservable.class); + final PluginConfigObservable pluginConfigObservable2 = mock(PluginConfigObservable.class); + objectUnderTest = new AwsSecretsPluginConfigPublisher(); + objectUnderTest.addPluginConfigObservable(pluginConfigObservable1); + objectUnderTest.addPluginConfigObservable(pluginConfigObservable2); + objectUnderTest.notifyAllPluginConfigObservable(); + verify(pluginConfigObservable1).update(); + verify(pluginConfigObservable2).update(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProviderTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProviderTest.java similarity index 83% rename from data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProviderTest.java rename to data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProviderTest.java index c1e84e9c7b..fe033c7f9b 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretExtensionProviderTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsPluginConfigValueTranslatorExtensionProviderTest.java @@ -15,15 +15,15 @@ import static org.hamcrest.MatcherAssert.assertThat; @ExtendWith(MockitoExtension.class) -class AwsSecretExtensionProviderTest { +class AwsSecretsPluginConfigValueTranslatorExtensionProviderTest { @Mock private PluginConfigValueTranslator pluginConfigValueTranslator; @Mock private ExtensionProvider.Context context; - private AwsSecretExtensionProvider createObjectUnderTest() { - return new AwsSecretExtensionProvider(pluginConfigValueTranslator); + private AwsSecretsPluginConfigValueTranslatorExtensionProvider createObjectUnderTest() { + return new AwsSecretsPluginConfigValueTranslatorExtensionProvider(pluginConfigValueTranslator); } @Test @@ -33,7 +33,7 @@ void supportedClass_returns_PluginConfigValueTranslator() { @Test void provideInstance_returns_the_PluginConfigValueTranslator_from_the_constructor() { - final AwsSecretExtensionProvider objectUnderTest = createObjectUnderTest(); + final AwsSecretsPluginConfigValueTranslatorExtensionProvider objectUnderTest = createObjectUnderTest(); final Optional optionalPluginConfigValueTranslator = objectUnderTest.provideInstance(context); diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplierTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplierTest.java index f9047086f6..b9ac484bbb 100644 --- a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplierTest.java +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsSecretsSupplierTest.java @@ -16,6 +16,7 @@ import software.amazon.awssdk.services.secretsmanager.model.SecretsManagerException; import java.util.Map; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -127,4 +128,30 @@ void testConstructorWithGetSecretValueFailure() { when(secretsManagerClient.getSecretValue(eq(getSecretValueRequest))).thenThrow(secretsManagerException); assertThrows(RuntimeException.class, () -> new AwsSecretsSupplier(awsSecretPluginConfig, OBJECT_MAPPER)); } + + @Test + void testRefreshSecretsWithKey() { + final String testValue = "{\"key\":\"oldValue\"}"; + when(getSecretValueResponse.secretString()).thenReturn(testValue); + objectUnderTest = new AwsSecretsSupplier(awsSecretPluginConfig, OBJECT_MAPPER); + assertThat(objectUnderTest.retrieveValue(TEST_AWS_SECRET_CONFIGURATION_NAME, "key"), + equalTo("oldValue")); + final String newTestValue = "{\"key\":\"newValue\"}"; + when(getSecretValueResponse.secretString()).thenReturn(newTestValue); + objectUnderTest.refresh(TEST_AWS_SECRET_CONFIGURATION_NAME); + assertThat(objectUnderTest.retrieveValue(TEST_AWS_SECRET_CONFIGURATION_NAME, "key"), + equalTo("newValue")); + } + + @Test + void testRefreshSecretsWithoutKey() { + final String testValue = UUID.randomUUID().toString(); + when(getSecretValueResponse.secretString()).thenReturn(testValue); + objectUnderTest = new AwsSecretsSupplier(awsSecretPluginConfig, OBJECT_MAPPER); + assertThat(objectUnderTest.retrieveValue(TEST_AWS_SECRET_CONFIGURATION_NAME), equalTo(testValue)); + final String newTestValue = testValue + "-mutated"; + when(getSecretValueResponse.secretString()).thenReturn(newTestValue); + objectUnderTest.refresh(TEST_AWS_SECRET_CONFIGURATION_NAME); + assertThat(objectUnderTest.retrieveValue(TEST_AWS_SECRET_CONFIGURATION_NAME), equalTo(newTestValue)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJobTest.java b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJobTest.java new file mode 100644 index 0000000000..08b4f51e1d --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/SecretsRefreshJobTest.java @@ -0,0 +1,35 @@ +package org.opensearch.dataprepper.plugins.aws; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher; + +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class SecretsRefreshJobTest { + private static final String TEST_SECRET_CONFIG_ID = "test_secret_config_id"; + @Mock + private SecretsSupplier secretsSupplier; + + @Mock + private PluginConfigPublisher pluginConfigPublisher; + + private SecretsRefreshJob objectUnderTest; + + @BeforeEach + void setUp() { + objectUnderTest = new SecretsRefreshJob(TEST_SECRET_CONFIG_ID, secretsSupplier, pluginConfigPublisher); + } + + @Test + void testRun() { + objectUnderTest.run(); + + verify(secretsSupplier).refresh(TEST_SECRET_CONFIG_ID); + verify(pluginConfigPublisher).notifyAllPluginConfigObservable(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/aws-plugin/src/test/resources/test-aws-secret-manager-configuration-invalid-refresh-interval.yaml b/data-prepper-plugins/aws-plugin/src/test/resources/test-aws-secret-manager-configuration-invalid-refresh-interval.yaml new file mode 100644 index 0000000000..28b38d6f2c --- /dev/null +++ b/data-prepper-plugins/aws-plugin/src/test/resources/test-aws-secret-manager-configuration-invalid-refresh-interval.yaml @@ -0,0 +1,2 @@ +secret_id: test-secret +refresh_interval: PT1m \ No newline at end of file