Skip to content

Commit ec25fe9

Browse files
iche033mjcarroll
andauthored
Prevent invoking callbacks after a node unsubscribes to a topic (#381)
Signed-off-by: Ian Chen <[email protected]> Co-authored-by: Michael Carroll <[email protected]>
1 parent 725841d commit ec25fe9

File tree

6 files changed

+139
-5
lines changed

6 files changed

+139
-5
lines changed

src/Node.cc

+51-1
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ bool Node::Publisher::Publish(const ProtoMsg &_msg)
400400
{
401401
std::unique_lock<std::mutex> queueLock(
402402
this->dataPtr->shared->dataPtr->pubThreadMutex);
403-
this->dataPtr->shared->dataPtr->pubQueue.push(std::move(pubMsgDetails));
403+
this->dataPtr->shared->dataPtr->pubQueue.push_back(
404+
std::move(pubMsgDetails));
404405
}
405406

406407
this->dataPtr->shared->dataPtr->signalNewPub.notify_one();
@@ -601,6 +602,15 @@ bool Node::Unsubscribe(const std::string &_topic)
601602
return false;
602603
}
603604

605+
// Remove handlers from shared pubQueue to avoid invoking callbacks after
606+
// unsuscribing to the topic
607+
if (!this->dataPtr->RemoveHandlersFromPubQueue(topic))
608+
{
609+
std::cerr << "Error removing subscription handlers from publish queue "
610+
<< "when unsubscribing from Topic [" << _topic << "]"
611+
<< std::endl;
612+
}
613+
604614
std::lock_guard<std::recursive_mutex> lk(this->dataPtr->shared->mutex);
605615

606616
// Remove the subscribers for the given topic that belong to this node.
@@ -1030,3 +1040,43 @@ bool Node::SubscribeHelper(const std::string &_fullyQualifiedTopic)
10301040
{
10311041
return this->dataPtr->SubscribeHelper(_fullyQualifiedTopic);
10321042
}
1043+
1044+
//////////////////////////////////////////////////
1045+
bool NodePrivate::RemoveHandlersFromPubQueue(const std::string &_topic)
1046+
{
1047+
// Remove from pubQueue
1048+
std::unique_lock<std::mutex> queueLock(
1049+
this->shared->dataPtr->pubThreadMutex);
1050+
for (auto &msgDetails : this->shared->dataPtr->pubQueue)
1051+
{
1052+
// check if there is a pub queue with message details that has topic
1053+
// which the node unsubscribes to
1054+
if (msgDetails->info.Topic() != _topic)
1055+
continue;
1056+
1057+
// remove local handler if it is a handler for this node
1058+
for (auto handlerIt = msgDetails->localHandlers.begin();
1059+
handlerIt != msgDetails->localHandlers.end();)
1060+
{
1061+
if ((*handlerIt)->NodeUuid() == this->nUuid)
1062+
{
1063+
msgDetails->localHandlers.erase(handlerIt);
1064+
}
1065+
else
1066+
++handlerIt;
1067+
}
1068+
1069+
// remove raw handler if it is a handler for this node
1070+
for (auto handlerIt = msgDetails->rawHandlers.begin();
1071+
handlerIt != msgDetails->rawHandlers.end();)
1072+
{
1073+
if ((*handlerIt)->NodeUuid() == this->nUuid)
1074+
{
1075+
msgDetails->rawHandlers.erase(handlerIt);
1076+
}
1077+
else
1078+
++handlerIt;
1079+
}
1080+
}
1081+
return true;
1082+
}

src/NodePrivate.hh

+6
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ namespace gz
5050
/// \sa TopicUtils::FullyQualifiedName
5151
public: bool SubscribeHelper(const std::string &_fullyQualifiedTopic);
5252

53+
/// \brief Helper function to remove handlers from the shared publish
54+
/// queue. This is called when the node unsubscribes to a topic
55+
/// \param[in] _topic Topic that the node unsubcribed to.
56+
/// \return True on success.
57+
public: bool RemoveHandlersFromPubQueue(const std::string &_topic);
58+
5359
/// \brief The list of topics subscribed by this node.
5460
public: std::unordered_set<std::string> topicsSubscribed;
5561

