Skip to content

Commit 0fd4b7c

Browse files
committed
Add overloaded request method to node for non-blocking calls using abstract types.
- Add overloaded method to Node. - Update specialisation of ReqHandler to handle callbacks using google::protobuf::Message. Signed-off-by: Rhys Mainwaring <[email protected]>
1 parent ec25fe9 commit 0fd4b7c

File tree

3 files changed

+180
-2
lines changed

3 files changed

+180
-2
lines changed

include/gz/transport/Node.hh

+21
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,27 @@ namespace gz
561561
std::function<void(const ReplyT &_reply,
562562
const bool _result)> &_callback);
563563

564+
/// \brief Request a new service using a non-blocking call.
565+
/// In this version the callback is a lambda function and the
566+
/// request and response may be abstract types.
567+
/// \param[in] _topic Service name requested.
568+
/// \param[in] _request Protobuf message containing the request's
569+
/// parameters.
570+
/// \param[in] _callback Lambda function executed when the response
571+
/// arrives. The callback has the following parameters:
572+
/// * _reply Protobuf message containing the response.
573+
/// * _result Result of the service call. If false, there was
574+
/// a problem executing your request.
575+
/// \param[in] _repType Message type used in the response.
576+
/// \return true when the service call was succesfully requested.
577+
public: template<typename RequestT, typename ReplyT>
578+
bool Request(
579+
const std::string &_topic,
580+
const RequestT &_request,
581+
std::function<void(const ReplyT &_reply,
582+
const bool _result)> &_callback,
583+
const char *_repType);
584+
564585
/// \brief Request a new service without input parameter using a
565586
/// non-blocking call.
566587
/// In this version the callback is a lambda function.

include/gz/transport/ReqHandler.hh

+66-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include <memory>
3333
#include <string>
3434

35+
#include <gz/msgs/Factory.hh>
36+
3537
#include "gz/transport/config.hh"
3638
#include "gz/transport/Export.hh"
3739
#include "gz/transport/TransportTypes.hh"
@@ -318,6 +320,43 @@ namespace gz
318320
{
319321
}
320322

323+
/// \brief Create a specific protobuf message given its serialized data.
324+
/// \param[in] _data The serialized data.
325+
/// \return Pointer to the specific protobuf message.
326+
public: std::shared_ptr<google::protobuf::Message>
327+
CreateMsg(const std::string &_data) const
328+
{
329+
// Instantiate a specific protobuf message
330+
std::shared_ptr<google::protobuf::Message> msgPtr =
331+
gz::msgs::Factory::New(this->RepTypeName());
332+
if (!msgPtr)
333+
{
334+
std::cerr << "Unable to create response of type["
335+
<< this->RepTypeName() << "].\n";
336+
return nullptr;
337+
}
338+
339+
// Create the message using some serialized data
340+
if (!msgPtr->ParseFromString(_data))
341+
{
342+
std::cerr << "ReqHandler::CreateMsg() error: ParseFromString failed"
343+
<< std::endl;
344+
}
345+
346+
return msgPtr;
347+
}
348+
349+
/// \brief Set the callback for this handler.
350+
/// \param[in] _cb The callback with the following parameters:
351+
/// * _rep Protobuf message containing the service response.
352+
/// * _result True when the service request was successful or
353+
/// false otherwise.
354+
public: void SetCallback(const std::function <void(
355+
const google::protobuf::Message &_rep, const bool _result)> &_cb)
356+
{
357+
this->cb = _cb;
358+
}
359+
321360
/// \brief Set the REQ protobuf message for this handler.
322361
/// \param[in] _reqMsg Protofub message containing the input parameters of
323362
/// of the service request.
@@ -371,8 +410,25 @@ namespace gz
371410
// Documentation inherited.
372411
public: void NotifyResult(const std::string &_rep, const bool _result)
373412
{
374-
this->rep = _rep;
375-
this->result = _result;
413+
// Execute the callback (if existing).
414+
if (this->cb)
415+
{
416+
// Instantiate the specific protobuf message associated to this topic.
417+
auto msg = this->CreateMsg(_rep);
418+
if (!msg)
419+
{
420+
/// \todo(srmainwaring) verify this is the correct fail behaviour
421+
this->result = false;
422+
this->repAvailable = false;
423+
this->condition.notify_one();
424+
}
425+
this->cb(*msg, _result);
426+
}
427+
else
428+
{
429+
this->rep = _rep;
430+
this->result = _result;
431+
}
376432

377433
this->repAvailable = true;
378434
this->condition.notify_one();
@@ -409,6 +465,14 @@ namespace gz
409465

410466
/// \brief Protobuf message containing the response.
411467
private: google::protobuf::Message *repMsg = nullptr;
468+
469+
/// \brief Callback to the function registered for this handler with the
470+
/// following parameters:
471+
/// \param[in] _rep Protobuf message containing the service response.
472+
/// \param[in] _result True when the service request was successful or
473+
/// false otherwise.
474+
private: std::function<void(
475+
const google::protobuf::Message &_rep, const bool _result)> cb;
412476
};
413477
}
414478
}

