Skip to content

Commit

Permalink
Merge pull request #3025 from reubenmiller/fix-clean-mqtt-disconnects
Browse files Browse the repository at this point in the history
fix: disconnect cleanly from local MQTT broker in tedge connect
  • Loading branch information
reubenmiller authored Jul 31, 2024
2 parents 796e3e3 + 19e320d commit e18db25
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
51 changes: 48 additions & 3 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut exists = false;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -312,7 +313,8 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
let token = String::from_utf8(response.payload.to_vec()).unwrap();
// FIXME: what does this magic number mean?
if token.contains("71") {
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
Expand All @@ -332,6 +334,19 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down Expand Up @@ -363,6 +378,7 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM, AtLeastOnce)?;

Expand All @@ -385,7 +401,8 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
// We got a response
if response.topic.contains(REGISTRATION_OK) {
println!("Received expected response message.");
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
} else {
break;
}
Expand All @@ -407,6 +424,19 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand All @@ -430,6 +460,7 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AWS_TOPIC_SUB_CHECK_CONNECTION, AtLeastOnce)?;

Expand All @@ -451,7 +482,8 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
Ok(Event::Incoming(Packet::Publish(response))) => {
// We got a response
println!("Received expected response on topic {}.", response.topic);
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -470,6 +502,19 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down
17 changes: 16 additions & 1 deletion crates/core/tedge/src/cli/connect/jwt_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut c8y_url: Option<String> = None;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -49,7 +50,8 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
// We got a response
let token = String::from_utf8(response.payload.to_vec()).unwrap();
let connected_url = decode_jwt_token(token.as_str())?;
return Ok(connected_url);
c8y_url = Some(connected_url);
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -68,6 +70,19 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if let Some(c8y_url) = c8y_url {
return Ok(c8y_url);
}

if acknowledged {
// The request has been sent but without a response
println!("\nThe request has been sent, however, no response.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Resource ../../resources/common.resource
Library ThinEdgeIO

Test Tags theme:cli theme:mqtt theme:c8y
Suite Setup Setup
Suite Setup Suite Setup
Suite Teardown Get Logs

*** Test Cases ***
Expand Down Expand Up @@ -58,8 +58,19 @@ tedge reconnect restarts mapper
${pid_after}= Execute Command sudo systemctl show --property MainPID tedge-mapper-c8y
Should Not Be Equal ${pid_before} ${pid_after}

Check absence of OpenSSL Error messages #3024
${SuiteStart}= Get Suite Start Time
# Only checkout output if mosquitto is being used
${output}= Execute Command systemctl is-active mosquitto && journalctl -u mosquitto -n 5000 --since "@${SuiteStartSeconds}" || true
Should Not Contain ${output} OpenSSL Error

*** Keywords ***

Suite Setup
Setup
${SuiteStartSeconds}= Get Unix Timestamp
Set Suite Variable $SuiteStartSeconds

Should Have File Permissions
[Arguments] ${file} ${expected_permissions}
${FILE_MODE_OWNERSHIP}= Execute Command stat -c '%a %U:%G' ${file} strip=${True}
Expand Down

0 comments on commit e18db25

Please sign in to comment.