Skip to content

Commit

Permalink
Testing: initial message queue (infra msg) infrastructure
Browse files Browse the repository at this point in the history
Add initial infrastructure for sending / receiving messages
using websocket bindings for client sim.
Add initial handle-reboot-simulation test, that both
tests CGW's ability to sink down a request, as well as
test it's behavior whenever simulated device's been rebooted.

Signed-off-by: Oleksandr Mazur <[email protected]>
  • Loading branch information
Cahb committed Dec 5, 2024
1 parent f1357a0 commit 10fd2d7
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 3 deletions.
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self):
# However, we're making a fixture, hence all values must be the same
# on the initial step.
connect_msg = json.loads(device.messages.connect)
connect_msg['params']['capabilities']['platform'] = "ap"
connect_msg['params']['firmware'] = "Test_FW_A"
connect_msg['params']['uuid'] = 1
device.messages.connect = json.dumps(connect_msg)
Expand Down
9 changes: 9 additions & 0 deletions utils/client_simulator/src/simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ def send_join(self, socket: client.ClientConnection):
def send_leave(self, socket: client.ClientConnection):
socket.send(self.messages.leave)

def get_single_message(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)
return self.messages.from_json(msg)
except TimeoutError:
return None
except:
raise

def handle_messages(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)
Expand Down
26 changes: 24 additions & 2 deletions utils/kafka_producer/src/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ def get_msgs(self, timeout_ms: int = 12000):

return res_list

def get_infra_request_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))

assert self.is_connected(),\
f"consumer: Cannot get Kafka result msg, Not connected!"

while True:
# We explicitly use get_single_msg instead of <get_msgs>
# to make sure we return as soon as we find result,
# without waiting for potential T/O
message = self.get_single_msg(timeout_ms=timeout_ms)
if message is None:
break

logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid'] and message.value['type'] == 'infra_request_result':
return message
return None

def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))

Expand All @@ -102,8 +123,9 @@ def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000):

logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if res_uuid == message.value['uuid']:
return message
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid']:
return message
return None

def get_single_msg(self, timeout_ms: int = 12000):
Expand Down
115 changes: 114 additions & 1 deletion utils/kafka_producer/src/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utils import Message, MacRange
from .utils import Message, MacRange, UCentralConfigRequest
from .log import logger

from typing import List, Tuple
Expand All @@ -7,14 +7,122 @@
import uuid
import sys
import random
import json


class Producer:
@staticmethod
def device_message_reboot(mac: str, id: int = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

params["serial"] = mac
params["when"] = 0

msg["jsonrpc"] = "2.0"
msg["method"] = "reboot"
msg["params"] = params
msg["id"] = id

return msg

@staticmethod
def device_message_factory(mac: str, id: int = None, keep_rediretor: bool = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

if keep_rediretor is None:
keep_rediretor = True

params["serial"] = mac
params["when"] = 0
params["keep_rediretor"] = keep_rediretor

msg["jsonrpc"] = "2.0"
msg["method"] = "factory"
msg["params"] = params
msg["id"] = id

return msg

@staticmethod
def device_message_ping(mac: str, id: int = None):
msg = {}
params = {}

if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

params["serial"] = mac

msg["jsonrpc"] = "2.0"
msg["method"] = "ping"
msg["params"] = params
msg["id"] = id

return msg

def device_message_config_ap_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_ap_basic_cfg(mac, id);
return json.loads(msg)

def device_message_config_ap_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_ap_basic_invalid_cfg(mac, id);
return json.loads(msg)

def device_message_config_switch_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_switch_basic_cfg(mac, id);
return json.loads(msg)

def device_message_config_switch_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')

if id is None:
id = 1

msg = self.ucentral_configs.get_switch_basic_invalid_cfg(mac, id);
return json.loads(msg)

def __init__(self, db: str, topic: str) -> None:
self.db = db
self.conn = None
self.topic = topic
self.message = Message()
self.ucentral_configs = UCentralConfigRequest()

def __enter__(self) -> kafka.KafkaProducer:
return self.connect()
Expand Down Expand Up @@ -134,6 +242,11 @@ def handle_device_assignment(self, add: List[Tuple[str, MacRange]], remove: List
bytes(group, encoding="utf-8"))
conn.flush()

def handle_single_device_message(self, message: dict, group: str, mac: str, uuid_val: int) -> None:
self.conn.send(self.topic, self.message.to_device(group, mac, message, 0, uuid_val),
bytes(group, encoding="utf-8"))
self.conn.flush()

def handle_device_messages(self, message: dict, group: str, mac_range: MacRange,
count: int, time_s: int, interval_s: int) -> None:
if not time_s:
Expand Down
46 changes: 46 additions & 0 deletions utils/kafka_producer/src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,52 @@ def __parse_input(self, input: str) -> Tuple[int, int]:
return self.mac2num(base), int(count)
return self.mac2num(input), 1

class UCentralConfigRequest:
TEMPLATE_FILE_AP_BASIC = "./kafka_data/cfg_ap_basic.json"
TEMPLATE_FILE_AP_BASIC_INVALID = "./kafka_data/cfg_ap_basic_invalid.json"
TEMPLATE_FILE_SWITCH_BASIC = "./kafka_data/cfg_switch_basic.json"
TEMPLATE_FILE_SWITCH_BASIC_INVALID = "./kafka_data/cfg_switch_basic_invalid.json"

@staticmethod
def parse_uuid(uuid_val = None) -> str:
if uuid_val is None:
return str(1)

return str(uuid_val)

def __init__(self) -> None:
with open(self.TEMPLATE_FILE_AP_BASIC) as f:
self.ap_basic = f.read()
with open(self.TEMPLATE_FILE_AP_BASIC_INVALID) as f:
self.ap_basic_invalid = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC) as f:
self.switch_basic = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC_INVALID) as f:
self.switch_basic_invalid = f.read()

def get_ap_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_ap_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_switch_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

def get_switch_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req

class Message:
TEMPLATE_FILE = "./kafka_data/message_template.json"
Expand Down

0 comments on commit 10fd2d7

Please sign in to comment.