Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the configuration watcher to shut down the application to refresh the application #1799

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ change to a ConfigMap or Secret occurs then the HTTP implementation will use the
instances of the application which match the name of the ConfigMap or Secret and send an HTTP POST request to the application's actuator
`/refresh` endpoint. By default, it will send the post request to `/actuator/refresh` using the port registered in the discovery client.

You can also configure the configuration watcher to call the instances `shutdown` actuator endpoint. To do this you can set
`spring.cloud.kubernetes.configuration.watcher.refresh-strategy=shutdown`.

### Non-Default Management Port and Actuator Path

If the application is using a non-default actuator path and/or using a different port for the management endpoints, the Kubernetes service for the application
Expand Down Expand Up @@ -224,7 +227,13 @@ Another way you can choose to configure the actuator path and/or management port
## Messaging Implementation

The messaging implementation can be enabled by setting profile to either `bus-amqp` (RabbitMQ) or `bus-kafka` (Kafka) when the Spring Cloud Kubernetes Configuration Watcher
application is deployed to Kubernetes.
application is deployed to Kubernetes. By default, when using the messaging implementation the configuration watcher will send a `RefreshRemoteApplicationEvent` using
Spring Cloud Bus to all application instances. This will cause the application instances to refresh the application's configuration properties without
restarting the instance.

You can also configure the configuration to shut down the application instances in order to refresh the application's configuration properties.
When the application shuts down, Kubernetes will restart the application instance and the new configuration properties will be loaded. To use
this strategy set `spring.cloud.kubernetes.configuration.watcher.refresh-strategy=shutdown`.

## Configuring RabbitMQ

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/**
* @param enabled if kubernetes discovery is enabled
* @param allNamespaces if discover is enabled for all namespaces
* @param allNamespaces if discovery is enabled for all namespaces
* @param namespaces If set and allNamespaces is false, then only the services and
* endpoints matching these namespaces will be fetched from the Kubernetes API server.
* @param waitCacheReady wait for the discovery cache (service and endpoints) to be fully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import org.springframework.cloud.bus.event.PathDestinationFactory;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.event.ShutdownRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy.SHUTDOWN;

/**
* An event publisher for an 'event bus' type of application.
*
Expand All @@ -34,16 +38,28 @@ final class BusRefreshTrigger implements RefreshTrigger {

private final String busId;

BusRefreshTrigger(ApplicationEventPublisher applicationEventPublisher, String busId) {
private final ConfigurationWatcherConfigurationProperties watcherConfigurationProperties;

BusRefreshTrigger(ApplicationEventPublisher applicationEventPublisher, String busId,
ConfigurationWatcherConfigurationProperties watcherConfigurationProperties) {
this.applicationEventPublisher = applicationEventPublisher;
this.busId = busId;
this.watcherConfigurationProperties = watcherConfigurationProperties;
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject configMap, String appName) {
applicationEventPublisher.publishEvent(new RefreshRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName)));
applicationEventPublisher.publishEvent(createRefreshApplicationEvent(configMap, appName));
return Mono.empty();
}

private RemoteApplicationEvent createRefreshApplicationEvent(KubernetesObject configMap, String appName) {
if (watcherConfigurationProperties.getRefreshStrategy() == SHUTDOWN) {
return new ShutdownRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName));
}
return new RefreshRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class ConfigurationWatcherConfigurationProperties {
@DurationUnit(ChronoUnit.MILLIS)
private Duration refreshDelay = Duration.ofMillis(120000);

private RefreshStrategy refreshStrategy = RefreshStrategy.REFRESH;

private int threadPoolSize = 1;

private String actuatorPath = "/actuator";
Expand Down Expand Up @@ -115,4 +117,28 @@ public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}

public RefreshStrategy getRefreshStrategy() {
return refreshStrategy;
}

public void setRefreshStrategy(RefreshStrategy refreshStrategy) {
this.refreshStrategy = refreshStrategy;
}

public enum RefreshStrategy {

/**
* Call the Actuator refresh endpoint or send a refresh event over Spring Cloud
* Bus.
*/
REFRESH,

/**
* Call the Actuator shutdown endpoint or send a shutdown event over Spring Cloud
* Bus.
*/
SHUTDOWN

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy.SHUTDOWN;

