Skip to content

Commit

Permalink
Refactor mqtt5 fleet provisioning sample further
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Jul 12, 2024
1 parent 3dd5dfb commit 67f2331
Showing 1 changed file with 157 additions and 122 deletions.
279 changes: 157 additions & 122 deletions samples/fleet_provisioning/mqtt5_fleet_provisioning/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ static std::string getFileData(std::string const &fileName)
return str;
}

/**
* Auxiliary structure for holding data used by MQTT connection.
*/
struct Mqtt5ClientContext
{
std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;
std::promise<void> disconnectPromise;
std::promise<bool> subscribeSuccess;
};

/**
* Auxiliary structure for holding data used when creating a certificate.
*/
Expand All @@ -47,14 +58,82 @@ struct CreateCertificateContext
String token;
};

/**
* Auxiliary structure for holding data used when registering a thing.
*/
struct RegisterThingContext
{
std::promise<void> pubAckPromise;
std::promise<void> acceptedSubAckPromise;
std::promise<void> rejectedSubAckPromise;
std::promise<void> thingCreatedPromise;
};

/**
* Create MQTT5 client.
*/
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> createMqtt5Client(Mqtt5ClientContext &ctx, const Utils::cmdData &cmdData)
{
// Create the MQTT5 builder and populate it with data from cmdData.
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());

// Check if the builder setup correctly.
if (builder == nullptr)
{
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
exit(-1);
}

// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
builder->WithPort(static_cast<uint32_t>(cmdData.input_port));
}

// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback([&ctx](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
ctx.connectionPromise.set_value(true);
});
builder->WithClientConnectionFailureCallback([&ctx](const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
ctx.connectionPromise.set_value(false);
});
builder->WithClientStoppedCallback([&ctx](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
ctx.stoppedPromise.set_value();
});
builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) {
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
});
builder->WithClientDisconnectionCallback([&ctx](const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
ctx.disconnectPromise.set_value();
});

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();
delete builder;

return client;
}

