Skip to content

Commit

Permalink
feat: add nucleus connectivity validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianci Shen committed Mar 13, 2024
1 parent 2f671e3 commit 5446985
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqttclient.MqttClient;
import com.aws.greengrass.util.Coerce;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class DeploymentConfigMerger {
private Kernel kernel;
private DeviceConfiguration deviceConfiguration;
private DynamicComponentConfigurationValidator validator;
private MqttClient mqttClient;
private ThingGroupHelper thingGroupHelper;

/**
* Merge in new configuration values and new services.
Expand Down Expand Up @@ -143,7 +147,7 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
}

// Validate the AWS region, IoT credentials endpoint as well as the IoT data endpoint.
if (!validateNucleusConfig(totallyCompleteFuture, nucleusConfig)) {
if (!validateNucleusConfig(totallyCompleteFuture, activator, deployment, nucleusConfig)) {
return;
}

Expand All @@ -153,6 +157,7 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
}

private boolean validateNucleusConfig(CompletableFuture<DeploymentResult> totallyCompleteFuture,
DeploymentActivator activator, Deployment deployment,
Map<String, Object> nucleusConfig) {
if (nucleusConfig != null) {
String awsRegion = tryGetAwsRegionFromNewConfig(nucleusConfig);
Expand All @@ -166,10 +171,79 @@ private boolean validateNucleusConfig(CompletableFuture<DeploymentResult> totall
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e));
return false;
}

Integer timeoutSec = deployment.getDeploymentDocumentObj().getConfigurationValidationPolicy().timeoutInSeconds();
long configTimeout = Duration.ofSeconds(20).toMillis();
if (timeoutSec != null) {
configTimeout = Duration.ofSeconds(timeoutSec).toMillis();
}

Map<String, Object> currentConfig = kernel.getConfig().toPOJO();
if (configTimeout == 0
|| !deviceConfiguration.isDeviceConfiguredToTalkToCloud()
|| !hasNucleusConfigurationChange(currentConfig, nucleusConfig)) {
logger.atInfo().log("Skipping connectivity validation");
return true;
}
try {
// Update Nucleus with new config
logger.atInfo().log("Applying Nucleus config");
Map<String, Object> modifiedConfig = kernel.getConfig().toPOJO();
replaceNucleusConfiguration(modifiedConfig, nucleusConfig);
activator.mergeConfig(modifiedConfig, deployment);
TimeUnit.SECONDS.sleep(5);

// Check that MQTT client has reconnected
logger.atInfo().log("Checking MQTT Reconnected");
mqttClient.waitForReconnect(configTimeout);

// Check that HTTP client works
logger.atInfo().log("Checking HTTP Reconnected");
thingGroupHelper.waitForReconnect(configTimeout);
} catch (Exception e) {
activator.mergeConfig(currentConfig, deployment);
logger.atError().cause(e).log("Nucleus connectivity validation failed");
totallyCompleteFuture
.complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e));
return false;
}
}
return true;
}

private void replaceNucleusConfiguration(Map<String, Object> kernelConfig, Map<String, Object> nucleusConfig) {
Map<String, Object> currentNucleusConfig = getNucleusConfiguration(kernelConfig);
if (nucleusConfig != null){
currentNucleusConfig.put(CONFIGURATION_CONFIG_KEY, nucleusConfig);
}
}

private boolean hasNucleusConfigurationChange(Map<String, Object> kernelConfig, Map<String, Object> nucleusConfig) {
Map<String, Object> currentNucleusConfig = getNucleusConfiguration(kernelConfig);
if (nucleusConfig != null) {
return currentNucleusConfig.equals(nucleusConfig);
}
return false;
}

private Map<String, Object> getNucleusConfiguration(Map<String, Object> kernelConfig){
Map<String, Object> nucleusConfig = null;
if (kernelConfig.containsKey(SERVICES_NAMESPACE_TOPIC)) {
// Get services config if it exists
Map<String, Object> serviceConfig = (Map<String, Object>) kernelConfig.get(SERVICES_NAMESPACE_TOPIC);
if (serviceConfig.containsKey(deviceConfiguration.getNucleusComponentName())) {
// Get nucleus config if it exists
Map<String, Object> currentNucleusConfig = (Map<String, Object>) serviceConfig
.get(deviceConfiguration.getNucleusComponentName());
if (currentNucleusConfig.containsKey(CONFIGURATION_CONFIG_KEY)){
// Replace nucleus config new the new values
nucleusConfig = (Map<String, Object>) currentNucleusConfig.get(CONFIGURATION_CONFIG_KEY);
}
}
}
return nucleusConfig;
}

/**
* Completes the provided future when all of the listed services are running.
*
Expand Down Expand Up @@ -307,7 +381,6 @@ public AggregateServicesChangeManager createRollbackManager() {

/**
* Start the new services the merge intends to add.
*
*/
public void startNewServices() {
for (String serviceName : servicesToAdd) {
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.aws.greengrass.deployment;

import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException;
import com.aws.greengrass.deployment.exceptions.RetryableServerErrorException;
import com.aws.greengrass.logging.api.Logger;
Expand All @@ -26,6 +28,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;

Expand Down Expand Up @@ -102,4 +105,41 @@ public Optional<Set<String>> listThingGroupsForDevice(int maxAttemptCount) throw
return Optional.of(thingGroupNames);
}, "get-thing-group-hierarchy", logger);
}

