Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lumeo rust utilities acked #1

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: NoticeFuture::new
Devdutt Shenoi authored and FSMaxB committed Jun 20, 2024
commit d5698fea27545e3c3f13084f865a0f8b4c344a36
169 changes: 89 additions & 80 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
@@ -83,16 +83,17 @@ impl AsyncClient {
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload);
publish.retain = retain;

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = Some(NoticeTx(notice_tx));

let publish = Request::Publish(publish);
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(publish));
return Err(ClientError::Request(request));
}
self.request_tx.send_async((notice_tx, publish)).await?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = (qos == QoS::AtMostOnce).then_some(notice_tx);
self.request_tx.send_async((notice_tx, request)).await?;

Ok(future)
}

/// Attempts to send a MQTT Publish to the `EventLoop`.
@@ -110,23 +111,17 @@ impl AsyncClient {
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload);
publish.retain = retain;
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::TryRequest(request));
}

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = if qos == QoS::AtMostOnce {
notice_tx.success();
None
} else {
Some(notice_tx)
};
let notice_tx = (qos == QoS::AtMostOnce).then_some(notice_tx);
self.request_tx.try_send((notice_tx, request))?;

let publish = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::TryRequest(publish));
}
self.request_tx.try_send((notice_tx, publish))?;
Ok(NoticeFuture(notice_rx))
Ok(future)
}

/// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
@@ -161,13 +156,14 @@ impl AsyncClient {
{
let mut publish = Publish::from_bytes(topic, qos, payload);
publish.retain = retain;
let request = Request::Publish(publish);

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = Some(NoticeTx(notice_tx));
let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = (qos == QoS::AtMostOnce).then_some(notice_tx);
self.request_tx.send_async((notice_tx, request)).await?;

let publish = Request::Publish(publish);
self.request_tx.send_async((notice_tx, publish)).await?;
Ok(NoticeFuture(notice_rx))
Ok(future)
}

/// Sends a MQTT Subscribe to the `EventLoop`
@@ -178,16 +174,16 @@ impl AsyncClient {
) -> Result<NoticeFuture, ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.request_tx
.send_async((Some(notice_tx), request))
.await?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.request_tx.send_async((notice_tx, request)).await?;

Ok(future)
}

/// Attempts to send a MQTT Subscribe to the `EventLoop`
@@ -198,14 +194,16 @@ impl AsyncClient {
) -> Result<NoticeFuture, ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.request_tx.try_send((notice_tx, request))?;

Ok(future)
}

/// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
@@ -216,16 +214,17 @@ impl AsyncClient {
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
self.request_tx
.send_async((Some(notice_tx), request))
.await?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = Some(notice_tx);
self.request_tx.send_async((notice_tx, request)).await?;

Ok(future)
}

/// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
@@ -236,14 +235,17 @@ impl AsyncClient {
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = Some(notice_tx);
self.request_tx.try_send((notice_tx, request))?;

Ok(future)
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
@@ -252,25 +254,26 @@ impl AsyncClient {
topic: S,
) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Unsubscribe(unsubscribe);
self.request_tx
.send_async((Some(notice_tx), request))
.await?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = Some(notice_tx);
self.request_tx.try_send((notice_tx, request))?;

Ok(future)
}

/// Attempts to send a MQTT Unsubscribe to the `EventLoop`
pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Unsubscribe(unsubscribe);
self.request_tx.try_send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.request_tx.try_send((notice_tx, request))?;

Ok(future)
}

/// Sends a MQTT disconnect to the `EventLoop`
@@ -353,16 +356,17 @@ impl Client {
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload);
publish.retain = retain;

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = Some(NoticeTx(notice_tx));

let publish = Request::Publish(publish);
let request = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(publish));
return Err(ClientError::Request(request));
}
self.client.request_tx.send((notice_tx, publish))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
// Fulfill instantly for QoS 0
let notice_tx = (qos == QoS::AtMostOnce).then_some(notice_tx);
self.client.request_tx.send((notice_tx, request))?;

Ok(future)
}

pub fn try_publish<S, V>(
@@ -403,14 +407,16 @@ impl Client {
) -> Result<NoticeFuture, ClientError> {
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.client.request_tx.send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.client.request_tx.send((notice_tx, request))?;

Ok(future)
}

/// Sends a MQTT Subscribe to the `EventLoop`
@@ -430,14 +436,16 @@ impl Client {
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
self.client.request_tx.send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.client.request_tx.send((notice_tx, request))?;

Ok(future)
}

pub fn try_subscribe_many<T>(&self, topics: T) -> Result<NoticeFuture, ClientError>
@@ -450,12 +458,13 @@ impl Client {
/// Sends a MQTT Unsubscribe to the `EventLoop`
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<NoticeFuture, ClientError> {
let unsubscribe = Unsubscribe::new(topic.into());

let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();
let notice_tx = NoticeTx(notice_tx);
let request = Request::Unsubscribe(unsubscribe);
self.client.request_tx.send((Some(notice_tx), request))?;
Ok(NoticeFuture(notice_rx))

let (notice_tx, future) = NoticeTx::new();
let notice_tx = Some(notice_tx);
self.client.request_tx.send((notice_tx, request))?;

Ok(future)
}

/// Sends a MQTT Unsubscribe to the `EventLoop`
6 changes: 6 additions & 0 deletions rumqttc/src/notice.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,12 @@ impl NoticeFuture {
pub struct NoticeTx(pub(crate) oneshot::Sender<NoticeResult>);

impl NoticeTx {
pub fn new() -> (Self, NoticeFuture) {
let (notice_tx, notice_rx) = tokio::sync::oneshot::channel();

(NoticeTx(notice_tx), NoticeFuture(notice_rx))
}

pub(crate) fn success(self) {
_ = self.0.send(Ok(()));
}
Loading