Skip to content

Commit

Permalink
Protocol cleanup (#374)
Browse files Browse the repository at this point in the history
* Remove ACK message

* Cleanup pull and subscriber

* Remove Put/Del from ResponseBody

* Cleanup flags

* Comment unusued variables in pull examples

* Fix alignment test
  • Loading branch information
Mallets authored Mar 20, 2024
1 parent 8e8c678 commit 9bd061c
Show file tree
Hide file tree
Showing 39 changed files with 250 additions and 862 deletions.
49 changes: 27 additions & 22 deletions examples/arduino/z_pull.ino
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@

#define KEYEXPR "demo/example/**"

z_owned_pull_subscriber_t sub;
// @TODO
// z_owned_pull_subscriber_t sub;

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
std::string val((const char *)sample->payload.start, sample->payload.len);
// @TODO
// void data_handler(const z_sample_t *sample, void *arg) {
// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
// std::string val((const char *)sample->payload.start, sample->payload.len);

Serial.print(" >> [Subscription listener] Received (");
Serial.print(z_str_loan(&keystr));
Serial.print(", ");
Serial.print(val.c_str());
Serial.println(")");
// Serial.print(" >> [Subscription listener] Received (");
// Serial.print(z_str_loan(&keystr));
// Serial.print(", ");
// Serial.print(val.c_str());
// Serial.println(")");

z_str_drop(z_str_move(&keystr));
}
// z_str_drop(z_str_move(&keystr));
// }