/**
* Keys-and-Certificate workflow.
*
* @note Subscriptions created here will be active even after the function completes. So, all variables accessed in the
* callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of
* CreateCertificateContext is used to store variables used by the callbacks.
*/
void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx)
void createKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateContext &ctx)
{
auto onKeysPublishPubAck = [&ctx](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
Expand Down Expand Up @@ -144,7 +223,7 @@ void useKeysAndCertificate(IotIdentityClient &identityClient, CreateCertificateC
* callbacks must be alive for the whole duration of the identityClient's lifetime. An instance of
* CreateCertificateContext is used to store variables used by the callbacks.
*/
void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile)
void createCertificateFromCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, const String &csrFile)
{
auto onCsrPublishPubAck = [&ctx](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
Expand Down Expand Up @@ -230,111 +309,22 @@ void useCsr(IotIdentityClient &identityClient, CreateCertificateContext &ctx, co
ctx.tokenReceivedPromise.get_future().wait();
}

int main(int argc, char *argv[])
/**
* Provision an AWS IoT thing using a pre-defined template.
*/
void registerThing(
IotIdentityClient &identityClient,
RegisterThingContext &ctx,
const Utils::cmdData &cmdData,
const String &token)
{
/************************ Setup ****************************/

// Do the global initialization for the API
ApiHandle apiHandle;
// Variables for the sample
String csrFile;
RegisterThingResponse registerThingResponse;

/**
* cmdData is the arguments/input from the command line placed into a single struct for
* use in this sample. This handles all of the command line parsing, validating, etc.
* See the Utils/CommandLineUtils for more information.
*/
Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle);

if (cmdData.input_csrPath != "")
{
csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str();
}

// Create the MQTT5 builder and populate it with data from cmdData.
Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
cmdData.input_endpoint, cmdData.input_cert.c_str(), cmdData.input_key.c_str());

// Check if the builder setup correctly.
if (builder == nullptr)
{
printf(
"Failed to setup mqtt5 client builder with error code %d: %s", LastError(), ErrorDebugString(LastError()));
return -1;
}

// Setup connection options
std::shared_ptr<Mqtt5::ConnectPacket> connectOptions = std::make_shared<Mqtt5::ConnectPacket>();
connectOptions->WithClientId(cmdData.input_clientId);
builder->WithConnectOptions(connectOptions);
if (cmdData.input_port != 0)
{
builder->WithPort(static_cast<uint32_t>(cmdData.input_port));
}

std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;
std::promise<void> disconnectPromise;
std::promise<bool> subscribeSuccess;

// Setup lifecycle callbacks
builder->WithClientConnectionSuccessCallback(
[&connectionPromise](const Mqtt5::OnConnectionSuccessEventData &eventData) {
fprintf(
stdout,
"Mqtt5 Client connection succeed, clientid: %s.\n",
eventData.negotiatedSettings->getClientId().c_str());
connectionPromise.set_value(true);
});
builder->WithClientConnectionFailureCallback([&connectionPromise](
const Mqtt5::OnConnectionFailureEventData &eventData) {
fprintf(stdout, "Mqtt5 Client connection failed with error: %s.\n", aws_error_debug_str(eventData.errorCode));
connectionPromise.set_value(false);
});
builder->WithClientStoppedCallback([&stoppedPromise](const Mqtt5::OnStoppedEventData &) {
fprintf(stdout, "Mqtt5 Client stopped.\n");
stoppedPromise.set_value();
});
builder->WithClientAttemptingConnectCallback([](const Mqtt5::OnAttemptingConnectEventData &) {
fprintf(stdout, "Mqtt5 Client attempting connection...\n");
});
builder->WithClientDisconnectionCallback([&disconnectPromise](const Mqtt5::OnDisconnectionEventData &eventData) {
fprintf(stdout, "Mqtt5 Client disconnection with reason: %s.\n", aws_error_debug_str(eventData.errorCode));
disconnectPromise.set_value();
});

// Create Mqtt5Client
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> client = builder->Build();
delete builder;
/************************ Run the sample ****************************/

fprintf(stdout, "Connecting...\n");
if (!client->Start())
{
fprintf(stderr, "MQTT5 Connection failed to start");
exit(-1);
}

if (!connectionPromise.get_future().get())
{
return -1;
}
IotIdentityClient identityClient(client);

std::promise<void> registerPublishPubAckCompletedPromise;
std::promise<void> registerAcceptedSubAckCompletedPromise;
std::promise<void> registerRejectedSubAckCompletedPromise;
std::promise<void> registerAcceptedCompletedPromise;

auto onRegisterAcceptedSubAck = [&](int ioErr) {
if (ioErr != AWS_OP_SUCCESS)
{
fprintf(stderr, "Error subscribing to RegisterThing accepted: %s\n", ErrorDebugString(ioErr));
exit(-1);
}

registerAcceptedSubAckCompletedPromise.set_value();
ctx.acceptedSubAckPromise.set_value();
};

auto onRegisterRejectedSubAck = [&](int ioErr) {
Expand All @@ -343,14 +333,14 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error subscribing to RegisterThing rejected: %s\n", ErrorDebugString(ioErr));
exit(-1);
}
registerRejectedSubAckCompletedPromise.set_value();
ctx.rejectedSubAckPromise.set_value();
};

auto onRegisterAccepted = [&](RegisterThingResponse *response, int ioErr) {
if (ioErr == AWS_OP_SUCCESS)
{
fprintf(stdout, "RegisterThingResponse ThingName: %s.\n", response->ThingName->c_str());
registerAcceptedCompletedPromise.set_value();
ctx.thingCreatedPromise.set_value();
}
else
{
Expand Down Expand Up @@ -382,22 +372,9 @@ int main(int argc, char *argv[])
fprintf(stderr, "Error publishing to RegisterThing: %s\n", ErrorDebugString(ioErr));
exit(-1);
}

registerPublishPubAckCompletedPromise.set_value();
ctx.pubAckPromise.set_value();
};

// Create certificate.
CreateCertificateContext certificateContext;
if (csrFile.empty())
{
useKeysAndCertificate(identityClient, certificateContext);
}
else
{
useCsr(identityClient, certificateContext, csrFile);
}

// After certificate is obtained, it's time to register a thing.
fprintf(stdout, "Subscribing to RegisterThing Accepted and Rejected topics\n");
RegisterThingSubscriptionRequest registerSubscriptionRequest;
registerSubscriptionRequest.TemplateName = cmdData.input_templateName;
Expand All @@ -409,8 +386,8 @@ int main(int argc, char *argv[])
registerSubscriptionRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterRejected, onRegisterRejectedSubAck);