/**
* @author wind57
*/
Expand Down Expand Up @@ -91,15 +93,15 @@ private URI getActuatorUri(ServiceInstance si, String actuatorPath, int actuator
}
else {
int port = actuatorPort < 0 ? si.getPort() : actuatorPort;
actuatorUriBuilder = actuatorUriBuilder.path(actuatorPath + "/refresh").port(port);
actuatorUriBuilder = actuatorUriBuilder.path(actuatorPath + getRefreshStrategyEndpoint()).port(port);
}

return actuatorUriBuilder.build().toUri();
}

private void setActuatorUriFromAnnotation(UriComponentsBuilder actuatorUriBuilder, String metadataUri) {
URI annotationUri = URI.create(metadataUri);
actuatorUriBuilder.path(annotationUri.getPath() + "/refresh");
actuatorUriBuilder.path(annotationUri.getPath() + getRefreshStrategyEndpoint());

// The URI may not contain a host so if that is the case the port in the URI will
// be -1. The authority of the URI will be :<port> for example :9090, we just need
Expand All @@ -114,4 +116,11 @@ private void setActuatorUriFromAnnotation(UriComponentsBuilder actuatorUriBuilde
}
}

private String getRefreshStrategyEndpoint() {
if (k8SConfigurationProperties.getRefreshStrategy() == SHUTDOWN) {
return "/shutdown";
}
return "/refresh";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class RefreshTriggerAutoConfiguration {
@ConditionalOnMissingBean
@Profile({ AMQP, KAFKA })
BusRefreshTrigger busRefreshTrigger(ApplicationEventPublisher applicationEventPublisher,
BusProperties busProperties) {
return new BusRefreshTrigger(applicationEventPublisher, busProperties.getId());
BusProperties busProperties, ConfigurationWatcherConfigurationProperties properties) {
return new BusRefreshTrigger(applicationEventPublisher, busProperties.getId(), properties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.event.ShutdownRemoteApplicationEvent;
import org.springframework.cloud.kubernetes.client.config.KubernetesClientConfigMapPropertySourceLocator;
import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider;
import org.springframework.cloud.kubernetes.commons.config.reload.ConfigReloadProperties;
Expand All @@ -39,6 +41,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider.NAMESPACE_PROPERTY;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy;

/**
* @author Ryan Baxter
Expand Down Expand Up @@ -68,31 +71,52 @@ class BusEventBasedConfigMapWatcherChangeDetectorTests {

private BusProperties busProperties;

private MockEnvironment mockEnvironment;

@BeforeEach
void setup() {
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty(NAMESPACE_PROPERTY, "default");
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
busProperties = new BusProperties();
changeDetector = new BusEventBasedConfigMapWatcherChangeDetector(coreV1Api, mockEnvironment,
ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY, configMapPropertySourceLocator,
new KubernetesNamespaceProvider(mockEnvironment), configurationWatcherConfigurationProperties,
threadPoolTaskExecutor, new BusRefreshTrigger(applicationEventPublisher, busProperties.getId()));
}

@Test
void triggerRefreshWithConfigMap() {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1ConfigMap configMap = new V1ConfigMap();
configMap.setMetadata(objectMeta);
changeDetector.triggerRefresh(configMap, configMap.getMetadata().getName());
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithConfigMap(RefreshStrategy.REFRESH, argumentCaptor);
}

@Test
void triggerRefreshWithConfigMapUsingShutdown() {
ArgumentCaptor<ShutdownRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(ShutdownRemoteApplicationEvent.class);
triggerRefreshWithConfigMap(RefreshStrategy.SHUTDOWN, argumentCaptor);
}

void triggerRefreshWithConfigMap(RefreshStrategy strategy,
ArgumentCaptor<? extends RemoteApplicationEvent> argumentCaptor) {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1ConfigMap configMap = getV1ConfigMap(objectMeta, strategy);
verify(applicationEventPublisher).publishEvent(argumentCaptor.capture());
assertThat(argumentCaptor.getValue().getSource()).isEqualTo(configMap);
assertThat(argumentCaptor.getValue().getOriginService()).isEqualTo(busProperties.getId());
assertThat(argumentCaptor.getValue().getDestinationService()).isEqualTo("foo:**");
}

private V1ConfigMap getV1ConfigMap(V1ObjectMeta objectMeta, RefreshStrategy refreshStrategy) {
V1ConfigMap configMap = new V1ConfigMap();
configMap.setMetadata(objectMeta);
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
configurationWatcherConfigurationProperties.setRefreshStrategy(refreshStrategy);
BusEventBasedConfigMapWatcherChangeDetector changeDetector = new BusEventBasedConfigMapWatcherChangeDetector(
coreV1Api, mockEnvironment, ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY,
configMapPropertySourceLocator, new KubernetesNamespaceProvider(mockEnvironment),
configurationWatcherConfigurationProperties, threadPoolTaskExecutor, new BusRefreshTrigger(
applicationEventPublisher, busProperties.getId(), configurationWatcherConfigurationProperties));
changeDetector.triggerRefresh(configMap, configMap.getMetadata().getName());
return configMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.kubernetes.client.config.KubernetesClientSecretsPropertySourceLocator;
import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider;
import org.springframework.cloud.kubernetes.commons.config.reload.ConfigReloadProperties;
Expand All @@ -39,6 +40,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider.NAMESPACE_PROPERTY;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy;

/**
* @author Ryan Baxter
Expand All @@ -64,35 +66,55 @@ class BusEventBasedSecretsWatcherChangeDetectorTests {
@Mock
private ApplicationEventPublisher applicationEventPublisher;

private BusEventBasedSecretsWatcherChangeDetector changeDetector;

private BusProperties busProperties;

private MockEnvironment mockEnvironment;

@BeforeEach
void setup() {
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty(NAMESPACE_PROPERTY, "default");
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
busProperties = new BusProperties();
changeDetector = new BusEventBasedSecretsWatcherChangeDetector(coreV1Api, mockEnvironment,
ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY, secretsPropertySourceLocator,
new KubernetesNamespaceProvider(mockEnvironment), configurationWatcherConfigurationProperties,
threadPoolTaskExecutor, new BusRefreshTrigger(applicationEventPublisher, busProperties.getId()));
}

@Test
void triggerRefreshWithSecret() {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1Secret secret = new V1Secret();
secret.setMetadata(objectMeta);
changeDetector.triggerRefresh(secret, secret.getMetadata().getName());
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithSecret(ConfigurationWatcherConfigurationProperties.RefreshStrategy.REFRESH, argumentCaptor);
}

@Test
void triggerRefreshWithSecretWithShutdown() {
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithSecret(ConfigurationWatcherConfigurationProperties.RefreshStrategy.REFRESH, argumentCaptor);
}

void triggerRefreshWithSecret(RefreshStrategy strategy,
ArgumentCaptor<? extends RemoteApplicationEvent> argumentCaptor) {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1Secret secret = getV1Secret(objectMeta, strategy);
verify(applicationEventPublisher).publishEvent(argumentCaptor.capture());
assertThat(argumentCaptor.getValue().getSource()).isEqualTo(secret);
assertThat(argumentCaptor.getValue().getOriginService()).isEqualTo(busProperties.getId());
assertThat(argumentCaptor.getValue().getDestinationService()).isEqualTo("foo:**");
}

private V1Secret getV1Secret(V1ObjectMeta objectMeta,
ConfigurationWatcherConfigurationProperties.RefreshStrategy refreshStrategy) {
V1Secret secret = new V1Secret();
secret.setMetadata(objectMeta);
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
configurationWatcherConfigurationProperties.setRefreshStrategy(refreshStrategy);
BusEventBasedSecretsWatcherChangeDetector changeDetector = new BusEventBasedSecretsWatcherChangeDetector(
coreV1Api, mockEnvironment, ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY,
secretsPropertySourceLocator, new KubernetesNamespaceProvider(mockEnvironment),
configurationWatcherConfigurationProperties, threadPoolTaskExecutor, new BusRefreshTrigger(
applicationEventPublisher, busProperties.getId(), configurationWatcherConfigurationProperties));
changeDetector.triggerRefresh(secret, secret.getMetadata().getName());
return secret;
}

}
Loading
Loading