Skip to content

Commit

Permalink
re-support multicast and datagram transports
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 11, 2023
1 parent 6c3ae1a commit a0f0113
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 51 deletions.
16 changes: 11 additions & 5 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_CONFIG_H
#define ZENOH_PICO_CONFIG_H
#ifndef INCLUDE_ZENOH_PICO_CONFIG_H
#define INCLUDE_ZENOH_PICO_CONFIG_H

/*------------------ Runtime configuration properties ------------------*/
/**
Expand Down Expand Up @@ -225,8 +225,14 @@
/**
* Defaulf maximum batch size possible to be received or sent.
*/
#ifndef Z_BATCH_SIZE
#define Z_BATCH_SIZE 65535
#ifndef Z_BATCH_UNICAST_SIZE
#define Z_BATCH_UNICAST_SIZE 65535
#endif
/**
* Defaulf maximum batch size possible to be received or sent.
*/
#ifndef Z_BATCH_MULTICAST_SIZE
#define Z_BATCH_MULTICAST_SIZE 8192
#endif

/**
Expand Down Expand Up @@ -265,4 +271,4 @@
#endif
#endif

#endif /* ZENOH_PICO_CONFIG_H */
#endif /* INCLUDE_ZENOH_PICO_CONFIG_H */
3 changes: 2 additions & 1 deletion include/zenoh-pico/protocol/definitions/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

#include "zenoh-pico/protocol/core.h"

#define _Z_DEFAULT_BATCH_SIZE 65535
#define _Z_DEFAULT_UNICAST_BATCH_SIZE 65535
#define _Z_DEFAULT_MULTICAST_BATCH_SIZE 8192
#define _Z_DEFAULT_RESOLUTION_SIZE 2

#define _Z_DECLARE_CLEAR(layer, name) void _z_##layer##_msg_clear_##name(_z_##name##_t *m, uint8_t header)
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
// T Lease period if T==1 then the lease period is in seconds else in milliseconds
// S Size params if S==1 then size parameters are exchanged
// Z Extensions if Z==1 then Zenoh extensions are present
#define _Z_FLAG_T_JOIN_T 0x40 // 1 << 6
#define _Z_FLAG_T_JOIN_S 0x20 // 1 << 5
#define _Z_FLAG_T_JOIN_T 0x20 // 1 << 5
#define _Z_FLAG_T_JOIN_S 0x40 // 1 << 6

