Skip to content

Commit

Permalink
Fix query queue for multiple clients (#159)
Browse files Browse the repository at this point in the history
* Small update to a comment.

Signed-off-by: Chris Lalancette <[email protected]>

* Don't return an error if we can't find a number in the sequence map.

I'm not really sure that this is correct, but do it for now.

Signed-off-by: Chris Lalancette <[email protected]>

* Fix query queue for multiple clients.

In particular, make sure that we track requests from
individual clients separately so that we don't mix them
up.  To do that, we store the client gid in the server
set along with the sequence_number and Query itself.

Signed-off-by: Chris Lalancette <[email protected]>

* Finish changes

Signed-off-by: Yadunund <[email protected]>

* Tweak api to store and retrieve query

Signed-off-by: Yadunund <[email protected]>

* Lint

Signed-off-by: Yadunund <[email protected]>

---------

Signed-off-by: Chris Lalancette <[email protected]>
Signed-off-by: Yadunund <[email protected]>
Co-authored-by: Chris Lalancette <[email protected]>
  • Loading branch information
Yadunund and clalancette authored Apr 18, 2024
1 parent 655a961 commit 12ebc2e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 27 deletions.
60 changes: 51 additions & 9 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <utility>

Expand Down Expand Up @@ -208,31 +209,72 @@ void rmw_service_data_t::add_new_query(std::unique_ptr<ZenohQuery> query)
notify();
}

static size_t hash_gid(const rmw_request_id_t & request_id)
{
std::stringstream hash_str;
hash_str << std::hex;
size_t i = 0;
for (; i < (RMW_GID_STORAGE_SIZE - 1); i++) {
hash_str << static_cast<int>(request_id.writer_guid[i]);
}
return std::hash<std::string>{}(hash_str.str());
}

///=============================================================================
bool rmw_service_data_t::add_to_query_map(
int64_t sequence_number, std::unique_ptr<ZenohQuery> query)
const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
if (sequence_to_query_map_.find(sequence_number) != sequence_to_query_map_.end()) {
return false;

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
SequenceToQuery stq;

sequence_to_query_map_.insert(std::make_pair(hash, std::move(stq)));

it = sequence_to_query_map_.find(hash);
} else {
// Client already in the map

if (it->second.find(request_id.sequence_number) != it->second.end()) {
return false;
}
}
sequence_to_query_map_.emplace(
std::pair(sequence_number, std::move(query)));

it->second.insert(std::make_pair(request_id.sequence_number, std::move(query)));

return true;
}

///=============================================================================
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(int64_t sequence_number)
std::unique_ptr<ZenohQuery> rmw_service_data_t::take_from_query_map(
const rmw_request_id_t & request_id)
{
size_t hash = hash_gid(request_id);

std::lock_guard<std::mutex> lock(sequence_to_query_map_mutex_);
auto query_it = sequence_to_query_map_.find(sequence_number);
if (query_it == sequence_to_query_map_.end()) {

std::unordered_map<size_t, SequenceToQuery>::iterator it = sequence_to_query_map_.find(hash);

if (it == sequence_to_query_map_.end()) {
return nullptr;
}

SequenceToQuery::iterator query_it = it->second.find(request_id.sequence_number);

if (query_it == it->second.end()) {
return nullptr;
}

std::unique_ptr<ZenohQuery> query = std::move(query_it->second);
sequence_to_query_map_.erase(query_it);
it->second.erase(query_it);

if (sequence_to_query_map_[hash].size() == 0) {
sequence_to_query_map_.erase(hash);
}

return query;
}
Expand Down
10 changes: 5 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -252,9 +251,9 @@ class rmw_service_data_t final

void add_new_query(std::unique_ptr<ZenohQuery> query);

bool add_to_query_map(int64_t sequence_number, std::unique_ptr<ZenohQuery> query);
bool add_to_query_map(const rmw_request_id_t & request_id, std::unique_ptr<ZenohQuery> query);

std::unique_ptr<ZenohQuery> take_from_query_map(int64_t sequence_number);
std::unique_ptr<ZenohQuery> take_from_query_map(const rmw_request_id_t & request_id);

DataCallbackManager data_callback_mgr;

Expand All @@ -265,8 +264,9 @@ class rmw_service_data_t final
std::deque<std::unique_ptr<ZenohQuery>> query_queue_;
mutable std::mutex query_queue_mutex_;

// Map to store the sequence_number -> query_id
std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>> sequence_to_query_map_;
// Map to store the sequence_number (as given by the client) -> ZenohQuery
using SequenceToQuery = std::unordered_map<int64_t, std::unique_ptr<ZenohQuery>>;
std::unordered_map<size_t, SequenceToQuery> sequence_to_query_map_;
std::mutex sequence_to_query_map_mutex_;

std::condition_variable * condition_{nullptr};
Expand Down
25 changes: 12 additions & 13 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2811,9 +2811,7 @@ rmw_take_request(
request_header->received_timestamp = now_ns.count();

// Add this query to the map, so that rmw_send_response can quickly look it up later
if (!service_data->add_to_query_map(
request_header->request_id.sequence_number, std::move(query)))
{
if (!service_data->add_to_query_map(request_header->request_id, std::move(query))) {
RMW_SET_ERROR_MSG("duplicate sequence number in the map");
return RMW_RET_ERROR;
}
Expand Down Expand Up @@ -2849,6 +2847,15 @@ rmw_send_response(

rmw_service_data_t * service_data = static_cast<rmw_service_data_t *>(service->data);

// Create the queryable payload
std::unique_ptr<ZenohQuery> query =
service_data->take_from_query_map(*request_header);
if (query == nullptr) {
// If there is no data associated with this request, the higher layers of
// ROS 2 seem to expect that we just silently return with no work.
return RMW_RET_OK;
}

rcutils_allocator_t * allocator = &(service_data->context->options.allocator);

size_t max_data_length = (
Expand All @@ -2860,7 +2867,7 @@ rmw_send_response(
max_data_length,
allocator->state));
if (!response_bytes) {
RMW_SET_ERROR_MSG("failed allocate response message bytes");
RMW_SET_ERROR_MSG("failed to allocate response message bytes");
return RMW_RET_ERROR;
}
auto free_response_bytes = rcpputils::make_scope_exit(
Expand All @@ -2883,14 +2890,6 @@ rmw_send_response(

size_t data_length = ser.get_serialized_data_length();

// Create the queryable payload
std::unique_ptr<ZenohQuery> query =
service_data->take_from_query_map(request_header->sequence_number);
if (query == nullptr) {
RMW_SET_ERROR_MSG("Unable to find taken request. Report this bug.");
return RMW_RET_ERROR;
}

const z_query_t loaned_query = query->get_query();
z_query_reply_options_t options = z_query_reply_options_default();

Expand Down Expand Up @@ -3417,7 +3416,7 @@ rmw_get_node_names(
}

//==============================================================================
/// Return the name, namespae, and enclave name of all nodes in the ROS graph.
/// Return the name, namespace, and enclave name of all nodes in the ROS graph.
rmw_ret_t
rmw_get_node_names_with_enclaves(
const rmw_node_t * node,
Expand Down

0 comments on commit 12ebc2e

Please sign in to comment.