Skip to content

Commit

Permalink
Add infra to store Device RAM Cache into Redis DB.
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Oct 21, 2024
1 parent a68b70b commit ce966bd
Show file tree
Hide file tree
Showing 3 changed files with 486 additions and 52 deletions.
130 changes: 114 additions & 16 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,18 @@ impl CGWConnectionServer {
queue_lock.start_queue_timeout_manager().await;
});

// Sync RAM cache with PostgressDB.
server
if let Err(e) = server.cgw_remote_discovery.sync_devices_cache().await {
error!("Failed to sync infras with Redis devices cache! Error: {e}");
}

// Sync RAM cache with Redis.
if let Err(e) = server
.cgw_remote_discovery
.sync_device_to_gid_cache(server.devices_cache.clone())
.await;
.sync_devices_cache_with_redis(server.devices_cache.clone())
.await
{
error!("Failed to sync Device cache! Error: {e}");
}

tokio::spawn(async move {
CGWMetrics::get_ref()
Expand Down Expand Up @@ -630,6 +637,17 @@ impl CGWConnectionServer {
// This only means that original capacity of all buffers is allocated to <100>,
// it can still increase on demand or need automatically (upon insert, push_back etc)
let cgw_buf_prealloc_size = 100;
let mut last_update_timestamp: i64 = match self
.cgw_remote_discovery
.get_redis_last_update_timestamp()
.await
{
Ok(timestamp) => timestamp,
Err(e) => {
error!("{e}");
0i64
}
};

loop {
if num_of_msg_read < buf_capacity {
Expand Down Expand Up @@ -658,6 +676,40 @@ impl CGWConnectionServer {
};
num_of_msg_read += rd_num;

if rd_num == 0 {
let curretn_timestamp: i64 = match self
.cgw_remote_discovery
.get_redis_last_update_timestamp()
.await
{
Ok(timestamp) => timestamp,
Err(e) => {
error!("{e}");
0i64
}
};

if last_update_timestamp != curretn_timestamp {
if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await {
error!("process_internal_nb_api_mbox: failed to sync GID to CGW map! Error: {e}");
}

if let Err(e) = self.cgw_remote_discovery.sync_devices_cache().await {
error!("process_internal_nb_api_mbox: failed to sync Devices cache! Error: {e}");
}

if let Err(e) = self
.cgw_remote_discovery
.sync_devices_cache_with_redis(self.devices_cache.clone())
.await
{
error!("Failed to sync Device cache! Error: {e}");
}

last_update_timestamp = curretn_timestamp;
}
}

// We read some messages, try to continue and read more
// If none read - break from recv, process all buffers that've
// been filled-up so far (both local and remote).
Expand All @@ -676,9 +728,6 @@ impl CGWConnectionServer {
// This is done to ensure that we don't fallback for redis too much,
// but still somewhat fully rely on it.
//
if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await {
error!("process_internal_nb_api_mbox: failed to sync GID to CGW map Error: {e}");
}

// TODO: rework to avoid re-allocating these buffers on each loop iteration
// (get mut slice of vec / clear when done?)
Expand Down Expand Up @@ -1529,6 +1578,20 @@ impl CGWConnectionServer {
}
}
device.update_device_capabilities(&caps);
match serde_json::to_string(device) {
Ok(device_json) => {
if let Err(e) = self
.cgw_remote_discovery
.add_device_to_redis_cache(&device_mac, &device_json)
.await
{
error!("{e}");
}
}
Err(e) => {
error!("Failed to serialize device to json string! Error: {e}");
}
}
} else {
let default_caps: CGWDeviceCapabilities = Default::default();
let changes = cgw_detect_device_chages(&default_caps, &caps);
Expand All @@ -1552,16 +1615,29 @@ impl CGWConnectionServer {
}
}

devices_cache.add_device(
&device_mac,
&CGWDevice::new(
device_type,
CGWDeviceState::CGWDeviceConnected,
0,
false,
caps,
),
let device: CGWDevice = CGWDevice::new(
device_type,
CGWDeviceState::CGWDeviceConnected,
0,
false,
caps,
);
devices_cache.add_device(&device_mac, &device);

match serde_json::to_string(&device) {
Ok(device_json) => {
if let Err(e) = self
.cgw_remote_discovery
.add_device_to_redis_cache(&device_mac, &device_json)
.await
{
error!("{e}");
}
}
Err(e) => {
error!("Failed to serialize device to json string! Error: {e}");
}
}

if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg(
device_mac,
Expand Down Expand Up @@ -1613,8 +1689,30 @@ impl CGWConnectionServer {
device_group_id = device.get_device_group_id();
if device.get_device_remains_in_db() {
device.set_device_state(CGWDeviceState::CGWDeviceDisconnected);

match serde_json::to_string(device) {
Ok(device_json) => {
if let Err(e) = self
.cgw_remote_discovery
.add_device_to_redis_cache(&device_mac, &device_json)
.await
{
error!("{e}");
}
}
Err(e) => {
error!("Failed to serialize device to json string! Error: {e}");
}
}
} else {
devices_cache.del_device(&device_mac);
if let Err(e) = self
.cgw_remote_discovery
.del_device_from_redis_cache(&device_mac)
.await
{
error!("{e}");
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/cgw_devices_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl CGWDevicesCache {
}
}

pub fn flush_all(&mut self) {
self.cache.clear();
}

pub fn dump_devices_cache(&self) {
// Debug print - simply ignore errors if any!
if let Ok(json_output) = serde_json::to_string_pretty(&self) {
Expand Down
Loading

0 comments on commit ce966bd

Please sign in to comment.