// Wait for the subscriptions to the accept and reject RegisterThing topics to be established.
registerAcceptedSubAckCompletedPromise.get_future().wait();
registerRejectedSubAckCompletedPromise.get_future().wait();
ctx.acceptedSubAckPromise.get_future().wait();
ctx.rejectedSubAckPromise.get_future().wait();

fprintf(stdout, "Publishing to RegisterThing topic\n");
RegisterThingRequest registerThingRequest;
Expand All @@ -429,18 +406,76 @@ int main(int argc, char *argv[])
registerThingRequest.Parameters = params;
// NOTE: In a real application creating multiple certificates you'll probably need to protect token var with
// a critical section. This sample makes only one request for a certificate, so no data race is possible.
registerThingRequest.CertificateOwnershipToken = certificateContext.token;
registerThingRequest.CertificateOwnershipToken = token;

identityClient.PublishRegisterThing(registerThingRequest, AWS_MQTT_QOS_AT_LEAST_ONCE, onRegisterPublishPubAck);
registerPublishPubAckCompletedPromise.get_future().wait();
ctx.pubAckPromise.get_future().wait();

// Wait for registering a thing to succeed.
registerAcceptedCompletedPromise.get_future().wait();
ctx.thingCreatedPromise.get_future().wait();
}

int main(int argc, char *argv[])
{
/************************ Setup ****************************/

// Do the global initialization for the API
ApiHandle apiHandle;
// Variables for the sample
String csrFile;
RegisterThingResponse registerThingResponse;

/**
* cmdData is the arguments/input from the command line placed into a single struct for
* use in this sample. This handles all of the command line parsing, validating, etc.
* See the Utils/CommandLineUtils for more information.
*/
Utils::cmdData cmdData = Utils::parseSampleInputFleetProvisioning(argc, argv, &apiHandle);

if (cmdData.input_csrPath != "")
{
csrFile = getFileData(cmdData.input_csrPath.c_str()).c_str();
}

Mqtt5ClientContext mqtt5ClientContext;
auto client = createMqtt5Client(mqtt5ClientContext, cmdData);

/************************ Run the sample ****************************/

fprintf(stdout, "Connecting...\n");
if (!client->Start())
{
fprintf(stderr, "MQTT5 Connection failed to start");
exit(-1);
}

if (!mqtt5ClientContext.connectionPromise.get_future().get())
{
return -1;
}

// Create fleet provisioning client.
IotIdentityClient identityClient(client);

// Create certificate.
CreateCertificateContext certificateContext;
if (csrFile.empty())
{
createKeysAndCertificate(identityClient, certificateContext);
}
else
{
createCertificateFromCsr(identityClient, certificateContext, csrFile);
}

// After certificate is obtained, it's time to register a thing.
RegisterThingContext registerThingContext;
registerThing(identityClient, registerThingContext, cmdData, certificateContext.token);

// Disconnect
if (client->Stop())
{
stoppedPromise.get_future().wait();
mqtt5ClientContext.stoppedPromise.get_future().wait();
}

return 0;
Expand Down

0 comments on commit 67f2331

Please sign in to comment.