Skip to content

Commit

Permalink
Test Shared Subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Dec 19, 2023
1 parent 0bd6389 commit 7357838
Showing 1 changed file with 170 additions and 0 deletions.
170 changes: 170 additions & 0 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,9 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
/* Subscribe to test topic */
Mqtt5::Subscription subscription(TEST_TOPIC, Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);

subscribe->WithSubscription(std::move(subscription));

std::promise<void> subscribed;
ASSERT_TRUE(subscriber->Subscribe(
subscribe, [&subscribed](int, std::shared_ptr<Mqtt5::SubAckPacket>) { subscribed.set_value(); }));
Expand All @@ -1936,6 +1938,174 @@ static int s_TestMqtt5WillTest(Aws::Crt::Allocator *allocator, void *)
}
AWS_TEST_CASE(Mqtt5WillTest, s_TestMqtt5WillTest)

/*
* Shared Subscription test
*/
static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, void *)
{
Mqtt5TestEnvVars mqtt5TestVars(allocator, MQTT5CONNECT_IOT_CORE);
if (!mqtt5TestVars)
{
printf("Environment Variables are not set for the test, skip the test");
return AWS_OP_SKIP;
}

ApiHandle apiHandle(allocator);

String currentUUID = Aws::Crt::UUID().ToString();
const String TEST_TOPIC = "test/MQTTT5_Binding_CPP_ss" + currentUUID;
const String sharedTopicFilter = "$share/crttest/test/MQTT5_Binding_CPP_ss" + currentUUID;
const int MESSAGE_NUMBER = 10;
int client1_messages = 0;
int client2_messages = 0;


Aws::Iot::Mqtt5ClientBuilder *builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(builder);

Aws::Iot::Mqtt5ClientBuilder *builder2 = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(builder2);

Aws::Iot::Mqtt5ClientBuilder *publish_builder = Aws::Iot::Mqtt5ClientBuilder::NewMqtt5ClientBuilderWithMtlsFromPath(
mqtt5TestVars.m_hostname_string,
mqtt5TestVars.m_certificate_path_string.c_str(),
mqtt5TestVars.m_private_key_path_string.c_str(),
allocator);
ASSERT_TRUE(publish_builder);

std::promise<void> client1_received;
auto onMessage_client1 = [&](const PublishReceivedEventData &eventData)->int {
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
//++receivedMessages[message_int];
client1_messages++;
if (client1_messages == 5)
{
client1_received.set_value();
}
}
return 0;
};
builder->WithPublishReceivedCallback(onMessage_client1);

std::promise<void> client2_received;
auto onMessage_client2 = [&](const PublishReceivedEventData &eventData)->int {
String topic = eventData.publishPacket->getTopic();
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
String message_string = String((const char *)payload.ptr, payload.len);
int message_int = atoi(message_string.c_str());
ASSERT_TRUE(message_int < MESSAGE_NUMBER);
//++receivedMessages[message_int];
client2_messages++;
if (client2_messages == 5)
{
client2_received.set_value();
}
}
return 0;
};
builder2->WithPublishReceivedCallback(onMessage_client1);


std::promise<bool> connectionPromise;
std::promise<void> stoppedPromise;

std::promise<bool> connectionPromise2;
std::promise<void> stoppedPromise2;

std::promise<bool> connectionPromise3;
std::promise<void> stoppedPromise3;

/* first subscriber */
s_setupConnectionLifeCycle(builder, connectionPromise, stoppedPromise, "Subscriber 1");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client = builder->Build();

ASSERT_TRUE(mqtt5Client);
ASSERT_TRUE(mqtt5Client->Start());


/* second subscriber */
s_setupConnectionLifeCycle(builder2, connectionPromise2, stoppedPromise2, "Subscriber 2");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Client2 = builder->Build();

ASSERT_TRUE(mqtt5Client2);
ASSERT_TRUE(mqtt5Client2->Start());

/* publisher */
s_setupConnectionLifeCycle(publish_builder, connectionPromise3, stoppedPromise3, "Publisher");
std::shared_ptr<Aws::Crt::Mqtt5::Mqtt5Client> mqtt5Publisher = builder->Build();

ASSERT_TRUE(mqtt5Publisher);
ASSERT_TRUE(mqtt5Publisher->Start());

/* Wait for all clents to connect */
ASSERT_TRUE(connectionPromise.get_future().get());
ASSERT_TRUE(connectionPromise2.get_future().get());
ASSERT_TRUE(connectionPromise3.get_future().get());

/* Subscribe to test topic */
Mqtt5::Subscription subscription(sharedTopicFilter, Mqtt5::QOS::AWS_MQTT5_QOS_AT_MOST_ONCE, allocator);
std::shared_ptr<Mqtt5::SubscribePacket> subscribe = std::make_shared<Mqtt5::SubscribePacket>(allocator);
subscribe->WithSubscription(std::move(subscription));

std::promise<void> suback;
auto onSubAck = [&](int ioErr, std::shared_ptr<SubAckPacket> packet) {
suback.set_value();
};

/*subscribe both clients */
ASSERT_TRUE(mqtt5Client->Subscribe(subscribe, onSubAck));
suback.get_future().wait();
suback = std::promise<void>();
ASSERT_TRUE(mqtt5Client2->Subscribe(subscribe, onSubAck));
suback.get_future().wait();

/* Publish message 10 to test topic */
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
std::string payload = std::to_string(i);
std::shared_ptr<Mqtt5::PublishPacket> publish = std::make_shared<Mqtt5::PublishPacket>(
TEST_TOPIC, ByteCursorFromCString(payload.c_str()), Mqtt5::QOS::AWS_MQTT5_QOS_AT_LEAST_ONCE, allocator);
ASSERT_TRUE(mqtt5Publisher->Publish(publish));
}


client1_received.get_future().wait();
client2_received.get_future().wait();

ASSERT_INT_EQUALS(5, client1_messages);
ASSERT_INT_EQUALS(5, client2_messages);

/* Stop all clients */
mqtt5Client->Stop();
mqtt5Client2->Stop();
mqtt5Publisher->Stop();

/* Wait for all clents to disconnect */
stoppedPromise.get_future().get();
stoppedPromise2.get_future().get();
stoppedPromise3.get_future().get();

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(Mqtt5SharedSubscriptionTest, s_TestMqtt5SharedSubscriptionTest)

//////////////////////////////////////////////////////////
// Error Operation Tests [ErrorOp-UC]
//////////////////////////////////////////////////////////
Expand Down

0 comments on commit 7357838

Please sign in to comment.