Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not process lldp & link state data if topology map is disabled #69

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,12 @@ impl CGWConnectionProcessor {
return Ok(CGWConnectionState::ClosedGracefully);
}
Text(payload) => {
if let Ok(evt) =
cgw_ucentral_event_parse(&device_type, &payload, timestamp.timestamp())
{
if let Ok(evt) = cgw_ucentral_event_parse(
&device_type,
self.feature_topomap_enabled,
&payload,
timestamp.timestamp(),
) {
kafaka_msg = payload.clone();
if let CGWUCentralEventType::State(_) = evt.evt_type {
if let Some(decompressed) = evt.decompressed.clone() {
Expand Down
4 changes: 2 additions & 2 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,7 @@ mod tests {
#[test]
fn can_parse_connect_event() -> Result<()> {
let msg = get_connect_json_msg();
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?;
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?;

match event.evt_type {
CGWUCentralEventType::Connect(_) => {
Expand All @@ -1467,7 +1467,7 @@ mod tests {

fn can_parse_log_event() -> Result<()> {
let msg = get_log_json_msg();
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(msg, 0)?;
let event: CGWUCentralEvent = cgw_ucentral_ap_parse_message(false, msg, 0)?;

match event.evt_type {
CGWUCentralEventType::Log(_) => {
Expand Down
62 changes: 39 additions & 23 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ fn parse_link_state_data(
}
}

fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result<CGWUCentralEvent> {
fn parse_state_event_data(
feature_topomap_enabled: bool,
map: CGWUCentralJRPCMessage,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
if !map.contains_key("params") {
return Err(Error::UCentralParser(
"Invalid state event received: params is missing",
Expand Down Expand Up @@ -350,30 +354,32 @@ fn parse_state_event_data(map: CGWUCentralJRPCMessage, timestamp: i64) -> Result
let mut lldp_links: Vec<CGWUCentralEventStateLinks> = Vec::new();
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();

if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(v, &mut lldp_links)?;
if feature_topomap_enabled {
if state_map.contains_key("lldp-peers") {
if let Value::Object(v) = &state_map["lldp-peers"] {
parse_lldp_data(v, &mut lldp_links)?;
}
}
}

let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();
let mut upstream_ifaces: Vec<String> = Vec::new();
let mut downstream_ifaces: Vec<String> = Vec::new();

if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
if state_map.contains_key("link-state") {
if let Value::Object(obj) = &state_map["link-state"] {
parse_link_state_data(obj, &mut upstream_ifaces, &mut downstream_ifaces);
}
}
}

if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
iface,
&mut clients_links,
&upstream_ifaces,
timestamp,
)?;
if let Value::Array(arr) = &state_map["interfaces"] {
for interface in arr {
if let Value::Object(iface) = interface {
parse_interface_data(
iface,
&mut clients_links,
&upstream_ifaces,
timestamp,
)?;
}
}
}
}
Expand Down Expand Up @@ -770,7 +776,11 @@ fn parse_realtime_event_data(
}
}

pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CGWUCentralEvent> {
pub fn cgw_ucentral_ap_parse_message(
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
let map: CGWUCentralJRPCMessage = match serde_json::from_str(message) {
Ok(m) => m,
Err(e) => {
Expand Down Expand Up @@ -835,9 +845,15 @@ pub fn cgw_ucentral_ap_parse_message(message: &str, timestamp: i64) -> Result<CG

return Ok(connect_event);
} else if method == "state" {
return parse_state_event_data(map, timestamp);
return parse_state_event_data(feature_topomap_enabled, map, timestamp);
} else if method == "event" {
return parse_realtime_event_data(map, timestamp);
if feature_topomap_enabled {
return parse_realtime_event_data(map, timestamp);
} else {
return Err(Error::UCentralParser(
"Received unexpected event while topo map feature is disabled",
));
}
}
} else if map.contains_key("result") {
if !map.contains_key("id") {
Expand Down
9 changes: 7 additions & 2 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,16 @@ pub fn cgw_ucentral_parse_command_message(message: &str) -> Result<CGWUCentralCo

pub fn cgw_ucentral_event_parse(
device_type: &CGWDeviceType,
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
match device_type {
CGWDeviceType::CGWDeviceAP => cgw_ucentral_ap_parse_message(message, timestamp),
CGWDeviceType::CGWDeviceSwitch => cgw_ucentral_switch_parse_message(message, timestamp),
CGWDeviceType::CGWDeviceAP => {
cgw_ucentral_ap_parse_message(feature_topomap_enabled, message, timestamp)
}
CGWDeviceType::CGWDeviceSwitch => {
cgw_ucentral_switch_parse_message(feature_topomap_enabled, message, timestamp)
}
}
}
33 changes: 18 additions & 15 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ fn parse_fdb_data(
}

pub fn cgw_ucentral_switch_parse_message(
feature_topomap_enabled: bool,
message: &str,
timestamp: i64,
) -> Result<CGWUCentralEvent> {
Expand Down Expand Up @@ -185,26 +186,28 @@ pub fn cgw_ucentral_switch_parse_message(
// (FDBClient) will have all additional met info, like VID
let mut clients_links: Vec<CGWUCentralEventStateClients> = Vec::new();

if state_map.contains_key("default-gateway") {
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.first() {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
if feature_topomap_enabled {
if state_map.contains_key("default-gateway") {
if let Value::Array(default_gw) = &state_map["default-gateway"] {
if let Some(gw) = default_gw.first() {
if let Value::String(port) = &gw["out-port"] {
upstream_port = Some(port.as_str().to_string());
}
}
}
}
}

if state_map.contains_key("lldp-peers") {
parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?;
}
if state_map.contains_key("lldp-peers") {
parse_lldp_data(&state_map["lldp-peers"], &mut lldp_links, &upstream_port)?;
}

if state_map.contains_key("mac-forwarding-table") {
parse_fdb_data(
&state_map["mac-forwarding-table"],
&mut clients_links,
&upstream_port,
)?;
if state_map.contains_key("mac-forwarding-table") {
parse_fdb_data(
&state_map["mac-forwarding-table"],
&mut clients_links,
&upstream_port,
)?;
}
}

let state_event = CGWUCentralEvent {
Expand Down
19 changes: 7 additions & 12 deletions utils/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@ services:
image: "postgres:latest"
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "max_connections=400"
- "-c"
- "shared_buffers=20MB"
- "-c"
- "ssl=on"
- "-c"
- "ssl_cert_file=/var/lib/postgresql/certs/server.crt"
- "-c"
- "ssl_key_file=/var/lib/postgresql/certs/server.key"
user: postgres
command: >
bash -c "
chown 999:999 /var/lib/postgresql/certs/server.key &&
chmod 600 /var/lib/postgresql/certs/server.key &&
postgres -c max_connections=400 -c shared_buffers=20MB -c ssl=on -c ssl_cert_file=/var/lib/postgresql/certs/server.crt -c ssl_key_file=/var/lib/postgresql/certs/server.key
"
env_file:
- postgresql.env
restart: always
Expand Down