Skip to content

Commit 73ef858

Browse files
committed
Add overloaded request method to node for non-blocking calls using abstract types.
- Add overloaded method to Node. - Update specialisation of ReqHandler to handle callbacks using google::protobuf::Message. Signed-off-by: Rhys Mainwaring <[email protected]>
1 parent 685ad65 commit 73ef858

File tree

3 files changed

+180
-2
lines changed

3 files changed

+180
-2
lines changed

include/gz/transport/Node.hh

+21
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,27 @@ namespace gz
561561
std::function<void(const ReplyT &_reply,
562562
const bool _result)> &_callback);
563563

564+
/// \brief Request a new service using a non-blocking call.
565+
/// In this version the callback is a lambda function and the
566+
/// request and response may be abstract types.
567+
/// \param[in] _topic Service name requested.
568+
/// \param[in] _request Protobuf message containing the request's
569+
/// parameters.
570+
/// \param[in] _callback Lambda function executed when the response
571+
/// arrives. The callback has the following parameters:
572+
/// * _reply Protobuf message containing the response.
573+
/// * _result Result of the service call. If false, there was
574+
/// a problem executing your request.
575+
/// \param[in] _repType Message type used in the response.
576+
/// \return true when the service call was succesfully requested.
577+
public: template<typename RequestT, typename ReplyT>
578+
bool Request(
579+
const std::string &_topic,
580+
const RequestT &_request,
581+
std::function<void(const ReplyT &_reply,
582+
const bool _result)> &_callback,
583+
const char *_repType);
584+
564585
/// \brief Request a new service without input parameter using a
565586
/// non-blocking call.
566587
/// In this version the callback is a lambda function.

include/gz/transport/ReqHandler.hh

+66-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include <memory>
3333
#include <string>
3434

35+
#include <gz/msgs/Factory.hh>
36+
3537
#include "gz/transport/config.hh"
3638
#include "gz/transport/Export.hh"
3739
#include "gz/transport/TransportTypes.hh"
@@ -318,6 +320,43 @@ namespace gz
318320
{
319321
}
320322

323+
/// \brief Create a specific protobuf message given its serialized data.
324+
/// \param[in] _data The serialized data.
325+
/// \return Pointer to the specific protobuf message.
326+
public: std::shared_ptr<google::protobuf::Message>
327+
CreateMsg(const std::string &_data) const
328+
{
329+
// Instantiate a specific protobuf message
330+
std::shared_ptr<google::protobuf::Message> msgPtr =
331+
gz::msgs::Factory::New(this->RepTypeName());
332+
if (!msgPtr)
333+
{
334+
std::cerr << "Unable to create response of type["
335+
<< this->RepTypeName() << "].\n";
336+
return nullptr;
337+
}
338+
339+
// Create the message using some serialized data
340+
if (!msgPtr->ParseFromString(_data))
341+
{
342+
std::cerr << "ReqHandler::CreateMsg() error: ParseFromString failed"
343+
<< std::endl;
344+
}
345+
346+
return msgPtr;
347+
}
348+
349+
/// \brief Set the callback for this handler.
350+
/// \param[in] _cb The callback with the following parameters:
351+
/// * _rep Protobuf message containing the service response.
352+
/// * _result True when the service request was successful or
353+
/// false otherwise.
354+
public: void SetCallback(const std::function <void(
355+
const google::protobuf::Message &_rep, const bool _result)> &_cb)
356+
{
357+
this->cb = _cb;
358+
}
359+
321360
/// \brief Set the REQ protobuf message for this handler.
322361
/// \param[in] _reqMsg Protofub message containing the input parameters of
323362
/// of the service request.
@@ -371,8 +410,25 @@ namespace gz
371410
// Documentation inherited.
372411
public: void NotifyResult(const std::string &_rep, const bool _result)
373412
{
374-
this->rep = _rep;
375-
this->result = _result;
413+
// Execute the callback (if existing).
414+
if (this->cb)
415+
{
416+
// Instantiate the specific protobuf message associated to this topic.
417+
auto msg = this->CreateMsg(_rep);
418+
if (!msg)
419+
{
420+
/// \todo(srmainwaring) verify this is the correct fail behaviour
421+
this->result = false;
422+
this->repAvailable = false;
423+
this->condition.notify_one();
424+
}
425+
this->cb(*msg, _result);
426+
}
427+
else
428+
{
429+
this->rep = _rep;
430+
this->result = _result;
431+
}
376432

377433
this->repAvailable = true;
378434
this->condition.notify_one();
@@ -409,6 +465,14 @@ namespace gz
409465

410466
/// \brief Protobuf message containing the response.
411467
private: google::protobuf::Message *repMsg = nullptr;
468+
469+
/// \brief Callback to the function registered for this handler with the
470+
/// following parameters:
471+
/// \param[in] _rep Protobuf message containing the service response.
472+
/// \param[in] _result True when the service request was successful or
473+
/// false otherwise.
474+
private: std::function<void(
475+
const google::protobuf::Message &_rep, const bool _result)> cb;
412476
};
413477
}
414478
}

