diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 5bf83cbc9d9..e799c373898 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -24,15 +24,17 @@ pub enum C8yEndPointError { #[derive(Debug, Clone)] pub struct C8yEndPoint { c8y_host: String, + c8y_mqtt_host: String, pub device_id: String, pub token: Option, devices_internal_id: HashMap, } impl C8yEndPoint { - pub fn new(c8y_host: &str, device_id: &str) -> C8yEndPoint { + pub fn new(c8y_host: &str, c8y_mqtt_host: &str, device_id: &str) -> C8yEndPoint { C8yEndPoint { c8y_host: c8y_host.into(), + c8y_mqtt_host: c8y_mqtt_host.into(), device_id: device_id.into(), token: None, devices_internal_id: HashMap::new(), @@ -101,20 +103,29 @@ impl C8yEndPoint { // * . eg: t12345.c8y.io // These URLs may be both equivalent and point to the same tenant. // We are going to remove that and only check if the domain is the same. - let (tenant_host, _port) = self + let (tenant_http_host, _port) = self .c8y_host .split_once(':') .unwrap_or((&self.c8y_host, "")); + let (tenant_mqtt_host, _port) = self + .c8y_mqtt_host + .split_once(':') + .unwrap_or((&self.c8y_mqtt_host, "")); let url = Url::parse(url).ok()?; let url_host = url.domain()?; let (_, host) = url_host.split_once('.').unwrap_or((url_host, "")); - let (_, c8y_host) = tenant_host.split_once('.').unwrap_or((tenant_host, "")); + let (_, c8y_http_host) = tenant_http_host + .split_once('.') + .unwrap_or((tenant_http_host, "")); + let (_, c8y_mqtt_host) = tenant_mqtt_host + .split_once('.') + .unwrap_or((tenant_mqtt_host, "")); // The configured `c8y.http` setting may have a port value specified, // but the incoming URL is less likely to have any port specified. // Hence just matching the host prefix. - (host == c8y_host).then_some(url) + (host == c8y_http_host || host == c8y_mqtt_host).then_some(url) } } @@ -208,7 +219,7 @@ mod tests { #[test] fn get_url_for_get_id_returns_correct_address() { - let c8y = C8yEndPoint::new("test_host", "test_device"); + let c8y = C8yEndPoint::new("test_host", "test_host", "test_device"); let res = c8y.get_url_for_internal_id("test_device"); assert_eq!( @@ -219,7 +230,7 @@ mod tests { #[test] fn get_url_for_sw_list_returns_correct_address() { - let mut c8y = C8yEndPoint::new("test_host", "test_device"); + let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device"); c8y.devices_internal_id .insert("test_device".to_string(), "12345".to_string()); let internal_id = c8y.get_internal_id("test_device".to_string()).unwrap(); @@ -237,8 +248,39 @@ mod tests { #[test_case("https://t1124124.test.com/path")] #[test_case("https://t1124124.test.com/path/to/file.test")] #[test_case("https://t1124124.test.com/path/to/file")] + #[test_case("https://t1124124.mqtt-url.com/path/to/file")] fn url_is_my_tenant_correct_urls(url: &str) { - let c8y = C8yEndPoint::new("test.test.com", "test_device"); + let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device"); + assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap())); + } + + #[test_case("http://aaa.test.com")] + #[test_case("https://aaa.test.com")] + #[test_case("ftp://aaa.test.com")] + #[test_case("mqtt://aaa.test.com")] + #[test_case("https://t1124124.test.com")] + #[test_case("https://t1124124.test.com:12345")] + #[test_case("https://t1124124.test.com/path")] + #[test_case("https://t1124124.test.com/path/to/file.test")] + #[test_case("https://t1124124.test.com/path/to/file")] + #[test_case("https://t1124124.mqtt-url.com/path/to/file")] + fn url_is_my_tenant_correct_urls_with_http_port(url: &str) { + let c8y = C8yEndPoint::new("test.test.com:443", "test.mqtt-url.com", "test_device"); + assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap())); + } + + #[test_case("http://aaa.test.com")] + #[test_case("https://aaa.test.com")] + #[test_case("ftp://aaa.test.com")] + #[test_case("mqtt://aaa.test.com")] + #[test_case("https://t1124124.test.com")] + #[test_case("https://t1124124.test.com:12345")] + #[test_case("https://t1124124.test.com/path")] + #[test_case("https://t1124124.test.com/path/to/file.test")] + #[test_case("https://t1124124.test.com/path/to/file")] + #[test_case("https://t1124124.mqtt-url.com/path/to/file")] + fn url_is_my_tenant_correct_urls_with_mqtt_port(url: &str) { + let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com:8883", "test_device"); assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap())); } @@ -250,13 +292,13 @@ mod tests { #[test_case("http://localhost")] #[test_case("http://abc.com")] fn url_is_my_tenant_incorrect_urls(url: &str) { - let c8y = C8yEndPoint::new("test.test.com", "test_device"); + let c8y = C8yEndPoint::new("test.test.com", "test.mqtt-url.com", "test_device"); assert!(c8y.maybe_tenant_url(url).is_none()); } #[test] fn url_is_my_tenant_with_hostname_without_commas() { - let c8y = C8yEndPoint::new("custom-domain", "test_device"); + let c8y = C8yEndPoint::new("custom-domain", "non-custom-mqtt-domain", "test_device"); let url = "http://custom-domain/path"; assert_eq!(c8y.maybe_tenant_url(url), Some(url.parse().unwrap())); } @@ -264,14 +306,14 @@ mod tests { #[ignore = "Until #2804 is fixed"] #[test] fn url_is_my_tenant_check_not_too_broad() { - let c8y = C8yEndPoint::new("abc.com", "test_device"); + let c8y = C8yEndPoint::new("abc.com", "abc.com", "test_device"); dbg!(c8y.maybe_tenant_url("http://xyz.com")); assert!(c8y.maybe_tenant_url("http://xyz.com").is_none()); } #[test] fn check_non_cached_internal_id_for_a_device() { - let mut c8y = C8yEndPoint::new("test_host", "test_device"); + let mut c8y = C8yEndPoint::new("test_host", "test_host", "test_device"); c8y.devices_internal_id .insert("test_device".to_string(), "12345".to_string()); let end_pt_err = c8y.get_internal_id("test_child".into()).unwrap_err(); diff --git a/crates/extensions/c8y_firmware_manager/src/config.rs b/crates/extensions/c8y_firmware_manager/src/config.rs index 514d6a602e2..12ce81b4b1c 100644 --- a/crates/extensions/c8y_firmware_manager/src/config.rs +++ b/crates/extensions/c8y_firmware_manager/src/config.rs @@ -38,6 +38,7 @@ impl FirmwareManagerConfig { data_dir: DataDir, timeout_sec: Duration, c8y_url: String, + c8y_mqtt: String, c8y_prefix: TopicPrefix, ) -> Self { let local_http_host = format!("{}:{}", local_http_host, local_http_port).into(); @@ -46,7 +47,7 @@ impl FirmwareManagerConfig { let firmware_update_response_topics = TopicFilter::new_unchecked(FIRMWARE_UPDATE_RESPONSE_TOPICS); - let c8y_end_point = C8yEndPoint::new(&c8y_url, &tedge_device_id); + let c8y_end_point = C8yEndPoint::new(&c8y_url, &c8y_mqtt, &tedge_device_id); Self { tedge_device_id, @@ -72,6 +73,7 @@ impl FirmwareManagerConfig { let timeout_sec = tedge_config.firmware.child.update.timeout.duration(); let c8y_url = tedge_config.c8y.http.or_config_not_set()?.to_string(); + let c8y_mqtt = tedge_config.c8y.mqtt.or_config_not_set()?.to_string(); let c8y_prefix = tedge_config.c8y.bridge.topic_prefix.clone(); Ok(Self::new( @@ -82,6 +84,7 @@ impl FirmwareManagerConfig { data_dir, timeout_sec, c8y_url, + c8y_mqtt, c8y_prefix, )) } diff --git a/crates/extensions/c8y_firmware_manager/src/tests.rs b/crates/extensions/c8y_firmware_manager/src/tests.rs index 907760e1d3a..7e8d87eb153 100644 --- a/crates/extensions/c8y_firmware_manager/src/tests.rs +++ b/crates/extensions/c8y_firmware_manager/src/tests.rs @@ -643,6 +643,7 @@ async fn spawn_firmware_manager( tmp_dir.utf8_path_buf().into(), timeout_sec, C8Y_HOST.into(), + C8Y_HOST.into(), "c8y".try_into().unwrap(), ); diff --git a/crates/extensions/c8y_http_proxy/src/actor.rs b/crates/extensions/c8y_http_proxy/src/actor.rs index 5cb07d8245e..2fc156844d6 100644 --- a/crates/extensions/c8y_http_proxy/src/actor.rs +++ b/crates/extensions/c8y_http_proxy/src/actor.rs @@ -131,7 +131,11 @@ impl Actor for C8YHttpProxyActor { impl C8YHttpProxyActor { pub fn new(config: C8YHttpConfig, message_box: C8YHttpProxyMessageBox) -> Self { - let end_point = C8yEndPoint::new(&config.c8y_host, &config.device_id); + let end_point = C8yEndPoint::new( + &config.c8y_http_host, + &config.c8y_mqtt_host, + &config.device_id, + ); C8YHttpProxyActor { config, end_point, diff --git a/crates/extensions/c8y_http_proxy/src/lib.rs b/crates/extensions/c8y_http_proxy/src/lib.rs index b744dc171cd..953541a3d43 100644 --- a/crates/extensions/c8y_http_proxy/src/lib.rs +++ b/crates/extensions/c8y_http_proxy/src/lib.rs @@ -34,7 +34,8 @@ mod tests; /// Configuration of C8Y REST API #[derive(Default, Clone)] pub struct C8YHttpConfig { - pub c8y_host: String, + pub c8y_http_host: String, + pub c8y_mqtt_host: String, pub device_id: String, pub tmp_dir: PathBuf, identity: Option, @@ -45,14 +46,16 @@ impl TryFrom<&TEdgeConfig> for C8YHttpConfig { type Error = C8yHttpConfigBuildError; fn try_from(tedge_config: &TEdgeConfig) -> Result { - let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string(); + let c8y_http_host = tedge_config.c8y.http.or_config_not_set()?.to_string(); + let c8y_mqtt_host = tedge_config.c8y.mqtt.or_config_not_set()?.to_string(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); let tmp_dir = tedge_config.tmp.path.as_std_path().to_path_buf(); let identity = tedge_config.http.client.auth.identity()?; let retry_interval = Duration::from_secs(5); Ok(Self { - c8y_host, + c8y_http_host, + c8y_mqtt_host, device_id, tmp_dir, identity, diff --git a/crates/extensions/c8y_http_proxy/src/tests.rs b/crates/extensions/c8y_http_proxy/src/tests.rs index 40991ca99c3..94077f6ecdb 100644 --- a/crates/extensions/c8y_http_proxy/src/tests.rs +++ b/crates/extensions/c8y_http_proxy/src/tests.rs @@ -354,7 +354,8 @@ async fn retry_internal_id_on_expired_jwt_with_mock() { let mut http_actor = HttpActor::new().builder(); let config = C8YHttpConfig { - c8y_host: target_url.clone(), + c8y_http_host: target_url.clone(), + c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), identity: None, @@ -418,7 +419,8 @@ async fn retry_create_event_on_expired_jwt_with_mock() { let mut http_actor = HttpActor::new().builder(); let config = C8YHttpConfig { - c8y_host: target_url.clone(), + c8y_http_host: target_url.clone(), + c8y_mqtt_host: target_url.clone(), device_id: external_id.into(), tmp_dir: tmp_dir.into(), identity: None, @@ -661,7 +663,8 @@ async fn spawn_c8y_http_proxy( let mut http = FakeServerBox::builder(); let config = C8YHttpConfig { - c8y_host, + c8y_http_host: c8y_host.clone(), + c8y_mqtt_host: c8y_host, device_id, tmp_dir, identity: None, diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 9f6063b6390..14d8644c515 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -39,6 +39,7 @@ pub struct C8yMapperConfig { pub device_type: String, pub service: TEdgeConfigReaderService, pub c8y_host: String, + pub c8y_mqtt: String, pub tedge_http_host: Arc, pub topics: TopicFilter, pub capabilities: Capabilities, @@ -77,6 +78,7 @@ impl C8yMapperConfig { device_type: String, service: TEdgeConfigReaderService, c8y_host: String, + c8y_mqtt: String, tedge_http_host: Arc, topics: TopicFilter, capabilities: Capabilities, @@ -113,6 +115,7 @@ impl C8yMapperConfig { device_type, service, c8y_host, + c8y_mqtt, tedge_http_host, topics, capabilities, @@ -232,6 +235,7 @@ impl C8yMapperConfig { device_topic_id, device_type, service, + c8y_host.clone(), c8y_host, tedge_http_host, topics, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 678e3411ef5..9424c963eeb 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -260,7 +260,8 @@ impl CumulocityConverter { config.service.ty.clone() }; - let c8y_host = config.c8y_host.clone(); + let c8y_host = &config.c8y_host; + let c8y_mqtt = &config.c8y_mqtt; let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); @@ -272,7 +273,7 @@ impl CumulocityConverter { let log_dir = config.logs_path.join(TEDGE_AGENT_LOG_DIR); let operation_logs = OperationLogs::try_new(log_dir)?; - let c8y_endpoint = C8yEndPoint::new(&c8y_host, &device_id); + let c8y_endpoint = C8yEndPoint::new(c8y_host, c8y_mqtt, &device_id); let mqtt_schema = config.mqtt_schema.clone(); @@ -2001,37 +2002,37 @@ pub(crate) mod tests { } #[test_case( - "m/env", + "m/env", json!({ "temp": 1}) ;"measurement" )] #[test_case( - "e/click", + "e/click", json!({ "text": "Someone clicked" }) ;"event" )] #[test_case( - "a/temp", + "a/temp", json!({ "text": "Temperature too high" }) ;"alarm" )] #[test_case( - "twin/custom", + "twin/custom", json!({ "foo": "bar" }) ;"twin" )] #[test_case( - "status/health", + "status/health", json!({ "status": "up" }) ;"health status" )] #[test_case( - "cmd/restart", + "cmd/restart", json!({ }) ;"command metadata" )] #[test_case( - "cmd/restart/123", + "cmd/restart/123", json!({ "status": "init" }) ;"command" )] @@ -3408,7 +3409,7 @@ pub(crate) mod tests { let device_topic_id = EntityTopicId::default_main_device(); let device_type = "test-device-type".into(); let tedge_config = TEdgeConfig::load_toml_str("service.ty = \"service\""); - let c8y_host = "test.c8y.io".into(); + let c8y_host = "test.c8y.io".to_owned(); let tedge_http_host = "127.0.0.1".into(); let auth_proxy_addr = "127.0.0.1".into(); let auth_proxy_port = 8001; @@ -3428,6 +3429,7 @@ pub(crate) mod tests { device_topic_id, device_type, tedge_config.service.clone(), + c8y_host.clone(), c8y_host, tedge_http_host, topics, diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index c5ec3588ef8..90709603c2b 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2686,7 +2686,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { let device_topic_id = EntityTopicId::default_main_device(); let device_type = "test-device-type".into(); let config = TEdgeConfig::load_toml_str("service.ty = \"service\""); - let c8y_host = "test.c8y.io".into(); + let c8y_host = "test.c8y.io".to_owned(); let tedge_http_host = "localhost:8888".into(); let mqtt_schema = MqttSchema::default(); let auth_proxy_addr = "127.0.0.1".into(); @@ -2712,6 +2712,7 @@ pub(crate) fn test_mapper_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { device_topic_id, device_type, config.service.clone(), + c8y_host.clone(), c8y_host, tedge_http_host, topics,