Skip to content

Commit

Permalink
ENH: data-prepper-core support for secrets refreshment (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#3415)

* INIT: secrets refreshment infra

Signed-off-by: George Chen <[email protected]>

* MAINT: add interval and test validity

Signed-off-by: George Chen <[email protected]>

* MAINT: some more refactoring

Signed-off-by: George Chen <[email protected]>

* MAINT: delete unused classes

Signed-off-by: George Chen <[email protected]>

* TST: AwsSecretsPluginConfigPublisherExtensionProviderTest

Signed-off-by: George Chen <[email protected]>

* MAINT: inject PluginConfigPublisher into PluginCreator

Signed-off-by: George Chen <[email protected]>

* MAINT: complete test cases for AwsSecretPluginIT

Signed-off-by: George Chen <[email protected]>

* MAINT: test refresh secrets

Signed-off-by: George Chen <[email protected]>

* MAINT: refactoring and documentation

Signed-off-by: George Chen <[email protected]>

* STY: import

Signed-off-by: George Chen <[email protected]>

* MAINT: fix test cases

Signed-off-by: George Chen <[email protected]>

* MAINT: missing test case

Signed-off-by: George Chen <[email protected]>

* MAINT: address minor comments

Signed-off-by: George Chen <[email protected]>

* REF: PluginConfigurationObservableRegister

Signed-off-by: George Chen <[email protected]>

---------

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Oct 5, 2023
1 parent 52cbd03 commit 71a6956
Show file tree
Hide file tree
Showing 32 changed files with 860 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.model.plugin;

/**
* An interface to be implemented by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source}
* /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink},
* to refresh their components, e.g. client connection.
* @since 2.5
*/
public interface PluginComponentRefresher<PluginComponent, PluginConfig> {
/**
* Returns the refreshed {@link PluginComponent}.
*
* @return An instance of {@link PluginComponent}.
*/
PluginComponent get();

/**
* Updates the {@link PluginComponent} with the new {@link PluginConfig}.
*
* @param pluginConfig The new pluginConfig used to refresh the {@link PluginComponent}.
*/
void update(PluginConfig pluginConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.model.plugin;

/**
* An interface used by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source}
* /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink},
* to onboard {@link PluginConfigObserver}.
* @since 2.5
*/
public interface PluginConfigObservable {

/**
* Onboard a new {@link PluginConfigObserver} within the plugin.
*/
boolean addPluginConfigObserver(PluginConfigObserver pluginConfigObserver);

/**
* Invoke all {@link PluginConfigObserver}.
*/
void update();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.model.plugin;

/**
* An interface by pipeline plugins, i.e. {@link org.opensearch.dataprepper.model.source.Source}
* * /{@link org.opensearch.dataprepper.model.processor.Processor}/{@link org.opensearch.dataprepper.model.sink.Sink}
* to implement custom plugin component refreshment logic.
* @since 2.5
*/
public interface PluginConfigObserver<T> {
/**
* Update plugin components with a new pluginConfig.
*
* @param pluginConfig The plugin configuration object used as reference for update.
*/
void update(T pluginConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.dataprepper.model.plugin;

/**
* An interface used to onboard and notify all {@link PluginConfigObservable} to invoke update.
* @since 2.5
*/
public interface PluginConfigPublisher {
/**
* Onboard a new {@link PluginConfigObservable}.
*/
boolean addPluginConfigObservable(PluginConfigObservable pluginConfigObservable);

/**
* Notify all {@link PluginConfigObservable} to update.
*/
void notifyAllPluginConfigObservable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand Down Expand Up @@ -65,6 +66,11 @@ private ComponentPluginArgumentsContext(final Builder builder) {
if (builder.acknowledgementSetManager != null) {
typedArgumentsSuppliers.put(AcknowledgementSetManager.class, () -> builder.acknowledgementSetManager);
}

if (builder.pluginConfigObservable != null) {
typedArgumentsSuppliers.put(PluginConfigObservable.class, () -> builder.pluginConfigObservable);
}

if (builder.sinkContext != null) {
typedArgumentsSuppliers.put(SinkContext.class, () -> builder.sinkContext);
}
Expand Down Expand Up @@ -118,6 +124,7 @@ static class Builder {
private BeanFactory beanFactory;
private EventFactory eventFactory;
private AcknowledgementSetManager acknowledgementSetManager;
private PluginConfigObservable pluginConfigObservable;
private SinkContext sinkContext;

Builder withPluginConfiguration(final Object pluginConfiguration) {
Expand Down Expand Up @@ -160,6 +167,11 @@ Builder withBeanFactory(final BeanFactory beanFactory) {
return this;
}

Builder withPluginConfigurationObservable(final PluginConfigObservable pluginConfigObservable) {
this.pluginConfigObservable = pluginConfigObservable;
return this;
}

ComponentPluginArgumentsContext build() {
return new ComponentPluginArgumentsContext(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginConfigObserver;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class DefaultPluginConfigObservable implements PluginConfigObservable {
private final Map<PluginConfigObserver, Boolean> pluginConfigObserverBooleanMap
= new ConcurrentHashMap<>();
private final PluginConfigurationConverter pluginConfigurationConverter;
private final Class<?> pluginConfigClass;
private final PluginSetting rawPluginSettings;

public DefaultPluginConfigObservable(final PluginConfigurationConverter pluginConfigurationConverter,
final Class<?> pluginConfigClass,
final PluginSetting rawPluginSettings) {
this.pluginConfigurationConverter = pluginConfigurationConverter;
this.pluginConfigClass = pluginConfigClass;
this.rawPluginSettings = rawPluginSettings;
}

@Override
public boolean addPluginConfigObserver(final PluginConfigObserver pluginConfigObserver) {
pluginConfigObserverBooleanMap.put(pluginConfigObserver, true);
return true;
}

@Override
public void update() {
final Object newPluginConfiguration = pluginConfigurationConverter.convert(
pluginConfigClass, rawPluginSettings);
pluginConfigObserverBooleanMap.keySet().forEach(
pluginConfigObserver -> pluginConfigObserver.update(newPluginConfiguration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class DefaultPluginFactory implements PluginFactory {
private final PluginBeanFactoryProvider pluginBeanFactoryProvider;
private final DefaultEventFactory eventFactory;
private final DefaultAcknowledgementSetManager acknowledgementSetManager;
private final PluginConfigurationObservableFactory pluginConfigurationObservableFactory;

@Inject
DefaultPluginFactory(
Expand All @@ -49,16 +51,19 @@ public class DefaultPluginFactory implements PluginFactory {
final PluginConfigurationConverter pluginConfigurationConverter,
final PluginBeanFactoryProvider pluginBeanFactoryProvider,
final DefaultEventFactory eventFactory,
final DefaultAcknowledgementSetManager acknowledgementSetManager
final DefaultAcknowledgementSetManager acknowledgementSetManager,
final PluginConfigurationObservableFactory pluginConfigurationObservableFactory
) {
Objects.requireNonNull(pluginProviderLoader);
Objects.requireNonNull(pluginConfigurationObservableFactory);
this.pluginCreator = Objects.requireNonNull(pluginCreator);
this.pluginConfigurationConverter = Objects.requireNonNull(pluginConfigurationConverter);

this.pluginProviders = Objects.requireNonNull(pluginProviderLoader.getPluginProviders());
this.pluginBeanFactoryProvider = Objects.requireNonNull(pluginBeanFactoryProvider);
this.eventFactory = Objects.requireNonNull(eventFactory);
this.acknowledgementSetManager = Objects.requireNonNull(acknowledgementSetManager);
this.pluginConfigurationObservableFactory = pluginConfigurationObservableFactory;

if(pluginProviders.isEmpty()) {
throw new RuntimeException("Data Prepper requires at least one PluginProvider. " +
Expand Down Expand Up @@ -113,6 +118,8 @@ private <T> ComponentPluginArgumentsContext getConstructionContext(final PluginS

final Class<?> pluginConfigurationType = pluginAnnotation.pluginConfigurationType();
final Object configuration = pluginConfigurationConverter.convert(pluginConfigurationType, pluginSetting);
final PluginConfigObservable pluginConfigObservable = pluginConfigurationObservableFactory
.createDefaultPluginConfigObservable(pluginConfigurationConverter, pluginClass, pluginSetting);

return new ComponentPluginArgumentsContext.Builder()
.withPluginSetting(pluginSetting)
Expand All @@ -122,6 +129,7 @@ private <T> ComponentPluginArgumentsContext getConstructionContext(final PluginS
.withBeanFactory(pluginBeanFactoryProvider.get())
.withEventFactory(eventFactory)
.withAcknowledgementSetManager(acknowledgementSetManager)
.withPluginConfigurationObservable(pluginConfigObservable)
.withSinkContext(sinkContext)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;

import javax.inject.Named;

@Named
public class PluginConfigurationObservableFactory {
public final PluginConfigObservable createDefaultPluginConfigObservable(
final PluginConfigurationConverter pluginConfigurationConverter, final Class<?> pluginConfigClass,
final PluginSetting rawPluginSettings) {
return new DefaultPluginConfigObservable(
pluginConfigurationConverter, pluginConfigClass, rawPluginSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.dataprepper.plugin;

import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.plugin.PluginConfigPublisher;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;

@Named
public class PluginConfigurationObservableRegister {
private final Set<PluginConfigPublisher> pluginConfigPublishers;

@Inject
public PluginConfigurationObservableRegister(final Set<PluginConfigPublisher> pluginConfigPublishers) {
this.pluginConfigPublishers = pluginConfigPublishers;
}

public void registerPluginConfigurationObservables(final Object[] constructorArguments) {
Optional.ofNullable(constructorArguments).ifPresent(arguments -> Arrays.stream(arguments)
.filter(arg -> arg instanceof PluginConfigObservable)
.forEach(arg -> pluginConfigPublishers.forEach(pluginConfigPublisher ->
pluginConfigPublisher.addPluginConfigObservable(
(PluginConfigObservable) arg)))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -25,6 +26,13 @@
class PluginCreator {
private static final Logger LOG = LoggerFactory.getLogger(PluginCreator.class);

private final PluginConfigurationObservableRegister pluginConfigurationObservableRegister;

@Inject
PluginCreator(final PluginConfigurationObservableRegister pluginConfigurationObservableRegister) {
this.pluginConfigurationObservableRegister = pluginConfigurationObservableRegister;
}

<T> T newPluginInstance(final Class<T> pluginClass,
final PluginArgumentsContext pluginArgumentsContext,
final String pluginName) {
Expand All @@ -36,6 +44,8 @@ <T> T newPluginInstance(final Class<T> pluginClass,

final Object[] constructorArguments = pluginArgumentsContext.createArguments(constructor.getParameterTypes());

pluginConfigurationObservableRegister.registerPluginConfigurationObservables(constructorArguments);

try {
return (T) constructor.newInstance(constructorArguments);
} catch (final IllegalAccessException | InstantiationException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.sink.SinkContext;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -112,6 +113,19 @@ void createArguments_with_single_class_using_sink_context() {
equalTo(new Object[] { sinkContext}));
}

@Test
void createArguments_with_single_class_using_plugin_configuration_observable() {
final PluginConfigObservable pluginConfigObservable = mock(PluginConfigObservable.class);

final ComponentPluginArgumentsContext objectUnderTest = new ComponentPluginArgumentsContext.Builder()
.withPluginSetting(pluginSetting)
.withPluginConfigurationObservable(pluginConfigObservable)
.build();

assertThat(objectUnderTest.createArguments(new Class[] { PluginConfigObservable.class }),
equalTo(new Object[] {pluginConfigObservable}));
}

@Test
void createArguments_given_bean_not_available_with_single_class_using_bean_factory() {
doThrow(mock(BeansException.class)).when(beanFactory).getBean((Class<Object>) any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.NoPluginFoundException;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.event.DefaultEventFactory;
Expand Down Expand Up @@ -57,6 +58,8 @@ class DefaultPluginFactoryTest {
private String pipelineName;
private DefaultAcknowledgementSetManager acknowledgementSetManager;
private DefaultEventFactory eventFactory;
private PluginConfigurationObservableFactory pluginConfigurationObservableFactory;
private PluginConfigObservable pluginConfigObservable;

@BeforeEach
void setUp() {
Expand All @@ -80,10 +83,18 @@ void setUp() {

beanFactoryProvider = mock(PluginBeanFactoryProvider.class);
beanFactory = mock(BeanFactory.class);
pluginConfigurationObservableFactory = mock(PluginConfigurationObservableFactory.class);
given(pluginConfigurationObservableFactory.createDefaultPluginConfigObservable(
eq(pluginConfigurationConverter),
any(Class.class),
any(PluginSetting.class)
)).willReturn(pluginConfigObservable);
}

private DefaultPluginFactory createObjectUnderTest() {
return new DefaultPluginFactory(pluginProviderLoader, pluginCreator, pluginConfigurationConverter, beanFactoryProvider, eventFactory, acknowledgementSetManager);
return new DefaultPluginFactory(
pluginProviderLoader, pluginCreator, pluginConfigurationConverter, beanFactoryProvider, eventFactory,
acknowledgementSetManager, pluginConfigurationObservableFactory);
}

@Test
Expand Down Expand Up @@ -113,6 +124,15 @@ void constructor_should_throw_if_pluginProviders_is_empty() {
verifyNoInteractions(beanFactoryProvider);
}

@Test
void constructor_should_throw_if_pluginConfigurationObservableFactory_is_null() {
pluginConfigurationObservableFactory = null;

assertThrows(NullPointerException.class,
this::createObjectUnderTest);
verifyNoInteractions(beanFactoryProvider);
}

@Test
void constructor_should_throw_if_pluginCreator_is_null() {
pluginCreator = null;
Expand Down
Loading

0 comments on commit 71a6956

Please sign in to comment.