include/gz/transport/detail/Node.hh

+93
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,99 @@ namespace gz
476476
return true;
477477
}
478478

479+
//////////////////////////////////////////////////
480+
template<typename RequestT, typename ReplyT>
481+
bool Node::Request(
482+
const std::string &_topic,
483+
const RequestT &_request,
484+
std::function<void(const ReplyT &_reply, const bool _result)> &_cb,
485+
const char *_repType)
486+
{
487+
auto rep = gz::msgs::Factory::New(_repType);
488+
if (!rep)
489+
{
490+
std::cerr << "Unable to create response of type["
491+
<< _repType << "].\n";
492+
return false;
493+
}
494+
495+
// Topic remapping.
496+
std::string topic = _topic;
497+
this->Options().TopicRemap(_topic, topic);
498+
499+
std::string fullyQualifiedTopic;
500+
if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
501+
this->Options().NameSpace(), topic, fullyQualifiedTopic))
502+
{
503+
std::cerr << "Service [" << topic << "] is not valid." << std::endl;
504+
return false;
505+
}
506+
507+
bool localResponserFound;
508+
IRepHandlerPtr repHandler;
509+
{
510+
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
511+
localResponserFound = this->Shared()->repliers.FirstHandler(
512+
fullyQualifiedTopic,
513+
_request.GetTypeName(),
514+
rep->GetTypeName(),
515+
repHandler);
516+
}
517+
518+
// If the responser is within my process.
519+
if (localResponserFound)
520+
{
521+
// There is a responser in my process, let's use it.
522+
bool result = repHandler->RunLocalCallback(_request, *rep);
523+
524+
_cb(*rep, result);
525+
return true;
526+
}
527+
528+
// Create a new request handler.
529+
std::shared_ptr<ReqHandler<RequestT, ReplyT>> reqHandlerPtr(
530+
new ReqHandler<RequestT, ReplyT>(this->NodeUuid()));
531+
532+
// Insert the request's parameters.
533+
reqHandlerPtr->SetMessage(&_request);
534+
535+
// Set the response message (to set the type info).
536+
reqHandlerPtr->SetResponse(rep.get());
537+
538+
// Insert the callback into the handler.
539+
reqHandlerPtr->SetCallback(_cb);
540+
541+
{
542+
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
543+
544+
// Store the request handler.
545+
this->Shared()->requests.AddHandler(
546+
fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
547+
548+
// If the responser's address is known, make the request.
549+
SrvAddresses_M addresses;
550+
if (this->Shared()->TopicPublishers(fullyQualifiedTopic, addresses))
551+
{
552+
this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
553+
_request.GetTypeName(), rep->GetTypeName());
554+
}
555+
else
556+
{
557+
// Discover the service responser.
558+
if (!this->Shared()->DiscoverService(fullyQualifiedTopic))
559+
{
560+
std::cerr << "Node::Request(): Error discovering service ["
561+
<< topic
562+
<< "]. Did you forget to start the discovery service?"
563+
<< std::endl;
564+
return false;
565+
}
566+
}
567+
}
568+
569+
return true;
570+
}
571+
479572
//////////////////////////////////////////////////
480573
template<typename ReplyT>
481574
bool Node::Request(

0 commit comments

Comments
 (0)