Skip to content

Commit

Permalink
refactor: roll out connect
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed May 29, 2024
1 parent a1282cd commit 8e1633f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 24 deletions.
22 changes: 11 additions & 11 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl EventLoop {

requests_in_channel.retain(|request| {
match request {
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
_ => true,
}
});
Expand Down Expand Up @@ -398,20 +398,20 @@ async fn mqtt_connect(
options: &mut MqttOptions,
network: &mut Network,
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_start = options.clean_start();
let client_id = options.client_id();
let properties = options.connect_properties();

let connect = Connect {
keep_alive,
client_id,
clean_start,
properties,
client_id: options.client_id(),
keep_alive: options.keep_alive().as_secs() as u16,
clean_start: options.clean_start(),
properties: options.connect_properties(),
};
let last_will = options.last_will();
let login = options.credentials();

// send mqtt connect packet
network.connect(connect, options).await?;
network
.write(Packet::Connect(connect, last_will, login))
.await?;
network.flush().await?;

// validate connack
match network.read().await? {
Expand Down
13 changes: 0 additions & 13 deletions rumqttc/src/v5/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,6 @@ impl Network {
.map_err(StateError::Deserialization)
}

pub async fn connect(
&mut self,
connect: Connect,
options: &MqttOptions,
) -> Result<(), StateError> {
let last_will = options.last_will();
let login = options.credentials();
self.write(Packet::Connect(connect, last_will, login))
.await?;

self.flush().await
}

pub async fn flush(&mut self) -> Result<(), StateError> {
self.framed
.flush()
Expand Down

0 comments on commit 8e1633f

Please sign in to comment.