Skip to content

Commit

Permalink
Add support for the configuration watcher to shut down the applicatio…
Browse files Browse the repository at this point in the history
…n to refresh the application (#1799)

See #1772
  • Loading branch information
ryanjbaxter authored Nov 21, 2024
1 parent 890af8f commit 16eec95
Show file tree
Hide file tree
Showing 12 changed files with 343 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ change to a ConfigMap or Secret occurs then the HTTP implementation will use the
instances of the application which match the name of the ConfigMap or Secret and send an HTTP POST request to the application's actuator
`/refresh` endpoint. By default, it will send the post request to `/actuator/refresh` using the port registered in the discovery client.

You can also configure the configuration watcher to call the instances `shutdown` actuator endpoint. To do this you can set
`spring.cloud.kubernetes.configuration.watcher.refresh-strategy=shutdown`.

### Non-Default Management Port and Actuator Path

If the application is using a non-default actuator path and/or using a different port for the management endpoints, the Kubernetes service for the application
Expand Down Expand Up @@ -224,7 +227,13 @@ Another way you can choose to configure the actuator path and/or management port
## Messaging Implementation

The messaging implementation can be enabled by setting profile to either `bus-amqp` (RabbitMQ) or `bus-kafka` (Kafka) when the Spring Cloud Kubernetes Configuration Watcher
application is deployed to Kubernetes.
application is deployed to Kubernetes. By default, when using the messaging implementation the configuration watcher will send a `RefreshRemoteApplicationEvent` using
Spring Cloud Bus to all application instances. This will cause the application instances to refresh the application's configuration properties without
restarting the instance.

You can also configure the configuration to shut down the application instances in order to refresh the application's configuration properties.
When the application shuts down, Kubernetes will restart the application instance and the new configuration properties will be loaded. To use
this strategy set `spring.cloud.kubernetes.configuration.watcher.refresh-strategy=shutdown`.

## Configuring RabbitMQ

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/**
* @param enabled if kubernetes discovery is enabled
* @param allNamespaces if discover is enabled for all namespaces
* @param allNamespaces if discovery is enabled for all namespaces
* @param namespaces If set and allNamespaces is false, then only the services and
* endpoints matching these namespaces will be fetched from the Kubernetes API server.
* @param waitCacheReady wait for the discovery cache (service and endpoints) to be fully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import org.springframework.cloud.bus.event.PathDestinationFactory;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.event.ShutdownRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy.SHUTDOWN;

/**
* An event publisher for an 'event bus' type of application.
*
Expand All @@ -34,16 +38,28 @@ final class BusRefreshTrigger implements RefreshTrigger {

private final String busId;

BusRefreshTrigger(ApplicationEventPublisher applicationEventPublisher, String busId) {
private final ConfigurationWatcherConfigurationProperties watcherConfigurationProperties;

BusRefreshTrigger(ApplicationEventPublisher applicationEventPublisher, String busId,
ConfigurationWatcherConfigurationProperties watcherConfigurationProperties) {
this.applicationEventPublisher = applicationEventPublisher;
this.busId = busId;
this.watcherConfigurationProperties = watcherConfigurationProperties;
}

@Override
public Mono<Void> triggerRefresh(KubernetesObject configMap, String appName) {
applicationEventPublisher.publishEvent(new RefreshRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName)));
applicationEventPublisher.publishEvent(createRefreshApplicationEvent(configMap, appName));
return Mono.empty();
}

private RemoteApplicationEvent createRefreshApplicationEvent(KubernetesObject configMap, String appName) {
if (watcherConfigurationProperties.getRefreshStrategy() == SHUTDOWN) {
return new ShutdownRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName));
}
return new RefreshRemoteApplicationEvent(configMap, busId,
new PathDestinationFactory().getDestination(appName));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class ConfigurationWatcherConfigurationProperties {
@DurationUnit(ChronoUnit.MILLIS)
private Duration refreshDelay = Duration.ofMillis(120000);

private RefreshStrategy refreshStrategy = RefreshStrategy.REFRESH;

private int threadPoolSize = 1;

private String actuatorPath = "/actuator";
Expand Down Expand Up @@ -115,4 +117,28 @@ public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}

public RefreshStrategy getRefreshStrategy() {
return refreshStrategy;
}

public void setRefreshStrategy(RefreshStrategy refreshStrategy) {
this.refreshStrategy = refreshStrategy;
}

public enum RefreshStrategy {

/**
* Call the Actuator refresh endpoint or send a refresh event over Spring Cloud
* Bus.
*/
REFRESH,

/**
* Call the Actuator shutdown endpoint or send a shutdown event over Spring Cloud
* Bus.
*/
SHUTDOWN

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;

import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy.SHUTDOWN;

/**
* @author wind57
*/
Expand Down Expand Up @@ -91,15 +93,15 @@ private URI getActuatorUri(ServiceInstance si, String actuatorPath, int actuator
}
else {
int port = actuatorPort < 0 ? si.getPort() : actuatorPort;
actuatorUriBuilder = actuatorUriBuilder.path(actuatorPath + "/refresh").port(port);
actuatorUriBuilder = actuatorUriBuilder.path(actuatorPath + getRefreshStrategyEndpoint()).port(port);
}

return actuatorUriBuilder.build().toUri();
}

