Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feat/attachements #190

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ serde_yaml = "0.9.19"

[lib]
path="src/lib.rs"
name = "zenohc"
name = "zenohcd"
p-avital marked this conversation as resolved.
Show resolved Hide resolved
crate-type = ["cdylib", "staticlib"]
doctest = false

Expand Down
67 changes: 67 additions & 0 deletions examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>

#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Alive token ('%s')\n", z_loan(keystr));
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
}
}
z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
76 changes: 76 additions & 0 deletions examples/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/zenoh-rs";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness token '%s'...\n", expr);
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL);
if (!z_check(token)) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
} else if (c == 'd') {
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));
}
}

z_drop(z_move(token));
z_close(z_move(s));
return 0;
}
5 changes: 5 additions & 0 deletions examples/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ int main(int argc, char **argv) {
if (argc > 1) keyexpr = argv[1];
if (argc > 2) value = argv[2];

z_owned_bytes_map_t attachements = z_bytes_map_new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a usability point of view, I was thinking about having something like this:

// Options.attachment is z_attachment_t and allocated on the first insert
// I would avoid copying the values on the insert since they are going to be copied anyway for serialization
z_attachment_insert(z_loan(&options.attachment), z_attachment_key("hello"), z_attachment_value(value, strlen(value))); 

z_bytes_map_insert_by_copy(&attachements, z_bytes_new("hello"), z_bytes_new("there"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is providing raw bytes supported or should it be only strings?
  2. is it possible to provide the buffer and not allocate buffers ? i would like to reduce the number of allocations during run time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes, z_bytes_new is just a constructor for a bytes view from a null terminated string, but you may construct z_bytes_t in other ways, aliasing whatever memory you like.
  2. It is possible:
  • The map itself is able to alias buffers rather than copy them, but it's still a map that will allocate as part of its functioning.
  • z_attachment_t is what actually provides Zenoh with the data to send, and does so by being a v-tabled iterator over aliased byte views. If you like, you can provide that v-table for any type you want, allowing you to have 0 need for allocations in the whole process.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @p-avital can you please explain why do you need a map?
What not just provide a way to provide a pointer to a header buffer ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to provide a flexible way for the user to provide metadata to be attached to the payload. Having one single pointer to a buffer forces the user to potentially allocate a buffer and serialize himself multiple metadata in it first. From a usability perspective, we believe a key-value semantics would be more flexible so as to cover more diverse use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the abstraction for metadata is a map of byte view to byte view. Notably because we've seen the need for multiple metadata fields, and that a single buffer would force the user to serialise their metadata on their own before we serialise it again into a packet. This would be both wasteful and less user friendly. This is why we chose this specific abstraction.

You can choose to have a single buffer and have they key to it be an empty slice, the iterator for that wouldn't be hard to write, and would never need to allocate.


z_owned_config_t config = z_config_default();
if (argc > 3) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) {
Expand All @@ -44,11 +47,13 @@ int main(int argc, char **argv) {
printf("Putting Data ('%s': '%s')...\n", keyexpr, value);
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
options.attachements = z_bytes_map_as_attachement(&attachements);
int res = z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options);
if (res < 0) {
printf("Put failed...\n");
}

z_close(z_move(s));
z_drop(z_move(attachements));
return 0;
}
5 changes: 5 additions & 0 deletions examples/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdint.h>
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
Expand All @@ -26,6 +27,10 @@ void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr),
(int)sample->payload.len, sample->payload.start);
if (z_check(sample->attachements)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the put example I propose to have z_attachment_insert(..). In the sub, what about something like:

z_attachment_value v = z_attachment_get(sample->attachment, z_attachment_key("hello"));
if (z_check(v)) {
  // Do your stuff
}

We might still implement the internals of a z_attachment_t as a z_bytes_map, but that would be abstracted by the z_attachment_key and z_attachment_value. By doing so, we reserve a bit of freedom to change the internal implementation without changing the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is for z_attachement_t to be a generic, read-only type. This lets users pick whatever suits them best as a concrete implementation. The map is provided as the default way to make up an attachment, but that way of working lets us have lazy deserializers on the read side, and if the user want noalloc attachements, they can have them

z_owned_bytes_map_t map = z_bytes_map_from_attachement(sample->attachements);
z_bytes_t there = z_bytes_map_get(&map, z_bytes_new("hello"));
}
z_drop(z_move(keystr));
}

Expand Down
86 changes: 86 additions & 0 deletions examples/z_sub_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
switch (sample->kind) {
case Z_SAMPLE_KIND_PUT:
printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr));
break;
case Z_SAMPLE_KIND_DELETE:
printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr));
break;
}
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness subscriber on '%s'...\n", expr);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare liveliness subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_undeclare_subscriber(z_move(sub));
z_close(z_move(s));
return 0;
}
Loading
Loading