src/NodeShared.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1888,7 +1888,7 @@ void NodeSharedPrivate::PublishThread()
18881888

18891889
// Get the message
18901890
msgDetails = std::move(this->pubQueue.front());
1891-
this->pubQueue.pop();
1891+
this->pubQueue.pop_front();
18921892
}
18931893

18941894
// Send the message to all the local handlers.

src/NodeSharedPrivate.hh

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
#include <zmq.hpp>
2222

2323
#include <atomic>
24+
#include <list>
2425
#include <map>
2526
#include <memory>
26-
#include <queue>
2727
#include <string>
2828
#include <vector>
2929

@@ -175,9 +175,9 @@ namespace gz
175175
/// \brief Mutex to protect the pubThread and pubQueue.
176176
public: std::mutex pubThreadMutex;
177177

178-
/// \brief Queue onto which new messages are pushed. The pubThread
178+
/// \brief List onto which new messages are pushed. The pubThread
179179
/// will pop off the messages and send them to local subscribers.
180-
public: std::queue<std::unique_ptr<PublishMsgDetails>> pubQueue;
180+
public: std::list<std::unique_ptr<PublishMsgDetails>> pubQueue;
181181

182182
/// \brief used to signal when new work is available
183183
public: std::condition_variable signalNewPub;

test/integration/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ set(TEST_TYPE "INTEGRATION")
33
set(tests
44
authPubSub.cc
55
scopedTopic.cc
6+
callback_scope_TEST.cc
67
statistics.cc
78
twoProcsPubSub.cc
89
twoProcsSrvCall.cc
+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (C) 2023 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
#include <gz/msgs/int32.pb.h>
18+
19+
#include <gtest/gtest.h>
20+
21+
#include <gz/transport/Node.hh>
22+
23+
TEST(CallbackScope, CleanupCorrectly)
24+
{
25+
// test cleanup for serialized msgs
26+
gz::transport::Node node;
27+
auto publisher = node.Advertise<gz::msgs::Int32>("/my_topic");
28+
gz::msgs::Int32 msg;
29+
{
30+
auto msg2 = std::make_unique<gz::msgs::Int32>();
31+
32+
std::function<void(const gz::msgs::Int32&)> callback =
33+
[&msg2](const gz::msgs::Int32 &) {
34+
35+
if (nullptr == msg2)
36+
{
37+
FAIL();
38+
}
39+
return;
40+
};
41+
42+
node.Subscribe("/my_topic", callback);
43+
publisher.Publish(msg);
44+
publisher.Publish(msg);
45+
node.Unsubscribe("/my_topic");
46+
47+
// Clear msg2
48+
msg2.reset();
49+
}
50+
51+
// test cleanup for raw msgs
52+
gz::transport::Node nodeRaw;
53+
auto publisherRaw = nodeRaw.Advertise<gz::msgs::Int32>("/my_topic_raw");
54+
gz::msgs::Int32 msgRaw;
55+
{
56+
auto msg2Raw = std::make_unique<gz::msgs::Int32>();
57+
std::function<void(const char *_msg, const size_t,
58+
const gz::transport::MessageInfo &)> rawCallback =
59+
[&msg2Raw](const char *, const size_t,
60+
const gz::transport::MessageInfo &) {
61+
62+
if (nullptr == msg2Raw)
63+
{
64+
FAIL();
65+
}
66+
return;
67+
};
68+
69+
nodeRaw.SubscribeRaw("/my_topic_raw", rawCallback);
70+
// use Publish intead of PublishRaw so the msgs end up in the pub queue
71+
publisherRaw.Publish(msgRaw);
72+
publisherRaw.Publish(msgRaw);
73+
nodeRaw.Unsubscribe("/my_topic_raw");
74+
75+
msg2Raw.reset();
76+
}
77+
}

0 commit comments

Comments
 (0)