Skip to content

Commit

Permalink
Envelop state|realtime events with wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Jan 10, 2025
1 parent 3f351df commit 2de9632
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 24 deletions.
85 changes: 61 additions & 24 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::{
cgw_connection_server::{CGWConnectionServer, CGWConnectionServerReqMsg},
cgw_device::{CGWDeviceCapabilities, CGWDeviceType},
cgw_errors::{Error, Result},
cgw_nb_api_listener::{cgw_construct_infra_request_result_msg, CGWKafkaProducerTopic},
cgw_nb_api_listener::{
cgw_construct_infra_realtime_event_message, cgw_construct_infra_request_result_msg,
cgw_construct_infra_state_event_message, CGWKafkaProducerTopic,
},
cgw_ucentral_messages_queue_manager::{
CGWUCentralMessagesQueueItem, CGWUCentralMessagesQueueState, CGW_MESSAGES_QUEUE,
MESSAGE_TIMEOUT_DURATION,
Expand Down Expand Up @@ -330,6 +333,7 @@ impl CGWConnectionProcessor {
timestamp.timestamp(),
) {
kafka_msg.clone_from(&payload);
let event_type_str: String = evt.evt_type.to_string();
match evt.evt_type {
CGWUCentralEventType::State(_) => {
if let Some(decompressed) = evt.decompressed.clone() {
Expand All @@ -351,20 +355,36 @@ impl CGWConnectionProcessor {
self.cgw_server.clone(),
);
}
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
kafka_msg,
CGWKafkaProducerTopic::State,
)?;
if let Ok(resp) = cgw_construct_infra_state_event_message(
event_type_str,
kafka_msg,
self.cgw_server.get_local_id(),
) {
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
resp,
CGWKafkaProducerTopic::InfraRealtime,
)?;
} else {
error!("Failed to construct rebalance_group message!");
}
}
CGWUCentralEventType::Healthcheck => {
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
kafka_msg,
CGWKafkaProducerTopic::State,
)?;
if let Ok(resp) = cgw_construct_infra_state_event_message(
event_type_str,
kafka_msg,
self.cgw_server.get_local_id(),
) {
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
resp,
CGWKafkaProducerTopic::InfraRealtime,
)?;
} else {
error!("Failed to construct rebalance_group message!");
}
}
CGWUCentralEventType::Reply(content) => {
if *fsm_state != CGWUCentralMessageProcessorState::ResultPending {
Expand Down Expand Up @@ -415,14 +435,23 @@ impl CGWConnectionProcessor {
self.cgw_server.clone(),
);
}
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
kafka_msg,
CGWKafkaProducerTopic::InfraRealtime,
)?;

if let Ok(resp) = cgw_construct_infra_realtime_event_message(
event_type_str,
kafka_msg,
self.cgw_server.get_local_id(),
) {
self.cgw_server
.enqueue_mbox_message_from_device_to_nb_api_c(
self.group_id,
resp,
CGWKafkaProducerTopic::InfraRealtime,
)?;
} else {
error!("Failed to construct rebalance_group message!");
}
}
CGWUCentralEventType::Connect(_cgwucentral_event_connect) => {
CGWUCentralEventType::Connect(_) => {
error!("Expected to receive Connect event as one of the first message from infra during connection procedure!");
}
CGWUCentralEventType::Log
Expand All @@ -436,11 +465,19 @@ impl CGWConnectionProcessor {
| CGWUCentralEventType::CfgPending
| CGWUCentralEventType::DeviceUpdate
| CGWUCentralEventType::Recovery => {
self.cgw_server.enqueue_mbox_message_from_cgw_to_nb_api(
self.group_id,
if let Ok(resp) = cgw_construct_infra_realtime_event_message(
event_type_str,
kafka_msg,
CGWKafkaProducerTopic::InfraRealtime,
)
self.cgw_server.get_local_id(),
) {
self.cgw_server.enqueue_mbox_message_from_cgw_to_nb_api(
self.group_id,
resp,
CGWKafkaProducerTopic::InfraRealtime,
)
} else {
error!("Failed to construct rebalance_group message!");
}
}
CGWUCentralEventType::Unknown => {
error!("Received unknown event type! Message payload: {kafka_msg}");
Expand Down
46 changes: 46 additions & 0 deletions src/cgw_nb_api_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,52 @@ pub struct InfraLeaveMessage {
pub reporter_shard_id: i32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct InfraStateEventMessage {
pub r#type: &'static str,
pub event_type: String,
pub payload: String,
pub reporter_shard_id: i32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct InfraRealtimeEventMessage {
pub r#type: &'static str,
pub event_type: String,
pub payload: String,
pub reporter_shard_id: i32,
}

pub fn cgw_construct_infra_state_event_message(
event_type: String,
payload: String,
reporter_shard_id: i32,
) -> Result<String> {
let state_message = InfraStateEventMessage {
r#type: "infrastructure_state_event_message",
event_type,
payload,
reporter_shard_id,
};

Ok(serde_json::to_string(&state_message)?)
}

pub fn cgw_construct_infra_realtime_event_message(
event_type: String,
payload: String,
reporter_shard_id: i32,
) -> Result<String> {
let realtime_message = InfraRealtimeEventMessage {
r#type: "infrastructure_realtime_event_message",
event_type,
payload,
reporter_shard_id,
};

Ok(serde_json::to_string(&realtime_message)?)
}

pub fn cgw_construct_infra_group_create_response(
infra_group_id: i32,
reporter_shard_id: i32,
Expand Down
26 changes: 26 additions & 0 deletions src/cgw_ucentral_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ pub enum CGWUCentralEventType {
Unknown,
}

impl std::fmt::Display for CGWUCentralEventType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
CGWUCentralEventType::Connect(_) => write!(f, "connect"),
CGWUCentralEventType::State(_) => write!(f, "state"),
CGWUCentralEventType::Healthcheck => write!(f, "healthcheck"),
CGWUCentralEventType::Log => write!(f, "log"),
CGWUCentralEventType::Event => write!(f, "event"),
CGWUCentralEventType::Alarm => write!(f, "alarm"),
CGWUCentralEventType::WifiScan => write!(f, "wifiScan"),
CGWUCentralEventType::CrashLog => write!(f, "crashLog"),
CGWUCentralEventType::RebootLog => write!(f, "rebootLog"),
CGWUCentralEventType::CfgPending => write!(f, "cfgPending"),
CGWUCentralEventType::DeviceUpdate => write!(f, "deviceupdate"),
CGWUCentralEventType::Ping => write!(f, "ping"),
CGWUCentralEventType::Recovery => write!(f, "recovery"),
CGWUCentralEventType::VenueBroadcast => write!(f, "venue_broadcast"),
CGWUCentralEventType::RealtimeEvent(_) => {
write!(f, "realtime_event")
}
CGWUCentralEventType::Reply(_) => write!(f, "reply"),
CGWUCentralEventType::Unknown => write!(f, "unknown"),
}
}
}

#[derive(Debug, Deserialize, Serialize)]
pub struct CGWUCentralEvent {
pub serial: MacAddress,
Expand Down

0 comments on commit 2de9632

Please sign in to comment.