Skip to content

Commit

Permalink
Merge pull request #116 from Telecominfraproject/dev-multi-kafka-topics
Browse files Browse the repository at this point in the history
Send foreign connection to device with shard owner host and port
  • Loading branch information
Cahb authored Jan 27, 2025
2 parents cef13c0 + 198cfc6 commit 5da8334
Show file tree
Hide file tree
Showing 5 changed files with 507 additions and 32 deletions.
37 changes: 37 additions & 0 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::Serialize;
use uuid::Uuid;

use std::{net::SocketAddr, str::FromStr, sync::Arc};
Expand All @@ -39,6 +40,29 @@ use tungstenite::Message::{Close, Ping, Text};
type SStream = SplitStream<WebSocketStream<TlsStream<TcpStream>>>;
type SSink = SplitSink<WebSocketStream<TlsStream<TcpStream>>, Message>;

#[derive(Debug, Serialize)]
pub struct ForeignConnection {
pub r#type: &'static str,
pub infra_group_infra: MacAddress,
pub destination_shard_host: String,
pub destination_wss_port: u16,
}

fn cgw_construct_foreign_connection_msg(
infra_group_infra: MacAddress,
destination_shard_host: String,
destination_wss_port: u16,
) -> Result<String> {
let unassigned_infra_msg = ForeignConnection {
r#type: "foreign_connection",
infra_group_infra,
destination_shard_host,
destination_wss_port,
};

Ok(serde_json::to_string(&unassigned_infra_msg)?)
}