include/gz/transport/detail/Node.hh

+93
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,99 @@ namespace gz
477477
return true;
478478
}
479479

480+
//////////////////////////////////////////////////
481+
template<typename RequestT, typename ReplyT>
482+
bool Node::Request(
483+
const std::string &_topic,
484+
const RequestT &_request,
485+
std::function<void(const ReplyT &_reply, const bool _result)> &_cb,
486+
const char *_repType)
487+
{
488+
auto rep = gz::msgs::Factory::New(_repType);
489+
if (!rep)
490+
{
491+
std::cerr << "Unable to create response of type["
492+
<< _repType << "].\n";
493+
return false;
494+
}
495+
496+
// Topic remapping.
497+
std::string topic = _topic;
498+
this->Options().TopicRemap(_topic, topic);
499+
500+
std::string fullyQualifiedTopic;
501+
if (!TopicUtils::FullyQualifiedName(this->Options().Partition(),
502+
this->Options().NameSpace(), topic, fullyQualifiedTopic))
503+
{
504+
std::cerr << "Service [" << topic << "] is not valid." << std::endl;
505+
return false;
506+
}
507+
508+
bool localResponserFound;
509+
IRepHandlerPtr repHandler;
510+
{
511+
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
512+
localResponserFound = this->Shared()->repliers.FirstHandler(
513+
fullyQualifiedTopic,
514+
_request.GetTypeName(),
515+
rep->GetTypeName(),
516+
repHandler);
517+
}
518+
519+
// If the responser is within my process.
520+
if (localResponserFound)
521+
{
522+
// There is a responser in my process, let's use it.
523+
bool result = repHandler->RunLocalCallback(_request, *rep);
524+
525+
_cb(*rep, result);
526+
return true;
527+
}
528+
529+
// Create a new request handler.
530+
std::shared_ptr<ReqHandler<RequestT, ReplyT>> reqHandlerPtr(
531+
new ReqHandler<RequestT, ReplyT>(this->NodeUuid()));
532+
533+
// Insert the request's parameters.
534+
reqHandlerPtr->SetMessage(&_request);
535+
536+
// Set the response message (to set the type info).
537+
reqHandlerPtr->SetResponse(rep.get());
538+
539+
// Insert the callback into the handler.
540+
reqHandlerPtr->SetCallback(_cb);
541+
542+
{
543+
std::lock_guard<std::recursive_mutex> lk(this->Shared()->mutex);
544+
545+
// Store the request handler.
546+
this->Shared()->requests.AddHandler(
547+
fullyQualifiedTopic, this->NodeUuid(), reqHandlerPtr);
548+
549+
// If the responser's address is known, make the request.
550+
SrvAddresses_M addresses;
551+
if (this->Shared()->TopicPublishers(fullyQualifiedTopic, addresses))
552+
{
553+
this->Shared()->SendPendingRemoteReqs(fullyQualifiedTopic,
554+
_request.GetTypeName(), rep->GetTypeName());
555+
}
556+
else
557+
{
558+
// Discover the service responser.
559+
if (!this->Shared()->DiscoverService(fullyQualifiedTopic))
560+
{
561+
std::cerr << "Node::Request(): Error discovering service ["
562+
<< topic
563+
<< "]. Did you forget to start the discovery service?"
564+
<< std::endl;
565+
return false;
566+
}
567+
}
568+
}
569+
570+
return true;
571+
}
572+
480573
//////////////////////////////////////////////////
481574
template<typename ReplyT>
482575
bool Node::Request(

0 commit comments

Comments
 (0)