From 489ee0f8961afe00100f161107c3d40a8e769611 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Wed, 24 Jul 2024 16:44:59 +0300 Subject: [PATCH] Send unzipped AP State Event message to Kafka --- src/cgw_connection_processor.rs | 33 +++++++++++++++++++++++++++++-- src/cgw_ucentral_ap_parser.rs | 7 +++++++ src/cgw_ucentral_parser.rs | 2 ++ src/cgw_ucentral_switch_parser.rs | 2 ++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/cgw_connection_processor.rs b/src/cgw_connection_processor.rs index efc4837..6c5fb85 100644 --- a/src/cgw_connection_processor.rs +++ b/src/cgw_connection_processor.rs @@ -8,7 +8,7 @@ use crate::{ }, cgw_ucentral_parser::{ cgw_ucentral_event_parse, cgw_ucentral_parse_connect_event, CGWUCentralCommandType, - CGWUCentralEventType, + CGWUCentralEventType, CGWUCentralJRPCMessage, }, cgw_ucentral_topology_map::CGWUCentralTopologyMap, }; @@ -19,6 +19,7 @@ use futures_util::{ stream::{SplitSink, SplitStream}, FutureExt, SinkExt, StreamExt, }; +use serde_json::Value; use std::{net::SocketAddr, str::FromStr, sync::Arc}; use tokio::{ net::TcpStream, @@ -268,6 +269,7 @@ impl CGWConnectionProcessor { // Make sure we always track the as accurate as possible the time // of receiving of the event (where needed). let timestamp = Local::now(); + let mut kafaka_msg: String = String::new(); match msg { Ok(msg) => match msg { @@ -278,7 +280,34 @@ impl CGWConnectionProcessor { if let Ok(evt) = cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp()) { + kafaka_msg = payload.clone(); if let CGWUCentralEventType::State(_) = evt.evt_type { + if let Ok(mut origin_msg) = + serde_json::from_str::(&payload) + { + let params_value = match Value::from_str( + evt.decompressed.clone().unwrap().as_str(), + ) { + Ok(val) => val, + Err(_e) => { + return Err(Error::ConnectionProcessor( + "Failed to cast decompressed message to JSON Value", + )); + } + }; + if let Some(value) = origin_msg.get_mut("params") { + *value = params_value; + kafaka_msg = match serde_json::to_string(&origin_msg) { + Ok(msg) => msg, + Err(_e) => { + return Err(Error::ConnectionProcessor( + "Failed to create decompressed Event message", + )); + } + }; + } + } + if self.feature_topomap_enabled { let topo_map = CGWUCentralTopologyMap::get_ref(); topo_map.process_state_message(&device_type, evt).await; @@ -313,7 +342,7 @@ impl CGWConnectionProcessor { } self.cgw_server - .enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, payload)?; + .enqueue_mbox_message_from_device_to_nb_api_c(self.group_id, kafaka_msg)?; return Ok(CGWConnectionState::IsActive); } Ping(_t) => { diff --git a/src/cgw_ucentral_ap_parser.rs b/src/cgw_ucentral_ap_parser.rs index 7b91a98..fd33132 100644 --- a/src/cgw_ucentral_ap_parser.rs +++ b/src/cgw_ucentral_ap_parser.rs @@ -388,6 +388,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result links: clients_links, }, }), + decompressed: Some(unzipped_data), }; return Ok(state_event); @@ -438,6 +439,7 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result links: clients_links, }, }), + decompressed: None, }; return Ok(state_event); @@ -655,6 +657,7 @@ fn parse_realtime_event_data( }, ), }), + decompressed: None, }) } "client.leave" => { @@ -734,6 +737,7 @@ fn parse_realtime_event_data( }, ), }), + decompressed: None, }) } _ => { @@ -775,6 +779,7 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result Result Result, } #[derive(Deserialize, Debug, Serialize)] @@ -262,6 +263,7 @@ pub fn cgw_ucentral_parse_connect_event(message: Message) -> Result