diff --git a/src/cgw_connection_processor.rs b/src/cgw_connection_processor.rs index 8a0d62c..9d7abef 100644 --- a/src/cgw_connection_processor.rs +++ b/src/cgw_connection_processor.rs @@ -277,9 +277,12 @@ impl CGWConnectionProcessor { return Ok(CGWConnectionState::ClosedGracefully); } Text(payload) => { - if let Ok(evt) = - cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp()) - { + if let Ok(evt) = cgw_ucentral_event_parse( + &device_type, + self.feature_topomap_enabled, + &payload, + timestamp.timestamp(), + ) { kafaka_msg = payload.clone(); if let CGWUCentralEventType::State(_) = evt.evt_type { if let Some(decompressed) = evt.decompressed.clone() { diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 18a6aa2..4216b76 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -1509,7 +1509,7 @@ mod tests { #[test] fn can_parse_connect_event() -> Result<()> { let msg = get_connect_json_msg(); - let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?; + let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?; match event.evt_type { CGWUCentralEventType::Connect(_) => { @@ -1527,7 +1527,7 @@ mod tests { fn can_parse_log_event() -> Result<()> { let msg = get_log_json_msg(); - let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?; + let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?; match event.evt_type { CGWUCentralEventType::Log(_) => { diff --git a/src/cgw_ucentral_ap_parser.rs b/src/cgw_ucentral_ap_parser.rs index 7bd966b..61b9464 100644 --- a/src/cgw_ucentral_ap_parser.rs +++ b/src/cgw_ucentral_ap_parser.rs @@ -304,7 +304,11 @@ fn parse_link_state_data( } } -fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result { +fn parse_state_event_data( + feature_topomap_enabled: bool, + map: CGWUCentralJRPCMessage, + timestamp: i64, +) -> Result { if !map.contains_key("params") { return Err(Error::UCentralParser( "Invalid state event received: params is missing", @@ -350,30 +354,32 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result let mut lldp_links: Vec = Vec::new(); let mut clients_links: Vec = Vec::new(); - if state_map.contains_key("lldp-peers") { - if let Value::Object(v) = &state_map["lldp-peers"] { - parse_lldp_data(v, &mut lldp_links)?; + if feature_topomap_enabled { + if state_map.contains_key("lldp-peers") { + if let Value::Object(v) = &state_map["lldp-peers"] { + parse_lldp_data(v, &mut lldp_links)?; + } } - } - let mut upstream_ifaces: Vec = Vec::new(); - let mut downstream_ifaces: Vec = Vec::new(); + let mut upstream_ifaces: Vec = Vec::new(); + let mut downstream_ifaces: Vec = Vec::new(); - if state_map.contains_key("link-state") { - if let Value::Object(obj) = &state_map["link-state"] { - parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces); + if state_map.contains_key("link-state") { + if let Value::Object(obj) = &state_map["link-state"] { + parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces); + } } - } - if let Value::Array(arr) = &state_map["interfaces"] { - for interface in arr { - if let Value::Object(iface) = interface { - parse_interface_data( - iface, - &mut clients_links, - &upstream_ifaces, - timestamp, - )?; + if let Value::Array(arr) = &state_map["interfaces"] { + for interface in arr { + if let Value::Object(iface) = interface { + parse_interface_data( + iface, + &mut clients_links, + &upstream_ifaces, + timestamp, + )?; + } } } } @@ -770,7 +776,11 @@ fn parse_realtime_event_data( } } -pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result { +pub fn cgw_ucentral_ap_parse_message( + feature_topomap_enabled: bool, + message: &str, + timestamp: i64, +) -> Result { let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) { Ok(m) => m, Err(e) => { @@ -835,9 +845,15 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result Result Result { match device_type { - CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(message, timestamp), - CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(message, timestamp), + CGWDeviceType::CGWDeviceAP => { + cgw_ucentral_ap_parse_message(feature_topomap_enabled, message, timestamp) + } + CGWDeviceType::CGWDeviceSwitch => { + cgw_ucentral_switch_parse_message(feature_topomap_enabled, message, timestamp) + } } } diff --git a/src/cgw_ucentral_switch_parser.rs b/src/cgw_ucentral_switch_parser.rs index c015a5b..311a0e7 100644 --- a/src/cgw_ucentral_switch_parser.rs +++ b/src/cgw_ucentral_switch_parser.rs @@ -124,6 +124,7 @@ fn parse_fdb_data( } pub fn cgw_ucentral_switch_parse_message( + feature_topomap_enabled: bool, message: &str, timestamp: i64, ) -> Result { @@ -185,26 +186,28 @@ pub fn cgw_ucentral_switch_parse_message( // (FDBClient) will have all additional met info, like VID let mut clients_links: Vec = Vec::new(); - if state_map.contains_key("default-gateway") { - if let Value::Array(default_gw) = &state_map["default-gateway"] { - if let Some(gw) = default_gw.first() { - if let Value::String(port) = &gw["out-port"] { - upstream_port = Some(port.as_str().to_string()); + if feature_topomap_enabled { + if state_map.contains_key("default-gateway") { + if let Value::Array(default_gw) = &state_map["default-gateway"] { + if let Some(gw) = default_gw.first() { + if let Value::String(port) = &gw["out-port"] { + upstream_port = Some(port.as_str().to_string()); + } } } } - } - if state_map.contains_key("lldp-peers") { - parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?; - } + if state_map.contains_key("lldp-peers") { + parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?; + } - if state_map.contains_key("mac-forwarding-table") { - parse_fdb_data( - &state_map["mac-forwarding-table"], - &mut clients_links, - &upstream_port, - )?; + if state_map.contains_key("mac-forwarding-table") { + parse_fdb_data( + &state_map["mac-forwarding-table"], + &mut clients_links, + &upstream_port, + )?; + } } let state_event = CGWUCentralEvent { diff --git a/utils/docker/docker-compose.yml b/utils/docker/docker-compose.yml index ad6183b..654b513 100644 --- a/utils/docker/docker-compose.yml +++ b/utils/docker/docker-compose.yml @@ -32,18 +32,13 @@ services: image: "postgres:latest" ports: - "5432:5432" - command: - - "postgres" - - "-c" - - "max_connections=400" - - "-c" - - "shared_buffers=20MB" - - "-c" - - "ssl=on" - - "-c" - - "ssl_cert_file=/var/lib/postgresql/certs/server.crt" - - "-c" - - "ssl_key_file=/var/lib/postgresql/certs/server.key" + user: postgres + command: > + bash -c " + chown 999:999 /var/lib/postgresql/certs/server.key && + chmod 600 /var/lib/postgresql/certs/server.key && + postgres -c max_connections=400 -c shared_buffers=20MB -c ssl=on -c ssl_cert_file=/var/lib/postgresql/certs/server.crt -c ssl_key_file=/var/lib/postgresql/certs/server.key + " env_file: - postgresql.env restart: always