From 46f944802c23ee0d0a89ed1cafc3357a02433aca Mon Sep 17 00:00:00 2001 From: James Green Date: Thu, 14 Dec 2023 01:36:50 +0000 Subject: [PATCH] Added Mist plan Now the application will plan the output and promt the user before running the mist scripts --- .gitignore | 1 + makefile | 1 + src/juniper.py | 204 +++++++++++++++++++++++++++---------------- test/test_juniper.py | 41 ++++++++- 4 files changed, 171 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index cdd03ae..67b35f5 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,6 @@ env/ terraform.tfstate data_src/**.csv +data_src/**mist_plan**.json .idea **/__pycache__/ diff --git a/makefile b/makefile index 7662785..88a04cb 100644 --- a/makefile +++ b/makefile @@ -81,6 +81,7 @@ run-dev: ## Run the python script while mounting the host. This enables using th tests: ## Run unit tests for the python app docker run -v $(shell pwd)/src:/app/src \ -v $(shell pwd)/test:/app/test \ + -v $(shell pwd)/data_src:/data_src \ -e RUN_UNIT_TESTS=True $(NAME) .PHONY: shell diff --git a/src/juniper.py b/src/juniper.py index 029317c..85e80cc 100644 --- a/src/juniper.py +++ b/src/juniper.py @@ -2,6 +2,7 @@ import json import getpass import sys +from datetime import datetime # Mist CRUD operations @@ -117,9 +118,125 @@ def warn_if_using_org_id_production(org_id): else: raise ValueError('Invalid input') -# Main function + +def build_payload( + d, + rf_template_id, + network_template_id, + site_group_ids +): + site = {'name': d.get('Site Name', ''), + 'address': d.get('Site Address', ''), + "latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]}, + "country_code": d.get('country_code', ''), + "rftemplate_id": rf_template_id, + "networktemplate_id": network_template_id, + "timezone": d.get('time_zone', ''), + "sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups( + gov_wifi=d.get('Enable GovWifi', ''), + moj_wifi=d.get('Enable MoJWifi', ''), + site_group_ids=json.loads(site_group_ids) + ), + } + # MOJ specific attributes + site_setting = { + + "auto_upgrade": { + "enabled": True, + "version": "custom", + "time_of_day": "02:00", + "custom_versions": { + "AP45": "0.12.27066", + "AP32": "0.12.27066" + }, + "day_of_week": "" + }, + + "rogue": { + "min_rssi": -80, + "min_duration": 20, + "enabled": True, + "honeypot_enabled": True, + "whitelisted_bssids": [ + "" + ], + "whitelisted_ssids": [ + "GovWifi" + ] + }, + + "persist_config_on_device": True, + + "engagement": { + "dwell_tags": { + "passerby": "1-300", + "bounce": "3600-14400", + "engaged": "25200-36000", + "stationed": "50400-86400" + }, + "dwell_tag_names": { + "passerby": "Below 5 Min (Passerby)", + "bounce": "1-4 Hours", + "engaged": "7-10 Hours", + "stationed": "14-24 Hours" + } + }, + "analytic": { + "enabled": True + }, + + "vars": { + "site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''), + "site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''), + "address": d.get('Site Address', ''), + "site_name": d.get('Site Name', '') + } + + } + return site, site_setting + +def plan_of_action( + data, + rf_template_id, + network_template_id, + site_group_ids +): + + # Generate a timestamp for the filename + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp) + json_objects = [] + + for d in data: + site, site_setting = build_payload( + d, + rf_template_id, + network_template_id, + site_group_ids + ) + json_objects.append(site) + json_objects.append(site_setting) + + # Load file + with open(plan_file_name, "w") as plan_file: + json.dump(json_objects, plan_file, indent=2) + + # Print to terminal + with open(plan_file_name, "r") as plan_file: + print(plan_file.read()) + + print("A file containing all the changes has been created: {file}".format(file=plan_file_name)) + user_input = input("Do you wish to continue? (y/n): ").upper() + + if user_input == "Y": + print("Continuing with run") + elif user_input == "N": + sys.exit(0) + else: + raise ValueError('Invalid input') +# Main function def juniper_script( data, org_id=None, @@ -129,7 +246,6 @@ def juniper_script( rf_template_id=None, network_template_id=None ): - # Configure True/False to enable/disable additional logging of the API response objects show_more_details = True @@ -149,6 +265,13 @@ def juniper_script( # Prompt user if we are using production org_id warn_if_using_org_id_production(org_id) + plan_of_action( + data, + rf_template_id, + network_template_id, + site_group_ids + ) + # Establish Mist session admin = Admin(mist_username, mist_login_method) @@ -156,76 +279,13 @@ def juniper_script( for d in data: # Variables site_id = None - site = {'name': d.get('Site Name', ''), - 'address': d.get('Site Address', ''), - "latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]}, - "country_code": d.get('country_code', ''), - "rftemplate_id": rf_template_id, - "networktemplate_id": network_template_id, - "timezone": d.get('time_zone', ''), - "sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups( - gov_wifi=d.get('Enable GovWifi', ''), - moj_wifi=d.get('Enable MoJWifi', ''), - site_group_ids=json.loads(site_group_ids) - ), - } - - # MOJ specific attributes - site_setting = { - - "auto_upgrade": { - "enabled": True, - "version": "custom", - "time_of_day": "02:00", - "custom_versions": { - "AP45": "0.12.27066", - "AP32": "0.12.27066" - }, - "day_of_week": "" - }, - - "rogue": { - "min_rssi": -80, - "min_duration": 20, - "enabled": True, - "honeypot_enabled": True, - "whitelisted_bssids": [ - "" - ], - "whitelisted_ssids": [ - "GovWifi" - ] - }, - "persist_config_on_device": True, - - "engagement": { - "dwell_tags": { - "passerby": "1-300", - "bounce": "3600-14400", - "engaged": "25200-36000", - "stationed": "50400-86400" - }, - "dwell_tag_names": { - "passerby": "Below 5 Min (Passerby)", - "bounce": "1-4 Hours", - "engaged": "7-10 Hours", - "stationed": "14-24 Hours" - } - }, - "analytic": { - "enabled": True - }, - - "vars": { - "site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''), - "site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''), - "address": d.get('Site Address', ''), - "site_name": d.get('Site Name', '') - }, - - - } + site, site_setting = build_payload( + d, + rf_template_id, + network_template_id, + site_group_ids + ) print('Calling the Mist Create Site API...') result = admin.post('/api/v1/orgs/' + org_id + '/sites', site) @@ -244,8 +304,6 @@ def juniper_script( print(json.dumps(result, sort_keys=True, indent=4)) print('\nUsing id in the Mist Update Setting API request') - print() - # Update Site Setting print('Calling the Mist Update Setting API...') result = admin.put('/api/v1/sites/' + site_id + '/setting', diff --git a/test/test_juniper.py b/test/test_juniper.py index 3325369..fcf9ab5 100644 --- a/test/test_juniper.py +++ b/test/test_juniper.py @@ -1,16 +1,19 @@ import unittest from unittest.mock import patch, MagicMock -from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production +from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production, plan_of_action from io import StringIO +from datetime import datetime +import os class TestJuniperScript(unittest.TestCase): + @patch('src.juniper.plan_of_action') @patch('getpass.getpass', return_value='token') @patch('src.juniper.requests.Session.get', return_value=MagicMock(status_code=200)) @patch('src.juniper.Admin.post') @patch('src.juniper.Admin.put') - def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token): + def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token, mock_plan_of_action): # Mock Mist API responses mock_post.return_value = {'id': '123', 'name': 'TestSite'} mock_put.return_value = {'status': 'success'} @@ -154,8 +157,9 @@ def test_juniper_script_missing_network_template_id(self): self.assertEqual(str(cm.exception), 'Must define network_template_id') + @patch('src.juniper.plan_of_action') @patch('src.juniper.Admin') - def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin): + def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin, mock_plan_of_action): output_buffer = StringIO() with patch('sys.stdout', new_callable=StringIO) as mock_stdout: juniper_script([], @@ -316,3 +320,34 @@ def test_append_neither_wifi(self): result = check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups( gov_wifi, moj_wifi, self.site_group_ids) self.assertEqual(result, []) + +class TestPlanOfActionFunction(unittest.TestCase): + + def setUp(self): + self.data = {'Site Name': 'TestSite', 'Site Address': '123 Main St', + 'gps': [1.23, 4.56], 'country_code': 'US', 'time_zone': 'UTC', + 'Enable GovWifi': 'true', 'Enable MoJWifi': 'false', + 'Wired NACS Radius Key': 'key1', 'GovWifi Radius Key': 'key2'}, + self.rf_template_id = "rf_template_id", + self.network_template_id = "network_template_id", + self.site_group_ids = '{"moj_wifi": "foo","gov_wifi": "bar"}' + + def test_plan_of_action_creates_file(self): + with patch('builtins.input', return_value='Y'), patch('sys.exit') as mock_exit: + plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + expected_file_name = f"../data_src/mist_plan_{timestamp}.json" + self.assertTrue(os.path.exists(expected_file_name)) + os.remove(expected_file_name) + + @patch("builtins.open") + def test_plan_of_action_exit_on_no(self, mock_open_file): + with patch('builtins.input', return_value='N'), self.assertRaises(SystemExit) as cm: + plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids) + self.assertEqual(cm.exception.code, 0) + + @patch("builtins.open") + def test_plan_of_action_invalid_input(self, mock_open_file): + with patch('builtins.input', return_value='invalid_input'), self.assertRaises(ValueError) as cm: + plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids) + self.assertEqual(str(cm.exception), 'Invalid input')