Skip to content

Commit

Permalink
implement fragmentation for multicast
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 18, 2023
1 parent 0be6ad0 commit 2b87df4
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ int8_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t head

__auto_type bytes = _z_bytes_wrap((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
_z_bytes_copy(&msg->_payload, &bytes);
zbf->_ios._r_pos = zbf->_ios._w_pos;

return ret;
}
Expand Down
3 changes: 2 additions & 1 deletion src/transport/multicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/definitions/transport.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/transport/utils.h"
#include "zenoh-pico/utils/logging.h"
Expand Down Expand Up @@ -90,7 +91,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me
} while (false); // The 1-iteration loop to use continue to break the entire loop on error

if (ret == _Z_RES_OK) {
_Z_DEBUG(">> \t transport_message_decode\n");
_Z_DEBUG(">> \t transport_message_decode: %ld\n", _z_zbuf_len(&ztm->_zbuf));
ret = _z_transport_message_decode(t_msg, &ztm->_zbuf);
}

Expand Down
1 change: 1 addition & 0 deletions src/transport/multicast/link/task/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/transport/link/rx.h"
#include "zenoh-pico/utils/logging.h"

Expand Down
30 changes: 29 additions & 1 deletion src/transport/multicast/link/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,35 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
ztm->_transmitted = true; // Mark the session that we have transmitted data
}
} else {
// TODO[protocol]: Fragmentation goes here
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
_Bool is_first = true; // Fragment and send the message
while (_z_wbuf_len(&fbf) > 0) {
if (is_first == false) { // Get the fragment sequence number
sn = __unsafe_z_multicast_get_sn(ztm, reliability);
}
is_first = false;

// Clear the buffer for serialization
__unsafe_z_prepare_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities));

// Serialize one fragment
ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
__unsafe_z_finalize_wbuf(&ztm->_wbuf, _Z_LINK_IS_STREAMED(ztm->_link._capabilities));

ret = _z_link_send_wbuf(&ztm->_link, &ztm->_wbuf); // Send the wbuf on the socket
if (ret == _Z_RES_OK) {
ztm->_transmitted = true; // Mark the session that we have transmitted data
}
}
}
}
}
}

Expand Down

0 comments on commit 2b87df4

Please sign in to comment.