Skip to content

Commit

Permalink
add unsubscribe from topic
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred2g committed Dec 20, 2023
1 parent 652ba1d commit c8ccf90
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions tests/Mqtt5ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
std::promise<void> client1_received;
auto onMessage_client1 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
printf("========= packet 1 received %s\n", topic.c_str());
fprintf(stderr, "========= packet 1 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
Expand All @@ -1991,7 +1991,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
if (client1_messages == 5)
{
fprintf(stderr, "client 1 future set ======\n");
//client1_received.set_value();
client1_received.set_value();
}
}
return 0;
Expand All @@ -2009,7 +2009,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
std::promise<void> client2_received;
auto onMessage_client2 = [&](const PublishReceivedEventData &eventData) -> int {
String topic = eventData.publishPacket->getTopic();
printf("========= packet 2 received %s\n", topic.c_str());
fprintf(stderr, "========= packet 2 received %s\n", topic.c_str());
if (topic == TEST_TOPIC)
{
ByteCursor payload = eventData.publishPacket->getPayload();
Expand All @@ -2021,7 +2021,7 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
if (client2_messages == 5)
{
fprintf(stderr, " client 2 future set=======\n");
//client2_received.set_value();
client2_received.set_value();
}
}
return 0;
Expand Down Expand Up @@ -2120,6 +2120,19 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
client2_received.get_future().wait();
fprintf(stderr, "all packets received =========\n");

Vector<String> unsubList;
unsubList.clear();
std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client1 = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client1->WithTopicFilters(unsubList);
ASSERT_FALSE(mqtt5Client->Unsubscribe(unsubscribe_client1));

std::shared_ptr<Mqtt5::UnsubscribePacket> unsubscribe_client2 = std::make_shared<Mqtt5::UnsubscribePacket>(allocator);
unsubscribe_client2->WithTopicFilters(unsubList);
ASSERT_FALSE(mqtt5Client->Unsubscribe(unsubscribe_client2));

client1_received = std::promise<void>();
client2_received = std::promise<void>();

/* makes sure messages are distrubuted evenly between the two clients*/
ASSERT_INT_EQUALS(5, client1_messages);
ASSERT_INT_EQUALS(5, client2_messages);
Expand All @@ -2137,9 +2150,9 @@ static int s_TestMqtt5SharedSubscriptionTest(Aws::Crt::Allocator *allocator, voi
ASSERT_TRUE(mqtt5Publisher->Stop());

/* Wait for all clents to disconnect */
stoppedPromise.get_future().get();
stoppedPromise2.get_future().get();
stoppedPromise3.get_future().get();
//stoppedPromise.get_future().get();
//stoppedPromise2.get_future().get();
//stoppedPromise3.get_future().get();
fprintf(stderr, "all connections stopped =========\n");

delete subscribe_builder;
Expand Down

0 comments on commit c8ccf90

Please sign in to comment.