Skip to content

Commit

Permalink
Publisher
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde committed Nov 20, 2024
1 parent e16ded3 commit cf03d9c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 84 deletions.
32 changes: 32 additions & 0 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,38 @@
namespace rmw_zenoh_cpp
{

AttachementData::AttachementData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE])
{
sequence_number = _sequence_number;
source_timestamp = _source_timestamp;
for (size_t i = 0; i < RMW_GID_STORAGE_SIZE; ++i)
{
source_gid.push_back(_source_gid[RMW_GID_STORAGE_SIZE - 1 - i]);
}
}

AttachementData::AttachementData(AttachementData && data)
{
sequence_number = std::move(data.sequence_number);
source_timestamp = std::move(data.source_timestamp);
source_gid = data.source_gid;
}

zenoh::Bytes AttachementData::serialize_to_zbytes()
{
auto serializer = zenoh::ext::Serializer();
serializer.serialize(std::string("sequence_number"));
serializer.serialize(this->sequence_number);
serializer.serialize(std::string("source_timestamp"));
serializer.serialize(this->source_timestamp);
serializer.serialize(std::string("source_gid"));
serializer.serialize(this->source_gid);
return std::move(serializer).finish();
}

attachement_data_t::attachement_data_t(
const int64_t _sequence_number,
const int64_t _source_timestamp,
Expand Down
20 changes: 19 additions & 1 deletion rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,31 @@
#ifndef DETAIL__ATTACHMENT_HELPERS_HPP_
#define DETAIL__ATTACHMENT_HELPERS_HPP_

#include <zenoh.h>
#include <zenoh.hxx>

#include "rmw/types.h"

namespace rmw_zenoh_cpp
{

class AttachementData final
{
public:
explicit AttachementData(
const int64_t _sequence_number,
const int64_t _source_timestamp,
const uint8_t _source_gid[RMW_GID_STORAGE_SIZE]);

// explicit AttachementData(const zenoh::Bytes & bytes);
explicit AttachementData(AttachementData && data);

int64_t sequence_number;
int64_t source_timestamp;
std::vector<uint8_t> source_gid;

zenoh::Bytes serialize_to_zbytes();
};

class attachement_data_t final
{
public:
Expand Down
127 changes: 49 additions & 78 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,74 +99,50 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

std::string topic_keyexpr = entity->topic_info()->topic_keyexpr_;
z_view_keyexpr_t pub_ke;
if (z_view_keyexpr_from_str(&pub_ke, topic_keyexpr.c_str()) != Z_OK) {
RMW_SET_ERROR_MSG("unable to create zenoh keyexpr.");
return nullptr;
}

zenoh::ZResult err;
std::optional<zenoh::ext::PublicationCache> pub_cache;
zenoh::KeyExpr pub_ke(entity->topic_info()->topic_keyexpr_);
// Create a Publication Cache if durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache = std::nullopt;
auto undeclare_z_publisher_cache = rcpputils::make_scope_exit(
[&pub_cache]() {
if (pub_cache.has_value()) {
z_drop(z_move(pub_cache.value()));
}
});
if (adapted_qos_profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
ze_publication_cache_options_t pub_cache_opts;
ze_publication_cache_options_default(&pub_cache_opts);

zenoh::Session::PublicationCacheOptions pub_cache_opts =
zenoh::Session::PublicationCacheOptions::create_default();

pub_cache_opts.history = adapted_qos_profile.depth;
pub_cache_opts.queryable_complete = true;
// Set the queryable_prefix to the session id so that querying subscribers can specify this
// session id to obtain latest data from this specific publication caches when querying over
// the same keyexpression.
// When such a prefix is added to the PublicationCache, it listens to queries with this extra
// prefix (allowing to be queried in a unique way), but still replies with the original
// publications' key expressions.

std::string queryable_prefix = entity->zid();
z_view_keyexpr_t prefix_ke;
z_view_keyexpr_from_str(&prefix_ke, queryable_prefix.c_str());
pub_cache_opts.queryable_prefix = z_loan(prefix_ke);
pub_cache_opts.queryable_prefix = zenoh::KeyExpr(queryable_prefix);

pub_cache = session->declare_publication_cache(pub_ke, std::move(pub_cache_opts), &err);

ze_owned_publication_cache_t pub_cache_;
if (ze_declare_publication_cache(
z_loan(session->_0), &pub_cache_, z_loan(pub_ke), &pub_cache_opts))
if (err != Z_OK)
{
RMW_SET_ERROR_MSG("unable to create zenoh publisher cache");
return nullptr;
}
pub_cache = pub_cache_;
}

// Set congestion_control to BLOCK if appropriate.
z_publisher_options_t opts;
z_publisher_options_default(&opts);
opts.congestion_control = Z_CONGESTION_CONTROL_DROP;

zenoh::Session::PublisherOptions opts = zenoh::Session::PublisherOptions::create_default();
opts.congestion_control = Z_CONGESTION_CONTROL_DROP;
if (adapted_qos_profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
opts.reliability = Z_RELIABILITY_RELIABLE;

if (adapted_qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
opts.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
}
}
z_owned_publisher_t pub;
// TODO(clalancette): What happens if the key name is a valid but empty string?
auto undeclare_z_publisher = rcpputils::make_scope_exit(
[&pub]() {
z_undeclare_publisher(z_move(pub));
});
if (z_declare_publisher(
z_loan(session->_0), &pub, z_loan(pub_ke), &opts) != Z_OK)
auto pub = session->declare_publisher(pub_ke, std::move(opts), &err);

if (err != Z_OK)
{
RMW_SET_ERROR_MSG("Unable to create Zenoh publisher.");
return nullptr;
}

std::string liveliness_keyexpr = entity->liveliness_keyexpr();
zenoh::ZResult err;
auto token = session->liveliness_declare_token(
zenoh::KeyExpr(liveliness_keyexpr),
zenoh::Session::LivelinessDeclarationOptions::create_default(),
Expand All @@ -179,9 +155,6 @@ std::shared_ptr<PublisherData> PublisherData::make(
return nullptr;
}

undeclare_z_publisher_cache.cancel();
undeclare_z_publisher.cancel();

return std::shared_ptr<PublisherData>(
new PublisherData{
node,
Expand All @@ -198,8 +171,8 @@ std::shared_ptr<PublisherData> PublisherData::make(
PublisherData::PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
Expand Down Expand Up @@ -295,24 +268,21 @@ rmw_ret_t PublisherData::publish(
// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
z_owned_bytes_t attachment;
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid);
options.attachment = z_move(attachment);

z_owned_bytes_t payload;
if (shmbuf.has_value()) {
z_bytes_from_shm_mut(&payload, z_move(shmbuf.value()));
} else {
z_bytes_copy_from_buf(&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
}

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
if (res == Z_ESESSION_CLOSED) {
zenoh::ZResult err;
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, local_gid);

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_image(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(raw_image);

pub_.put(std::move(payload), std::move(options), &err);
if (err != Z_OK) {
if (err == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
Expand Down Expand Up @@ -340,27 +310,25 @@ rmw_ret_t PublisherData::publish_serialized_message(

std::lock_guard<std::mutex> lock(mutex_);


const size_t data_length = ser.get_serialized_data_length();

// The encoding is simply forwarded and is useful when key expressions in the
// session use different encoding formats. In our case, all key expressions
// will be encoded with CDR so it does not really matter.
z_publisher_put_options_t options;
z_publisher_put_options_default(&options);
uint8_t local_gid[RMW_GID_STORAGE_SIZE];
entity_->copy_gid(local_gid);
z_owned_bytes_t attachment;
create_map_and_set_sequence_num(&attachment, sequence_number_++, local_gid);

options.attachment = z_move(attachment);
zenoh::ZResult err;
auto options = zenoh::Publisher::PutOptions::create_default();
options.attachment = create_map_and_set_sequence_num(sequence_number_++, local_gid);

z_owned_bytes_t payload;
z_bytes_copy_from_buf(&payload, serialized_message->buffer, data_length);
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);

z_result_t res = z_publisher_put(z_loan(pub_), z_move(payload), &options);
if (res != Z_OK) {
if (res == Z_ESESSION_CLOSED) {
pub_.put(std::move(payload), std::move(options), &err);
if (err != Z_OK) {
if (err == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"unable to publish message since the zenoh session is closed");
Expand All @@ -369,7 +337,6 @@ rmw_ret_t PublisherData::publish_serialized_message(
return RMW_RET_ERROR;
}
}

return RMW_RET_OK;
}

Expand Down Expand Up @@ -432,10 +399,14 @@ rmw_ret_t PublisherData::shutdown()
"Unable to undeclare liveliness token");
return RMW_RET_ERROR;
}
if (pub_cache_.has_value()) {
z_drop(z_move(pub_cache_.value()));
std::move(pub_).undeclare(&err);
if (err != Z_OK)
{
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to undeclare publisher");
return RMW_RET_ERROR;
}
z_undeclare_publisher(z_move(pub_));

is_shutdown_ = true;
return RMW_RET_OK;
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ class PublisherData final
PublisherData(
const rmw_node_t * rmw_node,
std::shared_ptr<liveliness::Entity> entity,
z_owned_publisher_t pub,
std::optional<ze_owned_publication_cache_t> pub_cache,
zenoh::Publisher pub,
std::optional<zenoh::ext::PublicationCache> pub_cache,
zenoh::LivelinessToken token,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);
Expand All @@ -96,9 +96,9 @@ class PublisherData final
// The Entity generated for the publisher.
std::shared_ptr<liveliness::Entity> entity_;
// An owned publisher.
z_owned_publisher_t pub_;
zenoh::Publisher pub_;
// Optional publication cache when durability is transient_local.
std::optional<ze_owned_publication_cache_t> pub_cache_;
std::optional<zenoh::ext::PublicationCache> pub_cache_;
// Liveliness token for the publisher.
std::optional<zenoh::LivelinessToken> token_;
// Type support fields
Expand Down
12 changes: 12 additions & 0 deletions rmw_zenoh_cpp/src/detail/zenoh_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ void create_map_and_set_sequence_num(
data.serialize_to_zbytes(out_bytes);
}

///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE])
{
auto now = std::chrono::system_clock::now().time_since_epoch();
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
int64_t source_timestamp = now_ns.count();

rmw_zenoh_cpp::AttachementData data(sequence_number, source_timestamp, gid);
return std::move(data.serialize_to_zbytes());
}

///=============================================================================
ZenohQuery::ZenohQuery(z_owned_query_t query) {query_ = query;}

Expand Down
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#ifndef DETAIL__ZENOH_UTILS_HPP_
#define DETAIL__ZENOH_UTILS_HPP_

#include <zenoh.h>
#include <zenoh.hxx>

#include <chrono>
#include <optional>
Expand All @@ -30,6 +30,10 @@ create_map_and_set_sequence_num(
z_owned_bytes_t * out_bytes, int64_t sequence_number,
uint8_t gid[RMW_GID_STORAGE_SIZE]);

///=============================================================================
zenoh::Bytes create_map_and_set_sequence_num(
int64_t sequence_number, uint8_t gid[RMW_GID_STORAGE_SIZE]);

///=============================================================================
// A class to store the replies to service requests.
class ZenohReply final
Expand Down

0 comments on commit cf03d9c

Please sign in to comment.