void setup() {
// Initialize Serial for debug
Expand Down Expand Up @@ -91,23 +93,26 @@ void setup() {
Serial.print("Declaring Subscriber on ");
Serial.print(KEYEXPR);
Serial.println(" ...");
z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
sub = z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
if (!z_pull_subscriber_check(&sub)) {
Serial.println("Unable to declare subscriber.");
while (1) {
;
}
}
Serial.println("OK");
Serial.println("Zenoh setup finished!");
// @TODO
// z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
// @TODO
// sub = z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
// if (!z_pull_subscriber_check(&sub)) {
// Serial.println("Unable to declare subscriber.");
// while (1) {
// ;
// }
// }
// Serial.println("OK");
// Serial.println("Zenoh setup finished!");
Serial.println("Pull Subscriber not supported... exiting");

delay(300);
}

void loop() {
delay(5000);
z_subscriber_pull(z_pull_subscriber_loan(&sub));
// z_subscriber_pull(z_pull_subscriber_loan(&sub));
}
#else
void setup() {
Expand Down
48 changes: 26 additions & 22 deletions examples/espidf/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ void wifi_init_sta(void) {
vEventGroupDelete(s_event_group_handler);
}

void data_handler(const z_sample_t* sample, void* arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_drop(z_move(keystr));
}
// @TODO
// void data_handler(const z_sample_t* sample, void* arg) {
// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
// sample->payload.start);
// z_drop(z_move(keystr));
// }

void app_main() {
esp_err_t ret = nvs_flash_init();
Expand Down Expand Up @@ -144,23 +145,26 @@ void app_main() {
zp_start_read_task(z_loan(s), NULL);
zp_start_lease_task(z_loan(s), NULL);

// @TODO
// z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...", KEYEXPR);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
printf("OK!\n");

while (1) {
sleep(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
z_subscriber_pull(z_loan(sub));
}

printf("Closing Zenoh Session...");
z_undeclare_pull_subscriber(z_move(sub));
// @TODO
// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// exit(-1);
// }
// printf("OK!\n");

// while (1) {
// sleep(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_loan(sub));
// }

// printf("Closing Zenoh Session...");
// z_undeclare_pull_subscriber(z_move(sub));
printf("Pull Subscriber not supported... exiting\n");

// Stop the receive and the session lease loop for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
42 changes: 23 additions & 19 deletions examples/freertos_plus_tcp/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_drop(z_move(keystr));
}
// @TODO
// void data_handler(const z_sample_t *sample, void *ctx) {
// (void)(ctx);
// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
// sample->payload.start);
// z_drop(z_move(keystr));
// }

void app_main(void) {
z_owned_config_t config = z_config_default();
Expand All @@ -57,21 +58,24 @@ void app_main(void) {
return;
}

z_owned_closure_sample_t callback = z_closure(data_handler);
// @TODO
// z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
return;
}
// @TODO
// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return;
// }

while (1) {
zp_sleep_s(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
z_subscriber_pull(z_loan(sub));
}
// while (1) {
// zp_sleep_s(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_loan(sub));
// }

z_undeclare_pull_subscriber(z_move(sub));
// z_undeclare_pull_subscriber(z_move(sub));
printf("Pull Subscriber not supported... exiting\n");

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
46 changes: 25 additions & 21 deletions examples/mbed/z_pull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@

#define KEYEXPR "demo/example/**"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
sample->payload.start);
z_str_drop(z_str_move(&keystr));
}
// @TODO
// void data_handler(const z_sample_t *sample, void *arg) {
// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
// printf(" >> [Subscriber handler] Received ('%s': '%.*s')\n", z_str_loan(&keystr), (int)sample->payload.len,
// sample->payload.start);
// z_str_drop(z_str_move(&keystr));
// }

int main(int argc, char **argv) {
randLIB_seed_random();
Expand Down Expand Up @@ -64,24 +65,27 @@ int main(int argc, char **argv) {
zp_start_read_task(z_session_loan(&s), NULL);
zp_start_lease_task(z_session_loan(&s), NULL);

// @TODO
// z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
printf("Declaring Subscriber on '%s'...", KEYEXPR);
z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
z_owned_pull_subscriber_t sub =
z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
if (!z_pull_subscriber_check(&sub)) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
printf("OK!\n");
// @TODO
// z_owned_pull_subscriber_t sub =
// z_declare_pull_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
// if (!z_pull_subscriber_check(&sub)) {
// printf("Unable to declare subscriber.\n");
// exit(-1);
// }
// printf("OK!\n");

while (1) {
zp_sleep_s(5);
printf("Pulling data from '%s'...\n", KEYEXPR);
z_subscriber_pull(z_pull_subscriber_loan(&sub));
}
// while (1) {
// zp_sleep_s(5);
// printf("Pulling data from '%s'...\n", KEYEXPR);
// z_subscriber_pull(z_pull_subscriber_loan(&sub));
// }

printf("Closing Zenoh Session...");
z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub));
// printf("Closing Zenoh Session...");
// z_undeclare_pull_subscriber(z_pull_subscriber_move(&sub));
printf("Pull Subscriber not supported... exiting\n");

// Stop the receive and the session lease loop for zenoh-pico
zp_stop_read_task(z_session_loan(&s));
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter any key to pull data or 'q' to quit...\n");
printf("Enter any key to get data or 'q' to quit...\n");
char c = '\0';
while (1) {
fflush(stdin);
Expand Down
55 changes: 30 additions & 25 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
#include <zenoh-pico.h>

#if Z_FEATURE_SUBSCRIPTION == 1
void data_handler(const z_sample_t *sample, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
sample->payload.start);
z_drop(z_move(keystr));
}
// @TODO
// void data_handler(const z_sample_t *sample, void *ctx) {
// (void)(ctx);
// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len,
// sample->payload.start);
// z_drop(z_move(keystr));
// }

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/**";
Expand Down Expand Up @@ -71,27 +72,31 @@ int main(int argc, char **argv) {
return -1;
}

z_owned_closure_sample_t callback = z_closure(data_handler);
// @TODO
// z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
return -1;
}

printf("Enter any key to pull data or 'q' to quit...\n");
char c = '\0';
while (1) {
fflush(stdin);
int ret = scanf("%c", &c);
(void)ret; // Remove unused result warning
if (c == 'q') {
break;
}
z_subscriber_pull(z_loan(sub));
}
// @TODO
// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return -1;
// }

// printf("Enter any key to pull data or 'q' to quit...\n");
// char c = '\0';
// while (1) {
// fflush(stdin);
// int ret = scanf("%c", &c);
// (void)ret; // Remove unused result warning
// if (c == 'q') {
// break;
// }
// z_subscriber_pull(z_loan(sub));
// }

z_undeclare_pull_subscriber(z_move(sub));
// z_undeclare_pull_subscriber(z_move(sub));
printf("Pull Subscriber not supported... exiting\n");

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c99/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ int main(int argc, char **argv) {
return -1;
}

printf("Enter any key to pull data or 'q' to quit...\n");
printf("Enter any key to get data or 'q' to quit...\n");
char c = '\0';
while (1) {
fflush(stdin);
Expand Down
Loading

0 comments on commit 9bd061c

Please sign in to comment.