From a0f01131188257ca9ea031c754f242f70823ee70 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Fri, 11 Aug 2023 12:01:35 +0200 Subject: [PATCH] re-support multicast and datagram transports --- include/zenoh-pico/config.h | 16 +++++++++---- .../zenoh-pico/protocol/definitions/core.h | 3 ++- .../protocol/definitions/transport.h | 4 ++-- src/net/session.c | 1 - src/protocol/codec/message.c | 2 +- src/protocol/codec/transport.c | 23 ++++--------------- src/protocol/definitions/transport.c | 14 +++++------ src/protocol/iobuf.c | 6 ++++- src/session/scout.c | 4 ++-- src/transport/common/rx.c | 2 +- src/transport/common/tx.c | 2 +- src/transport/multicast/link/rx.c | 4 ++-- src/transport/transport.c | 17 +++++++------- zenohpico.pc | 2 +- 14 files changed, 49 insertions(+), 51 deletions(-) diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 5ce74f664..23a929993 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -12,8 +12,8 @@ // ZettaScale Zenoh Team, // -#ifndef ZENOH_PICO_CONFIG_H -#define ZENOH_PICO_CONFIG_H +#ifndef INCLUDE_ZENOH_PICO_CONFIG_H +#define INCLUDE_ZENOH_PICO_CONFIG_H /*------------------ Runtime configuration properties ------------------*/ /** @@ -225,8 +225,14 @@ /** * Defaulf maximum batch size possible to be received or sent. */ -#ifndef Z_BATCH_SIZE -#define Z_BATCH_SIZE 65535 +#ifndef Z_BATCH_UNICAST_SIZE +#define Z_BATCH_UNICAST_SIZE 65535 +#endif +/** + * Defaulf maximum batch size possible to be received or sent. + */ +#ifndef Z_BATCH_MULTICAST_SIZE +#define Z_BATCH_MULTICAST_SIZE 8192 #endif /** @@ -265,4 +271,4 @@ #endif #endif -#endif /* ZENOH_PICO_CONFIG_H */ +#endif /* INCLUDE_ZENOH_PICO_CONFIG_H */ diff --git a/include/zenoh-pico/protocol/definitions/core.h b/include/zenoh-pico/protocol/definitions/core.h index 5c97d1427..58d16a430 100644 --- a/include/zenoh-pico/protocol/definitions/core.h +++ b/include/zenoh-pico/protocol/definitions/core.h @@ -17,7 +17,8 @@ #include "zenoh-pico/protocol/core.h" -#define _Z_DEFAULT_BATCH_SIZE 65535 +#define _Z_DEFAULT_UNICAST_BATCH_SIZE 65535 +#define _Z_DEFAULT_MULTICAST_BATCH_SIZE 8192 #define _Z_DEFAULT_RESOLUTION_SIZE 2 #define _Z_DECLARE_CLEAR(layer, name) void _z_##layer##_msg_clear_##name(_z_##name##_t *m, uint8_t header) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 521efd821..c3f1b3270 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -53,8 +53,8 @@ // T Lease period if T==1 then the lease period is in seconds else in milliseconds // S Size params if S==1 then size parameters are exchanged // Z Extensions if Z==1 then Zenoh extensions are present -#define _Z_FLAG_T_JOIN_T 0x40 // 1 << 6 -#define _Z_FLAG_T_JOIN_S 0x20 // 1 << 5 +#define _Z_FLAG_T_JOIN_T 0x20 // 1 << 5 +#define _Z_FLAG_T_JOIN_S 0x40 // 1 << 6 // Init message flags: // A Ack if A==1 then the message is an acknowledgment (aka InitAck), otherwise InitSyn diff --git a/src/net/session.c b/src/net/session.c index 7f60ac90c..cf1f73407 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -43,7 +43,6 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) { #else ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; #endif - if (ret == _Z_RES_OK) { ret = _z_session_init(zn, &local_zid); } diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 52658cf98..33de0a169 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -869,7 +869,7 @@ int8_t _z_scouting_message_decode_na(_z_scouting_message_t *msg, _z_zbuf_t *zbf) } break; default: { - _Z_DEBUG("WARNING: Trying to decode session message with unknown ID(%d)\n", mid); + _Z_DEBUG("WARNING: Trying to decode scouting message with unknown ID(0x%x)\n", mid); ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN; is_last = true; } break; diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 9c7281b2f..062277814 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -123,7 +123,7 @@ int8_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) { } else { msg->_seq_num_res = _Z_DEFAULT_RESOLUTION_SIZE; msg->_req_id_res = _Z_DEFAULT_RESOLUTION_SIZE; - msg->_batch_size = _Z_DEFAULT_BATCH_SIZE; + msg->_batch_size = _Z_DEFAULT_MULTICAST_BATCH_SIZE; } } if (ret == _Z_RES_OK) { @@ -203,7 +203,7 @@ int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) { } else { msg->_seq_num_res = _Z_DEFAULT_RESOLUTION_SIZE; msg->_req_id_res = _Z_DEFAULT_RESOLUTION_SIZE; - msg->_batch_size = _Z_DEFAULT_BATCH_SIZE; + msg->_batch_size = _Z_DEFAULT_UNICAST_BATCH_SIZE; } if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_INIT_A) == true)) { @@ -390,7 +390,7 @@ int8_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fragmen int8_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) { int8_t ret = _Z_RES_OK; *msg = (_z_t_msg_fragment_t){0}; - + _Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT\n"); ret |= _z_zint_decode(&msg->_sn, zbf); @@ -441,31 +441,24 @@ int8_t _z_transport_message_encode(_z_wbuf_t *wbf, const _z_transport_message_t case _Z_MID_T_FRAME: { ret |= _z_frame_encode(wbf, msg->_header, &msg->_body._frame); } break; - case _Z_MID_T_FRAGMENT: { ret |= _z_fragment_encode(wbf, msg->_header, &msg->_body._fragment); } break; - case _Z_MID_T_KEEP_ALIVE: { ret |= _z_keep_alive_encode(wbf, msg->_header, &msg->_body._keep_alive); } break; - case _Z_MID_T_JOIN: { ret |= _z_join_encode(wbf, msg->_header, &msg->_body._join); } break; - case _Z_MID_T_INIT: { ret |= _z_init_encode(wbf, msg->_header, &msg->_body._init); } break; - case _Z_MID_T_OPEN: { ret |= _z_open_encode(wbf, msg->_header, &msg->_body._open); } break; - case _Z_MID_T_CLOSE: { ret |= _z_close_encode(wbf, msg->_header, &msg->_body._close); } break; - default: { _Z_DEBUG("WARNING: Trying to encode session message with unknown ID(%d)\n", _Z_MID(msg->_header)); ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN; @@ -485,33 +478,27 @@ int8_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf) case _Z_MID_T_FRAME: { ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header); } break; - case _Z_MID_T_FRAGMENT: { ret |= _z_fragment_decode(&msg->_body._fragment, zbf, msg->_header); } break; - case _Z_MID_T_KEEP_ALIVE: { ret |= _z_keep_alive_decode(&msg->_body._keep_alive, zbf, msg->_header); } break; - case _Z_MID_T_JOIN: { ret |= _z_join_decode(&msg->_body._join, zbf, msg->_header); } break; - case _Z_MID_T_INIT: { ret |= _z_init_decode(&msg->_body._init, zbf, msg->_header); } break; - case _Z_MID_T_OPEN: { ret |= _z_open_decode(&msg->_body._open, zbf, msg->_header); } break; - case _Z_MID_T_CLOSE: { ret |= _z_close_decode(&msg->_body._close, zbf, msg->_header); } break; - default: { - _Z_DEBUG("WARNING: Trying to decode session message with unknown ID(%d)\n", mid); + _Z_DEBUG("WARNING: Trying to decode session message with unknown ID(0x%x) (header=0x%x)\n", mid, + msg->_header); ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN; } break; } diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2af2f11e0..f8c684c1a 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -71,7 +71,7 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._lease = lease; msg._body._join._seq_num_res = Z_SN_RESOLUTION; msg._body._join._req_id_res = Z_REQ_RESOLUTION; - msg._body._join._batch_size = Z_BATCH_SIZE; + msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; @@ -79,8 +79,8 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); } - if ((Z_BATCH_SIZE != _Z_DEFAULT_BATCH_SIZE) || (Z_SN_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE) || - (Z_REQ_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE)) { + if ((Z_BATCH_MULTICAST_SIZE != _Z_DEFAULT_MULTICAST_BATCH_SIZE) || + (Z_SN_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE) || (Z_REQ_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE)) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } @@ -101,10 +101,10 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._zid = zid; msg._body._init._seq_num_res = Z_SN_RESOLUTION; msg._body._init._req_id_res = Z_REQ_RESOLUTION; - msg._body._init._batch_size = Z_BATCH_SIZE; + msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_bytes_reset(&msg._body._init._cookie); - if ((msg._body._init._batch_size != _Z_DEFAULT_BATCH_SIZE) || + if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || (msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); @@ -123,10 +123,10 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._zid = zid; msg._body._init._seq_num_res = Z_SN_RESOLUTION; msg._body._init._req_id_res = Z_REQ_RESOLUTION; - msg._body._init._batch_size = Z_BATCH_SIZE; + msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; - if ((msg._body._init._batch_size != _Z_DEFAULT_BATCH_SIZE) || + if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || (msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index 3af776758..3d44e7b69 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -41,6 +41,10 @@ void __z_iosli_init(_z_iosli_t *ios, size_t capacity) { ios->_capacity = capacity; ios->_is_alloc = true; ios->_buf = (uint8_t *)z_malloc(capacity); + if (ios->_buf == NULL) { + ios->_capacity = 0; + ios->_is_alloc = false; + } } _z_iosli_t _z_iosli_make(size_t capacity) { @@ -257,7 +261,7 @@ _z_wbuf_t _z_wbuf_make(size_t capacity, _Bool is_expandable) { // Preallocate 4 slots, this is usually what we expect // when fragmenting a zenoh data message with attachment wbf._ioss = _z_iosli_vec_make(4); - _z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(Z_IOSLICE_SIZE)); + _z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity)); } else { wbf._ioss = _z_iosli_vec_make(1); _z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity)); diff --git a/src/session/scout.c b/src/session/scout.c index d7e34eb7e..7a45a3ecc 100644 --- a/src/session/scout.c +++ b/src/session/scout.c @@ -52,7 +52,7 @@ _z_hello_list_t *__z_scout_loop(const _z_wbuf_t *wbf, const char *locator, unsig err = _z_link_send_wbuf(&zl, wbf); if (err == _Z_RES_OK) { // The receiving buffer - _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_SIZE); + _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); z_time_t start = z_time_now(); while (z_time_elapsed_ms(&start) < period) { @@ -132,7 +132,7 @@ _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t zid, const char *lo _z_hello_list_t *ret = NULL; // Create the buffer to serialize the scout message on - _z_wbuf_t wbf = _z_wbuf_make(Z_BATCH_SIZE, false); + _z_wbuf_t wbf = _z_wbuf_make(Z_BATCH_UNICAST_SIZE, false); // Create and encode the scout message _z_scouting_message_t scout = _z_s_msg_make_scout(what, zid); diff --git a/src/transport/common/rx.c b/src/transport/common/rx.c index 32f09902b..5a7d2e45a 100644 --- a/src/transport/common/rx.c +++ b/src/transport/common/rx.c @@ -24,7 +24,7 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) { int8_t ret = _Z_RES_OK; // Create and prepare the buffer - _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_SIZE); + _z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); _z_zbuf_reset(&zbf); if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index d0340bdb5..0ac199e80 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -73,7 +73,7 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m int8_t ret = _Z_RES_OK; // Create and prepare the buffer to serialize the message on - uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE; + uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; _z_wbuf_t wbf = _z_wbuf_make(mtu, false); if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) { for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) { diff --git a/src/transport/multicast/link/rx.c b/src/transport/multicast/link/rx.c index 6c90b0b92..28dab94e0 100644 --- a/src/transport/multicast/link/rx.c +++ b/src/transport/multicast/link/rx.c @@ -241,7 +241,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t // If the new node has less representing capabilities then it is incompatible to communication if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) || (t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) || - (t_msg->_body._join._batch_size != Z_BATCH_SIZE)) { + (t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } @@ -278,7 +278,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t // Check if the representing capabilities are still the same if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) || (t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) || - (t_msg->_body._join._batch_size != Z_BATCH_SIZE)) { + (t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) { _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); // TODO: cleanup here should also be done on mappings/subs/etc... break; diff --git a/src/transport/transport.c b/src/transport/transport.c index 7ee258063..897eb1b18 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -19,6 +19,7 @@ #include #include "zenoh-pico/config.h" +#include "zenoh-pico/link/link.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/transport/link/rx.h" #include "zenoh-pico/transport/link/tx.h" @@ -90,8 +91,8 @@ int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unic // Initialize the read and write buffers if (ret == _Z_RES_OK) { - uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE; - _Bool expandable = true; + uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; + _Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities); size_t dbuf_size = 0; #if Z_DYNAMIC_MEMORY_ALLOCATION == 0 @@ -99,7 +100,7 @@ int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unic dbuf_size = Z_FRAG_MAX_SIZE; #endif zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, expandable); - zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_SIZE); + zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); // Initialize the defragmentation buffers zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); @@ -107,7 +108,7 @@ int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unic // Clean up the buffers if one of them failed to be allocated if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_SIZE) || + (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) || #if Z_DYNAMIC_MEMORY_ALLOCATION == 0 (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { @@ -195,13 +196,13 @@ int8_t _z_transport_multicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_mu // Initialize the read and write buffers if (ret == _Z_RES_OK) { - uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE; - zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, true); - zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_SIZE); + uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; + zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, _Z_LINK_IS_STREAMED(zl->_capabilities)); + zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); // Clean up the buffers if one of them failed to be allocated if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_SIZE)) { + (_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; #if Z_MULTI_THREAD == 1 diff --git a/zenohpico.pc b/zenohpico.pc index 60d1fd439..9be6ba388 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.10.20230809dev +Version: 0.10.20230811dev Cflags: -I${prefix}/ Libs: -L${prefix}/ -lzenohpico