Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disconnect cleanly from local MQTT broker in tedge connect #3025

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut exists = false;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -312,7 +313,8 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
let token = String::from_utf8(response.payload.to_vec()).unwrap();
// FIXME: what does this magic number mean?
if token.contains("71") {
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
Expand All @@ -332,6 +334,19 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down Expand Up @@ -363,6 +378,7 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AZURE_TOPIC_DEVICE_TWIN_DOWNSTREAM, AtLeastOnce)?;

Expand All @@ -385,7 +401,8 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
// We got a response
if response.topic.contains(REGISTRATION_OK) {
println!("Received expected response message.");
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
} else {
break;
}
Expand All @@ -407,6 +424,19 @@ fn check_device_status_azure(tedge_config: &TEdgeConfig) -> Result<DeviceStatus,
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand All @@ -430,6 +460,7 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C

let (mut client, mut connection) = rumqttc::Client::new(mqtt_options, 10);
let mut acknowledged = false;
let mut exists = false;

client.subscribe(AWS_TOPIC_SUB_CHECK_CONNECTION, AtLeastOnce)?;

Expand All @@ -451,7 +482,8 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
Ok(Event::Incoming(Packet::Publish(response))) => {
// We got a response
println!("Received expected response on topic {}.", response.topic);
return Ok(DeviceStatus::AlreadyExists);
exists = true;
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -470,6 +502,19 @@ fn check_device_status_aws(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if exists {
return Ok(DeviceStatus::AlreadyExists);
}

if acknowledged {
// The request has been sent but without a response
Ok(DeviceStatus::Unknown)
Expand Down
17 changes: 16 additions & 1 deletion crates/core/tedge/src/cli/connect/jwt_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
.network_options
.set_connection_timeout(CONNECTION_TIMEOUT.as_secs());
let mut acknowledged = false;
let mut c8y_url: Option<String> = None;

client.subscribe(&c8y_topic_builtin_jwt_token_downstream, AtLeastOnce)?;

Expand All @@ -49,7 +50,8 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
// We got a response
let token = String::from_utf8(response.payload.to_vec()).unwrap();
let connected_url = decode_jwt_token(token.as_str())?;
return Ok(connected_url);
c8y_url = Some(connected_url);
break;
}
Ok(Event::Outgoing(Outgoing::PingReq)) => {
// No messages have been received for a while
Expand All @@ -68,6 +70,19 @@ pub(crate) fn get_connected_c8y_url(tedge_config: &TEdgeConfig) -> Result<String
}
}

// Cleanly disconnect client
client.disconnect()?;
for event in connection.iter() {
match event {
Ok(Event::Outgoing(Outgoing::Disconnect)) | Err(_) => break,
_ => {}
}
}

if let Some(c8y_url) = c8y_url {
return Ok(c8y_url);
}

if acknowledged {
// The request has been sent but without a response
println!("\nThe request has been sent, however, no response.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Resource ../../resources/common.resource
Library ThinEdgeIO

Test Tags theme:cli theme:mqtt theme:c8y
Suite Setup Setup
Suite Setup Suite Setup
Suite Teardown Get Logs

*** Test Cases ***
Expand Down Expand Up @@ -58,8 +58,19 @@ tedge reconnect restarts mapper
${pid_after}= Execute Command sudo systemctl show --property MainPID tedge-mapper-c8y
Should Not Be Equal ${pid_before} ${pid_after}

Check absence of OpenSSL Error messages #3024
${SuiteStart}= Get Suite Start Time
# Only checkout output if mosquitto is being used
${output}= Execute Command systemctl is-active mosquitto && journalctl -u mosquitto -n 5000 --since "@${SuiteStartSeconds}" || true
Should Not Contain ${output} OpenSSL Error

*** Keywords ***

Suite Setup
Setup
${SuiteStartSeconds}= Get Unix Timestamp
Set Suite Variable $SuiteStartSeconds

Should Have File Permissions
[Arguments] ${file} ${expected_permissions}
${FILE_MODE_OWNERSHIP}= Execute Command stat -c '%a %U:%G' ${file} strip=${True}
Expand Down