From 448695da544dadd3b26351344bdb5c49f5603047 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 21 Jun 2024 10:21:48 +0200 Subject: [PATCH] Put API rework (#436) * feat: rework put api * feat: update tests * feat: update examples --- examples/arduino/z_pub.ino | 7 +++- examples/espidf/z_pub.c | 7 +++- examples/freertos_plus_tcp/z_pub.c | 6 +++- examples/freertos_plus_tcp/z_pub_st.c | 7 +++- examples/freertos_plus_tcp/z_put.c | 7 +++- examples/mbed/z_pub.cpp | 7 +++- examples/unix/c11/z_ping.c | 12 +++++-- examples/unix/c11/z_pong.c | 4 ++- examples/unix/c11/z_pub.c | 6 +++- examples/unix/c11/z_pub_attachment.c | 12 ++++--- examples/unix/c11/z_pub_st.c | 7 +++- examples/unix/c11/z_pub_thr.c | 10 ++++-- examples/unix/c11/z_put.c | 6 +++- examples/unix/c99/z_ping.c | 13 +++++-- examples/unix/c99/z_pong.c | 7 +++- examples/unix/c99/z_pub.c | 6 +++- examples/unix/c99/z_pub_st.c | 7 +++- examples/unix/c99/z_put.c | 6 +++- examples/windows/z_ping.c | 13 +++++-- examples/windows/z_pong.c | 7 +++- examples/windows/z_pub.c | 6 +++- examples/windows/z_pub_st.c | 7 +++- examples/windows/z_put.c | 7 +++- examples/zephyr/z_pub.c | 7 +++- include/zenoh-pico/api/primitives.h | 10 +++--- src/api/api.c | 22 ++++++------ tests/z_api_alignment_test.c | 51 +++++++++++++++------------ tests/z_client_test.c | 28 ++++++++++----- tests/z_peer_multicast_test.c | 15 +++++--- tests/z_perf_tx.c | 12 +++++-- tests/z_test_fragment_tx.c | 6 +++- 31 files changed, 242 insertions(+), 86 deletions(-) diff --git a/examples/arduino/z_pub.ino b/examples/arduino/z_pub.ino index df03b0bcf..c487c976b 100644 --- a/examples/arduino/z_pub.ino +++ b/examples/arduino/z_pub.ino @@ -99,13 +99,18 @@ void loop() { delay(1000); char buf[256]; sprintf(buf, "[%4d] %s", idx++, VALUE); + Serial.print("Writing Data ('"); Serial.print(KEYEXPR); Serial.print("': '"); Serial.print(buf); Serial.println("')"); - if (z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL) < 0) { + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + if (z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL) < 0) { Serial.println("Error while publishing data"); } } diff --git a/examples/espidf/z_pub.c b/examples/espidf/z_pub.c index 53cb2be99..5d1407239 100644 --- a/examples/espidf/z_pub.c +++ b/examples/espidf/z_pub.c @@ -154,7 +154,12 @@ void app_main() { sleep(1); sprintf(buf, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); - z_publisher_put(z_loan(pub), (const uint8_t*)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); } printf("Closing Zenoh Session..."); diff --git a/examples/freertos_plus_tcp/z_pub.c b/examples/freertos_plus_tcp/z_pub.c index f19ddc23e..e86ce07bd 100644 --- a/examples/freertos_plus_tcp/z_pub.c +++ b/examples/freertos_plus_tcp/z_pub.c @@ -97,9 +97,13 @@ void app_main(void) { snprintf(buf, 256, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + z_publisher_put_options_t options; z_publisher_put_options_default(&options); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); + z_publisher_put(z_loan(pub), z_move(payload), &options); } // Clean-up diff --git a/examples/freertos_plus_tcp/z_pub_st.c b/examples/freertos_plus_tcp/z_pub_st.c index bfdd22692..6239171b7 100644 --- a/examples/freertos_plus_tcp/z_pub_st.c +++ b/examples/freertos_plus_tcp/z_pub_st.c @@ -62,7 +62,12 @@ void app_main(void) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); ++idx; now = z_clock_now(); diff --git a/examples/freertos_plus_tcp/z_put.c b/examples/freertos_plus_tcp/z_put.c index 092ed503b..192b441bf 100644 --- a/examples/freertos_plus_tcp/z_put.c +++ b/examples/freertos_plus_tcp/z_put.c @@ -66,7 +66,12 @@ void app_main(void) { printf("Putting Data ('%s': '%s')...\n", KEYEXPR, VALUE); z_put_options_t options; z_put_options_default(&options); - if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)VALUE, strlen(VALUE), &options) < 0) { + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, VALUE); + + if (z_put(z_loan(s), z_loan(ke), z_move(payload), &options) < 0) { printf("Oh no! Put has failed...\n"); } diff --git a/examples/mbed/z_pub.cpp b/examples/mbed/z_pub.cpp index 08bfd624b..c709e584a 100644 --- a/examples/mbed/z_pub.cpp +++ b/examples/mbed/z_pub.cpp @@ -74,7 +74,12 @@ int main(int argc, char **argv) { z_sleep_s(1); sprintf(buf, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); - z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL); } printf("Closing Zenoh Session..."); diff --git a/examples/unix/c11/z_ping.c b/examples/unix/c11/z_ping.c index fa7744d64..f12e3b44f 100644 --- a/examples/unix/c11/z_ping.c +++ b/examples/unix/c11/z_ping.c @@ -106,7 +106,11 @@ int main(int argc, char** argv) { z_clock_t warmup_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < args.warmup_ms * 1000) { - z_publisher_put(z_loan(pub), data, args.size, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); z_condvar_wait(&cond, &mutex); elapsed_us = z_clock_elapsed_us(&warmup_start); } @@ -114,7 +118,11 @@ int main(int argc, char** argv) { unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings); for (unsigned int i = 0; i < args.number_of_pings; i++) { z_clock_t measure_start = z_clock_now(); - z_publisher_put(z_loan(pub), data, args.size, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); z_condvar_wait(&cond, &mutex); results[i] = z_clock_elapsed_us(&measure_start); } diff --git a/examples/unix/c11/z_pong.c b/examples/unix/c11/z_pong.c index c7e56e5d0..838da05d6 100644 --- a/examples/unix/c11/z_pong.c +++ b/examples/unix/c11/z_pong.c @@ -20,7 +20,9 @@ void callback(const z_loaned_sample_t* sample, void* context) { const z_loaned_publisher_t* pub = z_loan(*(z_owned_publisher_t*)context); z_owned_slice_t value; z_bytes_deserialize_into_slice(z_sample_payload(sample), &value); - z_publisher_put(pub, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)), NULL); + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, z_slice_data(z_loan(value)), z_slice_len(z_loan(value))); + z_publisher_put(pub, z_move(payload), NULL); z_drop(z_move(value)); } void drop(void* context) { diff --git a/examples/unix/c11/z_pub.c b/examples/unix/c11/z_pub.c index 576edf915..f65f7a172 100644 --- a/examples/unix/c11/z_pub.c +++ b/examples/unix/c11/z_pub.c @@ -112,6 +112,10 @@ int main(int argc, char **argv) { sprintf(buf, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, "utf8"); @@ -119,7 +123,7 @@ int main(int argc, char **argv) { z_publisher_put_options_default(&options); options.encoding = z_move(encoding); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); + z_publisher_put(z_loan(pub), z_move(payload), &options); } // Clean up z_undeclare_publisher(z_move(pub)); diff --git a/examples/unix/c11/z_pub_attachment.c b/examples/unix/c11/z_pub_attachment.c index 5db64ca4e..2376ffd7f 100644 --- a/examples/unix/c11/z_pub_attachment.c +++ b/examples/unix/c11/z_pub_attachment.c @@ -151,13 +151,19 @@ int main(int argc, char **argv) { z_owned_bytes_t attachment; // Allocate buffer - char buf[256]; char buf_ind[16]; // Publish data printf("Press CTRL-C to quit...\n"); + char buf[256]; for (int idx = 0; idx < n; ++idx) { z_sleep_s(1); + sprintf(buf, "[%4d] %s", idx, value); + printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); // Add attachment value sprintf(buf_ind, "%d", idx); @@ -166,9 +172,7 @@ int main(int argc, char **argv) { zp_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&ctx, kv_pairs_size(&ctx)); options.attachment = z_move(attachment); - sprintf(buf, "[%4d] %s", idx, value); - printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); + z_publisher_put(z_loan(pub), z_move(payload), &options); } // Clean up z_undeclare_publisher(z_move(pub)); diff --git a/examples/unix/c11/z_pub_st.c b/examples/unix/c11/z_pub_st.c index 51037f4a4..0c6fbe697 100644 --- a/examples/unix/c11/z_pub_st.c +++ b/examples/unix/c11/z_pub_st.c @@ -95,7 +95,12 @@ int main(int argc, char **argv) { sleep(1); sprintf(buf, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); zp_read(z_loan(s), NULL); zp_send_keep_alive(z_loan(s), NULL); diff --git a/examples/unix/c11/z_pub_thr.c b/examples/unix/c11/z_pub_thr.c index 228e70f01..a31911ee7 100644 --- a/examples/unix/c11/z_pub_thr.c +++ b/examples/unix/c11/z_pub_thr.c @@ -26,7 +26,7 @@ int main(int argc, char **argv) { } char *keyexpr = "test/thr"; size_t len = (size_t)atoi(argv[1]); - uint8_t *value = (uint8_t *)malloc(len); + uint8_t *value = (uint8_t *)z_malloc(len); memset(value, 1, len); // Set config @@ -61,14 +61,18 @@ int main(int argc, char **argv) { // Send packets while (1) { - z_publisher_put(z_loan(pub), (const uint8_t *)value, len, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, (char *)value); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); } // Clean up z_undeclare_publisher(z_move(pub)); zp_stop_read_task(z_loan_mut(s)); zp_stop_lease_task(z_loan_mut(s)); z_close(z_move(s)); - free(value); + z_free(value); exit(0); } #else diff --git a/examples/unix/c11/z_put.c b/examples/unix/c11/z_put.c index de8b39176..90efbb848 100644 --- a/examples/unix/c11/z_put.c +++ b/examples/unix/c11/z_put.c @@ -97,6 +97,10 @@ int main(int argc, char **argv) { return -1; } + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, value); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); @@ -105,7 +109,7 @@ int main(int argc, char **argv) { z_put_options_t options; z_put_options_default(&options); options.encoding = z_move(encoding); - if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)value, strlen(value), &options) < 0) { + if (z_put(z_loan(s), z_loan(ke), z_move(payload), &options) < 0) { printf("Oh no! Put has failed...\n"); } // Clean up diff --git a/examples/unix/c99/z_ping.c b/examples/unix/c99/z_ping.c index 17fe7833c..d74a5b988 100644 --- a/examples/unix/c99/z_ping.c +++ b/examples/unix/c99/z_ping.c @@ -109,7 +109,11 @@ int main(int argc, char** argv) { z_clock_t warmup_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < args.warmup_ms * 1000) { - z_publisher_put(z_publisher_loan(&pub), data, args.size, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL); z_condvar_wait(&cond, &mutex); elapsed_us = z_clock_elapsed_us(&warmup_start); } @@ -117,7 +121,12 @@ int main(int argc, char** argv) { unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings); for (unsigned int i = 0; i < args.number_of_pings; i++) { z_clock_t measure_start = z_clock_now(); - z_publisher_put(z_publisher_loan(&pub), data, args.size, NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL); z_condvar_wait(&cond, &mutex); results[i] = z_clock_elapsed_us(&measure_start); } diff --git a/examples/unix/c99/z_pong.c b/examples/unix/c99/z_pong.c index f5060147f..7ec6528f9 100644 --- a/examples/unix/c99/z_pong.c +++ b/examples/unix/c99/z_pong.c @@ -21,7 +21,12 @@ void callback(const z_loaned_sample_t* sample, void* context) { const z_loaned_publisher_t* pub = z_publisher_loan((z_owned_publisher_t*)context); z_owned_slice_t value; z_bytes_deserialize_into_slice(z_sample_payload(sample), &value); - z_publisher_put(pub, z_slice_data(z_slice_loan(&value)), z_slice_len(z_slice_loan(&value)), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, z_slice_data(z_slice_loan(&value)), z_slice_len(z_slice_loan(&value))); + + z_publisher_put(pub, z_bytes_move(&payload), NULL); z_slice_drop(z_slice_move(&value)); } void drop(void* context) { diff --git a/examples/unix/c99/z_pub.c b/examples/unix/c99/z_pub.c index 91e5799c2..5395865c3 100644 --- a/examples/unix/c99/z_pub.c +++ b/examples/unix/c99/z_pub.c @@ -102,6 +102,10 @@ int main(int argc, char **argv) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); @@ -109,7 +113,7 @@ int main(int argc, char **argv) { z_publisher_put_options_default(&options); options.encoding = z_encoding_move(&encoding); - z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), &options); + z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), &options); } // Clean up z_undeclare_publisher(z_publisher_move(&pub)); diff --git a/examples/unix/c99/z_pub_st.c b/examples/unix/c99/z_pub_st.c index d5d8220d8..22fc17e79 100644 --- a/examples/unix/c99/z_pub_st.c +++ b/examples/unix/c99/z_pub_st.c @@ -95,7 +95,12 @@ int main(int argc, char **argv) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); - z_publisher_put(z_publisher_loan(&pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL); ++idx; now = z_clock_now(); diff --git a/examples/unix/c99/z_put.c b/examples/unix/c99/z_put.c index 701432fa6..dff0186e1 100644 --- a/examples/unix/c99/z_put.c +++ b/examples/unix/c99/z_put.c @@ -93,6 +93,10 @@ int main(int argc, char **argv) { return -1; } + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, value); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); @@ -101,7 +105,7 @@ int main(int argc, char **argv) { z_put_options_t options; z_put_options_default(&options); options.encoding = z_encoding_move(&encoding); - if (z_put(z_session_loan(&s), z_keyexpr_loan(&ke), (const uint8_t *)value, strlen(value), &options) < 0) { + if (z_put(z_session_loan(&s), z_keyexpr_loan(&ke), z_bytes_move(&payload), &options) < 0) { printf("Oh no! Put has failed...\n"); } diff --git a/examples/windows/z_ping.c b/examples/windows/z_ping.c index 8dfd075ca..e45f7cdc6 100644 --- a/examples/windows/z_ping.c +++ b/examples/windows/z_ping.c @@ -105,7 +105,11 @@ int main(int argc, char** argv) { z_clock_t warmup_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < args.warmup_ms * 1000) { - z_publisher_put(z_loan(pub), data, args.size, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); z_condvar_wait(&cond, &mutex); elapsed_us = z_clock_elapsed_us(&warmup_start); } @@ -113,7 +117,12 @@ int main(int argc, char** argv) { unsigned long* results = z_malloc(sizeof(unsigned long) * args.number_of_pings); for (unsigned int i = 0; i < args.number_of_pings; i++) { z_clock_t measure_start = z_clock_now(); - z_publisher_put(z_loan(pub), data, args.size, NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, data, args.size); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); z_condvar_wait(&cond, &mutex); results[i] = z_clock_elapsed_us(&measure_start); } diff --git a/examples/windows/z_pong.c b/examples/windows/z_pong.c index c7e56e5d0..dd4c23858 100644 --- a/examples/windows/z_pong.c +++ b/examples/windows/z_pong.c @@ -20,7 +20,12 @@ void callback(const z_loaned_sample_t* sample, void* context) { const z_loaned_publisher_t* pub = z_loan(*(z_owned_publisher_t*)context); z_owned_slice_t value; z_bytes_deserialize_into_slice(z_sample_payload(sample), &value); - z_publisher_put(pub, z_slice_data(z_loan(value)), z_slice_len(z_loan(value)), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, z_slice_data(z_loan(value)), z_slice_len(z_loan(value))); + + z_publisher_put(pub, z_move(payload), NULL); z_drop(z_move(value)); } void drop(void* context) { diff --git a/examples/windows/z_pub.c b/examples/windows/z_pub.c index 231edeb66..13e748a22 100644 --- a/examples/windows/z_pub.c +++ b/examples/windows/z_pub.c @@ -64,6 +64,10 @@ int main(int argc, char **argv) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); @@ -71,7 +75,7 @@ int main(int argc, char **argv) { z_publisher_put_options_default(&options); options.encoding = z_move(encoding); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), &options); + z_publisher_put(z_loan(pub), z_move(payload), &options); } // Clean-up diff --git a/examples/windows/z_pub_st.c b/examples/windows/z_pub_st.c index 959fb2b6f..75c19e6bd 100644 --- a/examples/windows/z_pub_st.c +++ b/examples/windows/z_pub_st.c @@ -59,7 +59,12 @@ int main(int argc, char **argv) { if (z_clock_elapsed_ms(&now) > 1000) { snprintf(buf, 256, "[%4d] %s", idx, value); printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); ++idx; now = z_clock_now(); diff --git a/examples/windows/z_put.c b/examples/windows/z_put.c index e3c9b5982..4b3a76dda 100644 --- a/examples/windows/z_put.c +++ b/examples/windows/z_put.c @@ -59,6 +59,11 @@ int main(int argc, char **argv) { z_close(z_move(s)); return -1; } + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, value); + // Create encoding z_owned_encoding_t encoding; zp_encoding_make(&encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); @@ -67,7 +72,7 @@ int main(int argc, char **argv) { z_put_options_t options; z_put_options_default(&options); options.encoding = z_move(encoding); - if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)value, strlen(value), &options) < 0) { + if (z_put(z_loan(s), z_loan(ke), z_move(payload), &options) < 0) { printf("Oh no! Put has failed...\n"); } diff --git a/examples/zephyr/z_pub.c b/examples/zephyr/z_pub.c index 209f91dc5..441342e7a 100644 --- a/examples/zephyr/z_pub.c +++ b/examples/zephyr/z_pub.c @@ -71,7 +71,12 @@ int main(int argc, char **argv) { sleep(1); sprintf(buf, "[%4d] %s", idx, VALUE); printf("Putting Data ('%s': '%s')...\n", KEYEXPR, buf); - z_publisher_put(z_loan(pub), (const uint8_t *)buf, strlen(buf), NULL); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, buf); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); } printf("Closing Zenoh Session..."); diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 2c3dcc285..7b5167615 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1307,15 +1307,14 @@ void z_delete_options_default(z_delete_options_t *options); * Parameters: * zs: Pointer to a :c:type:`z_loaned_session_t` to put the data through. * keyexpr: Pointer to a :c:type:`z_loaned_keyexpr_t` to put the data for. - * payload: Pointer to the data to put. - * payload_len: The length of the ``payload``. + * payload: Pointer to a moved :c:type:`z_owned_bytes_t` containing the data to put. * options: Pointer to a :c:type:`z_put_options_t` to configure the operation. * * Return: * ``0`` if put operation successful, ``negative value`` otherwise. */ -int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, const uint8_t *payload, - z_zint_t payload_len, const z_put_options_t *options); +int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, + const z_put_options_t *options); /** * Deletes data for a given keyexpr. @@ -1389,12 +1388,13 @@ void z_publisher_delete_options_default(z_publisher_delete_options_t *options); * * Parameters: * pub: Pointer to a :c:type:`z_loaned_publisher_t` from where to put the data. + * payload: Pointer to a moved :c:type:`z_owned_bytes_t` containing the data to put. * options: Pointer to a :c:type:`z_publisher_put_options_t` to configure the operation. * * Return: * ``0`` if put operation successful, ``negative value`` otherwise. */ -int8_t z_publisher_put(const z_loaned_publisher_t *pub, const uint8_t *payload, size_t len, +int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload, const z_publisher_put_options_t *options); /** diff --git a/src/api/api.c b/src/api/api.c index c1ad88e1d..acd2da6ba 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -896,8 +896,8 @@ void z_delete_options_default(z_delete_options_t *options) { options->priority = Z_PRIORITY_DEFAULT; } -int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, const uint8_t *payload, - z_zint_t payload_len, const z_put_options_t *options) { +int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload, + const z_put_options_t *options) { int8_t ret = 0; z_put_options_t opt; @@ -909,17 +909,18 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co opt.attachment = options->attachment; } - ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, (const uint8_t *)payload, payload_len, + ret = _z_write(&_Z_RC_IN_VAL(zs), *keyexpr, payload->_val->_slice.start, payload->_val->_slice.len, _z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority, _z_bytes_from_owned_bytes(opt.attachment)); // Trigger local subscriptions - _z_trigger_local_subscriptions(&_Z_RC_IN_VAL(zs), *keyexpr, payload, payload_len, + _z_trigger_local_subscriptions(&_Z_RC_IN_VAL(zs), *keyexpr, payload->_val->_slice.start, payload->_val->_slice.len, _z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), _z_bytes_from_owned_bytes(opt.attachment)); // Clean-up z_encoding_drop(opt.encoding); z_bytes_drop(opt.attachment); + z_bytes_drop(payload); return ret; } @@ -994,7 +995,7 @@ void z_publisher_put_options_default(z_publisher_put_options_t *options) { void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->__dummy = 0; } -int8_t z_publisher_put(const z_loaned_publisher_t *pub, const uint8_t *payload, size_t len, +int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload, const z_publisher_put_options_t *options) { int8_t ret = 0; // Build options @@ -1007,16 +1008,17 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, const uint8_t *payload, // Check if write filter is active before writing if (!_z_write_filter_active(pub)) { // Write value - ret = _z_write(&pub->_zn.in->val, pub->_key, payload, len, _z_encoding_from_owned(opt.encoding), - Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, - _z_bytes_from_owned_bytes(opt.attachment)); + ret = _z_write(&pub->_zn.in->val, pub->_key, payload->_val->_slice.start, payload->_val->_slice.len, + _z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control, + pub->_priority, _z_bytes_from_owned_bytes(opt.attachment)); } // Trigger local subscriptions - _z_trigger_local_subscriptions(&pub->_zn.in->val, pub->_key, payload, len, _Z_N_QOS_DEFAULT, - _z_bytes_from_owned_bytes(opt.attachment)); + _z_trigger_local_subscriptions(&pub->_zn.in->val, pub->_key, payload->_val->_slice.start, payload->_val->_slice.len, + _Z_N_QOS_DEFAULT, _z_bytes_from_owned_bytes(opt.attachment)); // Clean-up z_encoding_drop(opt.encoding); z_bytes_drop(opt.attachment); + z_bytes_drop(payload); return ret; } diff --git a/tests/z_api_alignment_test.c b/tests/z_api_alignment_test.c index 7a1d233f2..be7bc2c36 100644 --- a/tests/z_api_alignment_test.c +++ b/tests/z_api_alignment_test.c @@ -22,7 +22,6 @@ #define URI "demo/example/**/*" #if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) #include -#define sleep(x) Sleep(x * 1000) #else #include #endif @@ -163,7 +162,7 @@ int main(int argc, char **argv) { assert(!_ret_bool); #endif - sleep(SLEEP); + z_sleep_s(SLEEP); size_t keyexpr_len = strlen(URI); char *keyexpr_str = (char *)z_malloc(keyexpr_len + 1); @@ -186,7 +185,7 @@ int main(int argc, char **argv) { #endif printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Testing Configs..."); z_owned_config_t _ret_config; @@ -216,7 +215,7 @@ int main(int argc, char **argv) { z_drop(z_move(_ret_sconfig)); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Testing Scouting..."); z_scouting_config_from(&_ret_sconfig, z_loan(_ret_config)); @@ -245,13 +244,13 @@ int main(int argc, char **argv) { z_closure(&_ret_closure_zid, zid_handler, NULL, NULL); _ret_int8 = z_info_peers_zid(z_loan(s1), z_move(_ret_closure_zid)); assert_eq(_ret_int8, 0); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(zids, 0); _ret_int8 = z_info_routers_zid(z_loan(s1), z_move(_ret_closure_zid)); assert_eq(_ret_int8, 0); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(zids, 1); #ifdef ZENOH_PICO @@ -263,7 +262,7 @@ int main(int argc, char **argv) { zp_start_lease_task(z_loan_mut(s1), &_ret_lease_opt); #endif - sleep(SLEEP); + z_sleep_s(SLEEP); z_config_default(&_ret_config); #ifdef ZENOH_PICO @@ -289,7 +288,7 @@ int main(int argc, char **argv) { zp_start_lease_task(z_loan_mut(s2), NULL); #endif - sleep(SLEEP); + z_sleep_s(SLEEP); const z_loaned_session_t *ls1 = z_loan(s1); printf("Declaring Subscriber..."); @@ -305,7 +304,7 @@ int main(int argc, char **argv) { assert(_ret_int8 == _Z_RES_OK); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); char s1_res[64]; sprintf(s1_res, "%s/chunk/%d", keyexpr_str, 1); @@ -323,11 +322,16 @@ int main(int argc, char **argv) { z_owned_encoding_t _ret_encoding; zp_encoding_make(&_ret_encoding, Z_ENCODING_ID_TEXT_PLAIN, NULL); _ret_put_opt.encoding = z_move(_ret_encoding); - _ret_int8 = z_put(z_loan(s1), z_loan(_ret_expr), (const uint8_t *)value, strlen(value), &_ret_put_opt); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_string(&payload, value); + + _ret_int8 = z_put(z_loan(s1), z_loan(_ret_expr), z_move(payload), &_ret_put_opt); assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(datas, 1); printf("Session delete..."); @@ -338,7 +342,7 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(datas, 2); printf("Undeclaring Keyexpr..."); @@ -357,16 +361,19 @@ int main(int argc, char **argv) { assert(_ret_int8 == _Z_RES_OK); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Publisher Put..."); + // Create payload + z_bytes_serialize_from_string(&payload, value); + z_publisher_put_options_t _ret_pput_opt; z_publisher_put_options_default(&_ret_pput_opt); - _ret_int8 = z_publisher_put(z_loan(_ret_pub), (const uint8_t *)value, strlen(value), &_ret_pput_opt); + _ret_int8 = z_publisher_put(z_loan(_ret_pub), z_move(payload), &_ret_pput_opt); assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(datas, 3); printf("Publisher Delete..."); @@ -376,7 +383,7 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(datas, 4); printf("Undeclaring Publisher..."); @@ -385,7 +392,7 @@ int main(int argc, char **argv) { assert(!z_check(_ret_pub)); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Undeclaring Subscriber..."); _ret_int8 = z_undeclare_subscriber(z_move(_ret_sub)); @@ -393,7 +400,7 @@ int main(int argc, char **argv) { assert(!z_check(_ret_sub)); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Declaring Queryable..."); z_owned_closure_query_t _ret_closure_query; @@ -405,7 +412,7 @@ int main(int argc, char **argv) { _Z_RES_OK); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); printf("Testing Consolidations..."); const z_loaned_session_t *ls2 = z_loan(s2); @@ -431,7 +438,7 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP); + z_sleep_s(SLEEP); assert_eq(queries, 1); assert_eq(replies, 1); @@ -457,9 +464,9 @@ int main(int argc, char **argv) { assert_eq(_ret_int8, 0); printf("Ok\n"); - sleep(SLEEP * 5); + z_sleep_s(SLEEP * 5); - free(keyexpr_str); + z_free(keyexpr_str); return 0; } diff --git a/tests/z_client_test.c b/tests/z_client_test.c index 4a8813e3d..206573196 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -231,8 +231,8 @@ int main(int argc, char **argv) { // Write data from first session size_t len = MSG_LEN; - uint8_t *payload = (uint8_t *)z_malloc(len); - memset(payload, 1, MSG_LEN); + uint8_t *value = (uint8_t *)z_malloc(len); + memset(value, 1, MSG_LEN); total = MSG * SET; for (unsigned int n = 0; n < MSG; n++) { @@ -240,7 +240,12 @@ int main(int argc, char **argv) { z_put_options_t opt; z_put_options_default(&opt); opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; - z_put(z_loan(s1), z_loan(rids1[i]), (const uint8_t *)payload, len, &opt); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, len); + + z_put(z_loan(s1), z_loan(rids1[i]), z_move(payload), &opt); printf("Wrote data from session 1: %u %zu b\t(%u/%u)\n", z_loan(rids1[i])->_id, len, n * SET + (i + 1), total); } @@ -264,10 +269,10 @@ int main(int argc, char **argv) { // Write fragment data from first session if (is_reliable) { - z_free((uint8_t *)payload); + z_free((uint8_t *)value); len = FRAGMENT_MSG_LEN; - payload = (uint8_t *)z_malloc(len); - memset(payload, 1, FRAGMENT_MSG_LEN); + value = (uint8_t *)z_malloc(len); + memset(value, 1, FRAGMENT_MSG_LEN); total = FRAGMENT_MSG_NB * SET; for (unsigned int n = 0; n < FRAGMENT_MSG_NB; n++) { @@ -275,7 +280,12 @@ int main(int argc, char **argv) { z_put_options_t opt; z_put_options_default(&opt); opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; - z_put(z_loan(s1), z_loan(rids1[i]), (const uint8_t *)payload, len, &opt); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, len); + + z_put(z_loan(s1), z_loan(rids1[i]), z_move(payload), &opt); printf("Wrote fragment data from session 1: %u %zu b\t(%u/%u)\n", z_loan(rids1[i])->_id, len, n * SET + (i + 1), total); } @@ -399,8 +409,8 @@ int main(int argc, char **argv) { printf("Closing session 2\n"); z_close(z_move(s2)); - z_free((uint8_t *)payload); - payload = NULL; + z_free((uint8_t *)value); + value = NULL; free(s1_res); diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index 7413cfb0d..66b7034c1 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -128,8 +128,8 @@ int main(int argc, char **argv) { // Write data from first session size_t len = MSG_LEN; - uint8_t *payload = (uint8_t *)z_malloc(len); - memset(payload, 1, MSG_LEN); + uint8_t *value = (uint8_t *)z_malloc(len); + memset(value, 1, MSG_LEN); total = MSG * SET; for (unsigned int n = 0; n < MSG; n++) { @@ -141,7 +141,12 @@ int main(int argc, char **argv) { opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK; z_view_keyexpr_t ke; z_view_keyexpr_from_string(&ke, s1_res); - z_put(z_loan(s1), z_loan(ke), (const uint8_t *)payload, len, &opt); + + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, len); + + z_put(z_loan(s1), z_loan(ke), z_move(payload), &opt); printf("Wrote data from session 1: %s %zu b\t(%u/%u)\n", s1_res, len, n * SET + (i + 1), total); } } @@ -191,8 +196,8 @@ int main(int argc, char **argv) { printf("Closing session 2\n"); z_close(z_move(s2)); - z_free((uint8_t *)payload); - payload = NULL; + z_free((uint8_t *)value); + value = NULL; free(s1_res); diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c index be324d3b7..de02b06b5 100644 --- a/tests/z_perf_tx.c +++ b/tests/z_perf_tx.c @@ -26,7 +26,11 @@ int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value z_clock_t test_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < TEST_DURATION_US) { - z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, pkt_len); + + z_publisher_put(z_loan(*pub), z_move(payload), NULL); elapsed_us = z_clock_elapsed_us(&test_start); } return 0; @@ -97,7 +101,11 @@ int main(int argc, char **argv) { } // Send end packet printf("Sending end pkt\n"); - z_publisher_put(z_loan(pub), (const uint8_t *)value, 1, NULL); + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, 1); + + z_publisher_put(z_loan(pub), z_move(payload), NULL); // Clean up z_undeclare_publisher(z_move(pub)); zp_stop_read_task(z_loan_mut(s)); diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index 0e6658565..6e19e0eab 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -77,8 +77,12 @@ int main(int argc, char **argv) { z_view_keyexpr_t ke; z_view_keyexpr_from_string(&ke, keyexpr); for (int i = 0; i < 5; i++) { + // Create payload + z_owned_bytes_t payload; + z_bytes_serialize_from_slice(&payload, value, size); + printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); - if (z_put(z_loan(s), z_loan(ke), (const uint8_t *)value, size, NULL) < 0) { + if (z_put(z_loan(s), z_loan(ke), z_move(payload), NULL) < 0) { printf("Oh no! Put has failed...\n"); } z_sleep_s(1);