// Init message flags:
// A Ack if A==1 then the message is an acknowledgment (aka InitAck), otherwise InitSyn
Expand Down
1 change: 0 additions & 1 deletion src/net/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) {
#else
ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE;
#endif

if (ret == _Z_RES_OK) {
ret = _z_session_init(zn, &local_zid);
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ int8_t _z_scouting_message_decode_na(_z_scouting_message_t *msg, _z_zbuf_t *zbf)
} break;

default: {
_Z_DEBUG("WARNING: Trying to decode session message with unknown ID(%d)\n", mid);
_Z_DEBUG("WARNING: Trying to decode scouting message with unknown ID(0x%x)\n", mid);
ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN;
is_last = true;
} break;
Expand Down
23 changes: 5 additions & 18 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ int8_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) {
} else {
msg->_seq_num_res = _Z_DEFAULT_RESOLUTION_SIZE;
msg->_req_id_res = _Z_DEFAULT_RESOLUTION_SIZE;
msg->_batch_size = _Z_DEFAULT_BATCH_SIZE;
msg->_batch_size = _Z_DEFAULT_MULTICAST_BATCH_SIZE;
}
}
if (ret == _Z_RES_OK) {
Expand Down Expand Up @@ -203,7 +203,7 @@ int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) {
} else {
msg->_seq_num_res = _Z_DEFAULT_RESOLUTION_SIZE;
msg->_req_id_res = _Z_DEFAULT_RESOLUTION_SIZE;
msg->_batch_size = _Z_DEFAULT_BATCH_SIZE;
msg->_batch_size = _Z_DEFAULT_UNICAST_BATCH_SIZE;
}

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_INIT_A) == true)) {
Expand Down Expand Up @@ -390,7 +390,7 @@ int8_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fragmen
int8_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) {
int8_t ret = _Z_RES_OK;
*msg = (_z_t_msg_fragment_t){0};

_Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT\n");
ret |= _z_zint_decode(&msg->_sn, zbf);

Expand Down Expand Up @@ -441,31 +441,24 @@ int8_t _z_transport_message_encode(_z_wbuf_t *wbf, const _z_transport_message_t
case _Z_MID_T_FRAME: {
ret |= _z_frame_encode(wbf, msg->_header, &msg->_body._frame);
} break;

case _Z_MID_T_FRAGMENT: {
ret |= _z_fragment_encode(wbf, msg->_header, &msg->_body._fragment);
} break;

case _Z_MID_T_KEEP_ALIVE: {
ret |= _z_keep_alive_encode(wbf, msg->_header, &msg->_body._keep_alive);
} break;

case _Z_MID_T_JOIN: {
ret |= _z_join_encode(wbf, msg->_header, &msg->_body._join);
} break;

case _Z_MID_T_INIT: {
ret |= _z_init_encode(wbf, msg->_header, &msg->_body._init);
} break;

case _Z_MID_T_OPEN: {
ret |= _z_open_encode(wbf, msg->_header, &msg->_body._open);
} break;

case _Z_MID_T_CLOSE: {
ret |= _z_close_encode(wbf, msg->_header, &msg->_body._close);
} break;

default: {
_Z_DEBUG("WARNING: Trying to encode session message with unknown ID(%d)\n", _Z_MID(msg->_header));
ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN;
Expand All @@ -485,33 +478,27 @@ int8_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *zbf)
case _Z_MID_T_FRAME: {
ret |= _z_frame_decode(&msg->_body._frame, zbf, msg->_header);
} break;

case _Z_MID_T_FRAGMENT: {
ret |= _z_fragment_decode(&msg->_body._fragment, zbf, msg->_header);
} break;

case _Z_MID_T_KEEP_ALIVE: {
ret |= _z_keep_alive_decode(&msg->_body._keep_alive, zbf, msg->_header);
} break;

case _Z_MID_T_JOIN: {
ret |= _z_join_decode(&msg->_body._join, zbf, msg->_header);
} break;

case _Z_MID_T_INIT: {
ret |= _z_init_decode(&msg->_body._init, zbf, msg->_header);
} break;

case _Z_MID_T_OPEN: {
ret |= _z_open_decode(&msg->_body._open, zbf, msg->_header);
} break;

case _Z_MID_T_CLOSE: {
ret |= _z_close_decode(&msg->_body._close, zbf, msg->_header);
} break;

default: {
_Z_DEBUG("WARNING: Trying to decode session message with unknown ID(%d)\n", mid);
_Z_DEBUG("WARNING: Trying to decode session message with unknown ID(0x%x) (header=0x%x)\n", mid,
msg->_header);
ret |= _Z_ERR_MESSAGE_TRANSPORT_UNKNOWN;
} break;
}
Expand Down
14 changes: 7 additions & 7 deletions src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease,
msg._body._join._lease = lease;
msg._body._join._seq_num_res = Z_SN_RESOLUTION;
msg._body._join._req_id_res = Z_REQ_RESOLUTION;
msg._body._join._batch_size = Z_BATCH_SIZE;
msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE;
msg._body._join._next_sn = next_sn;
msg._body._join._zid = zid;

if ((lease % 1000) == 0) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T);
}

if ((Z_BATCH_SIZE != _Z_DEFAULT_BATCH_SIZE) || (Z_SN_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE) ||
(Z_REQ_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE)) {
if ((Z_BATCH_MULTICAST_SIZE != _Z_DEFAULT_MULTICAST_BATCH_SIZE) ||
(Z_SN_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE) || (Z_REQ_RESOLUTION != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S);
}

Expand All @@ -101,10 +101,10 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid)
msg._body._init._zid = zid;
msg._body._init._seq_num_res = Z_SN_RESOLUTION;
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_SIZE;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
_z_bytes_reset(&msg._body._init._cookie);

