Skip to content

Commit

Permalink
disconnect from local mqtt broker cleanly
Browse files Browse the repository at this point in the history
Signed-off-by: Reuben Miller <[email protected]>
  • Loading branch information
reubenmiller committed Jul 30, 2024
1 parent e0c1024 commit 19e320d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
51 changes: 48 additions & 3 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut exists = false;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -312,7 +313,8 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
let token = String::from_utf8(response.payload.to_vec()).unwrap();
// FIXME: what does this magic number mean?
if token.contains("71") {
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
Expand All @@ -332,6 +334,19 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down Expand Up @@ -363,6 +378,7 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM, AtLeastOnce)?;

Expand All @@ -385,7 +401,8 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
// We got a response
if response.topic.contains(REGISTRATION_OK) {
println!("Received expected response message.");
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
} else {
break;
}
Expand All @@ -407,6 +424,19 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand All @@ -430,6 +460,7 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AWS_TOPIC_SUB_CHECK_CONNECTION, AtLeastOnce)?;

Expand All @@ -451,7 +482,8 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
Ok(Event::Incoming(Packet::Publish(response))) => {
// We got a response
println!("Received expected response on topic {}.", response.topic);
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -470,6 +502,19 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down
17 changes: 16 additions & 1 deletion crates/core/tedge/src/cli/connect/jwt_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut c8y_url: Option<String> = None;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -49,7 +50,8 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
// We got a response
let token = String::from_utf8(response.payload.to_vec()).unwrap();
let connected_url = decode_jwt_token(token.as_str())?;
return Ok(connected_url);
c8y_url = Some(connected_url);
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -68,6 +70,19 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if let Some(c8y_url) = c8y_url {
return Ok(c8y_url);
}

if acknowledged {
// The request has been sent but without a response
println!("\nThe request has been sent, however, no response.");
Expand Down

0 comments on commit 19e320d

Please sign in to comment.