Skip to content

Commit

Permalink
Flaky test pass1 (#693)
Browse files Browse the repository at this point in the history
* Update shared subscription test to remove invalid assertions on a property that does not necessarily hold (all subscriptions receive messages)
* Increase socket timeout on request-response 311 tests
* Update 311 will test to work around broker race condition wrt which connection gets terminated on duplicate client id

Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Jan 6, 2025
1 parent a947367 commit 9ad3311
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 67 deletions.
27 changes: 22 additions & 5 deletions tests/IotServiceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <condition_variable>
#include <fstream>
#include <mutex>
#include <thread>
#include <utility>

#include <aws/io/logging.h>
Expand Down Expand Up @@ -599,13 +600,19 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
subscriberCv.wait(lock, [&]() { return subscriberSubscribed; });
}

// there appears to be a race condition broker-side in IoT Core such that the interrupter is occasionally
// rejected rather than the existing connection. We try and work around this in two ways:
// (1) Wait a couple seconds to let things settle in terms of eventual consistency.
// (2) Have the interrupter make connection attempts in a loop until a successful interruption occurs.
std::this_thread::sleep_for(std::chrono::seconds(2));

// Disconnect the client by interrupting it with another client with the same ID
// which will cause the will to be sent
auto interruptConnection = mqttClient.NewConnection(envVars.inputHost.c_str(), 8883, socketOptions, tlsContext);
interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload);
std::mutex interruptMutex;
std::condition_variable interruptCv;
bool interruptConnected = false;
bool interruptConnectionAttemptComplete = false;
auto interruptOnConnectionCompleted =
[&](MqttConnection &, int errorCode, ReturnCode returnCode, bool sessionPresent)
{
Expand All @@ -614,7 +621,11 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
(void)sessionPresent;
{
std::lock_guard<std::mutex> lock(interruptMutex);
interruptConnected = true;
interruptConnectionAttemptComplete = true;
if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED)
{
interruptConnected = true;
}
}
interruptCv.notify_one();
};
Expand All @@ -628,10 +639,16 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
};
interruptConnection->OnConnectionCompleted = interruptOnConnectionCompleted;
interruptConnection->OnDisconnect = interruptOnDisconnect;
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);

bool continueConnecting = true;
while (continueConnecting)
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnected; });
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; });
continueConnecting = !interruptConnected;
}
}

// wait for message received callback - meaning the will was sent
Expand Down
87 changes: 26 additions & 61 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;

const int MESSAGE_NUMBER = 10;
std::atomic<int> client_messages(0);
bool client1_received = false;
bool client2_received = false;

std::vector<int> receivedMessages(MESSAGE_NUMBER);
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}
static const int MESSAGE_COUNT = 10;
std::mutex receivedMessagesLock;
std::condition_variable receivedMessagesSignal;
std::vector<int> receivedMessages;

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
allocator);
ASSERT_TRUE(subscribe_builder);

std::promise<void> client_received;

auto get_on_message_callback = [&](bool &received)
auto on_message_callback1 = [&](const PublishReceivedEventData &eventData)
{
return [&](const PublishReceivedEventData &eventData) -> int
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());

{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
received = true; // this line has changed

bool exchanged = false;
int desired = 11;
int tested = 10;
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
std::lock_guard<std::mutex> guard(receivedMessagesLock);
receivedMessages.push_back(message_int);

if (receivedMessages.size() == MESSAGE_COUNT)
{
client_received.set_value();
receivedMessagesSignal.notify_all();
}
}
return 0;
};
}
};
auto onMessage_client1 = get_on_message_callback(client1_received);
auto onMessage_client2 = get_on_message_callback(client2_received);
auto on_message_callback2 = on_message_callback1;

subscribe_builder->WithPublishReceivedCallback(onMessage_client1);
subscribe_builder->WithPublishReceivedCallback(on_message_callback1);

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder2);

subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);
subscribe_builder2->WithPublishReceivedCallback(on_message_callback2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
Expand Down Expand Up @@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
suback.get_future().wait();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
Expand All @@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
}

/* Wait for all packets to be received on both clients */
client_received.get_future().wait();

/* Unsubscribe from the topic from both clients*/
Vector<String> unsubList;
unsubList.push_back(TEST_TOPIC);
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2));

/* make sure all messages are received */
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */

/* makes sure both clients received at least one message */
ASSERT_TRUE(client1_received);
ASSERT_TRUE(client2_received);
std::unique_lock<std::mutex> receivedLock(receivedMessagesLock);
receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; });

std::sort(receivedMessages.begin(), receivedMessages.end());
/* make sure all messages are received with no duplicates*/
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
ASSERT_INT_EQUALS(i, receivedMessages[i]);
}
/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
Expand Down
2 changes: 1 addition & 1 deletion tests/MqttRequestResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ static TestContext s_CreateClient(
Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator);

Aws::Crt::Io::SocketOptions socketOptions;
socketOptions.SetConnectTimeoutMs(3000);
socketOptions.SetConnectTimeoutMs(10000);

Aws::Crt::Mqtt::MqttClient client;
context.protocolClient311 =
Expand Down

0 comments on commit 9ad3311

Please sign in to comment.