if ((msg._body._init._batch_size != _Z_DEFAULT_BATCH_SIZE) ||
if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
Expand All @@ -123,10 +123,10 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid,
msg._body._init._zid = zid;
msg._body._init._seq_num_res = Z_SN_RESOLUTION;
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_SIZE;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
msg._body._init._cookie = cookie;

if ((msg._body._init._batch_size != _Z_DEFAULT_BATCH_SIZE) ||
if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
Expand Down
6 changes: 5 additions & 1 deletion src/protocol/iobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ void __z_iosli_init(_z_iosli_t *ios, size_t capacity) {
ios->_capacity = capacity;
ios->_is_alloc = true;
ios->_buf = (uint8_t *)z_malloc(capacity);
if (ios->_buf == NULL) {
ios->_capacity = 0;
ios->_is_alloc = false;
}
}

_z_iosli_t _z_iosli_make(size_t capacity) {
Expand Down Expand Up @@ -257,7 +261,7 @@ _z_wbuf_t _z_wbuf_make(size_t capacity, _Bool is_expandable) {
// Preallocate 4 slots, this is usually what we expect
// when fragmenting a zenoh data message with attachment
wbf._ioss = _z_iosli_vec_make(4);
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(Z_IOSLICE_SIZE));
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity));
} else {
wbf._ioss = _z_iosli_vec_make(1);
_z_wbuf_add_iosli(&wbf, __z_wbuf_new_iosli(capacity));
Expand Down
4 changes: 2 additions & 2 deletions src/session/scout.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ _z_hello_list_t *__z_scout_loop(const _z_wbuf_t *wbf, const char *locator, unsig
err = _z_link_send_wbuf(&zl, wbf);
if (err == _Z_RES_OK) {
// The receiving buffer
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_SIZE);
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);

z_time_t start = z_time_now();
while (z_time_elapsed_ms(&start) < period) {
Expand Down Expand Up @@ -132,7 +132,7 @@ _z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t zid, const char *lo
_z_hello_list_t *ret = NULL;

// Create the buffer to serialize the scout message on
_z_wbuf_t wbf = _z_wbuf_make(Z_BATCH_SIZE, false);
_z_wbuf_t wbf = _z_wbuf_make(Z_BATCH_UNICAST_SIZE, false);

// Create and encode the scout message
_z_scouting_message_t scout = _z_s_msg_make_scout(what, zid);
Expand Down
2 changes: 1 addition & 1 deletion src/transport/common/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) {
int8_t ret = _Z_RES_OK;

// Create and prepare the buffer
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_SIZE);
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
_z_zbuf_reset(&zbf);

if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
int8_t ret = _Z_RES_OK;

// Create and prepare the buffer to serialize the message on
uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE;
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);
if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
Expand Down
4 changes: 2 additions & 2 deletions src/transport/multicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
// If the new node has less representing capabilities then it is incompatible to communication
if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) ||
(t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) ||
(t_msg->_body._join._batch_size != Z_BATCH_SIZE)) {
(t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) {
ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION;
}

Expand Down Expand Up @@ -278,7 +278,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
// Check if the representing capabilities are still the same
if ((t_msg->_body._join._seq_num_res != Z_SN_RESOLUTION) ||
(t_msg->_body._join._req_id_res != Z_REQ_RESOLUTION) ||
(t_msg->_body._join._batch_size != Z_BATCH_SIZE)) {
(t_msg->_body._join._batch_size != Z_BATCH_MULTICAST_SIZE)) {
_z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry);
// TODO: cleanup here should also be done on mappings/subs/etc...
break;
Expand Down
17 changes: 9 additions & 8 deletions src/transport/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stdlib.h>

#include "zenoh-pico/config.h"
#include "zenoh-pico/link/link.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/transport/link/rx.h"
#include "zenoh-pico/transport/link/tx.h"
Expand Down Expand Up @@ -90,24 +91,24 @@ int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unic

// Initialize the read and write buffers
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE;
_Bool expandable = true;
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_Bool expandable = _Z_LINK_IS_STREAMED(zl->_capabilities);
size_t dbuf_size = 0;

#if Z_DYNAMIC_MEMORY_ALLOCATION == 0
expandable = false;
dbuf_size = Z_FRAG_MAX_SIZE;
#endif
zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, expandable);
zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_SIZE);
zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);

// Initialize the defragmentation buffers
zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable);
zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_SIZE) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) ||
#if Z_DYNAMIC_MEMORY_ALLOCATION == 0
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) ||
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) {
Expand Down Expand Up @@ -195,13 +196,13 @@ int8_t _z_transport_multicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_mu

// Initialize the read and write buffers
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_SIZE) ? zl->_mtu : Z_BATCH_SIZE;
zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, true);
zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_SIZE);
uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE;
zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, _Z_LINK_IS_STREAMED(zl->_capabilities));
zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) ||
(_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_SIZE)) {
(_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;

#if Z_MULTI_THREAD == 1
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 0.10.20230809dev
Version: 0.10.20230811dev
Cflags: -I${prefix}/
Libs: -L${prefix}/ -lzenohpico

0 comments on commit a0f0113

Please sign in to comment.