diff --git a/tests/IotServiceTest.cpp b/tests/IotServiceTest.cpp index 1945d7dfc..7f3f5c8c1 100644 --- a/tests/IotServiceTest.cpp +++ b/tests/IotServiceTest.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -599,13 +600,19 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx) subscriberCv.wait(lock, [&]() { return subscriberSubscribed; }); } + // there appears to be a race condition broker-side in IoT Core such that the interrupter is occasionally + // rejected rather than the existing connection. We try and work around this in two ways: + // (1) Wait a couple seconds to let things settle in terms of eventual consistency. + // (2) Have the interrupter make connection attempts in a loop until a successful interruption occurs. + std::this_thread::sleep_for(std::chrono::seconds(2)); + // Disconnect the client by interrupting it with another client with the same ID // which will cause the will to be sent auto interruptConnection = mqttClient.NewConnection(envVars.inputHost.c_str(), 8883, socketOptions, tlsContext); - interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload); std::mutex interruptMutex; std::condition_variable interruptCv; bool interruptConnected = false; + bool interruptConnectionAttemptComplete = false; auto interruptOnConnectionCompleted = [&](MqttConnection &, int errorCode, ReturnCode returnCode, bool sessionPresent) { @@ -614,7 +621,11 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx) (void)sessionPresent; { std::lock_guard lock(interruptMutex); - interruptConnected = true; + interruptConnectionAttemptComplete = true; + if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED) + { + interruptConnected = true; + } } interruptCv.notify_one(); }; @@ -628,10 +639,16 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx) }; interruptConnection->OnConnectionCompleted = interruptOnConnectionCompleted; interruptConnection->OnDisconnect = interruptOnDisconnect; - interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true); + + bool continueConnecting = true; + while (continueConnecting) { - std::unique_lock lock(interruptMutex); - interruptCv.wait(lock, [&]() { return interruptConnected; }); + interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true); + { + std::unique_lock lock(interruptMutex); + interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; }); + continueConnecting = !interruptConnected; + } } // wait for message received callback - meaning the will was sent diff --git a/tests/Mqtt5ClientTest.cpp b/tests/Mqtt5ClientTest.cpp index ad14dfcc0..5d721a7b1 100644 --- a/tests/Mqtt5ClientTest.cpp +++ b/tests/Mqtt5ClientTest.cpp @@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID; const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID; - const int MESSAGE_NUMBER = 10; - std::atomic client_messages(0); - bool client1_received = false; - bool client2_received = false; - - std::vector receivedMessages(MESSAGE_NUMBER); - for (int i = 0; i < MESSAGE_NUMBER; i++) - { - receivedMessages.push_back(0); - } + static const int MESSAGE_COUNT = 10; + std::mutex receivedMessagesLock; + std::condition_variable receivedMessagesSignal; + std::vector receivedMessages; Aws::Iot::Mqtt5ClientBuilder *subscribe_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( @@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi allocator); ASSERT_TRUE(subscribe_builder); - std::promise client_received; - - auto get_on_message_callback = [&](bool &received) + auto on_message_callback1 = [&](const PublishReceivedEventData &eventData) { - return [&](const PublishReceivedEventData &eventData) -> int + String topic = eventData.publishPacket->getTopic(); + if (topic == TEST_TOPIC) { - String topic = eventData.publishPacket->getTopic(); - if (topic == TEST_TOPIC) + ByteCursor payload = eventData.publishPacket->getPayload(); + String message_string = String((const char *)payload.ptr, payload.len); + int message_int = atoi(message_string.c_str()); + { - ByteCursor payload = eventData.publishPacket->getPayload(); - String message_string = String((const char *)payload.ptr, payload.len); - int message_int = atoi(message_string.c_str()); - ASSERT_TRUE(message_int < MESSAGE_NUMBER); - ++receivedMessages[message_int]; - received = true; // this line has changed - - bool exchanged = false; - int desired = 11; - int tested = 10; - client_messages++; - exchanged = client_messages.compare_exchange_strong(tested, desired); - if (exchanged == true) + std::lock_guard guard(receivedMessagesLock); + receivedMessages.push_back(message_int); + + if (receivedMessages.size() == MESSAGE_COUNT) { - client_received.set_value(); + receivedMessagesSignal.notify_all(); } } - return 0; - }; + } }; - auto onMessage_client1 = get_on_message_callback(client1_received); - auto onMessage_client2 = get_on_message_callback(client2_received); + auto on_message_callback2 = on_message_callback1; - subscribe_builder->WithPublishReceivedCallback(onMessage_client1); + subscribe_builder->WithPublishReceivedCallback(on_message_callback1); Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( @@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi mqtt5TestVars.m_private_key_path_string.c_str(), allocator); ASSERT_TRUE(subscribe_builder2); - - subscribe_builder2->WithPublishReceivedCallback(onMessage_client2); + subscribe_builder2->WithPublishReceivedCallback(on_message_callback2); Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath( mqtt5TestVars.m_hostname_string, @@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi suback.get_future().wait(); /* Publish message 10 to test topic */ - for (int i = 0; i < MESSAGE_NUMBER; i++) + for (int i = 0; i < MESSAGE_COUNT; i++) { std::string payload = std::to_string(i); std::shared_ptr publish = std::make_shared( @@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi } /* Wait for all packets to be received on both clients */ - client_received.get_future().wait(); - - /* Unsubscribe from the topic from both clients*/ - Vector unsubList; - unsubList.push_back(TEST_TOPIC); - std::shared_ptr unsubscribe_client1 = - std::make_shared(allocator); - unsubscribe_client1->WithTopicFilters(unsubList); - ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1)); - - std::shared_ptr unsubscribe_client2 = - std::make_shared(allocator); - unsubscribe_client2->WithTopicFilters(unsubList); - ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2)); - - /* make sure all messages are received */ - ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */ - - /* makes sure both clients received at least one message */ - ASSERT_TRUE(client1_received); - ASSERT_TRUE(client2_received); + std::unique_lock receivedLock(receivedMessagesLock); + receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; }); + std::sort(receivedMessages.begin(), receivedMessages.end()); /* make sure all messages are received with no duplicates*/ - for (int i = 0; i < MESSAGE_NUMBER; i++) + for (int i = 0; i < MESSAGE_COUNT; i++) { - ASSERT_TRUE(receivedMessages[i] > 0); + ASSERT_INT_EQUALS(i, receivedMessages[i]); } /* Stop all clients */ ASSERT_TRUE(mqtt5Client->Stop()); diff --git a/tests/MqttRequestResponse.cpp b/tests/MqttRequestResponse.cpp index bf02f29fc..f701d208d 100644 --- a/tests/MqttRequestResponse.cpp +++ b/tests/MqttRequestResponse.cpp @@ -287,7 +287,7 @@ static TestContext s_CreateClient( Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator); Aws::Crt::Io::SocketOptions socketOptions; - socketOptions.SetConnectTimeoutMs(3000); + socketOptions.SetConnectTimeoutMs(10000); Aws::Crt::Mqtt::MqttClient client; context.protocolClient311 =