Skip to content

Commit

Permalink
Merge pull request #630 from jean-roland/ft_reliability
Browse files Browse the repository at this point in the history
Switch reliability from subscriber to publisher.
  • Loading branch information
milyin authored Sep 9, 2024
2 parents 2784fca + d3e5135 commit 9e926b5
Show file tree
Hide file tree
Showing 30 changed files with 171 additions and 152 deletions.
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ Primitives
.. autocfunction:: primitives.h::z_sample_timestamp
.. autocfunction:: primitives.h::z_sample_encoding
.. autocfunction:: primitives.h::z_sample_kind
.. autocfunction:: primitives.h::z_sample_reliability
.. autocfunction:: primitives.h::z_sample_attachment
.. autocfunction:: primitives.h::z_put_options_default
.. autocfunction:: primitives.h::z_delete_options_default
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,19 @@ const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample);
*/
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);

/**
* (unstable) Gets the reliability a sample was received with.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the reliability from.
*
* Return:
* The reliability wrapped as a :c:type:`z_reliability_t`.
*/
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t z_sample_reliability(const z_loaned_sample_t *sample);
#endif

/**
* Got sample qos congestion control value.
*
Expand Down
20 changes: 13 additions & 7 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,10 @@ _Z_OWNED_TYPE_VALUE(_z_value_t, reply_err)

/**
* Represents the configuration used to configure a subscriber upon declaration :c:func:`z_declare_subscriber`.
*
* Members:
* (unstable) z_reliability_t reliability: The subscription reliability value.
*.
*/
typedef struct {
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#else
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
#endif
} z_subscriber_options_t;

/**
Expand All @@ -221,12 +215,16 @@ typedef struct {
* publisher.
* z_priority_t priority: The priority of messages issued by this publisher.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_publisher_options_t;

/**
Expand Down Expand Up @@ -297,6 +295,7 @@ typedef struct {
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_moved_bytes_t* attachment: An optional attachment to the publication.
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_moved_encoding_t *encoding;
Expand All @@ -305,6 +304,9 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_moved_bytes_t *attachment;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_put_options_t;

/**
Expand All @@ -315,12 +317,16 @@ typedef struct {
* z_priority_t priority: The priority of this message when router.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* (unstable) z_reliability_t reliability: The reliability that should be used to transmit the data.
*/
typedef struct {
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
z_timestamp_t *timestamp;
#if Z_FEATURE_UNSTABLE_API == 1
z_reliability_t reliability;
#endif
} z_delete_options_t;

/**
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
#define INCLUDE_ZENOH_PICO_CONFIG_H

/*--- CMake generated config; pass values to CMake to change the following tokens ---*/
#define Z_FRAG_MAX_SIZE 300000
#define Z_BATCH_UNICAST_SIZE 65535
#define Z_BATCH_MULTICAST_SIZE 8096
#define Z_FRAG_MAX_SIZE 4096
#define Z_BATCH_UNICAST_SIZE 2048
#define Z_BATCH_MULTICAST_SIZE 2048
#define Z_CONFIG_SOCKET_TIMEOUT 100

#define Z_FEATURE_UNSTABLE_API 0
Expand Down
14 changes: 8 additions & 6 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid);
* keyexpr: The resource key to publish. The callee gets the ownership
* of any allocated value.
* encoding: The optional default encoding to use during put. The callee gets the ownership.
* reliability: The reliability of the publisher messages
*
* Returns:
* The created :c:type:`_z_publisher_t` (in null state if the declaration failed)..
*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express);
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express,
z_reliability_t reliability);

