From d2abcb52ed46821955f1140200c285cdd7fc2adf Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 30 Oct 2024 16:24:09 -0700 Subject: [PATCH 01/19] Checkpoint, shadow sample --- codebuild/samples/shadow-linux.sh | 30 +- samples/CMakeLists.txt | 3 +- samples/README.md | 3 +- .../mqtt5_shadow_sync/CMakeLists.txt | 4 +- .../mqtt5_shadow_sync/README.md | 0 .../mqtt5_shadow_sync/main.cpp | 2 +- .../shadow_sync}/CMakeLists.txt | 8 +- .../{v2 => deprecated/shadow_sync}/README.md | 46 +- .../shadow/deprecated/shadow_sync/main.cpp | 502 ++++++++++++ samples/shadow/shadow_sync/CMakeLists.txt | 4 +- samples/shadow/shadow_sync/README.md | 46 +- samples/shadow/shadow_sync/main.cpp | 734 ++++++++---------- samples/shadow/v2/main.cpp | 459 ----------- 13 files changed, 894 insertions(+), 947 deletions(-) rename samples/shadow/{ => deprecated}/mqtt5_shadow_sync/CMakeLists.txt (88%) rename samples/shadow/{ => deprecated}/mqtt5_shadow_sync/README.md (100%) rename samples/shadow/{ => deprecated}/mqtt5_shadow_sync/main.cpp (99%) rename samples/shadow/{v2 => deprecated/shadow_sync}/CMakeLists.txt (80%) rename samples/shadow/{v2 => deprecated/shadow_sync}/README.md (71%) create mode 100644 samples/shadow/deprecated/shadow_sync/main.cpp delete mode 100644 samples/shadow/v2/main.cpp diff --git a/codebuild/samples/shadow-linux.sh b/codebuild/samples/shadow-linux.sh index a04b05f1c..2d32b626c 100755 --- a/codebuild/samples/shadow-linux.sh +++ b/codebuild/samples/shadow-linux.sh @@ -4,7 +4,8 @@ set -e env -pushd $CODEBUILD_SRC_DIR/samples/shadow/shadow_sync +# v1 MQTT311 shadow sample +pushd $CODEBUILD_SRC_DIR/samples/shadow/deprecated/shadow_sync mkdir _build cd _build @@ -17,3 +18,30 @@ echo "Shadow-Sync test" ./shadow-sync --endpoint $ENDPOINT --key /tmp/privatekey.pem --cert /tmp/certificate.pem --thing_name CI_CodeBuild_Thing --is_ci true popd + +# v1 MQTT5 shadow sample +pushd $CODEBUILD_SRC_DIR/samples/shadow/deprecated/mqtt5_shadow_sync + +mkdir _build +cd _build +cmake -DCMAKE_PREFIX_PATH=/tmp/install .. +make -j + +ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "ci/endpoint" --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g') + +echo "Shadow-Sync test" +./mqtt5-shadow-sync --endpoint $ENDPOINT --key /tmp/privatekey.pem --cert /tmp/certificate.pem --thing_name CI_CodeBuild_Thing --is_ci true + + +popd + +# v2 MQTT5 shadow sample - smaple is interactive so build but don't run + +pushd $CODEBUILD_SRC_DIR/samples/shadow/shadow_sync + +mkdir _build +cd _build +cmake -DCMAKE_PREFIX_PATH=/tmp/install .. +make -j + +popd \ No newline at end of file diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index 600180149..af1fb480e 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -24,5 +24,6 @@ add_subdirectory(pub_sub/basic_pub_sub) add_subdirectory(pub_sub/cycle_pub_sub) add_subdirectory(secure_tunneling/secure_tunnel) add_subdirectory(secure_tunneling/tunnel_notification) +add_subdirectory(shadow/deprecated/shadow_sync) +add_subdirectory(shadow/deprecated/mqtt5_shadow_sync) add_subdirectory(shadow/shadow_sync) -add_subdirectory(shadow/mqtt5_shadow_sync) diff --git a/samples/README.md b/samples/README.md index cc90592e0..d3001076e 100644 --- a/samples/README.md +++ b/samples/README.md @@ -10,7 +10,7 @@ + [HTTP Proxy](./mqtt5/mqtt5_pubsub/README.md#http-proxy) * [Mqtt5 Shared Subscription](./mqtt5/mqtt5_shared_subscription/README.md) * [Mqtt5 Jobs](./jobs/mqtt5_job_execution/README.md) -* [Mqtt5 Shadow](./shadow/mqtt5_shadow_sync/README.md) +* [Shadow](./shadow/shadow_sync/README.md) * [Mqtt5 Fleet Provisioning](./fleet_provisioning/mqtt5_fleet_provisioning/README.md) ## MQTT311 Samples * [Basic Pub-Sub](./pub_sub/basic_pub_sub/README.md) @@ -22,7 +22,6 @@ * [Windows Certificate MQTT Connect](./mqtt/windows_cert_connect/README.md) * [Custom Authorizer Connect](./mqtt/custom_authorizer_connect/README.md) * [Cognito Connect](./mqtt/cognito_connect/README.md) -* [Shadow](./shadow/shadow_sync/README.md) * [Jobs](./jobs/job_execution/README.md) * [Fleet provisioning](./fleet_provisioning/fleet_provisioning/README.md) ## Other Samples diff --git a/samples/shadow/mqtt5_shadow_sync/CMakeLists.txt b/samples/shadow/deprecated/mqtt5_shadow_sync/CMakeLists.txt similarity index 88% rename from samples/shadow/mqtt5_shadow_sync/CMakeLists.txt rename to samples/shadow/deprecated/mqtt5_shadow_sync/CMakeLists.txt index 3dc9c1f07..99e94ca9f 100644 --- a/samples/shadow/mqtt5_shadow_sync/CMakeLists.txt +++ b/samples/shadow/deprecated/mqtt5_shadow_sync/CMakeLists.txt @@ -4,8 +4,8 @@ project(mqtt5-shadow-sync CXX) file(GLOB SRC_FILES "*.cpp" - "../../utils/CommandLineUtils.cpp" - "../../utils/CommandLineUtils.h" + "../../../utils/CommandLineUtils.cpp" + "../../../utils/CommandLineUtils.h" ) add_executable(${PROJECT_NAME} ${SRC_FILES}) diff --git a/samples/shadow/mqtt5_shadow_sync/README.md b/samples/shadow/deprecated/mqtt5_shadow_sync/README.md similarity index 100% rename from samples/shadow/mqtt5_shadow_sync/README.md rename to samples/shadow/deprecated/mqtt5_shadow_sync/README.md diff --git a/samples/shadow/mqtt5_shadow_sync/main.cpp b/samples/shadow/deprecated/mqtt5_shadow_sync/main.cpp similarity index 99% rename from samples/shadow/mqtt5_shadow_sync/main.cpp rename to samples/shadow/deprecated/mqtt5_shadow_sync/main.cpp index b55b1d1cf..fdd301dd2 100644 --- a/samples/shadow/mqtt5_shadow_sync/main.cpp +++ b/samples/shadow/deprecated/mqtt5_shadow_sync/main.cpp @@ -29,7 +29,7 @@ #include #include -#include "../../utils/CommandLineUtils.h" +#include "../../../utils/CommandLineUtils.h" using namespace Aws::Crt; using namespace Aws::Iotshadow; diff --git a/samples/shadow/v2/CMakeLists.txt b/samples/shadow/deprecated/shadow_sync/CMakeLists.txt similarity index 80% rename from samples/shadow/v2/CMakeLists.txt rename to samples/shadow/deprecated/shadow_sync/CMakeLists.txt index d42494ffb..a34e4d581 100644 --- a/samples/shadow/v2/CMakeLists.txt +++ b/samples/shadow/deprecated/shadow_sync/CMakeLists.txt @@ -1,11 +1,11 @@ -cmake_minimum_required(VERSION 3.1) +cmake_minimum_required(VERSION 3.9) # note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 -project(shadowv2 CXX) +project(shadow-sync CXX) file(GLOB SRC_FILES "*.cpp" - "../../utils/CommandLineUtils.cpp" - "../../utils/CommandLineUtils.h" + "../../../utils/CommandLineUtils.cpp" + "../../../utils/CommandLineUtils.h" ) add_executable(${PROJECT_NAME} ${SRC_FILES}) diff --git a/samples/shadow/v2/README.md b/samples/shadow/deprecated/shadow_sync/README.md similarity index 71% rename from samples/shadow/v2/README.md rename to samples/shadow/deprecated/shadow_sync/README.md index 0c614a9e5..4c687c71d 100644 --- a/samples/shadow/v2/README.md +++ b/samples/shadow/deprecated/shadow_sync/README.md @@ -75,53 +75,11 @@ Note that in a real application, you may want to avoid the use of wildcards in y To run the Shadow sample use the following command: ``` sh -./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property +./shadow-sync --endpoint --cert --key --thing_name --shadow_property ``` You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: ``` sh -./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property --ca_file +./shadow-sync --endpoint --cert --key --thing_name --shadow_property --ca_file ``` - - -## Service Client Notes -### Difference between MQTT5 and MQTT311 IotShadowClient -The IotShadowClient with Mqtt5 client is identical to Mqtt3 one. We wrapped the Mqtt5Client into MqttClientConnection so that we could keep the same interface for IotShadowClient. -The only difference is that you would need setup up a Mqtt5 Client for the IotShadowClient. For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../../../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](../../mqtt5/mqtt5_pubsub/) - - - - - - - - - - -
Create a IotShadowClient with Mqtt5Create a IotShadowClient with Mqtt311
- -```Cpp - // Build Mqtt5Client - std::shared_ptr client = builder->Build(); - - // Create shadow client with mqtt5 client - Aws::Iotshadow::IotShadowClient shadowClient(client); -``` - - - -```Cpp - // Create mqtt311 connection - Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); - auto connection = client.NewConnection(clientConfig); - - // Create shadow client with mqtt311 connection - Aws::Iotshadow::IotShadowClient shadowClient(connection); - -``` - -
- -### Mqtt::QOS v.s. Mqtt5::QOS -As the service client interface is unchanged for Mqtt3 Connection and Mqtt5 Client,the IotShadowClient will use Mqtt::QOS instead of Mqtt5::QOS even with a Mqtt5 Client. diff --git a/samples/shadow/deprecated/shadow_sync/main.cpp b/samples/shadow/deprecated/shadow_sync/main.cpp new file mode 100644 index 000000000..5b047006b --- /dev/null +++ b/samples/shadow/deprecated/shadow_sync/main.cpp @@ -0,0 +1,502 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "../../../utils/CommandLineUtils.h" + +using namespace Aws::Crt; +using namespace Aws::Iotshadow; + +static const char *SHADOW_VALUE_DEFAULT = "off"; + +static void s_changeShadowValue( + IotShadowClient &client, + const String &thingName, + const String &shadowProperty, + const String &value) +{ + fprintf(stdout, "Changing local shadow value to %s.\n", value.c_str()); + + ShadowState state; + JsonObject desired; + JsonObject reported; + + if (value == "null") + { + JsonObject nullObject; + nullObject.AsNull(); + desired.WithObject(shadowProperty, nullObject); + reported.WithObject(shadowProperty, nullObject); + } + else if (value == "clear_shadow") + { + desired.AsNull(); + reported.AsNull(); + } + else + { + desired.WithString(shadowProperty, value); + reported.WithString(shadowProperty, value); + } + state.Desired = desired; + state.Reported = reported; + + UpdateShadowRequest updateShadowRequest; + Aws::Crt::UUID uuid; + updateShadowRequest.ClientToken = uuid.ToString(); + updateShadowRequest.ThingName = thingName; + updateShadowRequest.State = state; + + auto publishCompleted = [thingName, value](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Failed to update %s shadow state: error %s\n", thingName.c_str(), ErrorDebugString(ioErr)); + return; + } + + fprintf(stdout, "Successfully updated shadow state for %s, to %s\n", thingName.c_str(), value.c_str()); + }; + + client.PublishUpdateShadow(updateShadowRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, std::move(publishCompleted)); +} + +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ + + // Do the global initialization for the API. + ApiHandle apiHandle; + + String currentShadowValue(""); + + /** + * cmdData is the arguments/input from the command line placed into a single struct for + * use in this sample. This handles all of the command line parsing, validating, etc. + * See the Utils/CommandLineUtils for more information. + */ + Utils::cmdData cmdData = Utils::parseSampleInputShadow(argc, argv, &apiHandle); + + // Create the MQTT builder and populate it with data from cmdData. + auto clientConfigBuilder = + Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); + clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); + if (cmdData.input_ca != "") + { + clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); + } + if (cmdData.input_proxyHost != "") + { + Aws::Crt::Http::HttpClientConnectionProxyOptions proxyOptions; + proxyOptions.HostName = cmdData.input_proxyHost; + proxyOptions.Port = static_cast(cmdData.input_proxyPort); + proxyOptions.AuthType = Aws::Crt::Http::AwsHttpProxyAuthenticationType::None; + clientConfigBuilder.WithHttpProxyOptions(proxyOptions); + } + if (cmdData.input_port != 0) + { + clientConfigBuilder.WithPortOverride(static_cast(cmdData.input_port)); + } + + // Create the MQTT connection from the MQTT builder + auto clientConfig = clientConfigBuilder.Build(); + if (!clientConfig) + { + fprintf( + stderr, + "Client Configuration initialization failed with error %s\n", + Aws::Crt::ErrorDebugString(clientConfig.LastError())); + exit(-1); + } + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + if (!*connection) + { + fprintf( + stderr, + "MQTT Connection Creation failed with error %s\n", + Aws::Crt::ErrorDebugString(connection->LastError())); + exit(-1); + } + + /** + * In a real world application you probably don't want to enforce synchronous behavior + * but this is a sample console application, so we'll just do that with a condition variable. + */ + std::promise connectionCompletedPromise; + std::promise connectionClosedPromise; + + // Invoked when a MQTT connect has completed or failed + auto onConnectionCompleted = + [&](Aws::Crt::Mqtt::MqttConnection &, int errorCode, Aws::Crt::Mqtt::ReturnCode returnCode, bool) { + if (errorCode) + { + fprintf(stdout, "Connection failed with error %s\n", Aws::Crt::ErrorDebugString(errorCode)); + connectionCompletedPromise.set_value(false); + } + else + { + fprintf(stdout, "Connection completed with return code %d\n", returnCode); + connectionCompletedPromise.set_value(true); + } + }; + + // Invoked when a disconnect message has completed. + auto onDisconnect = [&](Aws::Crt::Mqtt::MqttConnection &) { + fprintf(stdout, "Disconnect completed\n"); + connectionClosedPromise.set_value(); + }; + + // Assign callbacks + connection->OnConnectionCompleted = std::move(onConnectionCompleted); + connection->OnDisconnect = std::move(onDisconnect); + + /************************ Run the sample ****************************/ + + // Connect + fprintf(stdout, "Connecting...\n"); + if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) + { + fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); + exit(-1); + } + + if (connectionCompletedPromise.get_future().get()) + { + Aws::Iotshadow::IotShadowClient shadowClient(connection); + + /********************** Shadow Delta Updates ********************/ + // This section is for when a Shadow document updates/changes, whether it is on the server side or client side. + + std::promise subscribeDeltaCompletedPromise; + std::promise subscribeDeltaAcceptedCompletedPromise; + std::promise subscribeDeltaRejectedCompletedPromise; + + auto onDeltaUpdatedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to shadow delta: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + subscribeDeltaCompletedPromise.set_value(); + }; + + auto onDeltaUpdatedAcceptedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to shadow delta accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + subscribeDeltaAcceptedCompletedPromise.set_value(); + }; + + auto onDeltaUpdatedRejectedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to shadow delta rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + subscribeDeltaRejectedCompletedPromise.set_value(); + }; + + auto onDeltaUpdated = [&](ShadowDeltaUpdatedEvent *event, int ioErr) { + if (ioErr) + { + fprintf(stdout, "Error processing shadow delta: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + + if (event) + { + fprintf(stdout, "Received shadow delta event.\n"); + if (event->State && event->State->View().ValueExists(cmdData.input_shadowProperty)) + { + JsonView objectView = event->State->View().GetJsonObject(cmdData.input_shadowProperty); + if (objectView.IsNull()) + { + fprintf( + stdout, + "Delta reports that %s was deleted. Resetting defaults...\n", + cmdData.input_shadowProperty.c_str()); + s_changeShadowValue( + shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, SHADOW_VALUE_DEFAULT); + } + else + { + fprintf( + stdout, + "Delta reports that \"%s\" has a desired value of \"%s\", Changing local value...\n", + cmdData.input_shadowProperty.c_str(), + event->State->View().GetString(cmdData.input_shadowProperty).c_str()); + s_changeShadowValue( + shadowClient, + cmdData.input_thingName, + cmdData.input_shadowProperty, + event->State->View().GetString(cmdData.input_shadowProperty)); + } + + if (event->ClientToken) + { + fprintf(stdout, " ClientToken: %s\n", event->ClientToken->c_str()); + } + } + else + { + fprintf(stdout, "Delta did not report a change in \"%s\".\n", cmdData.input_shadowProperty.c_str()); + } + } + }; + + auto onUpdateShadowAccepted = [&](UpdateShadowResponse *response, int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + + if (response->State->Reported) + { + currentShadowValue = response->State->Reported->View().GetString(cmdData.input_shadowProperty); + } + else + { + fprintf(stdout, "Finished clearing shadow properties\n"); + currentShadowValue = ""; + } + + if (cmdData.input_isCI == false) + { + fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); + } + }; + + auto onUpdateShadowRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + fprintf( + stdout, + "Update of shadow state failed with message %s and code %d.", + error->Message->c_str(), + *error->Code); + }; + + ShadowDeltaUpdatedSubscriptionRequest shadowDeltaUpdatedRequest; + shadowDeltaUpdatedRequest.ThingName = cmdData.input_thingName; + + shadowClient.SubscribeToShadowDeltaUpdatedEvents( + shadowDeltaUpdatedRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onDeltaUpdated, onDeltaUpdatedSubAck); + + UpdateShadowSubscriptionRequest updateShadowSubscriptionRequest; + updateShadowSubscriptionRequest.ThingName = cmdData.input_thingName; + + shadowClient.SubscribeToUpdateShadowAccepted( + updateShadowSubscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + onUpdateShadowAccepted, + onDeltaUpdatedAcceptedSubAck); + + shadowClient.SubscribeToUpdateShadowRejected( + updateShadowSubscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + onUpdateShadowRejected, + onDeltaUpdatedRejectedSubAck); + + subscribeDeltaCompletedPromise.get_future().wait(); + subscribeDeltaAcceptedCompletedPromise.get_future().wait(); + subscribeDeltaRejectedCompletedPromise.get_future().wait(); + + /********************** Shadow Value Get ********************/ + // This section is to get the initial value of the Shadow document. + + std::promise subscribeGetShadowAcceptedCompletedPromise; + std::promise subscribeGetShadowRejectedCompletedPromise; + std::promise onGetShadowRequestCompletedPromise; + std::promise gotInitialShadowPromise; + + auto onGetShadowUpdatedAcceptedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to get shadow document accepted: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + subscribeGetShadowAcceptedCompletedPromise.set_value(); + }; + + auto onGetShadowUpdatedRejectedSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error subscribing to get shadow document rejected: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + subscribeGetShadowRejectedCompletedPromise.set_value(); + }; + + auto onGetShadowRequestSubAck = [&](int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error getting shadow document: %s\n", ErrorDebugString(ioErr)); + exit(-1); + } + onGetShadowRequestCompletedPromise.set_value(); + }; + + auto onGetShadowAccepted = [&](GetShadowResponse *response, int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error getting shadow value from document: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + if (response) + { + fprintf(stdout, "Received shadow document.\n"); + if (response->State && response->State->Reported->View().ValueExists(cmdData.input_shadowProperty)) + { + JsonView objectView = response->State->Reported->View().GetJsonObject(cmdData.input_shadowProperty); + if (objectView.IsNull()) + { + fprintf(stdout, "Shadow contains \"%s\" but is null.\n", cmdData.input_shadowProperty.c_str()); + currentShadowValue = ""; + } + else + { + currentShadowValue = response->State->Reported->View().GetString(cmdData.input_shadowProperty); + fprintf( + stdout, + "Shadow contains \"%s\". Updating local value to \"%s\"...\n", + cmdData.input_shadowProperty.c_str(), + currentShadowValue.c_str()); + } + } + else + { + fprintf( + stdout, "Shadow currently does not contain \"%s\".\n", cmdData.input_shadowProperty.c_str()); + currentShadowValue = ""; + } + gotInitialShadowPromise.set_value(); + } + }; + + auto onGetShadowRejected = [&](ErrorResponse *error, int ioErr) { + if (ioErr != AWS_OP_SUCCESS) + { + fprintf(stderr, "Error on getting shadow document: %s.\n", ErrorDebugString(ioErr)); + exit(-1); + } + fprintf( + stdout, + "Getting shadow document failed with message %s and code %d.\n", + error->Message->c_str(), + *error->Code); + gotInitialShadowPromise.set_value(); + }; + + GetShadowSubscriptionRequest shadowSubscriptionRequest; + shadowSubscriptionRequest.ThingName = cmdData.input_thingName; + + shadowClient.SubscribeToGetShadowAccepted( + shadowSubscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + onGetShadowAccepted, + onGetShadowUpdatedAcceptedSubAck); + + shadowClient.SubscribeToGetShadowRejected( + shadowSubscriptionRequest, + AWS_MQTT_QOS_AT_LEAST_ONCE, + onGetShadowRejected, + onGetShadowUpdatedRejectedSubAck); + + subscribeGetShadowAcceptedCompletedPromise.get_future().wait(); + subscribeGetShadowRejectedCompletedPromise.get_future().wait(); + + GetShadowRequest shadowGetRequest; + shadowGetRequest.ThingName = cmdData.input_thingName; + + // Get the current shadow document so we start with the correct value + shadowClient.PublishGetShadow(shadowGetRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onGetShadowRequestSubAck); + + onGetShadowRequestCompletedPromise.get_future().wait(); + gotInitialShadowPromise.get_future().wait(); + + /********************** Shadow change value input loop ********************/ + /** + * This section is to getting user input and changing the shadow value passed to that input. + * If in CI, then input is automatically passed + */ + + if (cmdData.input_isCI == false) + { + fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); + while (true) + { + String input; + std::cin >> input; + + if (input == "exit" || input == "quit") + { + fprintf(stdout, "Exiting..."); + break; + } + + if (input == currentShadowValue) + { + fprintf(stdout, "Shadow is already set to \"%s\"\n", currentShadowValue.c_str()); + fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); + } + else + { + s_changeShadowValue(shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, input); + } + } + } + else + { + int messagesSent = 0; + while (messagesSent < 5) + { + String input = "Shadow_Value_"; + input.append(std::to_string(messagesSent).c_str()); + s_changeShadowValue(shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, input); + // Sleep so there is a gap between shadow updates + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + messagesSent += 1; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } + + // Disconnect + if (connection->Disconnect()) + { + connectionClosedPromise.get_future().wait(); + } + return 0; +} diff --git a/samples/shadow/shadow_sync/CMakeLists.txt b/samples/shadow/shadow_sync/CMakeLists.txt index cf2270af4..d42494ffb 100644 --- a/samples/shadow/shadow_sync/CMakeLists.txt +++ b/samples/shadow/shadow_sync/CMakeLists.txt @@ -1,6 +1,6 @@ -cmake_minimum_required(VERSION 3.9) +cmake_minimum_required(VERSION 3.1) # note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 -project(shadow-sync CXX) +project(shadowv2 CXX) file(GLOB SRC_FILES "*.cpp" diff --git a/samples/shadow/shadow_sync/README.md b/samples/shadow/shadow_sync/README.md index 4c687c71d..0c614a9e5 100644 --- a/samples/shadow/shadow_sync/README.md +++ b/samples/shadow/shadow_sync/README.md @@ -75,11 +75,53 @@ Note that in a real application, you may want to avoid the use of wildcards in y To run the Shadow sample use the following command: ``` sh -./shadow-sync --endpoint --cert --key --thing_name --shadow_property +./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property ``` You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: ``` sh -./shadow-sync --endpoint --cert --key --thing_name --shadow_property --ca_file +./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property --ca_file ``` + + +## Service Client Notes +### Difference between MQTT5 and MQTT311 IotShadowClient +The IotShadowClient with Mqtt5 client is identical to Mqtt3 one. We wrapped the Mqtt5Client into MqttClientConnection so that we could keep the same interface for IotShadowClient. +The only difference is that you would need setup up a Mqtt5 Client for the IotShadowClient. For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../../../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](../../mqtt5/mqtt5_pubsub/) + + + + + + + + + + +
Create a IotShadowClient with Mqtt5Create a IotShadowClient with Mqtt311
+ +```Cpp + // Build Mqtt5Client + std::shared_ptr client = builder->Build(); + + // Create shadow client with mqtt5 client + Aws::Iotshadow::IotShadowClient shadowClient(client); +``` + + + +```Cpp + // Create mqtt311 connection + Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); + auto connection = client.NewConnection(clientConfig); + + // Create shadow client with mqtt311 connection + Aws::Iotshadow::IotShadowClient shadowClient(connection); + +``` + +
+ +### Mqtt::QOS v.s. Mqtt5::QOS +As the service client interface is unchanged for Mqtt3 Connection and Mqtt5 Client,the IotShadowClient will use Mqtt::QOS instead of Mqtt5::QOS even with a Mqtt5 Client. diff --git a/samples/shadow/shadow_sync/main.cpp b/samples/shadow/shadow_sync/main.cpp index 234a58e42..044148da3 100644 --- a/samples/shadow/shadow_sync/main.cpp +++ b/samples/shadow/shadow_sync/main.cpp @@ -3,29 +3,25 @@ * SPDX-License-Identifier: Apache-2.0. */ #include -#include -#include -#include +#include +#include -#include - -#include -#include -#include +#include +#include +#include +#include +#include #include +#include +#include +#include #include #include -#include - -#include -#include -#include -#include +#include #include #include #include -#include #include #include "../../utils/CommandLineUtils.h" @@ -33,470 +29,350 @@ using namespace Aws::Crt; using namespace Aws::Iotshadow; -static const char *SHADOW_VALUE_DEFAULT = "off"; +struct ApplicationContext { -static void s_changeShadowValue( - IotShadowClient &client, - const String &thingName, - const String &shadowProperty, - const String &value) -{ - fprintf(stdout, "Changing local shadow value to %s.\n", value.c_str()); + ApplicationContext() : + m_isConnected(false) + {} - ShadowState state; - JsonObject desired; - JsonObject reported; + std::shared_ptr m_protocolClient; + + std::shared_ptr m_shadowClient; + + + std::shared_ptr m_shadowDeltaUpdatedStream; + std::shared_ptr m_shadowUpdatedStream; + + String m_thingName; + + std::mutex m_connectedLock; + std::condition_variable m_connectedSignal; + bool m_isConnected; +}; + + +static void s_onConnectionSuccess(struct ApplicationContext &context) { + fprintf( + stdout, + "Mqtt5 Client connection succeeded!\n"); - if (value == "null") - { - JsonObject nullObject; - nullObject.AsNull(); - desired.WithObject(shadowProperty, nullObject); - reported.WithObject(shadowProperty, nullObject); - } - else if (value == "clear_shadow") - { - desired.AsNull(); - reported.AsNull(); - } - else { - desired.WithString(shadowProperty, value); - reported.WithString(shadowProperty, value); + std::lock_guard lock(context.m_connectedLock); + context.m_isConnected = true; } - state.Desired = desired; - state.Reported = reported; - UpdateShadowRequest updateShadowRequest; - Aws::Crt::UUID uuid; - updateShadowRequest.ClientToken = uuid.ToString(); - updateShadowRequest.ThingName = thingName; - updateShadowRequest.State = state; + context.m_connectedSignal.notify_all(); +} + +static void s_onConnectionFailure(const Mqtt5::OnConnectionFailureEventData &eventData) { + fprintf(stdout, "Mqtt5 client connection attempt failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); +} - auto publishCompleted = [thingName, value](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Failed to update %s shadow state: error %s\n", thingName.c_str(), ErrorDebugString(ioErr)); - return; +static String s_nibbleNextToken(String &input) { + String token; + String remaining; + auto delimPosition = input.find(' '); + if (delimPosition != Aws::Crt::String::npos) { + token = input.substr(0, delimPosition); + + auto untrimmedRemaining = input.substr(delimPosition, Aws::Crt::String::npos); + auto firstNonSpacePosition = untrimmedRemaining.find_first_not_of(' '); + if (firstNonSpacePosition != Aws::Crt::String::npos) { + remaining = untrimmedRemaining.substr(firstNonSpacePosition, Aws::Crt::String::npos); + } else { + remaining = ""; } + } else { + token = input; + remaining = ""; + } - fprintf(stdout, "Successfully updated shadow state for %s, to %s\n", thingName.c_str(), value.c_str()); - }; + input = remaining; + return token; +} - client.PublishUpdateShadow(updateShadowRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, std::move(publishCompleted)); +static void s_printHelp() { + fprintf(stdout, "\nShadow sandbox:\n\n"); + fprintf(stdout, " quit -- quits the program\n"); + fprintf(stdout, " get -- gets the state of a named shadow belonging to the specified thing\n"); + fprintf(stdout, " delete -- deletes a named shadow belonging to the specified thing\n"); + fprintf(stdout, " update-desired -- updates the desired state of a named shadow belonging to the specified thing\n"); + fprintf(stdout, " update-reported -- updates the reported state a named shadow belonging to the specified thing\n\n"); } -int main(int argc, char *argv[]) -{ - /************************ Setup ****************************/ +static void s_onServiceError(const ServiceErrorV2 &serviceError, String operationName) { + fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.GetErrorCode())); + if (serviceError.HasModeledError()) { + const auto &modeledError = serviceError.GetModeledError(); - // Do the global initialization for the API. - ApiHandle apiHandle; + JsonObject jsonObject; + modeledError.SerializeToObject(jsonObject); + String outgoingJson = jsonObject.View().WriteCompact(true); + fprintf(stdout, "modeled error: %s\n", outgoingJson.c_str()); + } +} - String currentShadowValue(""); +static void s_onGetShadowResult(GetShadowResult &&result) { + if (result.IsSuccess()) { + const auto &response = result.GetResponse(); + + JsonObject jsonObject; + response.SerializeToObject(jsonObject); + String outgoingJson = jsonObject.View().WriteCompact(true); + fprintf(stdout, "get result: %s\n", outgoingJson.c_str()); + } else { + const auto &serviceError = result.GetError(); + s_onServiceError(serviceError, "get"); + } +} - /** - * cmdData is the arguments/input from the command line placed into a single struct for - * use in this sample. This handles all of the command line parsing, validating, etc. - * See the Utils/CommandLineUtils for more information. - */ - Utils::cmdData cmdData = Utils::parseSampleInputShadow(argc, argv, &apiHandle); +static void s_handleGetShadow(const ApplicationContext &context) { + GetShadowRequest request; + request.ThingName = context.m_thingName; - // Create the MQTT builder and populate it with data from cmdData. - auto clientConfigBuilder = - Aws::Iot::MqttClientConnectionConfigBuilder(cmdData.input_cert.c_str(), cmdData.input_key.c_str()); - clientConfigBuilder.WithEndpoint(cmdData.input_endpoint); - if (cmdData.input_ca != "") - { - clientConfigBuilder.WithCertificateAuthority(cmdData.input_ca.c_str()); - } - if (cmdData.input_proxyHost != "") - { - Aws::Crt::Http::HttpClientConnectionProxyOptions proxyOptions; - proxyOptions.HostName = cmdData.input_proxyHost; - proxyOptions.Port = static_cast(cmdData.input_proxyPort); - proxyOptions.AuthType = Aws::Crt::Http::AwsHttpProxyAuthenticationType::None; - clientConfigBuilder.WithHttpProxyOptions(proxyOptions); + context.m_shadowClient->GetShadow(request, [](GetShadowResult &&result){ + s_onGetShadowResult(std::move(result)); + }); +} + +static void s_onDeleteShadowResult(DeleteShadowResult &&result) { + if (result.IsSuccess()) { + const auto &response = result.GetResponse(); + + JsonObject jsonObject; + response.SerializeToObject(jsonObject); + String outgoingJson = jsonObject.View().WriteCompact(true); + fprintf(stdout, "delete result: %s\n", outgoingJson.c_str()); + } else { + s_onServiceError(result.GetError(), "delete"); } - if (cmdData.input_port != 0) - { - clientConfigBuilder.WithPortOverride(static_cast(cmdData.input_port)); +} + +static void s_handleDeleteShadow(const ApplicationContext &context) { + DeleteShadowRequest request; + request.ThingName = context.m_thingName; + + context.m_shadowClient->DeleteShadow(request, [](DeleteShadowResult &&result){ + s_onDeleteShadowResult(std::move(result)); + }); +} + +static void s_onUpdateShadowResult(UpdateShadowResult &&result, String operationName) { + if (result.IsSuccess()) { + const auto &response = result.GetResponse(); + + JsonObject jsonObject; + response.SerializeToObject(jsonObject); + String outgoingJson = jsonObject.View().WriteCompact(true); + fprintf(stdout, "%s result: %s\n", operationName.c_str(), outgoingJson.c_str()); + } else { + s_onServiceError(result.GetError(), operationName); } +} - // Create the MQTT connection from the MQTT builder - auto clientConfig = clientConfigBuilder.Build(); - if (!clientConfig) - { - fprintf( - stderr, - "Client Configuration initialization failed with error %s\n", - Aws::Crt::ErrorDebugString(clientConfig.LastError())); - exit(-1); +static void s_handleUpdateDesiredShadow(const String ¶ms, const ApplicationContext &context) { + JsonObject stateAsJson(params); + if (!stateAsJson.WasParseSuccessful()) { + fprintf(stdout, "desired state was not valid JSON!\n\n"); + s_printHelp(); + return; } - Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); - auto connection = client.NewConnection(clientConfig); - if (!*connection) - { - fprintf( - stderr, - "MQTT Connection Creation failed with error %s\n", - Aws::Crt::ErrorDebugString(connection->LastError())); - exit(-1); + + Aws::Iotshadow::UpdateShadowRequest request; + request.ThingName = context.m_thingName; + request.State = ShadowState(); + request.State.value().Desired = stateAsJson; + + context.m_shadowClient->UpdateShadow(request, [](UpdateShadowResult &&result){ + s_onUpdateShadowResult(std::move(result), "update-desired"); + }); +} + +static void s_handleUpdateReportedShadow(const String ¶ms, const ApplicationContext &context) { + JsonObject stateAsJson(params); + if (!stateAsJson.WasParseSuccessful()) { + fprintf(stdout, "reported state was not valid JSON!\n\n"); + s_printHelp(); + return; } - /** - * In a real world application you probably don't want to enforce synchronous behavior - * but this is a sample console application, so we'll just do that with a condition variable. - */ - std::promise connectionCompletedPromise; - std::promise connectionClosedPromise; - - // Invoked when a MQTT connect has completed or failed - auto onConnectionCompleted = - [&](Aws::Crt::Mqtt::MqttConnection &, int errorCode, Aws::Crt::Mqtt::ReturnCode returnCode, bool) { - if (errorCode) + UpdateShadowRequest request; + request.ThingName = context.m_thingName; + request.State = ShadowState(); + request.State.value().Reported = stateAsJson; + + context.m_shadowClient->UpdateShadow(request, [](UpdateShadowResult &&result){ + s_onUpdateShadowResult(std::move(result), "update-reported"); + }); +} + +static std::shared_ptr s_createShadowUpdatedStream(const ApplicationContext &context) { + std::mutex subscribedMutex; + std::condition_variable subscribedSignal; + bool subscribed = false; + + ShadowUpdatedSubscriptionRequest request; + request.ThingName = context.m_thingName; + + Aws::Iot::RequestResponse::StreamingOperationOptions options; + options.WithSubscriptionStatusEventHandler([&subscribedMutex, &subscribedSignal, &subscribed](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + if (event.GetType() == Aws::Iot::RequestResponse::SubscriptionStatusEventType::SubscriptionEstablished) { { - fprintf(stdout, "Connection failed with error %s\n", Aws::Crt::ErrorDebugString(errorCode)); - connectionCompletedPromise.set_value(false); + std::lock_guard subscribedLock(subscribedMutex); + subscribed = true; } - else + subscribedSignal.notify_all(); + } + }); + options.WithStreamHandler([&context](Aws::Iotshadow::ShadowUpdatedEvent &&event) { + Aws::Crt::JsonObject jsonObject; + event.SerializeToObject(jsonObject); + Aws::Crt::String json = jsonObject.View().WriteCompact(true); + fprintf(stdout, "Received shadow updated event: %s\n", json.c_str()); + }); + + auto stream = context.m_shadowClient->CreateShadowUpdatedStream(request, options); + stream->Open(); + + { + std::unique_lock subscribedLock; + subscribedSignal.wait(subscribedLock, [&subscribed](){ return subscribed; }); + } + + return stream; +} + +static std::shared_ptr s_createShadowDeltaUpdatedStream(const ApplicationContext &context) { + std::mutex subscribedMutex; + std::condition_variable subscribedSignal; + bool subscribed = false; + + ShadowDeltaUpdatedSubscriptionRequest request; + request.ThingName = context.m_thingName; + + Aws::Iot::RequestResponse::StreamingOperationOptions options; + options.WithSubscriptionStatusEventHandler([&subscribedMutex, &subscribedSignal, &subscribed](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + if (event.GetType() == Aws::Iot::RequestResponse::SubscriptionStatusEventType::SubscriptionEstablished) { { - fprintf(stdout, "Connection completed with return code %d\n", returnCode); - connectionCompletedPromise.set_value(true); + std::lock_guard subscribedLock(subscribedMutex); + subscribed = true; } - }; + subscribedSignal.notify_all(); + } + }); + options.WithStreamHandler([&context](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { + Aws::Crt::JsonObject jsonObject; + event.SerializeToObject(jsonObject); + Aws::Crt::String json = jsonObject.View().WriteCompact(true); + fprintf(stdout, "Received shadow delta updated event: %s\n", json.c_str()); + }); - // Invoked when a disconnect message has completed. - auto onDisconnect = [&](Aws::Crt::Mqtt::MqttConnection &) { - fprintf(stdout, "Disconnect completed\n"); - connectionClosedPromise.set_value(); - }; + auto stream = context.m_shadowClient->CreateShadowDeltaUpdatedStream(request, options); + stream->Open(); - // Assign callbacks - connection->OnConnectionCompleted = std::move(onConnectionCompleted); - connection->OnDisconnect = std::move(onDisconnect); + { + std::unique_lock subscribedLock; + subscribedSignal.wait(subscribedLock, [&subscribed](){ return subscribed; }); + } - /************************ Run the sample ****************************/ + return stream; +} - // Connect - fprintf(stdout, "Connecting...\n"); - if (!connection->Connect(cmdData.input_clientId.c_str(), true, 0)) - { - fprintf(stderr, "MQTT Connection failed with error %s\n", ErrorDebugString(connection->LastError())); - exit(-1); +static bool s_handleInput(const Aws::Crt::String &input, ApplicationContext &context) { + Aws::Crt::String remaining = input; + Aws::Crt::String command = s_nibbleNextToken(remaining); + + if (command == "quit") { + fprintf(stdout, "Quitting!\n"); + return true; + } else if (command == "get") { + s_handleGetShadow(context); + } else if (command == "delete") { + s_handleDeleteShadow(context); + } else if (command == "update-desired") { + s_handleUpdateDesiredShadow(remaining, context); + } else if (command == "update-reported") { + s_handleUpdateReportedShadow(remaining, context); + } else { + s_printHelp(); } - if (connectionCompletedPromise.get_future().get()) - { - Aws::Iotshadow::IotShadowClient shadowClient(connection); + return false; +} - /********************** Shadow Delta Updates ********************/ - // This section is for when a Shadow document updates/changes, whether it is on the server side or client side. +int main(int argc, char *argv[]) +{ + /************************ Setup ****************************/ - std::promise subscribeDeltaCompletedPromise; - std::promise subscribeDeltaAcceptedCompletedPromise; - std::promise subscribeDeltaRejectedCompletedPromise; + // Do the global initialization for the API. + ApiHandle apiHandle; - auto onDeltaUpdatedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to shadow delta: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - subscribeDeltaCompletedPromise.set_value(); - }; + Utils::cmdData cmdData = Utils::parseSampleInputShadow(argc, argv, &apiHandle); - auto onDeltaUpdatedAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to shadow delta accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - subscribeDeltaAcceptedCompletedPromise.set_value(); - }; + ApplicationContext context; + context.m_thingName = cmdData.input_thingName; - auto onDeltaUpdatedRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to shadow delta rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - subscribeDeltaRejectedCompletedPromise.set_value(); - }; + // Create the MQTT5 builder and populate it with data from cmdData. + auto builder = std::unique_ptr( + Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( + cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); + // Check if the builder setup correctly. + if (builder == nullptr) + { + printf( + "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } - auto onDeltaUpdated = [&](ShadowDeltaUpdatedEvent *event, int ioErr) { - if (ioErr) - { - fprintf(stdout, "Error processing shadow delta: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } + // Setup lifecycle callbacks + builder->WithClientConnectionSuccessCallback([&context](const Mqtt5::OnConnectionSuccessEventData &){ s_onConnectionSuccess(context); }); + builder->WithClientConnectionFailureCallback(s_onConnectionFailure); - if (event) - { - fprintf(stdout, "Received shadow delta event.\n"); - if (event->State && event->State->View().ValueExists(cmdData.input_shadowProperty)) - { - JsonView objectView = event->State->View().GetJsonObject(cmdData.input_shadowProperty); - if (objectView.IsNull()) - { - fprintf( - stdout, - "Delta reports that %s was deleted. Resetting defaults...\n", - cmdData.input_shadowProperty.c_str()); - s_changeShadowValue( - shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, SHADOW_VALUE_DEFAULT); - } - else - { - fprintf( - stdout, - "Delta reports that \"%s\" has a desired value of \"%s\", Changing local value...\n", - cmdData.input_shadowProperty.c_str(), - event->State->View().GetString(cmdData.input_shadowProperty).c_str()); - s_changeShadowValue( - shadowClient, - cmdData.input_thingName, - cmdData.input_shadowProperty, - event->State->View().GetString(cmdData.input_shadowProperty)); - } - - if (event->ClientToken) - { - fprintf(stdout, " ClientToken: %s\n", event->ClientToken->c_str()); - } - } - else - { - fprintf(stdout, "Delta did not report a change in \"%s\".\n", cmdData.input_shadowProperty.c_str()); - } - } - }; + auto protocolClient = builder->Build(); + if (!protocolClient) { + printf( + "Failed to create mqtt5 client with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } - auto onUpdateShadowAccepted = [&](UpdateShadowResponse *response, int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } + context.m_protocolClient = protocolClient; - if (response->State->Reported) - { - currentShadowValue = response->State->Reported->View().GetString(cmdData.input_shadowProperty); - } - else - { - fprintf(stdout, "Finished clearing shadow properties\n"); - currentShadowValue = ""; - } + Aws::Iot::RequestResponse::RequestResponseClientOptions requestResponseOptions; + requestResponseOptions.WithMaxRequestResponseSubscriptions(4); + requestResponseOptions.WithMaxStreamingSubscriptions(10); + requestResponseOptions.WithOperationTimeoutInSeconds(30); - if (cmdData.input_isCI == false) - { - fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); - } - }; + auto shadowClient = Aws::Iotshadow::NewClientFrom5(*context.m_protocolClient, requestResponseOptions); + if (!shadowClient) { + printf( + "Failed to create shadow client with error code %d: %s", LastError(), ErrorDebugString(LastError())); + return -1; + } - auto onUpdateShadowRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error on subscription: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - fprintf( - stdout, - "Update of shadow state failed with message %s and code %d.", - error->Message->c_str(), - *error->Code); - }; - - ShadowDeltaUpdatedSubscriptionRequest shadowDeltaUpdatedRequest; - shadowDeltaUpdatedRequest.ThingName = cmdData.input_thingName; - - shadowClient.SubscribeToShadowDeltaUpdatedEvents( - shadowDeltaUpdatedRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onDeltaUpdated, onDeltaUpdatedSubAck); - - UpdateShadowSubscriptionRequest updateShadowSubscriptionRequest; - updateShadowSubscriptionRequest.ThingName = cmdData.input_thingName; - - shadowClient.SubscribeToUpdateShadowAccepted( - updateShadowSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onUpdateShadowAccepted, - onDeltaUpdatedAcceptedSubAck); - - shadowClient.SubscribeToUpdateShadowRejected( - updateShadowSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onUpdateShadowRejected, - onDeltaUpdatedRejectedSubAck); - - subscribeDeltaCompletedPromise.get_future().wait(); - subscribeDeltaAcceptedCompletedPromise.get_future().wait(); - subscribeDeltaRejectedCompletedPromise.get_future().wait(); - - /********************** Shadow Value Get ********************/ - // This section is to get the initial value of the Shadow document. - - std::promise subscribeGetShadowAcceptedCompletedPromise; - std::promise subscribeGetShadowRejectedCompletedPromise; - std::promise onGetShadowRequestCompletedPromise; - std::promise gotInitialShadowPromise; - - auto onGetShadowUpdatedAcceptedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to get shadow document accepted: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - subscribeGetShadowAcceptedCompletedPromise.set_value(); - }; + context.m_shadowClient = shadowClient; - auto onGetShadowUpdatedRejectedSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error subscribing to get shadow document rejected: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - subscribeGetShadowRejectedCompletedPromise.set_value(); - }; + fprintf(stdout, "Starting protocol client!\n"); + context.m_protocolClient->Start(); - auto onGetShadowRequestSubAck = [&](int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error getting shadow document: %s\n", ErrorDebugString(ioErr)); - exit(-1); - } - onGetShadowRequestCompletedPromise.set_value(); - }; + std::unique_lock connect_lock(context.m_connectedLock); + context.m_connectedSignal.wait(connect_lock, [&context](){ return context.m_isConnected; }); - auto onGetShadowAccepted = [&](GetShadowResponse *response, int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error getting shadow value from document: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - if (response) - { - fprintf(stdout, "Received shadow document.\n"); - if (response->State && response->State->Reported->View().ValueExists(cmdData.input_shadowProperty)) - { - JsonView objectView = response->State->Reported->View().GetJsonObject(cmdData.input_shadowProperty); - if (objectView.IsNull()) - { - fprintf(stdout, "Shadow contains \"%s\" but is null.\n", cmdData.input_shadowProperty.c_str()); - currentShadowValue = ""; - } - else - { - currentShadowValue = response->State->Reported->View().GetString(cmdData.input_shadowProperty); - fprintf( - stdout, - "Shadow contains \"%s\". Updating local value to \"%s\"...\n", - cmdData.input_shadowProperty.c_str(), - currentShadowValue.c_str()); - } - } - else - { - fprintf( - stdout, "Shadow currently does not contain \"%s\".\n", cmdData.input_shadowProperty.c_str()); - currentShadowValue = ""; - } - gotInitialShadowPromise.set_value(); - } - }; + context.m_shadowUpdatedStream = s_createShadowUpdatedStream(context); + context.m_shadowDeltaUpdatedStream = s_createShadowDeltaUpdatedStream(context); - auto onGetShadowRejected = [&](ErrorResponse *error, int ioErr) { - if (ioErr != AWS_OP_SUCCESS) - { - fprintf(stderr, "Error on getting shadow document: %s.\n", ErrorDebugString(ioErr)); - exit(-1); - } - fprintf( - stdout, - "Getting shadow document failed with message %s and code %d.\n", - error->Message->c_str(), - *error->Code); - gotInitialShadowPromise.set_value(); - }; - - GetShadowSubscriptionRequest shadowSubscriptionRequest; - shadowSubscriptionRequest.ThingName = cmdData.input_thingName; - - shadowClient.SubscribeToGetShadowAccepted( - shadowSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onGetShadowAccepted, - onGetShadowUpdatedAcceptedSubAck); - - shadowClient.SubscribeToGetShadowRejected( - shadowSubscriptionRequest, - AWS_MQTT_QOS_AT_LEAST_ONCE, - onGetShadowRejected, - onGetShadowUpdatedRejectedSubAck); - - subscribeGetShadowAcceptedCompletedPromise.get_future().wait(); - subscribeGetShadowRejectedCompletedPromise.get_future().wait(); - - GetShadowRequest shadowGetRequest; - shadowGetRequest.ThingName = cmdData.input_thingName; - - // Get the current shadow document so we start with the correct value - shadowClient.PublishGetShadow(shadowGetRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onGetShadowRequestSubAck); - - onGetShadowRequestCompletedPromise.get_future().wait(); - gotInitialShadowPromise.get_future().wait(); - - /********************** Shadow change value input loop ********************/ - /** - * This section is to getting user input and changing the shadow value passed to that input. - * If in CI, then input is automatically passed - */ - - if (cmdData.input_isCI == false) - { - fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); - while (true) - { - String input; - std::cin >> input; - - if (input == "exit" || input == "quit") - { - fprintf(stdout, "Exiting..."); - break; - } - - if (input == currentShadowValue) - { - fprintf(stdout, "Shadow is already set to \"%s\"\n", currentShadowValue.c_str()); - fprintf(stdout, "Enter Desired state of %s:\n", cmdData.input_shadowProperty.c_str()); - } - else - { - s_changeShadowValue(shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, input); - } - } - } - else + while (true) + { + fprintf(stdout, "\nEnter command:\n"); + + String input; + std::getline(std::cin, input); + + if (s_handleInput(input, context)) { - int messagesSent = 0; - while (messagesSent < 5) - { - String input = "Shadow_Value_"; - input.append(std::to_string(messagesSent).c_str()); - s_changeShadowValue(shadowClient, cmdData.input_thingName, cmdData.input_shadowProperty, input); - // Sleep so there is a gap between shadow updates - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - messagesSent += 1; - } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + fprintf(stdout, "Exiting..."); + break; } } - // Disconnect - if (connection->Disconnect()) - { - connectionClosedPromise.get_future().wait(); - } return 0; } diff --git a/samples/shadow/v2/main.cpp b/samples/shadow/v2/main.cpp deleted file mode 100644 index 575ded785..000000000 --- a/samples/shadow/v2/main.cpp +++ /dev/null @@ -1,459 +0,0 @@ -/** - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "../../utils/CommandLineUtils.h" - -using namespace Aws::Crt; -using namespace Aws::Iotshadow; - -struct StreamingOperationWrapper { - Aws::Crt::String m_thingName; - - Aws::Crt::String m_shadowName; - - Aws::Crt::String m_type; - - std::shared_ptr m_stream; -}; - -struct ApplicationContext { - - std::shared_ptr m_protocolClient; - - std::shared_ptr m_shadowClient; - - uint64_t m_nextStreamId; - - std::unordered_map m_streams; -}; - - -static void s_onConnectionSuccess(const Mqtt5::OnConnectionSuccessEventData &eventData) { - fprintf( - stdout, - "Mqtt5 Client connection succeeded!\n"); -} - -static void s_onConnectionFailure(const Mqtt5::OnConnectionFailureEventData &eventData) { - fprintf(stdout, "Mqtt5 client connection attempt failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); -} - -static void s_onStopped(const Mqtt5::OnStoppedEventData &event) { - fprintf(stdout, "Protocol client stopped.\n"); -} - -static Aws::Crt::String s_nibbleNextToken(Aws::Crt::String &input) { - Aws::Crt::String token; - Aws::Crt::String remaining; - auto delimPosition = input.find(' '); - if (delimPosition != Aws::Crt::String::npos) { - token = input.substr(0, delimPosition); - - auto untrimmedRemaining = input.substr(delimPosition, Aws::Crt::String::npos); - auto firstNonSpacePosition = untrimmedRemaining.find_first_not_of(' '); - if (firstNonSpacePosition != Aws::Crt::String::npos) { - remaining = untrimmedRemaining.substr(firstNonSpacePosition, Aws::Crt::String::npos); - } else { - remaining = ""; - } - } else { - token = input; - remaining = ""; - } - - input = remaining; - return token; -} - -static void s_printHelp() { - fprintf(stdout, "\nShadow sandbox:\n\n"); - fprintf(stdout, " quit -- quits the program\n"); - fprintf(stdout, " start -- starts the protocol client\n"); - fprintf(stdout, " stop -- stops the protocol client\n\n"); - fprintf(stdout, " get -- gets the state of a named shadow belonging to the specified thing\n"); - fprintf(stdout, " delete -- deletes a named shadow belonging to the specified thing\n"); - fprintf(stdout, " update-desired -- updates the desired state of a named shadow belonging to the specified thing\n"); - fprintf(stdout, " update-reported -- updates the reported state a named shadow belonging to the specified thing\n\n"); - fprintf(stdout, " list-streams -- lists all open streaming operations\n"); - fprintf(stdout, " open-delta-stream -- opens a new streaming operation that receives delta events about changes to a particular shadow belonging to a thing\n"); - fprintf(stdout, " open-document-stream -- opens a new streaming operation that receives document events about changes to a particular shadow belonging to a thing\n"); - fprintf(stdout, " close-stream -- closes a streaming operation\n"); -} - -static void s_onServiceError(const Aws::Iotshadow::ServiceErrorV2 &serviceError, Aws::Crt::String operationName) { - fprintf(stdout, "%s failed with error code: %s\n", operationName.c_str(), aws_error_debug_str(serviceError.GetErrorCode())); - if (serviceError.HasModeledError()) { - const auto &modeledError = serviceError.GetModeledError(); - - Aws::Crt::JsonObject jsonObject; - modeledError.SerializeToObject(jsonObject); - Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true); - fprintf(stdout, "modeled error: %s\n", outgoingJson.c_str()); - } -} - -static void s_onGetShadowResult(GetShadowResult &&result) { - if (result.IsSuccess()) { - const auto &response = result.GetResponse(); - - Aws::Crt::JsonObject jsonObject; - response.SerializeToObject(jsonObject); - Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true); - fprintf(stdout, "get result: %s\n", outgoingJson.c_str()); - } else { - s_onServiceError(result.GetError(), "get"); - } -} - -static void s_handleGetNamedShadow(const Aws::Crt::String params, const std::shared_ptr &shadowClient) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to get command!\n\n"); - s_printHelp(); - return; - } - - Aws::Iotshadow::GetNamedShadowRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - - shadowClient->GetNamedShadow(request, [](GetShadowResult &&result){ - s_onGetShadowResult(std::move(result)); - }); -} - -static void s_onDeleteShadowResult(DeleteShadowResult &&result) { - if (result.IsSuccess()) { - const auto &response = result.GetResponse(); - - Aws::Crt::JsonObject jsonObject; - response.SerializeToObject(jsonObject); - Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true); - fprintf(stdout, "delete result: %s\n", outgoingJson.c_str()); - } else { - s_onServiceError(result.GetError(), "delete"); - } -} - -static void s_handleDeleteNamedShadow(const Aws::Crt::String params, const std::shared_ptr &shadowClient) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to delete command!\n\n"); - s_printHelp(); - return; - } - - Aws::Iotshadow::DeleteNamedShadowRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - - shadowClient->DeleteNamedShadow(request, [](DeleteShadowResult &&result){ - s_onDeleteShadowResult(std::move(result)); - }); -} - -static void s_onUpdateShadowResult(UpdateShadowResult &&result, Aws::Crt::String operationName) { - if (result.IsSuccess()) { - const auto &response = result.GetResponse(); - - Aws::Crt::JsonObject jsonObject; - response.SerializeToObject(jsonObject); - Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true); - fprintf(stdout, "%s result: %s\n", operationName.c_str(), outgoingJson.c_str()); - } else { - s_onServiceError(result.GetError(), operationName); - } -} - -static void s_handleUpdateDesiredNamedShadow(const Aws::Crt::String params, const std::shared_ptr &shadowClient) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to update-desired command!\n\n"); - s_printHelp(); - return; - } - - Aws::Crt::JsonObject stateAsJson(remaining); - if (!stateAsJson.WasParseSuccessful()) { - fprintf(stdout, "desired state was not valid JSON!\n\n"); - s_printHelp(); - return; - } - - Aws::Iotshadow::UpdateNamedShadowRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - request.State = Aws::Iotshadow::ShadowState(); - request.State.value().Desired = stateAsJson; - - shadowClient->UpdateNamedShadow(request, [](UpdateShadowResult &&result){ - s_onUpdateShadowResult(std::move(result), "update-desired"); - }); -} - -static void s_handleUpdateReportedNamedShadow(const Aws::Crt::String params, const std::shared_ptr &shadowClient) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to update-reported command!\n\n"); - s_printHelp(); - return; - } - - Aws::Crt::JsonObject stateAsJson(remaining); - if (!stateAsJson.WasParseSuccessful()) { - fprintf(stdout, "reported state was not valid JSON!\n\n"); - s_printHelp(); - return; - } - - Aws::Iotshadow::UpdateNamedShadowRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - request.State = Aws::Iotshadow::ShadowState(); - request.State.value().Reported = stateAsJson; - - shadowClient->UpdateNamedShadow(request, [](UpdateShadowResult &&result){ - s_onUpdateShadowResult(std::move(result), "update-reported"); - }); -} - -static void s_handleListStreams(const ApplicationContext &context) { - fprintf(stdout, "Streams:\n"); - for (const auto &iter : context.m_streams) { - uint64_t streamId = iter.first; - const StreamingOperationWrapper &wrapper = iter.second; - fprintf(stdout, " %" PRIu64": type '%s', thing '%s', shadow '%s'\n", streamId, wrapper.m_type.c_str(), wrapper.m_thingName.c_str(), wrapper.m_shadowName.c_str()); - } -} - -static void s_handleCloseStream(const Aws::Crt::String params, ApplicationContext &context) { - Aws::Crt::String remaining = params; - Aws::Crt::String streamId = s_nibbleNextToken(remaining); - - if (streamId.length() == 0) { - fprintf(stdout, "Invalid arguments to close-stream command!\n\n"); - s_printHelp(); - return; - } - - uint64_t id = std::stoull(streamId.c_str()); - fprintf(stdout, "Closing stream %" PRIu64 "\n", id); - context.m_streams.erase(id); -} - -static void s_registerStream(ApplicationContext &context, uint64_t id, std::shared_ptr operation, Aws::Crt::String type, Aws::Crt::String thing, Aws::Crt::String shadow) { - StreamingOperationWrapper wrapper; - wrapper.m_stream = operation; - wrapper.m_type = type; - wrapper.m_thingName = thing; - wrapper.m_shadowName = shadow; - - context.m_streams[id] = wrapper; - - operation->Open(); -} - -static void s_onSubscriptionStatusEvent(uint64_t id, Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { - fprintf(stdout, "Stream %" PRIu64 ": subscription status event with type %d and error %s\n", id, event.GetErrorCode(), Aws::Crt::ErrorDebugString(event.GetErrorCode())); -} - -static void s_onShadowDeltaUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { - fprintf(stdout, "Stream %" PRIu64 ": received shadow delta updated event:\n", id); - - Aws::Crt::JsonObject jsonObject; - event.SerializeToObject(jsonObject); - Aws::Crt::String json = jsonObject.View().WriteCompact(true); - fprintf(stdout, " %s\n", json.c_str()); -} - -static void s_handleOpenDeltaStream(const Aws::Crt::String params, ApplicationContext &context) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to open-delta-stream command!\n\n"); - s_printHelp(); - return; - } - - uint64_t streamId = context.m_nextStreamId++; - - Aws::Iotshadow::NamedShadowDeltaUpdatedSubscriptionRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - - Aws::Iot::RequestResponse::StreamingOperationOptions options; - options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { - s_onSubscriptionStatusEvent(streamId, std::move(event)); - }); - options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { - s_onShadowDeltaUpdatedEvent(streamId, std::move(event)); - }); - - auto operation = context.m_shadowClient->CreateNamedShadowDeltaUpdatedStream(request, options); - s_registerStream(context, streamId, operation, "Delta", thing, shadow); -} - -static void s_onShadowUpdatedEvent(uint64_t id, Aws::Iotshadow::ShadowUpdatedEvent &&event) { - fprintf(stdout, "Stream %" PRIu64 ": received shadow updated event:\n", id); - - Aws::Crt::JsonObject jsonObject; - event.SerializeToObject(jsonObject); - Aws::Crt::String json = jsonObject.View().WriteCompact(true); - fprintf(stdout, " %s\n", json.c_str()); -} - -static void s_handleOpenDocumentStream(const Aws::Crt::String params, ApplicationContext &context) { - Aws::Crt::String remaining = params; - Aws::Crt::String thing = s_nibbleNextToken(remaining); - Aws::Crt::String shadow = s_nibbleNextToken(remaining); - - if (thing.length() == 0 || shadow.length() == 0) { - fprintf(stdout, "Invalid arguments to open-document-stream command!\n\n"); - s_printHelp(); - return; - } - - uint64_t streamId = context.m_nextStreamId++; - - Aws::Iotshadow::NamedShadowUpdatedSubscriptionRequest request; - request.ThingName = thing; - request.ShadowName = shadow; - - Aws::Iot::RequestResponse::StreamingOperationOptions options; - options.WithSubscriptionStatusEventHandler([streamId](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { - s_onSubscriptionStatusEvent(streamId, std::move(event)); - }); - options.WithStreamHandler([streamId](Aws::Iotshadow::ShadowUpdatedEvent &&event) { - s_onShadowUpdatedEvent(streamId, std::move(event)); - }); - - auto operation = context.m_shadowClient->CreateNamedShadowUpdatedStream(request, options); - s_registerStream(context, streamId, operation, "Document", thing, shadow); -} - -static bool s_handleInput(const Aws::Crt::String &input, ApplicationContext &context) { - Aws::Crt::String remaining = input; - Aws::Crt::String command = s_nibbleNextToken(remaining); - - if (command == "quit") { - fprintf(stdout, "Quitting!\n"); - return true; - } else if (command == "start") { - fprintf(stdout, "Starting protocol client!\n"); - context.m_protocolClient->Start(); - } else if (command == "stop") { - fprintf(stdout, "Stopping protocol client!\n"); - context.m_protocolClient->Stop(); - } else if (command == "get") { - s_handleGetNamedShadow(remaining, context.m_shadowClient); - } else if (command == "delete") { - s_handleDeleteNamedShadow(remaining, context.m_shadowClient); - } else if (command == "update-desired") { - s_handleUpdateDesiredNamedShadow(remaining, context.m_shadowClient); - } else if (command == "update-reported") { - s_handleUpdateReportedNamedShadow(remaining, context.m_shadowClient); - } else if (command == "list-streams") { - s_handleListStreams(context); - } else if (command == "open-delta-stream") { - s_handleOpenDeltaStream(remaining, context); - } else if (command == "open-document-stream") { - s_handleOpenDocumentStream(remaining, context); - } else if (command == "close-stream") { - s_handleCloseStream(remaining, context); - } else { - s_printHelp(); - } - - return false; -} - -int main(int argc, char *argv[]) -{ - /************************ Setup ****************************/ - - // Do the global initialization for the API. - ApiHandle apiHandle; - - Utils::cmdData cmdData = Utils::parseSampleInputBasicConnect(argc, argv, &apiHandle); - - // Create the MQTT5 builder and populate it with data from cmdData. - auto builder = std::unique_ptr( - Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( - cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str())); - // Check if the builder setup correctly. - if (builder == nullptr) - { - printf( - "Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError())); - return -1; - } - - // Setup lifecycle callbacks - builder->WithClientConnectionSuccessCallback(s_onConnectionSuccess); - builder->WithClientConnectionFailureCallback(s_onConnectionFailure); - builder->WithClientStoppedCallback(s_onStopped); - - Aws::Iot::RequestResponse::RequestResponseClientOptions requestResponseOptions; - requestResponseOptions.WithMaxRequestResponseSubscriptions(4); - requestResponseOptions.WithMaxStreamingSubscriptions(10); - requestResponseOptions.WithOperationTimeoutInSeconds(30); - - ApplicationContext context; - context.m_protocolClient = builder->Build();; - context.m_shadowClient = Aws::Iotshadow::NewClientFrom5(*context.m_protocolClient, requestResponseOptions); - context.m_nextStreamId = 1; - - while (true) - { - fprintf(stdout, "\nEnter command:\n"); - - String input; - std::getline(std::cin, input); - - if (s_handleInput(input, context)) - { - fprintf(stdout, "Exiting..."); - break; - } - } - - return 0; -} From bdab63ba03e506887e42fbd99be440145259926c Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 31 Oct 2024 15:09:59 -0700 Subject: [PATCH 02/19] Updates --- samples/shadow/shadow_sync/main.cpp | 135 ++++++++++++++++------------ 1 file changed, 77 insertions(+), 58 deletions(-) diff --git a/samples/shadow/shadow_sync/main.cpp b/samples/shadow/shadow_sync/main.cpp index 044148da3..447c73917 100644 --- a/samples/shadow/shadow_sync/main.cpp +++ b/samples/shadow/shadow_sync/main.cpp @@ -29,40 +29,57 @@ using namespace Aws::Crt; using namespace Aws::Iotshadow; -struct ApplicationContext { +template +class SimpleWaiter { + public: - ApplicationContext() : - m_isConnected(false) - {} + SimpleWaiter() = default; - std::shared_ptr m_protocolClient; + bool hasValue() { + std::lock_guard lock(m_lock); - std::shared_ptr m_shadowClient; + return m_value.has_value(); + } + const T &getValue() { + std::lock_guard lock(m_lock); - std::shared_ptr m_shadowDeltaUpdatedStream; - std::shared_ptr m_shadowUpdatedStream; + return m_value.value(); + } - String m_thingName; + void waitForValue() { + std::unique_lock lock(m_lock); + m_signal.wait(lock, [this](){ return m_value.has_value(); }); + } - std::mutex m_connectedLock; - std::condition_variable m_connectedSignal; - bool m_isConnected; + void setValue(T &&value) { + { + std::lock_guard lock(m_lock); + m_value = std::move(value); + } + + m_signal.notify_all(); + } + + private: + + std::mutex m_lock; + std::condition_variable m_signal; + + Optional m_value; }; +struct ApplicationContext { -static void s_onConnectionSuccess(struct ApplicationContext &context) { - fprintf( - stdout, - "Mqtt5 Client connection succeeded!\n"); + std::shared_ptr m_protocolClient; - { - std::lock_guard lock(context.m_connectedLock); - context.m_isConnected = true; - } + std::shared_ptr m_shadowClient; - context.m_connectedSignal.notify_all(); -} + std::shared_ptr m_shadowDeltaUpdatedStream; + std::shared_ptr m_shadowUpdatedStream; + + String m_thingName; +}; static void s_onConnectionFailure(const Mqtt5::OnConnectionFailureEventData &eventData) { fprintf(stdout, "Mqtt5 client connection attempt failed with error: %s.\n", aws_error_debug_str(eventData.errorCode)); @@ -130,9 +147,13 @@ static void s_handleGetShadow(const ApplicationContext &context) { GetShadowRequest request; request.ThingName = context.m_thingName; - context.m_shadowClient->GetShadow(request, [](GetShadowResult &&result){ - s_onGetShadowResult(std::move(result)); + SimpleWaiter getWaiter; + context.m_shadowClient->GetShadow(request, [&getWaiter](GetShadowResult &&result){ + s_onGetShadowResult(std::move(result)); + getWaiter.setValue(true); }); + + getWaiter.waitForValue(); } static void s_onDeleteShadowResult(DeleteShadowResult &&result) { @@ -152,9 +173,13 @@ static void s_handleDeleteShadow(const ApplicationContext &context) { DeleteShadowRequest request; request.ThingName = context.m_thingName; - context.m_shadowClient->DeleteShadow(request, [](DeleteShadowResult &&result){ + SimpleWaiter deleteWaiter; + context.m_shadowClient->DeleteShadow(request, [&deleteWaiter](DeleteShadowResult &&result){ s_onDeleteShadowResult(std::move(result)); + deleteWaiter.setValue(true); }); + + deleteWaiter.waitForValue(); } static void s_onUpdateShadowResult(UpdateShadowResult &&result, String operationName) { @@ -183,9 +208,13 @@ static void s_handleUpdateDesiredShadow(const String ¶ms, const ApplicationC request.State = ShadowState(); request.State.value().Desired = stateAsJson; - context.m_shadowClient->UpdateShadow(request, [](UpdateShadowResult &&result){ + SimpleWaiter updateWaiter; + context.m_shadowClient->UpdateShadow(request, [&updateWaiter](UpdateShadowResult &&result){ s_onUpdateShadowResult(std::move(result), "update-desired"); + updateWaiter.setValue(true); }); + + updateWaiter.waitForValue(); } static void s_handleUpdateReportedShadow(const String ¶ms, const ApplicationContext &context) { @@ -201,30 +230,28 @@ static void s_handleUpdateReportedShadow(const String ¶ms, const Application request.State = ShadowState(); request.State.value().Reported = stateAsJson; - context.m_shadowClient->UpdateShadow(request, [](UpdateShadowResult &&result){ + SimpleWaiter updateWaiter; + context.m_shadowClient->UpdateShadow(request, [&updateWaiter](UpdateShadowResult &&result){ s_onUpdateShadowResult(std::move(result), "update-reported"); + updateWaiter.setValue(true); }); + + updateWaiter.waitForValue(); } static std::shared_ptr s_createShadowUpdatedStream(const ApplicationContext &context) { - std::mutex subscribedMutex; - std::condition_variable subscribedSignal; - bool subscribed = false; + SimpleWaiter subscribedWaiter; ShadowUpdatedSubscriptionRequest request; request.ThingName = context.m_thingName; Aws::Iot::RequestResponse::StreamingOperationOptions options; - options.WithSubscriptionStatusEventHandler([&subscribedMutex, &subscribedSignal, &subscribed](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + options.WithSubscriptionStatusEventHandler([&subscribedWaiter](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { if (event.GetType() == Aws::Iot::RequestResponse::SubscriptionStatusEventType::SubscriptionEstablished) { - { - std::lock_guard subscribedLock(subscribedMutex); - subscribed = true; - } - subscribedSignal.notify_all(); + subscribedWaiter.setValue(true); } }); - options.WithStreamHandler([&context](Aws::Iotshadow::ShadowUpdatedEvent &&event) { + options.WithStreamHandler([](Aws::Iotshadow::ShadowUpdatedEvent &&event) { Aws::Crt::JsonObject jsonObject; event.SerializeToObject(jsonObject); Aws::Crt::String json = jsonObject.View().WriteCompact(true); @@ -234,33 +261,24 @@ static std::shared_ptr s_createS auto stream = context.m_shadowClient->CreateShadowUpdatedStream(request, options); stream->Open(); - { - std::unique_lock subscribedLock; - subscribedSignal.wait(subscribedLock, [&subscribed](){ return subscribed; }); - } + subscribedWaiter.waitForValue(); return stream; } static std::shared_ptr s_createShadowDeltaUpdatedStream(const ApplicationContext &context) { - std::mutex subscribedMutex; - std::condition_variable subscribedSignal; - bool subscribed = false; + SimpleWaiter subscribedWaiter; ShadowDeltaUpdatedSubscriptionRequest request; request.ThingName = context.m_thingName; Aws::Iot::RequestResponse::StreamingOperationOptions options; - options.WithSubscriptionStatusEventHandler([&subscribedMutex, &subscribedSignal, &subscribed](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { + options.WithSubscriptionStatusEventHandler([&subscribedWaiter](Aws::Iot::RequestResponse::SubscriptionStatusEvent &&event) { if (event.GetType() == Aws::Iot::RequestResponse::SubscriptionStatusEventType::SubscriptionEstablished) { - { - std::lock_guard subscribedLock(subscribedMutex); - subscribed = true; - } - subscribedSignal.notify_all(); + subscribedWaiter.setValue(true); } }); - options.WithStreamHandler([&context](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { + options.WithStreamHandler([](Aws::Iotshadow::ShadowDeltaUpdatedEvent &&event) { Aws::Crt::JsonObject jsonObject; event.SerializeToObject(jsonObject); Aws::Crt::String json = jsonObject.View().WriteCompact(true); @@ -270,10 +288,7 @@ static std::shared_ptr s_createS auto stream = context.m_shadowClient->CreateShadowDeltaUpdatedStream(request, options); stream->Open(); - { - std::unique_lock subscribedLock; - subscribedSignal.wait(subscribedLock, [&subscribed](){ return subscribed; }); - } + subscribedWaiter.waitForValue(); return stream; } @@ -324,8 +339,13 @@ int main(int argc, char *argv[]) return -1; } + SimpleWaiter connectedWaiter; + // Setup lifecycle callbacks - builder->WithClientConnectionSuccessCallback([&context](const Mqtt5::OnConnectionSuccessEventData &){ s_onConnectionSuccess(context); }); + builder->WithClientConnectionSuccessCallback([&connectedWaiter](const Mqtt5::OnConnectionSuccessEventData &){ + fprintf(stdout, "Mqtt5 Client connection succeeded!\n"); + connectedWaiter.setValue(true); + }); builder->WithClientConnectionFailureCallback(s_onConnectionFailure); auto protocolClient = builder->Build(); @@ -354,8 +374,7 @@ int main(int argc, char *argv[]) fprintf(stdout, "Starting protocol client!\n"); context.m_protocolClient->Start(); - std::unique_lock connect_lock(context.m_connectedLock); - context.m_connectedSignal.wait(connect_lock, [&context](){ return context.m_isConnected; }); + connectedWaiter.waitForValue(); context.m_shadowUpdatedStream = s_createShadowUpdatedStream(context); context.m_shadowDeltaUpdatedStream = s_createShadowDeltaUpdatedStream(context); From d9ac0cd92d1ed1abeaeb46876c47453185920beb Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Fri, 1 Nov 2024 11:02:47 -0700 Subject: [PATCH 03/19] Interactive shadow sample and readme with walkthrough --- samples/shadow/shadow_sync/CMakeLists.txt | 2 +- samples/shadow/shadow_sync/README.md | 275 ++++++++++++++++++---- samples/shadow/shadow_sync/main.cpp | 23 +- 3 files changed, 245 insertions(+), 55 deletions(-) diff --git a/samples/shadow/shadow_sync/CMakeLists.txt b/samples/shadow/shadow_sync/CMakeLists.txt index d42494ffb..bcf22adb6 100644 --- a/samples/shadow/shadow_sync/CMakeLists.txt +++ b/samples/shadow/shadow_sync/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.1) # note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 -project(shadowv2 CXX) +project(shadow_sync CXX) file(GLOB SRC_FILES "*.cpp" diff --git a/samples/shadow/shadow_sync/README.md b/samples/shadow/shadow_sync/README.md index 0c614a9e5..2321a84e1 100644 --- a/samples/shadow/shadow_sync/README.md +++ b/samples/shadow/shadow_sync/README.md @@ -2,12 +2,21 @@ [**Return to main sample list**](../../README.md) -This sample uses the AWS IoT [Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html) Service to keep a property in sync between device and server. Imagine a light whose color may be changed through an app, or set by a local user. +This is an interactive sample that supports a set of commands that allow you to interact with "classic" (unnamed) shadows of the AWS IoT [Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html) Service. -Once connected, type a value in the terminal and press Enter to update the property's "reported" value. The sample also responds when the "desired" value changes on the server. To observe this, edit the Shadow document in the AWS Console and set a new "desired" value. +### Commands +Once connected, the sample supports the following shadow-related commands: -On startup, the sample requests the shadow document to learn the property's initial state. The sample also subscribes to "delta" events from the server, which are sent when a property's "desired" value differs from its "reported" value. When the sample learns of a new desired value, that value is changed on the device and an update is sent to the server with the new "reported" value. +* `get` - gets the current full state of the classic (unnamed) shadow. This includes both a "desired" state component and a "reported" state component. +* `delete` - deletes the classic (unnamed) shadow completely +* `update-desired ` - applies an update to the classic shadow's desired state component. Properties in the JSON document set to non-null will be set to new values. Properties in the JSON document set to null will be removed. +* `update-reported ` - applies an update to the classic shadow's reported state component. Properties in the JSON document set to non-null will be set to new values. Properties in the JSON document set to null will be removed. +Two additional commands are supported: +* `help` - prints the set of supported commands +* `quit` - quits the sample application + +### Prerequisites Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended.
@@ -23,6 +32,7 @@ Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerg ], "Resource": [ "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get", + "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/delete", "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update" ] }, @@ -32,11 +42,9 @@ Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerg "iot:Receive" ], "Resource": [ - "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/accepted", - "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/rejected", - "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/accepted", - "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/rejected", - "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/delta" + "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/*", + "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/delete/*", + "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/*" ] }, { @@ -45,11 +53,9 @@ Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerg "iot:Subscribe" ], "Resource": [ - "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/accepted", - "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/rejected", - "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/accepted", - "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/rejected", - "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/delta" + "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/*", + "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/delete/*", + "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/*" ] }, { @@ -70,58 +76,233 @@ Note that in a real application, you may want to avoid the use of wildcards in y
-## How to run +## Walkthrough -To run the Shadow sample use the following command: +Before building and running the sample, you must first build and install the SDK: ``` sh -./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property +cd +mkdir _build +cd _build +cmake -DCMAKE_INSTALL_PREFIX= .. +make && make install ``` -You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: +Now build the sample: ``` sh -./mqtt5-shadow-sync --endpoint --cert --key --thing_name --shadow_property --ca_file +cd samples/shadow/shadow_sync +mkdir _build +cd _build +cmake -DCMAKE_PREFIX_PATH= .. +make +``` + +To run the sample: + +``` sh +./shadow_sync --endpoint --cert --key --thing_name +``` + +The sample also listens to a pair of event streams related to the classic (unnamed) shadow state of your thing, so in addition to responses, you will occasionally see output from these streaming operations as they receive events from the shadow service. + +Once successfully connected, you can issue commands. + +### Initialization + +Start off by getting the shadow state: + +``` +get +``` + +If your thing does have shadow state, you will get its current value, which this sample has no control over. Let's assume it was initialized +like what is described below: + +``` +get result: {"clientToken":"","state":{"reported":{"Color":"green"}},"metadata":{"reported":{"Color":{"timestamp":1730481958}}},"timestamp":1730482166,"version":1} +``` + +If your thing does not have any shadow state yet, you'll get a ResourceNotFound error: + +``` +get failed with error code: libaws-c-mqtt: AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR, Request-response operation failed with a modeled service error. +modeled error: {"clientToken":"","code":404,"message":"No shadow exists with name: ''"} +``` + +To create a shadow, you can issue an update call that will initialize the shadow to a starting state: + +``` +update-reported {"Color":"green"} +``` + +which will yield output similar to: + +``` +update-reported result: {"clientToken":"58de9848-e4db-e2ac-9622-d4c1197a5a14","state":{"reported":{"Color":"green"}},"metadata":{"reported":{"Color":{"timestamp":1730481958}}},"timestamp":1730481958,"version":1} + +Received ShadowUpdated event: {"current":{"state":{"reported":{"Color":"green"}},"metadata":{"reported":{"Color":{"timestamp":1730481958}}},"version":1},"timestamp":1730481958} +``` + +Notice that in addition to receiving a response to the update request, you also receive a `ShadowUpdated` event containing what changed about +the shadow plus additional metadata (version, update timestamps, etc...). Every time a shadow is updated, this +event is triggered. If you wish to listen and react to this event, use the `CreateShadowUpdatedStream` API in the shadow client to create a +streaming operation that converts the raw MQTT publish messages into modeled data that the streaming operation invokes a user-supplied callback with. + +Issue one more update to get the shadow's reported and desired states in sync: + +``` +update-desired {"Color":"green"} +``` + +yielding output similar to: + +``` +update-desired result: {"clientToken":"7f53091e-16a7-74d9-201b-b2d45a4d1ae7","state":{"desired":{"Color":"green"}},"metadata":{"desired":{"Color":{"timestamp":1730482304}}},"timestamp":1730482304,"version":2} + + +``` + +### Changing Properties +A device shadow contains two independent states: reported and desired. "Reported" represents the device's last-known local state, while +"desired" represents the state that control application(s) would like the device to change to. In general, each application (whether on the device or running +remotely as a control process) will only update one of these two state components. + +Let's walk through the multi-step process to coordinate a change-of-state on the device. First, a control application needs to update the shadow's desired +state with the change it would like applied: + +``` +update-desired {"Color":"red"} +``` + +For our sample, this yields output similar to: + +``` +update-desired result: {"clientToken":"35624091-7be2-3ab3-c193-bac8f4c4e9c8","state":{"desired":{"Color":"red"}},"metadata":{"desired":{"Color":{"timestamp":1730482794}}},"timestamp":1730482794,"version":3} + +Received ShadowUpdated event: {"previous":{"state":{"desired":{"Color":"green"},"reported":{"Color":"green"}},"metadata":{"desired":{"Color":{"timestamp":1730482304}},"reported":{"Color":{"timestamp":1730481958}}},"version":2},"current":{"state":{"desired":{"Color":"red"},"reported":{"Color":"green"}},"metadata":{"desired":{"Color":{"timestamp":1730482794}},"reported":{"Color":{"timestamp":1730481958}}},"version":3},"timestamp":1730482794} + +Received ShadowDeltaUpdated event: {"state":{"Color":"red"},"metadata":{"Color":{"timestamp":1730482794}},"timestamp":1730482794,"version":3,"clientToken":"35624091-7be2-3ab3-c193-bac8f4c4e9c8"} +``` + +The key thing to notice here is that in addition to the update response (which only the control application would see) and the ShadowUpdated event, +there is a new event, ShadowDeltaUpdated, which indicates properties on the shadow that are out-of-sync between desired and reported. All out-of-sync +properties will be included in this event, including properties that became out-of-sync due to a previous update. + +Like the ShadowUpdated event, ShadowDeltaUpdated events can be listened to by creating and configuring a streaming operation, this time by using +the `CreateShadowDeltaUpdatedStream` API. Using the ShadowDeltaUpdated events (rather than ShadowUpdated) lets a device focus on just what has +changed without having to do complex JSON diffs between the desired and reported states of the shadow. + +Assuming that the change expressed in the desired state is reasonable, the device should apply it internally and then let the service know it +has done so by updating the reported state of the shadow: + ``` +update-reported {"Color":"red"} +``` + +yielding +``` +update-reported result: {"clientToken":"5741d710-fe6b-7f4f-ece1-7767498a38c8","state":{"reported":{"Color":"red"}},"metadata":{"reported":{"Color":{"timestamp":1730482948}}},"timestamp":1730482948,"version":4} + +Received ShadowUpdated event: {"previous":{"state":{"desired":{"Color":"red"},"reported":{"Color":"green"}},"metadata":{"desired":{"Color":{"timestamp":1730482794}},"reported":{"Color":{"timestamp":1730481958}}},"version":3},"current":{"state":{"desired":{"Color":"red"},"reported":{"Color":"red"}},"metadata":{"desired":{"Color":{"timestamp":1730482794}},"reported":{"Color":{"timestamp":1730482948}}},"version":4},"timestamp":1730482948} +``` -## Service Client Notes -### Difference between MQTT5 and MQTT311 IotShadowClient -The IotShadowClient with Mqtt5 client is identical to Mqtt3 one. We wrapped the Mqtt5Client into MqttClientConnection so that we could keep the same interface for IotShadowClient. -The only difference is that you would need setup up a Mqtt5 Client for the IotShadowClient. For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../../../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](../../mqtt5/mqtt5_pubsub/) +Notice that no ShadowDeltaUpdated event is generated because the reported and desired states are now back in sync. - - - - - - - - - -
Create a IotShadowClient with Mqtt5Create a IotShadowClient with Mqtt311
+### Multiple Properties +Not all shadow properties represent device configuration. To illustrate several more aspects of the Shadow service, let's add a second property to our shadow document, +starting out in sync (output omitted): -```Cpp - // Build Mqtt5Client - std::shared_ptr client = builder->Build(); +``` +update-reported {"Status":"Great"} +``` - // Create shadow client with mqtt5 client - Aws::Iotshadow::IotShadowClient shadowClient(client); +``` +update-desired {"Status":"Great"} ``` - +Notice that shadow updates work by deltas rather than by complete state changes. Updating the "Status" property to a value had no effect on the shadow's +"Color" property: -```Cpp - // Create mqtt311 connection - Aws::Iot::MqttClient client = Aws::Iot::MqttClient(); - auto connection = client.NewConnection(clientConfig); +``` +get +``` - // Create shadow client with mqtt311 connection - Aws::Iotshadow::IotShadowClient shadowClient(connection); +yields ``` +get result: {"clientToken":"2b689730-8144-c20f-07dd-60bdf4e3f2b7","state":{"desired":{"Color":"red","Status":"Great"},"reported":{"Color":"red","Status":"Great"}},"metadata":{"desired":{"Color":{"timestamp":1730482794},"Status":{"timestamp":1730483069}},"reported":{"Color":{"timestamp":1730482948},"Status":{"timestamp":1730483062}}},"timestamp":1730483086,"version":6} +``` + +Suppose something goes wrong with the device and its status is no longer "Great" -
+``` +update-reported {"Status":"Awful"} +``` + +which yields something similar to: + +``` +update-reported result: {"clientToken":"af54adca-85c9-c4c9-52b0-7349337f57d5","state":{"reported":{"Status":"Awful"}},"metadata":{"reported":{"Status":{"timestamp":1730483858}}},"timestamp":1730483858,"version":7} + +Received ShadowUpdated event: {"previous":{"state":{"desired":{"Color":"red","Status":"Great"},"reported":{"Color":"red","Status":"Great"}},"metadata":{"desired":{"Color":{"timestamp":1730482794},"Status":{"timestamp":1730483069}},"reported":{"Color":{"timestamp":1730482948},"Status":{"timestamp":1730483062}}},"version":6},"current":{"state":{"desired":{"Color":"red","Status":"Great"},"reported":{"Color":"red","Status":"Awful"}},"metadata":{"desired":{"Color":{"timestamp":1730482794},"Status":{"timestamp":1730483069}},"reported":{"Color":{"timestamp":1730482948},"Status":{"timestamp":1730483858}}},"version":7},"timestamp":1730483858} + +Received ShadowDeltaUpdated event: {"state":{"Status":"Great"},"metadata":{"Status":{"timestamp":1730483069}},"timestamp":1730483858,"version":7,"clientToken":"af54adca-85c9-c4c9-52b0-7349337f57d5"} +``` + +Similar to how updates are delta-based, notice how the ShadowDeltaUpdated event only includes the "Status" property, leaving the "Color" property out because it +is still in sync between desired and reported. + +### Removing properties +Properties can be removed from a shadow by setting them to null. Removing a property completely would require its removal from both the +reported and desired states of the shadow (output omitted): + +``` +update-reported {"Status":null} +``` -### Mqtt::QOS v.s. Mqtt5::QOS -As the service client interface is unchanged for Mqtt3 Connection and Mqtt5 Client,the IotShadowClient will use Mqtt::QOS instead of Mqtt5::QOS even with a Mqtt5 Client. +``` +update-desired {"Status":null} +``` + +If you now get the shadow state: + +``` +get +``` + +its output yields something like + +``` +get result: {"clientToken":"2dccc7b8-47ad-80c5-299f-3de5d9b553fa","state":{"desired":{"Color":"red"},"reported":{"Color":"red"}},"metadata":{"desired":{"Color":{"timestamp":1730482794}},"reported":{"Color":{"timestamp":1730482948}}},"timestamp":1730483940,"version":9} +``` + +The Status property has been fully removed from the shadow state. + +### Removing a shadow +To remove a shadow, you must invoke the DeleteShadow API (setting the reported and desired +states to null will only clear the states, but not delete the shadow resource itself). + +``` +delete +``` + +yields something like + +``` +delete result: {"clientToken":"495844bb-65c9-58a6-bd16-099fa3021512","timestamp":1730483990,"version":9} +``` + +Subsequent attempts to get the shadow return an error: + +``` +get +``` + +results in something similar to + +``` +get failed with error code: libaws-c-mqtt: AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR, Request-response operation failed with a modeled service error. +modeled error: {"clientToken":"0228fa77-2bbf-bf91-9915-128668692dbb","code":404,"message":"No shadow exists with name: ''"} +``` \ No newline at end of file diff --git a/samples/shadow/shadow_sync/main.cpp b/samples/shadow/shadow_sync/main.cpp index 447c73917..cf35a4aa3 100644 --- a/samples/shadow/shadow_sync/main.cpp +++ b/samples/shadow/shadow_sync/main.cpp @@ -4,6 +4,7 @@ */ #include #include +#include #include #include @@ -22,7 +23,6 @@ #include #include #include -#include #include "../../utils/CommandLineUtils.h" @@ -111,10 +111,10 @@ static String s_nibbleNextToken(String &input) { static void s_printHelp() { fprintf(stdout, "\nShadow sandbox:\n\n"); fprintf(stdout, " quit -- quits the program\n"); - fprintf(stdout, " get -- gets the state of a named shadow belonging to the specified thing\n"); - fprintf(stdout, " delete -- deletes a named shadow belonging to the specified thing\n"); - fprintf(stdout, " update-desired -- updates the desired state of a named shadow belonging to the specified thing\n"); - fprintf(stdout, " update-reported -- updates the reported state a named shadow belonging to the specified thing\n\n"); + fprintf(stdout, " get -- gets the current value of the IoT thing's shadow\n"); + fprintf(stdout, " delete -- deletes the IoT thing's shadow\n"); + fprintf(stdout, " update-desired -- updates the desired state of the IoT thing's shadow. If the shadow does not exist, it will be created.\n"); + fprintf(stdout, " update-reported -- updates the reported state of the IoT thing's shadow. If the shadow does not exist, it will be created.\n\n"); } static void s_onServiceError(const ServiceErrorV2 &serviceError, String operationName) { @@ -255,7 +255,7 @@ static std::shared_ptr s_createS Aws::Crt::JsonObject jsonObject; event.SerializeToObject(jsonObject); Aws::Crt::String json = jsonObject.View().WriteCompact(true); - fprintf(stdout, "Received shadow updated event: %s\n", json.c_str()); + fprintf(stdout, "Received ShadowUpdated event: %s\n", json.c_str()); }); auto stream = context.m_shadowClient->CreateShadowUpdatedStream(request, options); @@ -282,7 +282,7 @@ static std::shared_ptr s_createS Aws::Crt::JsonObject jsonObject; event.SerializeToObject(jsonObject); Aws::Crt::String json = jsonObject.View().WriteCompact(true); - fprintf(stdout, "Received shadow delta updated event: %s\n", json.c_str()); + fprintf(stdout, "Received ShadowDeltaUpdated event: %s\n", json.c_str()); }); auto stream = context.m_shadowClient->CreateShadowDeltaUpdatedStream(request, options); @@ -309,6 +309,9 @@ static bool s_handleInput(const Aws::Crt::String &input, ApplicationContext &con } else if (command == "update-reported") { s_handleUpdateReportedShadow(remaining, context); } else { + if (command != "help") { + fprintf(stdout, "Command not recognized: %s\n", command.c_str()); + } s_printHelp(); } @@ -339,6 +342,12 @@ int main(int argc, char *argv[]) return -1; } + auto clientId = "test-" + UUID().ToString(); + auto connectPacket = MakeShared(DefaultAllocatorImplementation()); + connectPacket->WithClientId(clientId); + + builder->WithConnectOptions(connectPacket); + SimpleWaiter connectedWaiter; // Setup lifecycle callbacks From e2e0f34bdb1b42307cf0b7c2399a69a9a17b1ab7 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Sun, 3 Nov 2024 07:24:41 -0800 Subject: [PATCH 04/19] Checkpoint --- codebuild/samples/shadow-linux.sh | 5 +- samples/CMakeLists.txt | 12 +- samples/README.md | 5 +- .../fleet_provisioning/CMakeLists.txt | 28 ++ .../fleet_provisioning/README.md | 2 +- .../fleet_provisioning/main.cpp | 0 .../mqtt5_fleet_provisioning/CMakeLists.txt | 4 +- .../mqtt5_fleet_provisioning/README.md | 348 ++++++++++++++++++ .../mqtt5_fleet_provisioning/main.cpp | 0 .../shadow}/mqtt5_shadow_sync/CMakeLists.txt | 0 .../shadow}/mqtt5_shadow_sync/README.md | 0 .../shadow}/mqtt5_shadow_sync/main.cpp | 0 .../shadow}/shadow_sync/CMakeLists.txt | 0 .../shadow}/shadow_sync/README.md | 0 .../shadow}/shadow_sync/main.cpp | 0 .../CMakeLists.txt | 2 +- .../README.md | 0 samples/fleet_provisioning/basic/main.cpp | 161 ++++++++ samples/shadow/shadow_sync/main.cpp | 82 ++--- 19 files changed, 575 insertions(+), 74 deletions(-) create mode 100644 samples/deprecated/fleet_provisioning/fleet_provisioning/CMakeLists.txt rename samples/{ => deprecated}/fleet_provisioning/fleet_provisioning/README.md (99%) rename samples/{ => deprecated}/fleet_provisioning/fleet_provisioning/main.cpp (100%) rename samples/{ => deprecated}/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt (88%) create mode 100644 samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/README.md rename samples/{ => deprecated}/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp (100%) rename samples/{shadow/deprecated => deprecated/shadow}/mqtt5_shadow_sync/CMakeLists.txt (100%) rename samples/{shadow/deprecated => deprecated/shadow}/mqtt5_shadow_sync/README.md (100%) rename samples/{shadow/deprecated => deprecated/shadow}/mqtt5_shadow_sync/main.cpp (100%) rename samples/{shadow/deprecated => deprecated/shadow}/shadow_sync/CMakeLists.txt (100%) rename samples/{shadow/deprecated => deprecated/shadow}/shadow_sync/README.md (100%) rename samples/{shadow/deprecated => deprecated/shadow}/shadow_sync/main.cpp (100%) rename samples/fleet_provisioning/{fleet_provisioning => basic}/CMakeLists.txt (95%) rename samples/fleet_provisioning/{mqtt5_fleet_provisioning => basic}/README.md (100%) create mode 100644 samples/fleet_provisioning/basic/main.cpp diff --git a/codebuild/samples/shadow-linux.sh b/codebuild/samples/shadow-linux.sh index 2d32b626c..b2daa82ce 100755 --- a/codebuild/samples/shadow-linux.sh +++ b/codebuild/samples/shadow-linux.sh @@ -5,7 +5,7 @@ set -e env # v1 MQTT311 shadow sample -pushd $CODEBUILD_SRC_DIR/samples/shadow/deprecated/shadow_sync +pushd $CODEBUILD_SRC_DIR/samples/deprecated/shadow/shadow_sync mkdir _build cd _build @@ -20,7 +20,7 @@ echo "Shadow-Sync test" popd # v1 MQTT5 shadow sample -pushd $CODEBUILD_SRC_DIR/samples/shadow/deprecated/mqtt5_shadow_sync +pushd $CODEBUILD_SRC_DIR/samples/deprecated/shadow/mqtt5_shadow_sync mkdir _build cd _build @@ -32,7 +32,6 @@ ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "ci/endpoint" --query echo "Shadow-Sync test" ./mqtt5-shadow-sync --endpoint $ENDPOINT --key /tmp/privatekey.pem --cert /tmp/certificate.pem --thing_name CI_CodeBuild_Thing --is_ci true - popd # v2 MQTT5 shadow sample - smaple is interactive so build but don't run diff --git a/samples/CMakeLists.txt b/samples/CMakeLists.txt index af1fb480e..d1bf6f093 100644 --- a/samples/CMakeLists.txt +++ b/samples/CMakeLists.txt @@ -4,10 +4,10 @@ project(aws-iot-device-sdk-cpp-v2-samples) add_subdirectory(device_defender/basic_report) add_subdirectory(device_defender/mqtt5_basic_report) +add_subdirectory(fleet_provisioning/basic) +add_subdirectory(fleet_provisioning/csr) add_subdirectory(greengrass/ipc) add_subdirectory(greengrass/basic_discovery) -add_subdirectory(fleet_provisioning/fleet_provisioning) -add_subdirectory(fleet_provisioning/mqtt5_fleet_provisioning) add_subdirectory(jobs/job_execution) add_subdirectory(jobs/mqtt5_job_execution) add_subdirectory(mqtt/basic_connect) @@ -24,6 +24,10 @@ add_subdirectory(pub_sub/basic_pub_sub) add_subdirectory(pub_sub/cycle_pub_sub) add_subdirectory(secure_tunneling/secure_tunnel) add_subdirectory(secure_tunneling/tunnel_notification) -add_subdirectory(shadow/deprecated/shadow_sync) -add_subdirectory(shadow/deprecated/mqtt5_shadow_sync) add_subdirectory(shadow/shadow_sync) + + +add_subdirectory(deprecated/shadow/shadow_sync) +add_subdirectory(deprecated/shadow/mqtt5_shadow_sync) +add_subdirectory(deprecated/fleet_provisioning/fleet_provisioning) +add_subdirectory(deprecated/fleet_provisioning/mqtt5_fleet_provisioning) diff --git a/samples/README.md b/samples/README.md index d3001076e..27c145870 100644 --- a/samples/README.md +++ b/samples/README.md @@ -10,7 +10,6 @@ + [HTTP Proxy](./mqtt5/mqtt5_pubsub/README.md#http-proxy) * [Mqtt5 Shared Subscription](./mqtt5/mqtt5_shared_subscription/README.md) * [Mqtt5 Jobs](./jobs/mqtt5_job_execution/README.md) -* [Shadow](./shadow/shadow_sync/README.md) * [Mqtt5 Fleet Provisioning](./fleet_provisioning/mqtt5_fleet_provisioning/README.md) ## MQTT311 Samples * [Basic Pub-Sub](./pub_sub/basic_pub_sub/README.md) @@ -23,8 +22,10 @@ * [Custom Authorizer Connect](./mqtt/custom_authorizer_connect/README.md) * [Cognito Connect](./mqtt/cognito_connect/README.md) * [Jobs](./jobs/job_execution/README.md) -* [Fleet provisioning](./fleet_provisioning/fleet_provisioning/README.md) ## Other Samples +* [Shadow](./shadow/shadow_sync/README.md) +* [Basic Fleet Provisioning](./fleet_provisioning/basic/README.md) +* [CSR Fleet Provisioning](./fleet_provisioning/csr/README.md) * [Secure Tunnel](./secure_tunneling/secure_tunnel/README.md) * [Secure Tunnel Notification](./secure_tunneling/tunnel_notification/README.md) * [Cycle Pub-Sub](./pub_sub/cycle_pub_sub/README.md) diff --git a/samples/deprecated/fleet_provisioning/fleet_provisioning/CMakeLists.txt b/samples/deprecated/fleet_provisioning/fleet_provisioning/CMakeLists.txt new file mode 100644 index 000000000..8105d7a43 --- /dev/null +++ b/samples/deprecated/fleet_provisioning/fleet_provisioning/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.9) +# note: cxx-17 requires cmake 3.8, cxx-20 requires cmake 3.12 +project(fleet-provisioning CXX) + +file(GLOB SRC_FILES + "*.cpp" + "../../../utils/CommandLineUtils.cpp" + "../../../utils/CommandLineUtils.h" +) + +add_executable(${PROJECT_NAME} ${SRC_FILES}) + +set_target_properties(${PROJECT_NAME} PROPERTIES + CXX_STANDARD 14) + +#set warnings +if (MSVC) + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /WX) +else () + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wno-long-long -pedantic -Werror) +endif () + +find_package(aws-crt-cpp REQUIRED) +find_package(IotIdentity-cpp REQUIRED) + +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + +target_link_libraries(${PROJECT_NAME} PRIVATE AWS::aws-crt-cpp AWS::IotIdentity-cpp) diff --git a/samples/fleet_provisioning/fleet_provisioning/README.md b/samples/deprecated/fleet_provisioning/fleet_provisioning/README.md similarity index 99% rename from samples/fleet_provisioning/fleet_provisioning/README.md rename to samples/deprecated/fleet_provisioning/fleet_provisioning/README.md index d631083f7..4226783c8 100644 --- a/samples/fleet_provisioning/fleet_provisioning/README.md +++ b/samples/deprecated/fleet_provisioning/fleet_provisioning/README.md @@ -1,6 +1,6 @@ # Fleet provisioning -[**Return to main sample list**](../../README.md) +[**Return to main sample list**](../../../README.md) This sample uses the AWS IoT [Fleet provisioning](https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html) to provision devices using either a CSR or Keys-And-Certificate and subsequently calls RegisterThing. This allows you to create new AWS IoT Core things using a Fleet Provisioning Template. diff --git a/samples/fleet_provisioning/fleet_provisioning/main.cpp b/samples/deprecated/fleet_provisioning/fleet_provisioning/main.cpp similarity index 100% rename from samples/fleet_provisioning/fleet_provisioning/main.cpp rename to samples/deprecated/fleet_provisioning/fleet_provisioning/main.cpp diff --git a/samples/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt b/samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt similarity index 88% rename from samples/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt rename to samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt index 644023257..81fcaaa0a 100644 --- a/samples/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt +++ b/samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/CMakeLists.txt @@ -4,8 +4,8 @@ project(mqtt5-fleet-provisioning CXX) file(GLOB SRC_FILES "*.cpp" - "../../utils/CommandLineUtils.cpp" - "../../utils/CommandLineUtils.h" + "../../../utils/CommandLineUtils.cpp" + "../../../utils/CommandLineUtils.h" ) add_executable(${PROJECT_NAME} ${SRC_FILES}) diff --git a/samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/README.md b/samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/README.md new file mode 100644 index 000000000..d04d7d1e7 --- /dev/null +++ b/samples/deprecated/fleet_provisioning/mqtt5_fleet_provisioning/README.md @@ -0,0 +1,348 @@ +# Fleet provisioning + +[**Return to main sample list**](../../../README.md) + +This sample uses the AWS IoT [Fleet provisioning](https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html) to provision devices using either a CSR or Keys-And-Certificate and subsequently calls RegisterThing. This allows you to create new AWS IoT Core things using a Fleet Provisioning Template. + +On startup, the script subscribes to topics based on the request type of either CSR or Keys topics, publishes the request to corresponding topic and calls RegisterThing. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/rejected"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create/json/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create-from-csr/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create-from-csr/json/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/provisioning-templates/templatename/provision/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/provisioning-templates/templatename/provision/json/rejected"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS Fleet Provisioning template you want to use to create new AWS IoT Core Things. + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +There are many different ways to run the Fleet Provisioning sample because of how many different ways there are to setup a Fleet Provisioning template in AWS IoT Core. **The easiest and most common way is to run the sample with the following**: + +``` sh +./mqtt5-fleet-provisioning --endpoint --cert --key --template_name