Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky test pass1 #693

Merged
merged 6 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions tests/IotServiceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <condition_variable>
#include <fstream>
#include <mutex>
#include <thread>
#include <utility>

#include <aws/io/logging.h>
Expand Down Expand Up @@ -599,13 +600,19 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
subscriberCv.wait(lock, [&]() { return subscriberSubscribed; });
}

// there appears to be a race condition broker-side in IoT Core such that the interrupter is occasionally
// rejected rather than the existing connection. We try and work around this in two ways:
// (1) Wait a couple seconds to let things settle in terms of eventual consistency.
// (2) Have the interrupter make connection attempts in a loop until a successful interruption occurs.
std::this_thread::sleep_for(std::chrono::seconds(2));

// Disconnect the client by interrupting it with another client with the same ID
// which will cause the will to be sent
auto interruptConnection = mqttClient.NewConnection(envVars.inputHost.c_str(), 8883, socketOptions, tlsContext);
interruptConnection->SetWill(topicStr.c_str(), QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload);
std::mutex interruptMutex;
std::condition_variable interruptCv;
bool interruptConnected = false;
bool interruptConnectionAttemptComplete = false;
auto interruptOnConnectionCompleted =
[&](MqttConnection &, int errorCode, ReturnCode returnCode, bool sessionPresent)
{
Expand All @@ -614,7 +621,11 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
(void)sessionPresent;
{
std::lock_guard<std::mutex> lock(interruptMutex);
interruptConnected = true;
interruptConnectionAttemptComplete = true;
if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED)
{
interruptConnected = true;
}
}
interruptCv.notify_one();
};
Expand All @@ -628,10 +639,16 @@ static int s_TestIotWillTest(Aws::Crt::Allocator *allocator, void *ctx)
};
interruptConnection->OnConnectionCompleted = interruptOnConnectionCompleted;
interruptConnection->OnDisconnect = interruptOnDisconnect;
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);

bool continueConnecting = true;
while (continueConnecting)
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnected; });
interruptConnection->Connect((Aws::Crt::String("test-01-") + uuidStr).c_str(), true);
{
std::unique_lock<std::mutex> lock(interruptMutex);
interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really an issue for this PR, while I just wonder if we have any mechanism that help to improve the infinitely waiting on the mutex/future? We used futures and mutex in plenty of places in the unit tests, and did not have a proper way to timeout for the wait so far.

Copy link
Contributor Author

@bretambrose bretambrose Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need to update the condition variable wait to be time based and then check the result (rather than assume it's set) and fail the test.

So here, it would change to something like:

...
interruptCv.wait(lock, [&]() { return interruptConnectionAttemptComplete; }, std::chrono::seconds(60));
ASSERT_TRUE(interruptConnectionAttemptComplete);

continueConnecting = !interruptConnected;
}
}

// wait for message received callback - meaning the will was sent
Expand Down
87 changes: 26 additions & 61 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1974,16 +1974,10 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
const String TEST_TOPIC = "test/MQTT5_Binding_CPP_" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_" + currentUUID;

const int MESSAGE_NUMBER = 10;
std::atomic<int> client_messages(0);
bool client1_received = false;
bool client2_received = false;

std::vector<int> receivedMessages(MESSAGE_NUMBER);
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
receivedMessages.push_back(0);
}
static const int MESSAGE_COUNT = 10;
std::mutex receivedMessagesLock;
std::condition_variable receivedMessagesSignal;
std::vector<int> receivedMessages;

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -1993,39 +1987,29 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
allocator);
ASSERT_TRUE(subscribe_builder);

std::promise<void> client_received;

auto get_on_message_callback = [&](bool &received)
auto on_message_callback1 = [&](const PublishReceivedEventData &eventData)
{
return [&](const PublishReceivedEventData &eventData) -> int
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());

{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
++receivedMessages[message_int];
received = true; // this line has changed

bool exchanged = false;
int desired = 11;
int tested = 10;
client_messages++;
exchanged = client_messages.compare_exchange_strong(tested, desired);
if (exchanged == true)
std::lock_guard<std::mutex> guard(receivedMessagesLock);
receivedMessages.push_back(message_int);

if (receivedMessages.size() == MESSAGE_COUNT)
{
client_received.set_value();
receivedMessagesSignal.notify_all();
}
}
return 0;
};
}
};
auto onMessage_client1 = get_on_message_callback(client1_received);
auto onMessage_client2 = get_on_message_callback(client2_received);
auto on_message_callback2 = on_message_callback1;

subscribe_builder->WithPublishReceivedCallback(onMessage_client1);
subscribe_builder->WithPublishReceivedCallback(on_message_callback1);

Aws::Iot::Mqtt5ClientBuilder *subscribe_builder2 =
Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
Expand All @@ -2034,8 +2018,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(subscribe_builder2);

subscribe_builder2->WithPublishReceivedCallback(onMessage_client2);
subscribe_builder2->WithPublishReceivedCallback(on_message_callback2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
Expand Down Expand Up @@ -2113,7 +2096,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
suback.get_future().wait();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
Expand All @@ -2122,32 +2105,14 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
}

/* Wait for all packets to be received on both clients */
client_received.get_future().wait();

/* Unsubscribe from the topic from both clients*/
Vector<String> unsubList;
unsubList.push_back(TEST_TOPIC);
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 =
std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_TRUE(mqtt5Client2->Unsubscribe(unsubscribe_client2));

/* make sure all messages are received */
ASSERT_INT_EQUALS(MESSAGE_NUMBER + 1, client_messages); /* We are adding one at the end, so 10 messages received */

/* makes sure both clients received at least one message */
ASSERT_TRUE(client1_received);
ASSERT_TRUE(client2_received);
std::unique_lock<std::mutex> receivedLock(receivedMessagesLock);
receivedMessagesSignal.wait(receivedLock, [&]() { return receivedMessages.size() == MESSAGE_COUNT; });

std::sort(receivedMessages.begin(), receivedMessages.end());
/* make sure all messages are received with no duplicates*/
for (int i = 0; i < MESSAGE_NUMBER; i++)
for (int i = 0; i < MESSAGE_COUNT; i++)
{
ASSERT_TRUE(receivedMessages[i] > 0);
ASSERT_INT_EQUALS(i, receivedMessages[i]);
}
/* Stop all clients */
ASSERT_TRUE(mqtt5Client->Stop());
Expand Down
2 changes: 1 addition & 1 deletion tests/MqttRequestResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ static TestContext s_CreateClient(
Aws::Crt::Io::TlsContext tlsContext(tlsCtxOptions, Aws::Crt::Io::TlsMode::CLIENT, allocator);

Aws::Crt::Io::SocketOptions socketOptions;
socketOptions.SetConnectTimeoutMs(3000);
socketOptions.SetConnectTimeoutMs(10000);

Aws::Crt::Mqtt::MqttClient client;
context.protocolClient311 =
Expand Down