Skip to content

Commit

Permalink
add roam ota twog and sixg wpa3 enterprise test (#965)
Browse files Browse the repository at this point in the history
* add roam ota twog and sixg wpa3 enterprise test

Signed-off-by: anil-tegala <[email protected]>

---------

Signed-off-by: anil-tegala <[email protected]>
  • Loading branch information
anil-tegala authored Jul 3, 2024
1 parent 4f905f7 commit d541408
Showing 1 changed file with 142 additions and 0 deletions.
142 changes: 142 additions & 0 deletions tests/e2e/advanced/roam_test/hard_roam/OTA/test_roam_ota.py
Original file line number Diff line number Diff line change
Expand Up @@ -2488,3 +2488,145 @@ def test_roam_2g_and_6g_wpa3psk(self, get_target_object, get_test_library, get_l
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True

@pytest.mark.roam
@pytest.mark.twog
@pytest.mark.fiveg
@pytest.mark.wpa3_enterprise
def test_roam_2g_and_6g_wpa3eap(self, get_target_object, get_test_library, get_lab_info, selected_testbed,
radius_info):
"""
Test Roaming between two APs, 2G & 6G, WPA3 Enterprise
pytest -m "roam and twog and sixg and wpa3_enterprise and ota"
"""
ap_data = dict()
dut_list = [str(selected_testbed)]
dut_names = list()
bssid_list = list()
freqs_ = ""
testbed_info = get_lab_info.CONFIGURATION
config = copy.deepcopy(config_data)
temp_list = list()
for key, val in testbed_info.items():
tb_type, tb_name = selected_testbed.split("-")
if tb_type in key and tb_name[0] in key:
temp_list.append(key)
temp_list.sort()
dut_list = [temp_list[idx] for idx in range(len(temp_list)) if idx <= 1]
config['interfaces'][0]["ssids"][0]["radius"] = {
"accounting": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
},
"authentication": {
"host": radius_info["ip"],
"port": radius_info["port"],
"secret": radius_info["secret"]
}
}
config['radios'] = [
{"band": "2G", "channel": 11, "channel-mode": "HE", "channel-width": 20, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["2G"]
if "proto" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"]["proto"] = "wpa3"
if "key" in config['interfaces'][0]["ssids"][0]["encryption"]:
config['interfaces'][0]["ssids"][0]["encryption"].pop("key")
if len(dut_list) < 2:
logging.error(
f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}")
assert False, f"This test need two AP's but number of DUT's available in the selected testbed is {dut_list}"
for ap in range(len(dut_list)):
serial_number = testbed_info[dut_list[ap]]["device_under_tests"][0]['identifier']
dut_names.append(testbed_info[dut_list[ap]]["device_under_tests"][0]['model'])
if ap == 1:
config['radios'] = [
{"band": "6G", "channel": 161, "channel-mode": "HE", "channel-width": 80, "country": "CA"}]
config['interfaces'][0]["ssids"][0]["wifi-bands"] = ["6G"]
logging.info(config)
payload = {"configuration": json.dumps(config), "serialNumber": serial_number, "UUID": 2}
uri = get_target_object.controller_library_object.build_uri(
"device/" + serial_number + "/configure")
logging.info("Sending Command: " + "\n" + str(uri) + "\n" +
"TimeStamp: " + str(datetime.datetime.utcnow()) + "\n" +
"Data: " + str(json.dumps(payload, indent=2)) + "\n" +
"Headers: " + str(get_target_object.controller_library_object.make_headers()))
allure.attach(name=f"Push roam config on {serial_number}: ", body="Sending Command: " + str(uri) + "\n" +
"TimeStamp: " + str(
datetime.datetime.utcnow()) + "\n" +
"Data: " + str(payload) + "\n" +
"Headers: " + str(
get_target_object.controller_library_object.make_headers()))
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
allure.attach(name=f"Response - {resp.status_code} {resp.reason}", body=str(resp.json()))
if resp.status_code != 200:
if resp.status_code == 400 and "Device is already executing a command. Please try later." in \
resp.json()["ErrorDescription"]:
time.sleep(30)
resp = requests.post(uri, data=json.dumps(payload, indent=2),
headers=get_target_object.controller_library_object.make_headers(),
verify=False, timeout=120)
time.sleep(10)
logging.info(resp.json())
else:
assert False, f"push configuration to {serial_number} got failed"
get_target_object.dut_library_object.device_under_tests_data = testbed_info[dut_list[ap]][
"device_under_tests"]
ap_iwinfo = get_target_object.dut_library_object.get_iwinfo(attach_allure=True)
if str(ap_iwinfo) != "Error: pop from empty list":
include_essid = config['interfaces'][0]["ssids"][0]["name"]
re_obj = re.compile(
rf'(wlan\d(?:-\d)?)\s+ESSID: "{re.escape(include_essid)}".*?\s+Access Point:\s+([0-9A-Fa-f:]+).*?Channel:\s+('
r'\d+)\s+\(([\d.]+) GHz\)',
re.DOTALL
)
# find all matches
interface_matches = re_obj.finditer(ap_iwinfo)
if interface_matches:
for match in interface_matches:
interface_name = match.group(1)
access_point = match.group(2)
channel = match.group(3)
frequency = match.group(4).replace('.', '')
ap_data.update(
{serial_number: {'Access Point': access_point, 'Channel': channel, 'frequency': frequency}})
logging.info(f"AP Data from iwinfo: {ap_data}")
if ap_data == {}:
logging.error("Failed to get required iwinfo from minicom")
pytest.fail("Failed to get required iwinfo from minicom")
else:
pytest.fail("Failed to get iwinfo from minicom")
for serial in ap_data:
bssid_list.append(ap_data[serial]['Access Point'])
if not ap_data[serial]['frequency'].endswith(","):
freqs_ = freqs_ + ap_data[serial]['frequency'] + ","
else:
freqs_ = freqs_ + ap_data[serial]['frequency']
ssid = config['interfaces'][0]["ssids"][0]["name"]
pass_fail, message = True, "Test Passed"
try:
pass_fail, message = get_test_library.roam_test(ap1_bssid=bssid_list[0], ap2_bssid=bssid_list[1],
band="both", num_sta=1, security="wpa3", ssid=ssid,
upstream="1.1.eth1", eap_method="TLS",
pairwise_cipher="DEFAULT ",
groupwise_cipher="DEFAULT ",
eap_identity=radius_info["user"],
eap_password=radius_info["password"],
private_key="/home/lanforge/client.p12",
pk_passwd=radius_info["pk_password"],
ca_cert='/home/lanforge/ca.pem', sta_type="11r-eap",
iteration=1, channel="11", option="ota", dut_name=dut_names,
traffic_type="lf_udp")
except Exception as e:
logging.error(f"Exception in roam test : {e}")
pass_fail, message = False, e
finally:
get_target_object.dut_library_object.get_dut_logs(print_log=False)
if not pass_fail:
pytest.fail(f"Test failed with the following reasons: \n{message}")
else:
assert True

0 comments on commit d541408

Please sign in to comment.