/**
* Undeclare a :c:type:`_z_publisher_t`.
Expand Down Expand Up @@ -118,12 +120,14 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* timestamp: The timestamp of this write. The API level timestamp (e.g. of the data when it was created).
* attachment: An optional attachment to this write.
* reliability: The message reliability.
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment,
z_reliability_t reliability);
#endif

#if Z_FEATURE_SUBSCRIPTION == 1
Expand All @@ -134,16 +138,14 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership
* of any allocated value.
* sub_info: The :c:type:`_z_subinfo_t` to configure the :c:type:`_z_subscriber_t`.
* The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a data matching the subscribed resource is
* received. arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_subinfo_t sub_info,
_z_data_handler_t callback, _z_drop_handler_t dropper, void *arg);
_z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_data_handler_t callback,
_z_drop_handler_t dropper, void *arg);

/**
* Undeclare a :c:type:`_z_subscriber_t`.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef struct _z_publisher_t {
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
_Bool _is_express;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct _z_sample_t {
z_sample_kind_t kind;
_z_qos_t qos;
_z_bytes_t attachment;
z_reliability_t reliability;
} _z_sample_t;
void _z_sample_clear(_z_sample_t *sample);

Expand All @@ -56,6 +57,6 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t *timestamp,
_z_encoding_t *encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
7 changes: 0 additions & 7 deletions include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ typedef struct {
} _z_subscriber_t;

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Create a default subscription info for a push subscriber.
*
* Returns:
* A :c:type:`_z_subinfo_t` containing the created subscription info.
*/
_z_subinfo_t _z_subinfo_default(void);

void _z_subscriber_clear(_z_subscriber_t *sub);
void _z_subscriber_free(_z_subscriber_t **sub);
Expand Down
11 changes: 0 additions & 11 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,6 @@ typedef struct {
_z_zint_t n;
} _z_target_complete_body_t;

/**
* Informations to be passed to :c:func:`_z_declare_subscriber` to configure the created
* :c:type:`_z_subscription_rc_t`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct {
z_reliability_t reliability;
} _z_subinfo_t;

typedef struct {
_z_id_t _id;
uint32_t _entity_id;
Expand Down
5 changes: 1 addition & 4 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ _z_undecl_kexpr_t _z_undecl_kexpr_null(void);
typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
struct {
_Bool _reliable;
} _ext_subinfo;
} _z_decl_subscriber_t;
_z_decl_subscriber_t _z_decl_subscriber_null(void);
typedef struct {
Expand Down Expand Up @@ -105,7 +102,7 @@ void _z_decl_fix_mapping(_z_declaration_t* msg, uint16_t mapping);
_z_declaration_t _z_make_decl_keyexpr(uint16_t id, _Z_MOVE(_z_keyexpr_t) key);
_z_declaration_t _z_make_undecl_keyexpr(uint16_t id);

_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, _Bool reliable);
_z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint16_t distance, _Bool complete);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ typedef union {
typedef struct {
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag;
_z_network_body_t _body;
z_reliability_t _reliability;
} _z_network_message_t;
typedef _z_network_message_t _z_zenoh_message_t;
void _z_n_msg_clear(_z_network_message_t *m);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ typedef struct {
} _z_transport_message_t;
void _z_t_msg_clear(_z_transport_message_t *msg);

z_reliability_t _z_t_msg_get_reliability(_z_transport_message_t *msg);

/*------------------ Builders ------------------*/
_z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _z_id_t zid,
_z_conduit_sn_list_t next_sn);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/push.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
#ifndef ZENOH_PICO_SESSION_PUSH_H
#define ZENOH_PICO_SESSION_PUSH_H

int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push);
int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliability_t reliability);

#endif /* ZENOH_PICO_SESSION_PUSH_H */
1 change: 0 additions & 1 deletion include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ typedef struct {
_z_data_handler_t _callback;
_z_drop_handler_t _dropper;
void *_arg;
_z_subinfo_t _info;
} _z_subscription_t;

_Bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
const _z_bytes_t attachment, z_reliability_t reliability);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
Expand All @@ -30,7 +30,7 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t
_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment);
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);
void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
#endif
Expand Down
Loading

0 comments on commit 9e926b5

Please sign in to comment.