Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add requester prefix with timestamp to command operations #2381

Merged
merged 5 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 6 additions & 10 deletions crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ impl SmartRestUpdateSoftware {
pub fn into_software_update_command(
&self,
target: &EntityTopicId,
cmd_id: Option<&str>,
cmd_id: String,
) -> Result<SoftwareUpdateCommand, SmartRestDeserializerError> {
let mut request = match cmd_id {
None => SoftwareUpdateCommand::new(target),
Some(cmd_id) => SoftwareUpdateCommand::new_with_id(target, cmd_id.to_string()),
};
let mut request = SoftwareUpdateCommand::new(target, cmd_id);
for module in self.modules() {
match module.action.clone().try_into()? {
CumulocitySoftwareUpdateActions::Install => {
Expand Down Expand Up @@ -478,7 +475,7 @@ mod tests {
String::from("528,external_id,software1,version1,url1,action,software2,,,remove");
assert!(SmartRestUpdateSoftware::from_smartrest(&smartrest)
.unwrap()
.into_software_update_command(&device, Some("123"))
.into_software_update_command(&device, "123".to_string())
.is_err());
}

Expand All @@ -504,11 +501,10 @@ mod tests {
};
let device = EntityTopicId::default_main_device();
let thin_edge_json = smartrest_obj
.into_software_update_command(&device, Some("123"))
.into_software_update_command(&device, "123".to_string())
.unwrap();

let mut expected_thin_edge_json =
SoftwareUpdateCommand::new_with_id(&device, "123".to_string());
let mut expected_thin_edge_json = SoftwareUpdateCommand::new(&device, "123".to_string());
expected_thin_edge_json.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: Some("debian".to_string()),
name: "software1".to_string(),
Expand All @@ -535,7 +531,7 @@ mod tests {
nginx,1.21.0::docker,,install,mongodb,4.4.6::docker,,delete");
let software_update_request = SmartRestUpdateSoftware::from_smartrest(&smartrest)
.unwrap()
.into_software_update_command(&EntityTopicId::default_main_device(), Some("123"))
.into_software_update_command(&EntityTopicId::default_main_device(), "123".to_string())
.unwrap();

let output_json = software_update_request.payload.to_json();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ impl SoftwareManagerActor {
{
match operation {
StateStatus::Software(SoftwareOperationVariants::Update) => {
let response = SoftwareUpdateCommand::new_with_id(&self.config.device, cmd_id)
let response = SoftwareUpdateCommand::new(&self.config.device, cmd_id)
.with_error(
"Software Update command cancelled on agent restart".to_string(),
);
self.output_sender.send(response.into()).await?;
}
StateStatus::Software(SoftwareOperationVariants::List) => {
let response = SoftwareListCommand::new_with_id(&self.config.device, cmd_id)
let response = SoftwareListCommand::new(&self.config.device, cmd_id)
.with_error("Software List request cancelled on agent restart".to_string());
self.output_sender.send(response.into()).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_agent/src/software_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_pending_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let software_request_response =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string())
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string())
.with_error("Software List request cancelled on agent restart".to_string());
converter_box
.assert_received([software_request_response])
Expand All @@ -122,7 +122,7 @@ async fn test_new_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let command =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
converter_box.send(command.clone().into()).await?;

let executing_response = command.clone().with_status(CommandStatus::Executing);
Expand Down
10 changes: 4 additions & 6 deletions crates/core/tedge_agent/src/tedge_operation_converter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> {

// Assert SoftwareListCommand
software_box
.assert_received([SoftwareListCommand::new_with_id(
.assert_received([SoftwareListCommand::new(
&EntityTopicId::default_main_device(),
"some-cmd-id".to_string(),
)])
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> {

// Simulate SoftwareList response message received.
let software_list_request =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_list_response = software_list_request
.clone()
.with_status(CommandStatus::Executing);
Expand Down Expand Up @@ -189,10 +189,8 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> {
skip_capability_messages(&mut mqtt_box, "device/main//").await;

// Simulate SoftwareUpdate response message received.
let software_update_request = SoftwareUpdateCommand::new_with_id(
&EntityTopicId::default_main_device(),
"1234".to_string(),
);
let software_update_request =
SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_update_response = software_update_request.with_status(CommandStatus::Executing);
software_box.send(software_update_response.into()).await?;

Expand Down
1 change: 0 additions & 1 deletion crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ download = { workspace = true }
json-writer = { workspace = true }
log = { workspace = true }
mqtt_channel = { workspace = true }
nanoid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand Down
43 changes: 10 additions & 33 deletions crates/core/tedge_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ mod tests {
use mqtt_channel::Message;
use mqtt_channel::QoS;
use mqtt_channel::Topic;
use regex::Regex;

#[test]
fn topic_names() {
Expand All @@ -52,7 +51,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/software_list")
);
assert_eq!(
SoftwareListCommand::new_with_id(&device, cmd_id.clone())
SoftwareListCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/software_list/abc")
Expand All @@ -63,7 +62,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/software_update")
);
assert_eq!(
SoftwareUpdateCommand::new_with_id(&device, cmd_id.clone())
SoftwareUpdateCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/software_update/abc")
Expand All @@ -74,7 +73,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/restart")
);
assert_eq!(
RestartCommand::new_with_id(&device, cmd_id.clone())
RestartCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/restart/abc")
Expand All @@ -85,7 +84,7 @@ mod tests {
fn creating_a_software_list_request() {
let mqtt_schema = MqttSchema::default();
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareListCommand::new_with_id(&device, "1".to_string());
let request = SoftwareListCommand::new(&device, "1".to_string());

let expected_msg = Message {
topic: Topic::new_unchecked("te/device/abc///cmd/software_list/1"),
Expand All @@ -97,17 +96,6 @@ mod tests {
assert_eq!(actual_msg, expected_msg);
}

#[test]
fn creating_a_software_list_request_with_generated_id() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareListCommand::new(&device);
let generated_id = request.cmd_id;

// The generated id is a nanoid of 21 characters from A-Za-z0-9_~
let re = Regex::new(r"[A-Za-z0-9_~-]{21,21}").unwrap();
assert!(re.is_match(&generated_id));
}

#[test]
fn using_a_software_list_request() {
let device = EntityTopicId::default_child_device("abc").unwrap();
Expand Down Expand Up @@ -137,7 +125,7 @@ mod tests {
#[test]
fn creating_a_software_list_response() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let mut response = SoftwareListCommand::new_with_id(&device, "1".to_string())
let mut response = SoftwareListCommand::new(&device, "1".to_string())
.with_status(CommandStatus::Successful);

response.add_modules(
Expand Down Expand Up @@ -276,7 +264,7 @@ mod tests {
#[test]
fn creating_a_software_list_error() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let response = SoftwareListCommand::new_with_id(&device, "123".to_string());
let response = SoftwareListCommand::new(&device, "123".to_string());
let response = response.with_error("Request_timed-out".to_string());
let message = response.command_message(&MqttSchema::default());

Expand Down Expand Up @@ -316,7 +304,7 @@ mod tests {
fn creating_a_software_update_request() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_updates(
"debian",
Expand Down Expand Up @@ -405,7 +393,7 @@ mod tests {
fn creating_a_software_update_request_grouping_updates_per_plugin() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: Some("debian".to_string()),
Expand Down Expand Up @@ -482,7 +470,7 @@ mod tests {
fn creating_a_software_update_request_grouping_updates_per_plugin_using_default() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: None, // I.e. default
Expand Down Expand Up @@ -552,17 +540,6 @@ mod tests {
assert_eq!(actual_json, remove_whitespace(expected_json));
}

#[test]
fn creating_a_software_update_request_with_generated_id() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareUpdateCommand::new(&device);
let generated_id = request.cmd_id;

// The generated id is a nanoid of 21 characters from A-Za-z0-9_~
let re = Regex::new(r"[A-Za-z0-9_~-]{21,21}").unwrap();
assert!(re.is_match(&generated_id));
}

#[test]
fn using_a_software_update_request() {
let mqtt_schema = MqttSchema::default();
Expand Down Expand Up @@ -669,7 +646,7 @@ mod tests {
fn creating_a_software_update_response() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let request = SoftwareUpdateCommand::new(&device, cmd_id);

let response = request.with_status(CommandStatus::Executing);

Expand Down
12 changes: 1 addition & 11 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use download::DownloadInfo;
use mqtt_channel::Message;
use mqtt_channel::QoS;
use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;
Expand All @@ -25,17 +24,8 @@ impl<Payload> Command<Payload>
where
Payload: Default,
{
/// Build a new command with a random id
pub fn new(target: &EntityTopicId) -> Self {
Command {
target: target.clone(),
cmd_id: nanoid!(),
payload: Default::default(),
}
}

/// Build a new command with a given id
pub fn new_with_id(target: &EntityTopicId, cmd_id: String) -> Self {
pub fn new(target: &EntityTopicId, cmd_id: String) -> Self {
Command {
target: target.clone(),
cmd_id,
Expand Down
28 changes: 28 additions & 0 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::convert::Infallible;
use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;
use time::format_description;
use time::OffsetDateTime;

const ENTITY_ID_SEGMENTS: usize = 4;

Expand Down Expand Up @@ -644,6 +646,32 @@ pub enum ChannelFilter {
CommandMetadata(OperationType),
}

pub struct IdGenerator {
prefix: String,
}

impl IdGenerator {
pub fn new(prefix: &str) -> Self {
IdGenerator {
prefix: prefix.into(),
}
}

pub fn new_id(&self) -> String {
format!(
"{}-{}",
self.prefix,
OffsetDateTime::now_utc()
.format(&format_description::well_known::Rfc3339)
.unwrap(),
)
}

pub fn is_generator_of(&self, cmd_id: &str) -> bool {
cmd_id.contains(&self.prefix)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion crates/core/tedge_watchdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ clap = { workspace = true }
freedesktop_entry_parser = { workspace = true }
futures = { workspace = true }
mqtt_channel = { workspace = true }
nanoid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tedge_api = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ camino = { workspace = true }
clock = { workspace = true }
json-writer = { workspace = true }
logged_command = { workspace = true }
nanoid = { workspace = true }
plugin_sm = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ fn convert_from_old_agent_response(
) -> Result<Option<MqttMessage>, String> {
if let Ok(Value::Object(response)) = serde_json::from_slice(payload) {
if let Some(Value::String(cmd_id)) = response.get("id") {
if let Ok(topic) = Topic::new(&format!("te/device/main///cmd/{cmd_type}/{cmd_id}")) {
// The new mapper expects command ids with a specific prefix
if let Ok(topic) = Topic::new(&format!(
"te/device/main///cmd/{cmd_type}/c8y-mapper-{cmd_id}"
)) {
return Ok(Some(
MqttMessage::new(&topic, payload)
.with_retain()
Expand Down
Loading