public void waitForReconnect(long timeoutMillis) throws Exception {
if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) {
return;
}
Duration initialInterval = Duration.ofMillis(timeoutMillis / 8);
Duration maxRetryInterval = Duration.ofMillis(timeoutMillis / 4);

try {
RetryUtils.runWithRetry(clientExceptionRetryConfig.toBuilder()
.maxAttempt(3)
.initialRetryInterval(initialInterval)
.maxRetryInterval(maxRetryInterval)
.build(),
() -> {
ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder()
.coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName()))
.build();

ListThingGroupsForCoreDeviceResponse response;
try {
response =
clientFactory.fetchGreengrassV2DataClient().listThingGroupsForCoreDevice(request);
} catch (GreengrassV2DataException e) {
if (RetryUtils.retryErrorCodes(e.statusCode())) {
throw new RetryableServerErrorException("Failed with retryable error " + e.statusCode()
+ " when calling listThingGroupsForCoreDevice", e);
}
throw e;
}
return response;
}, "get-thing-group-hierarchy", logger);
} catch (Exception e) {
throw new ComponentConfigurationValidationException("HTTP client failed to reconnect with new configuration",
DeploymentErrorCode.FAILED_TO_RECONNECT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ protected DeploymentActivator(Kernel kernel) {
public abstract void activate(Map<String, Object> newConfig, Deployment deployment,
CompletableFuture<DeploymentResult> totallyCompleteFuture);

public void mergeConfig(Map<String, Object> newConfig, Deployment deployment) {
DeploymentDocument deploymentDocument = deployment.getDeploymentDocumentObj();
updateConfiguration(deploymentDocument.getTimestamp(), newConfig);
}

protected boolean takeConfigSnapshot(CompletableFuture<DeploymentResult> totallyCompleteFuture) {
try {
deploymentDirectoryManager.takeConfigSnapshot(deploymentDirectoryManager.getSnapshotFilePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public enum DeploymentErrorCode {
UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR),
IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
FAILED_TO_RECONNECT(DeploymentErrorType.REQUEST_ERROR),

/* Docker issues */
DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR),
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/aws/greengrass/mqttclient/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.deployment.DeviceConfiguration;
import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.logging.api.LogEventBuilder;
import com.aws.greengrass.logging.api.Logger;
Expand Down Expand Up @@ -159,6 +161,7 @@ public class MqttClient implements Closeable {
private int maxPublishRetryCount;
private int maxPublishMessageSize;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private boolean isReconnected = true;

@Getter(AccessLevel.PROTECTED)
private final MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() {
Expand Down Expand Up @@ -312,6 +315,7 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,

logger.atDebug().kv("modifiedNode", node.getFullName()).kv("changeType", what)
.log("Reconfiguring MQTT clients");
isReconnected = false;
return false;
}, (what) -> {
validateAndSetMqttPublishConfiguration();
Expand Down Expand Up @@ -351,6 +355,9 @@ protected MqttClient(DeviceConfiguration deviceConfiguration,
.log("Error while reconnecting MQTT client");
}
}
if (brokenConnections.isEmpty()){
isReconnected = true;
}
} while (!brokenConnections.isEmpty());
}, 1, TimeUnit.SECONDS));

Expand Down Expand Up @@ -1041,4 +1048,16 @@ public int getMqttOperationTimeoutMillis() {
private String getMqttVersion() {
return Coerce.toString(mqttTopics.findOrDefault(DEFAULT_MQTT_VERSION, MQTT_VERSION_KEY));
}

public void waitForReconnect(long timeoutMillis) throws Exception {
long timout = System.currentTimeMillis() + timeoutMillis;
while(System.currentTimeMillis() < timout) {
if (isReconnected) {
return;
}
TimeUnit.SECONDS.sleep(1);
}
throw new ComponentConfigurationValidationException("MQTT client failed to reconnect with new configuration",
DeploymentErrorCode.FAILED_TO_RECONNECT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqttclient.MqttClient;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -90,6 +91,10 @@ class DeploymentConfigMergerTest {
@Mock
private DynamicComponentConfigurationValidator validator;
@Mock
private MqttClient mqttClient;
@Mock
private ThingGroupHelper thingGroupHelper;
@Mock
private Context context;

@BeforeEach
Expand Down Expand Up @@ -307,7 +312,7 @@ void GIVEN_deployment_WHEN_check_safety_selected_THEN_check_safety_before_update
when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator);
when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory);

DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);

DeploymentDocument doc = new DeploymentDocument();
doc.setConfigurationArn("NoSafetyCheckDeploy");
Expand Down Expand Up @@ -345,7 +350,7 @@ void GIVEN_deployment_WHEN_task_cancelled_THEN_update_is_cancelled() throws Thro
});

// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -381,7 +386,7 @@ void GIVEN_deployment_WHEN_task_not_cancelled_THEN_update_is_continued() throws
when(context.get(DefaultActivator.class)).thenReturn(defaultActivator);

// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -437,7 +442,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_new_config_THEN_new_config_is
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -498,7 +503,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_some_new_config_THEN_old_conf
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down

0 comments on commit 5446985

Please sign in to comment.