Skip to content

Commit

Permalink
Merge pull request #69 from Telecominfraproject/dev-fix-processing-to…
Browse files Browse the repository at this point in the history
…pomap

Do not process lldp & link state data if topology map is disabled
  • Loading branch information
Cahb authored Jul 31, 2024
2 parents 274a8c8 + 94187e5 commit c0237bc
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 57 deletions.
9 changes: 6 additions & 3 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ impl CGWConnectionProcessor {
return Ok(CGWConnectionState::ClosedGracefully);
}
Text(payload) => {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
if let Ok(evt) = cgw_ucentral_event_parse(
&device_type,
self.feature_topomap_enabled,
&payload,
timestamp.timestamp(),
) {
kafaka_msg = payload.clone();
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
Expand Down
4 changes: 2 additions & 2 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ mod tests {
#[test]
fn can_parse_connect_event() -> Result<()> {
let msg = get_connect_json_msg();
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?;
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?;

match event.evt_type {
CGWUCentralEventType::Connect(_) => {
Expand All @@ -1527,7 +1527,7 @@ mod tests {

fn can_parse_log_event() -> Result<()> {
let msg = get_log_json_msg();
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?;
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?;

match event.evt_type {
CGWUCentralEventType::Log(_) => {
Expand Down
62 changes: 39 additions & 23 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ fn parse_link_state_data(
}
}

fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result<CGWUCentralEvent> {
fn parse_state_event_data(
feature_topomap_enabled: bool,
map: CGWUCentralJRPCMessage,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
if !map.contains_key("params") {
return Err(Error::UCentralParser(
"Invalid state event received: params is missing",
Expand Down Expand Up @@ -350,30 +354,32 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
let mut lldp_links: Vec<CGWUCentralEventStateLinks> = Vec::new();
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();

if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(v, &mut lldp_links)?;
if feature_topomap_enabled {
if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(v, &mut lldp_links)?;
}
}
}

let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();
let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();

if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
}
}
}

if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
iface,
&mut clients_links,
&upstream_ifaces,
timestamp,
)?;
if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
iface,
&mut clients_links,
&upstream_ifaces,
timestamp,
)?;
}
}
}
}
Expand Down Expand Up @@ -770,7 +776,11 @@ fn parse_realtime_event_data(
}
}

pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CGWUCentralEvent> {
pub fn cgw_ucentral_ap_parse_message(
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) {
Ok(m) => m,
Err(e) => {
Expand Down Expand Up @@ -835,9 +845,15 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG

return Ok(connect_event);
} else if method == "state" {
return parse_state_event_data(map, timestamp);
return parse_state_event_data(feature_topomap_enabled, map, timestamp);
} else if method == "event" {
return parse_realtime_event_data(map, timestamp);
if feature_topomap_enabled {
return parse_realtime_event_data(map, timestamp);
} else {
return Err(Error::UCentralParser(
"Received unexpected event while topo map feature is disabled",
));
}
}
} else if map.contains_key("result") {
if !map.contains_key("id") {
Expand Down
9 changes: 7 additions & 2 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,16 @@ pub fn cgw_ucentral_parse_command_message(message: &str) -> Result<CGWUCentralCo

pub fn cgw_ucentral_event_parse(
device_type: &CGWDeviceType,
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
match device_type {
CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(message, timestamp),
CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(message, timestamp),
CGWDeviceType::CGWDeviceAP => {
cgw_ucentral_ap_parse_message(feature_topomap_enabled, message, timestamp)
}
CGWDeviceType::CGWDeviceSwitch => {
cgw_ucentral_switch_parse_message(feature_topomap_enabled, message, timestamp)
}
}
}
33 changes: 18 additions & 15 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn parse_fdb_data(
}

pub fn cgw_ucentral_switch_parse_message(
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
Expand Down Expand Up @@ -185,26 +186,28 @@ pub fn cgw_ucentral_switch_parse_message(
// (FDBClient) will have all additional met info, like VID
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();

if state_map.contains_key("default-gateway") {
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.first() {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
if feature_topomap_enabled {
if state_map.contains_key("default-gateway") {
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.first() {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
}
}
}
}
}

if state_map.contains_key("lldp-peers") {
parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?;
}
if state_map.contains_key("lldp-peers") {
parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?;
}

if state_map.contains_key("mac-forwarding-table") {
parse_fdb_data(
&state_map["mac-forwarding-table"],
&mut clients_links,
&upstream_port,
)?;
if state_map.contains_key("mac-forwarding-table") {
parse_fdb_data(
&state_map["mac-forwarding-table"],
&mut clients_links,
&upstream_port,
)?;
}
}

let state_event = CGWUCentralEvent {
Expand Down
19 changes: 7 additions & 12 deletions utils/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@ services:
image: "postgres:latest"
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "max_connections=400"
- "-c"
- "shared_buffers=20MB"
- "-c"
- "ssl=on"
- "-c"
- "ssl_cert_file=/var/lib/postgresql/certs/server.crt"
- "-c"
- "ssl_key_file=/var/lib/postgresql/certs/server.key"
user: postgres
command: >
bash -c "
chown 999:999 /var/lib/postgresql/certs/server.key &&
chmod 600 /var/lib/postgresql/certs/server.key &&
postgres -c max_connections=400 -c shared_buffers=20MB -c ssl=on -c ssl_cert_file=/var/lib/postgresql/certs/server.crt -c ssl_key_file=/var/lib/postgresql/certs/server.key
"
env_file:
- postgresql.env
restart: always
Expand Down

0 comments on commit c0237bc

Please sign in to comment.