Skip to content

Commit

Permalink
Merge pull request #2381 from Ruadhri17/requester-in-topic-id
Browse files Browse the repository at this point in the history
Add requester prefix with timestamp to command operations
  • Loading branch information
reubenmiller authored Nov 1, 2023
2 parents e11b64f + 673135c commit 25efa8f
Show file tree
Hide file tree
Showing 20 changed files with 242 additions and 133 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 6 additions & 10 deletions crates/core/c8y_api/src/smartrest/smartrest_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ impl SmartRestUpdateSoftware {
pub fn into_software_update_command(
&self,
target: &EntityTopicId,
cmd_id: Option<&str>,
cmd_id: String,
) -> Result<SoftwareUpdateCommand, SmartRestDeserializerError> {
let mut request = match cmd_id {
None => SoftwareUpdateCommand::new(target),
Some(cmd_id) => SoftwareUpdateCommand::new_with_id(target, cmd_id.to_string()),
};
let mut request = SoftwareUpdateCommand::new(target, cmd_id);
for module in self.modules() {
match module.action.clone().try_into()? {
CumulocitySoftwareUpdateActions::Install => {
Expand Down Expand Up @@ -478,7 +475,7 @@ mod tests {
String::from("528,external_id,software1,version1,url1,action,software2,,,remove");
assert!(SmartRestUpdateSoftware::from_smartrest(&smartrest)
.unwrap()
.into_software_update_command(&device, Some("123"))
.into_software_update_command(&device, "123".to_string())
.is_err());
}

Expand All @@ -504,11 +501,10 @@ mod tests {
};
let device = EntityTopicId::default_main_device();
let thin_edge_json = smartrest_obj
.into_software_update_command(&device, Some("123"))
.into_software_update_command(&device, "123".to_string())
.unwrap();

let mut expected_thin_edge_json =
SoftwareUpdateCommand::new_with_id(&device, "123".to_string());
let mut expected_thin_edge_json = SoftwareUpdateCommand::new(&device, "123".to_string());
expected_thin_edge_json.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: Some("debian".to_string()),
name: "software1".to_string(),
Expand All @@ -535,7 +531,7 @@ mod tests {
nginx,1.21.0::docker,,install,mongodb,4.4.6::docker,,delete");
let software_update_request = SmartRestUpdateSoftware::from_smartrest(&smartrest)
.unwrap()
.into_software_update_command(&EntityTopicId::default_main_device(), Some("123"))
.into_software_update_command(&EntityTopicId::default_main_device(), "123".to_string())
.unwrap();

let output_json = software_update_request.payload.to_json();
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_agent/src/software_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ impl SoftwareManagerActor {
{
match operation {
StateStatus::Software(SoftwareOperationVariants::Update) => {
let response = SoftwareUpdateCommand::new_with_id(&self.config.device, cmd_id)
let response = SoftwareUpdateCommand::new(&self.config.device, cmd_id)
.with_error(
"Software Update command cancelled on agent restart".to_string(),
);
self.output_sender.send(response.into()).await?;
}
StateStatus::Software(SoftwareOperationVariants::List) => {
let response = SoftwareListCommand::new_with_id(&self.config.device, cmd_id)
let response = SoftwareListCommand::new(&self.config.device, cmd_id)
.with_error("Software List request cancelled on agent restart".to_string());
self.output_sender.send(response.into()).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_agent/src/software_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_pending_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let software_request_response =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string())
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string())
.with_error("Software List request cancelled on agent restart".to_string());
converter_box
.assert_received([software_request_response])
Expand All @@ -122,7 +122,7 @@ async fn test_new_software_list_operation() -> Result<(), DynError> {
let mut converter_box = spawn_software_manager(&temp_dir).await?;

let command =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
converter_box.send(command.clone().into()).await?;

let executing_response = command.clone().with_status(CommandStatus::Executing);
Expand Down
10 changes: 4 additions & 6 deletions crates/core/tedge_agent/src/tedge_operation_converter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn convert_incoming_software_list_request() -> Result<(), DynError> {

// Assert SoftwareListCommand
software_box
.assert_received([SoftwareListCommand::new_with_id(
.assert_received([SoftwareListCommand::new(
&EntityTopicId::default_main_device(),
"some-cmd-id".to_string(),
)])
Expand Down Expand Up @@ -130,7 +130,7 @@ async fn convert_outgoing_software_list_response() -> Result<(), DynError> {

// Simulate SoftwareList response message received.
let software_list_request =
SoftwareListCommand::new_with_id(&EntityTopicId::default_main_device(), "1234".to_string());
SoftwareListCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_list_response = software_list_request
.clone()
.with_status(CommandStatus::Executing);
Expand Down Expand Up @@ -189,10 +189,8 @@ async fn convert_outgoing_software_update_response() -> Result<(), DynError> {
skip_capability_messages(&mut mqtt_box, "device/main//").await;

// Simulate SoftwareUpdate response message received.
let software_update_request = SoftwareUpdateCommand::new_with_id(
&EntityTopicId::default_main_device(),
"1234".to_string(),
);
let software_update_request =
SoftwareUpdateCommand::new(&EntityTopicId::default_main_device(), "1234".to_string());
let software_update_response = software_update_request.with_status(CommandStatus::Executing);
software_box.send(software_update_response.into()).await?;

Expand Down
1 change: 0 additions & 1 deletion crates/core/tedge_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ download = { workspace = true }
json-writer = { workspace = true }
log = { workspace = true }
mqtt_channel = { workspace = true }
nanoid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand Down
43 changes: 10 additions & 33 deletions crates/core/tedge_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ mod tests {
use mqtt_channel::Message;
use mqtt_channel::QoS;
use mqtt_channel::Topic;
use regex::Regex;

#[test]
fn topic_names() {
Expand All @@ -52,7 +51,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/software_list")
);
assert_eq!(
SoftwareListCommand::new_with_id(&device, cmd_id.clone())
SoftwareListCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/software_list/abc")
Expand All @@ -63,7 +62,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/software_update")
);
assert_eq!(
SoftwareUpdateCommand::new_with_id(&device, cmd_id.clone())
SoftwareUpdateCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/software_update/abc")
Expand All @@ -74,7 +73,7 @@ mod tests {
Topic::new_unchecked("te/device/main///cmd/restart")
);
assert_eq!(
RestartCommand::new_with_id(&device, cmd_id.clone())
RestartCommand::new(&device, cmd_id.clone())
.command_message(&mqtt_schema)
.topic,
Topic::new_unchecked("te/device/main///cmd/restart/abc")
Expand All @@ -85,7 +84,7 @@ mod tests {
fn creating_a_software_list_request() {
let mqtt_schema = MqttSchema::default();
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareListCommand::new_with_id(&device, "1".to_string());
let request = SoftwareListCommand::new(&device, "1".to_string());

let expected_msg = Message {
topic: Topic::new_unchecked("te/device/abc///cmd/software_list/1"),
Expand All @@ -97,17 +96,6 @@ mod tests {
assert_eq!(actual_msg, expected_msg);
}

#[test]
fn creating_a_software_list_request_with_generated_id() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareListCommand::new(&device);
let generated_id = request.cmd_id;

// The generated id is a nanoid of 21 characters from A-Za-z0-9_~
let re = Regex::new(r"[A-Za-z0-9_~-]{21,21}").unwrap();
assert!(re.is_match(&generated_id));
}

#[test]
fn using_a_software_list_request() {
let device = EntityTopicId::default_child_device("abc").unwrap();
Expand Down Expand Up @@ -137,7 +125,7 @@ mod tests {
#[test]
fn creating_a_software_list_response() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let mut response = SoftwareListCommand::new_with_id(&device, "1".to_string())
let mut response = SoftwareListCommand::new(&device, "1".to_string())
.with_status(CommandStatus::Successful);

response.add_modules(
Expand Down Expand Up @@ -276,7 +264,7 @@ mod tests {
#[test]
fn creating_a_software_list_error() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let response = SoftwareListCommand::new_with_id(&device, "123".to_string());
let response = SoftwareListCommand::new(&device, "123".to_string());
let response = response.with_error("Request_timed-out".to_string());
let message = response.command_message(&MqttSchema::default());

Expand Down Expand Up @@ -316,7 +304,7 @@ mod tests {
fn creating_a_software_update_request() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_updates(
"debian",
Expand Down Expand Up @@ -405,7 +393,7 @@ mod tests {
fn creating_a_software_update_request_grouping_updates_per_plugin() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: Some("debian".to_string()),
Expand Down Expand Up @@ -482,7 +470,7 @@ mod tests {
fn creating_a_software_update_request_grouping_updates_per_plugin_using_default() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let mut request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let mut request = SoftwareUpdateCommand::new(&device, cmd_id);

request.add_update(SoftwareModuleUpdate::install(SoftwareModule {
module_type: None, // I.e. default
Expand Down Expand Up @@ -552,17 +540,6 @@ mod tests {
assert_eq!(actual_json, remove_whitespace(expected_json));
}

#[test]
fn creating_a_software_update_request_with_generated_id() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let request = SoftwareUpdateCommand::new(&device);
let generated_id = request.cmd_id;

// The generated id is a nanoid of 21 characters from A-Za-z0-9_~
let re = Regex::new(r"[A-Za-z0-9_~-]{21,21}").unwrap();
assert!(re.is_match(&generated_id));
}

#[test]
fn using_a_software_update_request() {
let mqtt_schema = MqttSchema::default();
Expand Down Expand Up @@ -669,7 +646,7 @@ mod tests {
fn creating_a_software_update_response() {
let device = EntityTopicId::default_child_device("abc").unwrap();
let cmd_id = "123".to_string();
let request = SoftwareUpdateCommand::new_with_id(&device, cmd_id);
let request = SoftwareUpdateCommand::new(&device, cmd_id);

let response = request.with_status(CommandStatus::Executing);

Expand Down
12 changes: 1 addition & 11 deletions crates/core/tedge_api/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use download::DownloadInfo;
use mqtt_channel::Message;
use mqtt_channel::QoS;
use mqtt_channel::Topic;
use nanoid::nanoid;
use serde::Deserialize;
use serde::Serialize;
use time::OffsetDateTime;
Expand All @@ -25,17 +24,8 @@ impl<Payload> Command<Payload>
where
Payload: Default,
{
/// Build a new command with a random id
pub fn new(target: &EntityTopicId) -> Self {
Command {
target: target.clone(),
cmd_id: nanoid!(),
payload: Default::default(),
}
}

/// Build a new command with a given id
pub fn new_with_id(target: &EntityTopicId, cmd_id: String) -> Self {
pub fn new(target: &EntityTopicId, cmd_id: String) -> Self {
Command {
target: target.clone(),
cmd_id,
Expand Down
28 changes: 28 additions & 0 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::convert::Infallible;
use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;
use time::format_description;
use time::OffsetDateTime;

const ENTITY_ID_SEGMENTS: usize = 4;

Expand Down Expand Up @@ -644,6 +646,32 @@ pub enum ChannelFilter {
CommandMetadata(OperationType),
}

pub struct IdGenerator {
prefix: String,
}

impl IdGenerator {
pub fn new(prefix: &str) -> Self {
IdGenerator {
prefix: prefix.into(),
}
}

pub fn new_id(&self) -> String {
format!(
"{}-{}",
self.prefix,
OffsetDateTime::now_utc()
.format(&format_description::well_known::Rfc3339)
.unwrap(),
)
}

pub fn is_generator_of(&self, cmd_id: &str) -> bool {
cmd_id.contains(&self.prefix)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion crates/core/tedge_watchdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ clap = { workspace = true }
freedesktop_entry_parser = { workspace = true }
futures = { workspace = true }
mqtt_channel = { workspace = true }
nanoid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tedge_api = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion crates/extensions/c8y_mapper_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ camino = { workspace = true }
clock = { workspace = true }
json-writer = { workspace = true }
logged_command = { workspace = true }
nanoid = { workspace = true }
plugin_sm = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ fn convert_from_old_agent_response(
) -> Result<Option<MqttMessage>, String> {
if let Ok(Value::Object(response)) = serde_json::from_slice(payload) {
if let Some(Value::String(cmd_id)) = response.get("id") {
if let Ok(topic) = Topic::new(&format!("te/device/main///cmd/{cmd_type}/{cmd_id}")) {
// The new mapper expects command ids with a specific prefix
if let Ok(topic) = Topic::new(&format!(
"te/device/main///cmd/{cmd_type}/c8y-mapper-{cmd_id}"
)) {
return Ok(Some(
MqttMessage::new(&topic, payload)
.with_retain()
Expand Down
Loading

1 comment on commit 25efa8f

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
355 0 3 355 100 57m39.900999999s

Please sign in to comment.