#[derive(Debug, Clone)]
pub enum CGWConnectionProcessorReqMsg {
// We got green light from server to process this connection on
Expand All @@ -47,6 +71,7 @@ pub enum CGWConnectionProcessorReqMsg {
// GID (used as kafka key).
AddNewConnectionAck(i32),
AddNewConnectionShouldClose,
ForeignConnection((String, u16)),
SinkRequestToDevice(CGWUCentralMessagesQueueItem),
// Conn Server can request this specific Processor to change
// it's internal GID value (infra list created - new gid,
Expand Down Expand Up @@ -520,6 +545,18 @@ impl CGWConnectionProcessor {
);
sink.send(Message::text(payload.message)).await.ok();
}
CGWConnectionProcessorReqMsg::ForeignConnection((destination_shard_host, destination_wss_port)) => {
if let Ok(resp) =
cgw_construct_foreign_connection_msg(processor_mac, destination_shard_host, destination_wss_port)
{
debug!("process_sink_mbox_rx_msg: ForeignConnection, processor (mac: {processor_mac}) payload: {}",
resp.clone()
);
sink.send(Message::text(resp)).await.ok();
} else {
error!("Failed to construct foreign_connection message!");
}
}
CGWConnectionProcessorReqMsg::GroupIdChanged(new_group_id) => {
debug!(
"Received GroupID change message: mac {} - old gid {} : new gid {}",
Expand Down
96 changes: 73 additions & 23 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,35 @@ impl CGWConnectionServer {
None
}

fn notify_device_on_foreign_connection(self: Arc<Self>, mac: MacAddress, shard_id_owner: i32) {
tokio::spawn(async move {
let (destination_shard_host, destination_wss_port) = match self
.cgw_remote_discovery
.get_shard_host_and_wss_port(shard_id_owner)
.await
{
Ok((host, port)) => (host, port),
Err(e) => {
error!("Failed to get shard {shard_id_owner} info! Error: {e}");
return;
}
};

let connmap_r_lock = self.connmap.map.read().await;
let msg: CGWConnectionProcessorReqMsg =
CGWConnectionProcessorReqMsg::ForeignConnection((destination_shard_host.clone(), destination_wss_port));

if let Some(c) = connmap_r_lock.get(&mac) {
match c.send(msg.clone()) {
Ok(_) => {
debug!("Notified {mac} about foreign connection. Shard hostname: {destination_shard_host}, wss port: {destination_wss_port}")
}
Err(e) => warn!("Failed to send GID change notification! Error: {e}"),
}
}
});
}

fn notify_devices_on_gid_change(self: Arc<Self>, infras_list: Vec<MacAddress>, new_gid: i32) {
tokio::spawn(async move {
// If we receive NB API add/del infra req,
Expand Down Expand Up @@ -1816,28 +1845,41 @@ impl CGWConnectionServer {
// Detect Capabilities changes as device exists in Cache
capability_changes = cgw_detect_device_changes(&current_device_caps, &caps);
} else {
let device: CGWDevice = CGWDevice::new(
device_type,
CGWDeviceState::CGWDeviceConnected,
device_group_id,
false,
caps,
);
devices_cache.add_device(&device_mac, &device);
// Infra is not found in RAM Cache - try to get it frim PostgreSQL DB
if let Some(infra) = self.cgw_remote_discovery.get_infra_from_db(device_mac).await {
device_group_id = infra.infra_group_id;
if let Some(group_owner_id) = self
.cgw_remote_discovery
.get_infra_group_owner_id(device_group_id)
.await
{
foreign_infra_join = self.local_cgw_id != group_owner_id;
group_owner_shard_id = group_owner_id;
}
} else {
let device: CGWDevice = CGWDevice::new(
device_type,
CGWDeviceState::CGWDeviceConnected,
device_group_id,
false,
caps,
);
devices_cache.add_device(&device_mac, &device);

// Update Redis Cache
match serde_json::to_string(&device) {
Ok(device_json) => {
if let Err(e) = self
.cgw_remote_discovery
.add_device_to_redis_cache(&device_mac, &device_json)
.await
{
error!("{e}");
// Update Redis Cache
match serde_json::to_string(&device) {
Ok(device_json) => {
if let Err(e) = self
.cgw_remote_discovery
.add_device_to_redis_cache(&device_mac, &device_json)
.await
{
error!("{e}");
}
}
Err(e) => {
error!("Failed to serialize device to json string! Error: {e}");
}
}
Err(e) => {
error!("Failed to serialize device to json string! Error: {e}");
}
}
}
Expand All @@ -1861,6 +1903,9 @@ impl CGWConnectionServer {
} else {
error!("Failed to construct foreign_infra_connection message!");
}

self.clone()
.notify_device_on_foreign_connection(device_mac, group_owner_shard_id);
}

// Send [un]assigned infra join message
Expand Down Expand Up @@ -1977,6 +2022,11 @@ impl CGWConnectionServer {
error!("{e}");
}
}
} else {
// Infra is not found in RAM Cache - try to get it frim PostgreSQL DB
if let Some(infra) = self.cgw_remote_discovery.get_infra_from_db(device_mac).await {
device_group_id = infra.infra_group_id;
}
}

// Insert device to disconnected device list
Expand All @@ -1994,9 +2044,9 @@ impl CGWConnectionServer {
.await;
}

// Send [un]assigned infra join message
let unassigned_infra_join: bool = device_group_id == 0;
let leave_message = match unassigned_infra_join {
// Send [un]assigned infra leave message
let unassigned_infra_leave: bool = device_group_id == 0;
let leave_message = match unassigned_infra_leave {
true => {
cgw_construct_unassigned_infra_leave_msg(device_mac, self.local_cgw_id)
}
Expand Down
26 changes: 26 additions & 0 deletions src/cgw_db_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,30 @@ impl CGWDBAccessor {
}
}
}

pub async fn get_infra(&self, mac: MacAddress) -> Option<CGWDBInfra> {
match self
.cl
.prepare("SELECT * from infras WHERE mac = $1")
.await
{
Ok(q) => {
let row = self.cl.query_one(&q, &[&mac]).await;

match row {
Ok(r) => {
return Some(CGWDBInfra::from(r));
},
Err(e) => {
error!("Query infra {mac} failed! Error: {e}");
return None;
},
};
}
Err(e) => {
error!("Failed to prepare statement! Error: {e}");
return None;
},
}
}
}
52 changes: 43 additions & 9 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct CGWREDISDBShard {
id: i32,
server_host: String,
server_port: u16,
wss_port: u16,
assigned_groups_num: i32,
capacity: i32,
threshold: i32,
Expand All @@ -72,28 +73,33 @@ impl From<Vec<String>> for CGWREDISDBShard {
} else if values[4] != "server_port" {
error!("redis.res[4] != server_port, unexpected.");
return CGWREDISDBShard::default();
} else if values[6] != "assigned_groups_num" {
error!("redis.res[6] != assigned_groups_num, unexpected.");
} else if values[6] != "wss_port" {
error!("redis.res[6] != wss_port, unexpected.");
return CGWREDISDBShard::default();
} else if values[8] != "capacity" {
error!("redis.res[8] != capacity, unexpected.");
} else if values[8] != "assigned_groups_num" {
error!("redis.res[8] != assigned_groups_num, unexpected.");
return CGWREDISDBShard::default();
} else if values[10] != "threshold" {
error!("redis.res[10] != threshold, unexpected.");
} else if values[10] != "capacity" {
error!("redis.res[10] != capacity, unexpected.");
return CGWREDISDBShard::default();
} else if values[12] != "threshold" {
error!("redis.res[12] != threshold, unexpected.");
return CGWREDISDBShard::default();
}

let id = values[1].parse::<i32>().unwrap_or_default();
let server_host = values[3].clone();
let server_port = values[5].parse::<u16>().unwrap_or_default();
let assigned_groups_num = values[7].parse::<i32>().unwrap_or_default();
let capacity = values[9].parse::<i32>().unwrap_or_default();
let threshold = values[11].parse::<i32>().unwrap_or_default();
let wss_port = values[7].parse::<u16>().unwrap_or_default();
let assigned_groups_num = values[9].parse::<i32>().unwrap_or_default();
let capacity = values[11].parse::<i32>().unwrap_or_default();
let threshold = values[13].parse::<i32>().unwrap_or_default();

CGWREDISDBShard {
id,
server_host,
server_port,
wss_port,
assigned_groups_num,
capacity,
threshold,
Expand All @@ -110,6 +116,8 @@ impl From<CGWREDISDBShard> for Vec<String> {
val.server_host,
"server_port".to_string(),
val.server_port.to_string(),
"wss_port".to_string(),
val.wss_port.to_string(),
"assigned_groups_num".to_string(),
val.assigned_groups_num.to_string(),
"capacity".to_string(),
Expand Down Expand Up @@ -338,6 +346,7 @@ impl CGWRemoteDiscovery {
id: app_args.cgw_id,
server_host: app_args.grpc_args.grpc_public_host.clone(),
server_port: app_args.grpc_args.grpc_public_port,
wss_port: app_args.wss_args.wss_port,
assigned_groups_num,
capacity: app_args.cgw_groups_capacity,
threshold: app_args.cgw_groups_threshold,
Expand Down Expand Up @@ -747,6 +756,26 @@ impl CGWRemoteDiscovery {
))
}

pub async fn get_shard_host_and_wss_port(&self, shard_id: i32) -> Result<(String, u16)> {
if let Err(_e) = self.sync_remote_cgw_map().await {
return Err(Error::RemoteDiscovery(
"Failed to sync (sync_remote_cgw_map) remote CGW info from REDIS",
));
}

let lock = self.remote_cgws_map.read().await;

match lock.get(&shard_id) {
Some(instance) => Ok((
instance.shard.server_host.clone(),
instance.shard.wss_port,
)),
None => Err(Error::RemoteDiscovery(
"Unexpected: Failed to find CGW shard",
)),
}
}

async fn validate_infra_group_cgw_assignee(&self, shard_id: i32) -> Result<i32> {
let lock = self.remote_cgws_map.read().await;

Expand Down Expand Up @@ -1598,4 +1627,9 @@ impl CGWRemoteDiscovery {
.await;
});
}

pub async fn get_infra_from_db(&self, mac: MacAddress)-> Option<CGWDBInfra>
{
self.db_accessor.clone().get_infra(mac).await
}
}
Loading

0 comments on commit 5da8334

Please sign in to comment.