private void setActuatorUriFromAnnotation(UriComponentsBuilder actuatorUriBuilder, String metadataUri) {
URI annotationUri = URI.create(metadataUri);
actuatorUriBuilder.path(annotationUri.getPath() + "/refresh");
actuatorUriBuilder.path(annotationUri.getPath() + getRefreshStrategyEndpoint());

// The URI may not contain a host so if that is the case the port in the URI will
// be -1. The authority of the URI will be :<port> for example :9090, we just need
Expand All @@ -114,4 +116,11 @@ private void setActuatorUriFromAnnotation(UriComponentsBuilder actuatorUriBuilde
}
}

private String getRefreshStrategyEndpoint() {
if (k8SConfigurationProperties.getRefreshStrategy() == SHUTDOWN) {
return "/shutdown";
}
return "/refresh";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class RefreshTriggerAutoConfiguration {
@ConditionalOnMissingBean
@Profile({ AMQP, KAFKA })
BusRefreshTrigger busRefreshTrigger(ApplicationEventPublisher applicationEventPublisher,
BusProperties busProperties) {
return new BusRefreshTrigger(applicationEventPublisher, busProperties.getId());
BusProperties busProperties, ConfigurationWatcherConfigurationProperties properties) {
return new BusRefreshTrigger(applicationEventPublisher, busProperties.getId(), properties);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.event.ShutdownRemoteApplicationEvent;
import org.springframework.cloud.kubernetes.client.config.KubernetesClientConfigMapPropertySourceLocator;
import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider;
import org.springframework.cloud.kubernetes.commons.config.reload.ConfigReloadProperties;
Expand All @@ -39,6 +41,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider.NAMESPACE_PROPERTY;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy;

/**
* @author Ryan Baxter
Expand Down Expand Up @@ -68,31 +71,52 @@ class BusEventBasedConfigMapWatcherChangeDetectorTests {

private BusProperties busProperties;

private MockEnvironment mockEnvironment;

@BeforeEach
void setup() {
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty(NAMESPACE_PROPERTY, "default");
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
busProperties = new BusProperties();
changeDetector = new BusEventBasedConfigMapWatcherChangeDetector(coreV1Api, mockEnvironment,
ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY, configMapPropertySourceLocator,
new KubernetesNamespaceProvider(mockEnvironment), configurationWatcherConfigurationProperties,
threadPoolTaskExecutor, new BusRefreshTrigger(applicationEventPublisher, busProperties.getId()));
}

@Test
void triggerRefreshWithConfigMap() {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1ConfigMap configMap = new V1ConfigMap();
configMap.setMetadata(objectMeta);
changeDetector.triggerRefresh(configMap, configMap.getMetadata().getName());
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithConfigMap(RefreshStrategy.REFRESH, argumentCaptor);
}

@Test
void triggerRefreshWithConfigMapUsingShutdown() {
ArgumentCaptor<ShutdownRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(ShutdownRemoteApplicationEvent.class);
triggerRefreshWithConfigMap(RefreshStrategy.SHUTDOWN, argumentCaptor);
}

void triggerRefreshWithConfigMap(RefreshStrategy strategy,
ArgumentCaptor<? extends RemoteApplicationEvent> argumentCaptor) {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1ConfigMap configMap = getV1ConfigMap(objectMeta, strategy);
verify(applicationEventPublisher).publishEvent(argumentCaptor.capture());
assertThat(argumentCaptor.getValue().getSource()).isEqualTo(configMap);
assertThat(argumentCaptor.getValue().getOriginService()).isEqualTo(busProperties.getId());
assertThat(argumentCaptor.getValue().getDestinationService()).isEqualTo("foo:**");
}

private V1ConfigMap getV1ConfigMap(V1ObjectMeta objectMeta, RefreshStrategy refreshStrategy) {
V1ConfigMap configMap = new V1ConfigMap();
configMap.setMetadata(objectMeta);
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
configurationWatcherConfigurationProperties.setRefreshStrategy(refreshStrategy);
BusEventBasedConfigMapWatcherChangeDetector changeDetector = new BusEventBasedConfigMapWatcherChangeDetector(
coreV1Api, mockEnvironment, ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY,
configMapPropertySourceLocator, new KubernetesNamespaceProvider(mockEnvironment),
configurationWatcherConfigurationProperties, threadPoolTaskExecutor, new BusRefreshTrigger(
applicationEventPublisher, busProperties.getId(), configurationWatcherConfigurationProperties));
changeDetector.triggerRefresh(configMap, configMap.getMetadata().getName());
return configMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.springframework.cloud.bus.BusProperties;
import org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.kubernetes.client.config.KubernetesClientSecretsPropertySourceLocator;
import org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider;
import org.springframework.cloud.kubernetes.commons.config.reload.ConfigReloadProperties;
Expand All @@ -39,6 +40,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.springframework.cloud.kubernetes.commons.KubernetesNamespaceProvider.NAMESPACE_PROPERTY;
import static org.springframework.cloud.kubernetes.configuration.watcher.ConfigurationWatcherConfigurationProperties.RefreshStrategy;

/**
* @author Ryan Baxter
Expand All @@ -64,35 +66,55 @@ class BusEventBasedSecretsWatcherChangeDetectorTests {
@Mock
private ApplicationEventPublisher applicationEventPublisher;

private BusEventBasedSecretsWatcherChangeDetector changeDetector;

private BusProperties busProperties;

private MockEnvironment mockEnvironment;

@BeforeEach
void setup() {
MockEnvironment mockEnvironment = new MockEnvironment();
mockEnvironment = new MockEnvironment();
mockEnvironment.setProperty(NAMESPACE_PROPERTY, "default");
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
busProperties = new BusProperties();
changeDetector = new BusEventBasedSecretsWatcherChangeDetector(coreV1Api, mockEnvironment,
ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY, secretsPropertySourceLocator,
new KubernetesNamespaceProvider(mockEnvironment), configurationWatcherConfigurationProperties,
threadPoolTaskExecutor, new BusRefreshTrigger(applicationEventPublisher, busProperties.getId()));
}

@Test
void triggerRefreshWithSecret() {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1Secret secret = new V1Secret();
secret.setMetadata(objectMeta);
changeDetector.triggerRefresh(secret, secret.getMetadata().getName());
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithSecret(ConfigurationWatcherConfigurationProperties.RefreshStrategy.REFRESH, argumentCaptor);
}

@Test
void triggerRefreshWithSecretWithShutdown() {
ArgumentCaptor<RefreshRemoteApplicationEvent> argumentCaptor = ArgumentCaptor
.forClass(RefreshRemoteApplicationEvent.class);
triggerRefreshWithSecret(ConfigurationWatcherConfigurationProperties.RefreshStrategy.REFRESH, argumentCaptor);
}

void triggerRefreshWithSecret(RefreshStrategy strategy,
ArgumentCaptor<? extends RemoteApplicationEvent> argumentCaptor) {
V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.setName("foo");
V1Secret secret = getV1Secret(objectMeta, strategy);
verify(applicationEventPublisher).publishEvent(argumentCaptor.capture());
assertThat(argumentCaptor.getValue().getSource()).isEqualTo(secret);
assertThat(argumentCaptor.getValue().getOriginService()).isEqualTo(busProperties.getId());
assertThat(argumentCaptor.getValue().getDestinationService()).isEqualTo("foo:**");
}

private V1Secret getV1Secret(V1ObjectMeta objectMeta,
ConfigurationWatcherConfigurationProperties.RefreshStrategy refreshStrategy) {
V1Secret secret = new V1Secret();
secret.setMetadata(objectMeta);
ConfigurationWatcherConfigurationProperties configurationWatcherConfigurationProperties = new ConfigurationWatcherConfigurationProperties();
configurationWatcherConfigurationProperties.setRefreshStrategy(refreshStrategy);
BusEventBasedSecretsWatcherChangeDetector changeDetector = new BusEventBasedSecretsWatcherChangeDetector(
coreV1Api, mockEnvironment, ConfigReloadProperties.DEFAULT, UPDATE_STRATEGY,
secretsPropertySourceLocator, new KubernetesNamespaceProvider(mockEnvironment),
configurationWatcherConfigurationProperties, threadPoolTaskExecutor, new BusRefreshTrigger(
applicationEventPublisher, busProperties.getId(), configurationWatcherConfigurationProperties));
changeDetector.triggerRefresh(secret, secret.getMetadata().getName());
return secret;
}

}
Loading

0 comments on commit 16eec95

Please sign in to comment.