Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/cgw kafka key in replies #102

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ reqwest = { version = "0.12.5", features = ["json"] }
jsonschema = { version = "0.18.0" }
url = { version = "2.5.2" }
nix = { version = "0.29.0", features = ["net"] }
murmur2 = { version = "0.1.0" }

[build-dependencies]
tonic-build = "0.11.0"
Expand Down
95 changes: 66 additions & 29 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ enum CGWNBApiParsedMsgType {
InfrastructureGroupCreateToShard(i32),
InfrastructureGroupDelete,
InfrastructureGroupInfrasAdd(Vec<MacAddress>),
InfrastructureGroupInfraDel(Vec<MacAddress>),
InfrastructureGroupInfrasDel(Vec<MacAddress>),
InfrastructureGroupInfraMsg(MacAddress, String, Option<u64>),
RebalanceGroups,
}
Expand Down Expand Up @@ -573,7 +573,9 @@ impl CGWConnectionServer {
return Some(CGWNBApiParsedMsg::new(
json_msg.uuid,
group_id,
CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(json_msg.infra_group_infras),
CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(
json_msg.infra_group_infras,
),
));
}
"infrastructure_group_infra_message_enqueue" => {
Expand Down Expand Up @@ -655,6 +657,9 @@ impl CGWConnectionServer {
}
};

let mut partition_array_idx: usize = 0;
let mut local_shard_partition_key: Option<String>;

