Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial infra message impl #103

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions run_cgw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ echo "CGW UCENTRAL AP DATAMODEL URI : $CGW_UCENTRAL_AP_DATAMODEL_URI"
echo "CGW UCENTRAL SWITCH DATAMODEL URI : $CGW_UCENTRAL_SWITCH_DATAMODEL_URI"

docker run \
-p 15002:15002 \
-p 50051:50051 \
-p 8080:8080 \
-p $CGW_WSS_PORT:$CGW_WSS_PORT \
-p $CGW_GRPC_PUBLIC_PORT:$CGW_GRPC_PUBLIC_PORT \
-p $CGW_METRICS_PORT:$CGW_METRICS_PORT \
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
-v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \
-v $CGW_NB_INFRA_CERTS_PATH:$CONTAINTER_NB_INFRA_CERTS_VOLUME \
Expand Down
28 changes: 15 additions & 13 deletions src/cgw_ucentral_ap_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,21 +894,23 @@ pub fn cgw_ucentral_ap_parse_message(
}
}
} else if map.contains_key("result") {
if !map.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
}
if let Value::Object(result) = &map["result"] {
if !result.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
}

let id = map["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};
let id = result["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};

return Ok(reply_event);
return Ok(reply_event);
}
}

Err(Error::UCentralParser("Failed to parse event/method"))
Expand Down
26 changes: 20 additions & 6 deletions src/cgw_ucentral_switch_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::cgw_ucentral_parser::{
CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateClients,
CGWUCentralEventStateClientsData, CGWUCentralEventStateClientsType,
CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventStatePort,
CGWUCentralEventType, CGWUCentralJRPCMessage,
CGWUCentralEventType, CGWUCentralJRPCMessage, CGWUCentralEventReply
};

fn parse_lldp_data(
Expand Down Expand Up @@ -253,11 +253,25 @@ pub fn cgw_ucentral_switch_parse_message(
}
}
} else if map.contains_key("result") {
info!("Processing <result> JSONRPC msg");
info!("{:?}", map);
return Err(Error::UCentralParser(
"Result handling is not yet implemented",
));
// For now, let's mimic AP's basic reply / result
// format.
if let Value::Object(result) = &map["result"] {
if !result.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
}

let id = result["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};

return Ok(reply_event);
}
}

Err(Error::UCentralParser("Failed to parse event/method"))
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self):
# However, we're making a fixture, hence all values must be the same
# on the initial step.
connect_msg = json.loads(device.messages.connect)
connect_msg['params']['capabilities']['platform'] = "ap"
connect_msg['params']['firmware'] = "Test_FW_A"
connect_msg['params']['uuid'] = 1
device.messages.connect = json.dumps(connect_msg)
Expand Down
9 changes: 9 additions & 0 deletions utils/client_simulator/src/simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ def send_join(self, socket: client.ClientConnection):
def send_leave(self, socket: client.ClientConnection):
socket.send(self.messages.leave)

def get_single_message(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)
return self.messages.from_json(msg)
except TimeoutError:
return None
except:
raise

def handle_messages(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)
Expand Down
26 changes: 24 additions & 2 deletions utils/kafka_producer/src/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ def get_msgs(self, timeout_ms: int = 12000):

return res_list

def get_infra_request_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))

assert self.is_connected(),\
f"consumer: Cannot get Kafka result msg, Not connected!"

while True:
# We explicitly use get_single_msg instead of <get_msgs>
# to make sure we return as soon as we find result,
# without waiting for potential T/O
message = self.get_single_msg(timeout_ms=timeout_ms)
if message is None:
break

logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid'] and message.value['type'] == 'infra_request_result':
return message
return None

def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))

Expand All @@ -102,8 +123,9 @@ def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000):

logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if res_uuid == message.value['uuid']:
return message
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid']:
return message
return None

def get_single_msg(self, timeout_ms: int = 12000):
Expand Down
115 changes: 114 additions & 1 deletion utils/kafka_producer/src/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utils import Message, MacRange
from .utils import Message, MacRange, UCentralConfigRequest
from .log import logger

from typing import List, Tuple
Expand All @@ -7,14 +7,122 @@
import uuid
import sys
import random
import json


class Producer:
@staticmethod
def device_message_reboot(mac: str, id: int = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

params["serial"] = mac
params["when"] = 0

msg["jsonrpc"] = "2.0"
msg["method"] = "reboot"
msg["params"] = params
msg["id"] = id

return msg

@staticmethod
def device_message_factory(mac: str, id: int = None, keep_rediretor: bool = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

if keep_rediretor is None:
keep_rediretor = True

params["serial"] = mac
params["when"] = 0
params["keep_rediretor"] = keep_rediretor

msg["jsonrpc"] = "2.0"
msg["method"] = "factory"
msg["params"] = params
msg["id"] = id

return msg

@staticmethod
def device_message_ping(mac: str, id: int = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

params["serial"] = mac

msg["jsonrpc"] = "2.0"
msg["method"] = "ping"
msg["params"] = params
msg["id"] = id

return msg

def device_message_config_ap_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_ap_basic_cfg(mac, id);
return json.loads(msg)

def device_message_config_ap_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_ap_basic_invalid_cfg(mac, id);
return json.loads(msg)

def device_message_config_switch_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_switch_basic_cfg(mac, id);
return json.loads(msg)

def device_message_config_switch_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_switch_basic_invalid_cfg(mac, id);
return json.loads(msg)

def __init__(self, db: str, topic: str) -> None:
self.db = db
self.conn = None
self.topic = topic
self.message = Message()
self.ucentral_configs = UCentralConfigRequest()

def __enter__(self) -> kafka.KafkaProducer:
return self.connect()
Expand Down Expand Up @@ -134,6 +242,11 @@ def handle_device_assignment(self, add: List[Tuple[str, MacRange]], remove: List
bytes(group, encoding="utf-8"))
conn.flush()

def handle_single_device_message(self, message: dict, group: str, mac: str, uuid_val: int) -> None:
self.conn.send(self.topic, self.message.to_device(group, mac, message, 0, uuid_val),
bytes(group, encoding="utf-8"))
self.conn.flush()

def handle_device_messages(self, message: dict, group: str, mac_range: MacRange,
count: int, time_s: int, interval_s: int) -> None:
if not time_s:
Expand Down
46 changes: 46 additions & 0 deletions utils/kafka_producer/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,52 @@ def __parse_input(self, input: str) -> Tuple[int, int]:
return self.mac2num(base), int(count)
return self.mac2num(input), 1

class UCentralConfigRequest:
TEMPLATE_FILE_AP_BASIC = "./kafka_data/cfg_ap_basic.json"
TEMPLATE_FILE_AP_BASIC_INVALID = "./kafka_data/cfg_ap_basic_invalid.json"
TEMPLATE_FILE_SWITCH_BASIC = "./kafka_data/cfg_switch_basic.json"
TEMPLATE_FILE_SWITCH_BASIC_INVALID = "./kafka_data/cfg_switch_basic_invalid.json"

@staticmethod
def parse_uuid(uuid_val = None) -> str:
if uuid_val is None:
return str(1)

return str(uuid_val)

def __init__(self) -> None:
with open(self.TEMPLATE_FILE_AP_BASIC) as f:
self.ap_basic = f.read()
with open(self.TEMPLATE_FILE_AP_BASIC_INVALID) as f:
self.ap_basic_invalid = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC) as f:
self.switch_basic = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC_INVALID) as f:
self.switch_basic_invalid = f.read()

def get_ap_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_ap_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_switch_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_switch_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

class Message:
TEMPLATE_FILE = "./kafka_data/message_template.json"
Expand Down
Loading