loop {
if num_of_msg_read < buf_capacity {
// Try to recv_many, but don't sleep too much
Expand Down Expand Up @@ -727,6 +732,26 @@ impl CGWConnectionServer {

debug!("Received {num_of_msg_read} messages from NB API, processing...");

let partition_mapping = self.nb_api_client.get_partition_to_local_shard_mapping();
debug!("Kafka partitions idx:key mapping info: {:?}", partition_mapping);
if !partition_mapping.is_empty() {
partition_array_idx += 1;
if partition_array_idx >= partition_mapping.len() {
partition_array_idx = 0;
}
local_shard_partition_key = Some(partition_mapping[partition_array_idx].1.clone());

debug!(
"Using kafka key '{}' for kafka partition idx '{}'",
partition_mapping[partition_array_idx].1,
partition_mapping[partition_array_idx].0
);
} else {
warn!("Cannot get partition to local shard mapping, won't be able to return kafka routing key in NB request replies!");
// Clear previously used partition key
local_shard_partition_key = None;
}

// We rely on this map only for a single iteration of received messages:
// say, we receive 10 messages but 20 in queue, this means that gid->cgw_id
// cache is clear at first, the filled up when processing first 10 messages,
Expand All @@ -751,16 +776,6 @@ impl CGWConnectionServer {
origin,
) = msg;

let gid_numeric = match key.parse::<i32>() {
Err(e) => {
warn!(
"Invalid KEY received from KAFKA bus message, ignoring it. Error: {e}"
);
continue;
}
Ok(v) => v,
};

let parsed_msg = match self.parse_nbapi_msg(&payload) {
Some(val) => val,
None => {
Expand Down Expand Up @@ -973,9 +988,17 @@ impl CGWConnectionServer {
continue;
}

// We need to know which dst GID this request should be
// forwarded to.
// In order to get it, match to <any> parsed msg, and
// get only gid field.
let gid: i32 = match parsed_msg {
CGWNBApiParsedMsg { gid, .. } => gid,
};

match self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await
{
Some(dst_cgw_id) => {
Expand Down Expand Up @@ -1010,6 +1033,7 @@ impl CGWConnectionServer {

let discovery_clone = self.cgw_remote_discovery.clone();
let self_clone = self.clone();
let local_shard_partition_key_clone = local_shard_partition_key.clone();

// Future to Handle (relay) messages for remote CGW
let relay_task_hdl = self.mbox_relay_msg_runtime_handle.spawn(async move {
Expand Down Expand Up @@ -1045,6 +1069,7 @@ impl CGWConnectionServer {
let cgw_id = value.0;
let msg_stream = value.1;
let self_clone = self_clone.clone();
let local_shard_partition_key_clone = local_shard_partition_key_clone.clone();
tokio::spawn(async move {
if (discovery_clone
.relay_request_stream_to_remote_cgw(cgw_id, msg_stream)
Expand All @@ -1056,6 +1081,7 @@ impl CGWConnectionServer {
Uuid::default(),
false,
Some(format!("Failed to relay MSG stream to remote CGW{cgw_id}")),
local_shard_partition_key_clone,
) {
self_clone.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
} else {
Expand All @@ -1079,16 +1105,8 @@ impl CGWConnectionServer {
_origin,
) = msg;

let gid_numeric = match key.parse::<i32>() {
Err(e) => {
warn!("Invalid KEY received from KAFKA bus message, ignoring! Error: {e}");
continue;
}
Ok(v) => v,
};

debug!(
"Received message for local CGW key: {key}, local id {}",
"Received message for local CGW: key '{key}', local shard id '{}'",
self.local_cgw_id
);

Expand All @@ -1101,7 +1119,7 @@ impl CGWConnectionServer {
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
Expand All @@ -1112,6 +1130,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to add infra list to nonexisting group, gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1140,6 +1159,7 @@ impl CGWConnectionServer {
uuid,
true,
None,
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1233,6 +1253,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to create few MACs from infras list (partial create), gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand All @@ -1250,11 +1271,11 @@ impl CGWConnectionServer {
CGWNBApiParsedMsg {
uuid,
gid,
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(mac_list),
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(mac_list),
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
Expand All @@ -1265,6 +1286,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to delete MACs from infra list, gid {gid}, uuid {uuid}: group does not exist")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1297,6 +1319,7 @@ impl CGWConnectionServer {
uuid,
true,
None,
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1331,6 +1354,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to destroy few MACs from infras list (partial delete), gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand All @@ -1357,7 +1381,7 @@ impl CGWConnectionServer {
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
Expand All @@ -1366,6 +1390,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1397,6 +1422,7 @@ impl CGWConnectionServer {
msg,
uuid,
timeout,
local_shard_partition_key.clone(),
)
.await;
}
Expand All @@ -1407,6 +1433,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand All @@ -1424,6 +1451,7 @@ impl CGWConnectionServer {
msg,
uuid,
timeout,
local_shard_partition_key.clone(),
)
.await;
}
Expand All @@ -1439,6 +1467,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to parse command message to device: {device_mac}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
Expand Down Expand Up @@ -1891,6 +1920,7 @@ impl CGWConnectionServer {
message: String,
uuid: Uuid,
timeout: Option<u64>,
local_shard_partition_key: Option<String>,
) {
if (infra_state == CGWDeviceState::CGWDeviceConnected)
|| (infra_state == CGWDeviceState::CGWDeviceDisconnected
Expand All @@ -1908,16 +1938,22 @@ impl CGWConnectionServer {
req.uuid,
false,
Some("Request replaced with new!".to_string()),
local_shard_partition_key,
),
None => cgw_construct_infra_enqueue_response(
self.local_cgw_id,
uuid,
true,
None,
local_shard_partition_key,
),
None => {
cgw_construct_infra_enqueue_response(self.local_cgw_id, uuid, true, None)
}
},
Err(e) => cgw_construct_infra_enqueue_response(
self.local_cgw_id,
uuid,
false,
Some(e.to_string()),
local_shard_partition_key,
),
};

Expand All @@ -1935,6 +1971,7 @@ impl CGWConnectionServer {
Some(format!(
"Device {mac} is disconnected! Accepting only Configure and Upgrade requests!"
)),
local_shard_partition_key,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(infra_gid, resp);
} else {
Expand Down
Loading
Loading