Skip to content

Commit

Permalink
Added Mist plan
Browse files Browse the repository at this point in the history
Now the application will plan the output and promt the user before
running the mist scripts
  • Loading branch information
jamesgreen-moj committed Dec 14, 2023
1 parent 5304745 commit 46f9448
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ env/
terraform.tfstate

data_src/**.csv
data_src/**mist_plan**.json
.idea
**/__pycache__/
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ run-dev: ## Run the python script while mounting the host. This enables using th
tests: ## Run unit tests for the python app
docker run -v $(shell pwd)/src:/app/src \
-v $(shell pwd)/test:/app/test \
-v $(shell pwd)/data_src:/data_src \
-e RUN_UNIT_TESTS=True $(NAME)

.PHONY: shell
Expand Down
204 changes: 131 additions & 73 deletions src/juniper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import getpass
import sys
from datetime import datetime

# Mist CRUD operations

Expand Down Expand Up @@ -117,9 +118,125 @@ def warn_if_using_org_id_production(org_id):
else:
raise ValueError('Invalid input')

# Main function

def build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
):
site = {'name': d.get('Site Name', ''),
'address': d.get('Site Address', ''),
"latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]},
"country_code": d.get('country_code', ''),
"rftemplate_id": rf_template_id,
"networktemplate_id": network_template_id,
"timezone": d.get('time_zone', ''),
"sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi=d.get('Enable GovWifi', ''),
moj_wifi=d.get('Enable MoJWifi', ''),
site_group_ids=json.loads(site_group_ids)
),
}
# MOJ specific attributes
site_setting = {

"auto_upgrade": {
"enabled": True,
"version": "custom",
"time_of_day": "02:00",
"custom_versions": {
"AP45": "0.12.27066",
"AP32": "0.12.27066"
},
"day_of_week": ""
},

"rogue": {
"min_rssi": -80,
"min_duration": 20,
"enabled": True,
"honeypot_enabled": True,
"whitelisted_bssids": [
""
],
"whitelisted_ssids": [
"GovWifi"
]
},

"persist_config_on_device": True,

"engagement": {
"dwell_tags": {
"passerby": "1-300",
"bounce": "3600-14400",
"engaged": "25200-36000",
"stationed": "50400-86400"
},
"dwell_tag_names": {
"passerby": "Below 5 Min (Passerby)",
"bounce": "1-4 Hours",
"engaged": "7-10 Hours",
"stationed": "14-24 Hours"
}
},
"analytic": {
"enabled": True
},

"vars": {
"site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''),
"site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''),
"address": d.get('Site Address', ''),
"site_name": d.get('Site Name', '')
}

}
return site, site_setting

def plan_of_action(
data,
rf_template_id,
network_template_id,
site_group_ids
):

# Generate a timestamp for the filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
plan_file_name = "../data_src/mist_plan_{time}.json".format(time=timestamp)
json_objects = []

for d in data:
site, site_setting = build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
)
json_objects.append(site)
json_objects.append(site_setting)

# Load file
with open(plan_file_name, "w") as plan_file:
json.dump(json_objects, plan_file, indent=2)

# Print to terminal
with open(plan_file_name, "r") as plan_file:
print(plan_file.read())

print("A file containing all the changes has been created: {file}".format(file=plan_file_name))
user_input = input("Do you wish to continue? (y/n): ").upper()

if user_input == "Y":
print("Continuing with run")
elif user_input == "N":
sys.exit(0)
else:
raise ValueError('Invalid input')


# Main function
def juniper_script(
data,
org_id=None,
Expand All @@ -129,7 +246,6 @@ def juniper_script(
rf_template_id=None,
network_template_id=None
):

# Configure True/False to enable/disable additional logging of the API response objects
show_more_details = True

Expand All @@ -149,83 +265,27 @@ def juniper_script(
# Prompt user if we are using production org_id
warn_if_using_org_id_production(org_id)

plan_of_action(
data,
rf_template_id,
network_template_id,
site_group_ids
)

# Establish Mist session
admin = Admin(mist_username, mist_login_method)

# Create each site from the CSV file
for d in data:
# Variables
site_id = None
site = {'name': d.get('Site Name', ''),
'address': d.get('Site Address', ''),
"latlng": {"lat": d.get('gps', '')[0], "lng": d.get('gps', '')[1]},
"country_code": d.get('country_code', ''),
"rftemplate_id": rf_template_id,
"networktemplate_id": network_template_id,
"timezone": d.get('time_zone', ''),
"sitegroup_ids": check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi=d.get('Enable GovWifi', ''),
moj_wifi=d.get('Enable MoJWifi', ''),
site_group_ids=json.loads(site_group_ids)
),
}

# MOJ specific attributes
site_setting = {

"auto_upgrade": {
"enabled": True,
"version": "custom",
"time_of_day": "02:00",
"custom_versions": {
"AP45": "0.12.27066",
"AP32": "0.12.27066"
},
"day_of_week": ""
},

"rogue": {
"min_rssi": -80,
"min_duration": 20,
"enabled": True,
"honeypot_enabled": True,
"whitelisted_bssids": [
""
],
"whitelisted_ssids": [
"GovWifi"
]
},

"persist_config_on_device": True,

"engagement": {
"dwell_tags": {
"passerby": "1-300",
"bounce": "3600-14400",
"engaged": "25200-36000",
"stationed": "50400-86400"
},
"dwell_tag_names": {
"passerby": "Below 5 Min (Passerby)",
"bounce": "1-4 Hours",
"engaged": "7-10 Hours",
"stationed": "14-24 Hours"
}
},
"analytic": {
"enabled": True
},

"vars": {
"site_specific_radius_wired_nacs_secret": d.get('Wired NACS Radius Key', ''),
"site_specific_radius_govwifi_secret": d.get('GovWifi Radius Key', ''),
"address": d.get('Site Address', ''),
"site_name": d.get('Site Name', '')
},


}
site, site_setting = build_payload(
d,
rf_template_id,
network_template_id,
site_group_ids
)

print('Calling the Mist Create Site API...')
result = admin.post('/api/v1/orgs/' + org_id + '/sites', site)
Expand All @@ -244,8 +304,6 @@ def juniper_script(
print(json.dumps(result, sort_keys=True, indent=4))
print('\nUsing id in the Mist Update Setting API request')

print()

# Update Site Setting
print('Calling the Mist Update Setting API...')
result = admin.put('/api/v1/sites/' + site_id + '/setting',
Expand Down
41 changes: 38 additions & 3 deletions test/test_juniper.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import unittest
from unittest.mock import patch, MagicMock
from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production
from src.juniper import juniper_script, Admin, check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups, warn_if_using_org_id_production, plan_of_action
from io import StringIO
from datetime import datetime
import os


class TestJuniperScript(unittest.TestCase):

@patch('src.juniper.plan_of_action')
@patch('getpass.getpass', return_value='token')
@patch('src.juniper.requests.Session.get', return_value=MagicMock(status_code=200))
@patch('src.juniper.Admin.post')
@patch('src.juniper.Admin.put')
def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token):
def test_juniper_script(self, mock_put, mock_post, mock_successful_login, api_token, mock_plan_of_action):
# Mock Mist API responses
mock_post.return_value = {'id': '123', 'name': 'TestSite'}
mock_put.return_value = {'status': 'success'}
Expand Down Expand Up @@ -154,8 +157,9 @@ def test_juniper_script_missing_network_template_id(self):

self.assertEqual(str(cm.exception), 'Must define network_template_id')

@patch('src.juniper.plan_of_action')
@patch('src.juniper.Admin')
def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin):
def test_given_mist_login_method_not_defined_then_default_to_credentials(self, mock_admin, mock_plan_of_action):
output_buffer = StringIO()
with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
juniper_script([],
Expand Down Expand Up @@ -316,3 +320,34 @@ def test_append_neither_wifi(self):
result = check_if_we_need_to_append_gov_wifi_or_moj_wifi_site_groups(
gov_wifi, moj_wifi, self.site_group_ids)
self.assertEqual(result, [])

class TestPlanOfActionFunction(unittest.TestCase):

def setUp(self):
self.data = {'Site Name': 'TestSite', 'Site Address': '123 Main St',
'gps': [1.23, 4.56], 'country_code': 'US', 'time_zone': 'UTC',
'Enable GovWifi': 'true', 'Enable MoJWifi': 'false',
'Wired NACS Radius Key': 'key1', 'GovWifi Radius Key': 'key2'},
self.rf_template_id = "rf_template_id",
self.network_template_id = "network_template_id",
self.site_group_ids = '{"moj_wifi": "foo","gov_wifi": "bar"}'

def test_plan_of_action_creates_file(self):
with patch('builtins.input', return_value='Y'), patch('sys.exit') as mock_exit:
plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
expected_file_name = f"../data_src/mist_plan_{timestamp}.json"
self.assertTrue(os.path.exists(expected_file_name))
os.remove(expected_file_name)

@patch("builtins.open")
def test_plan_of_action_exit_on_no(self, mock_open_file):
with patch('builtins.input', return_value='N'), self.assertRaises(SystemExit) as cm:
plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids)
self.assertEqual(cm.exception.code, 0)

@patch("builtins.open")
def test_plan_of_action_invalid_input(self, mock_open_file):
with patch('builtins.input', return_value='invalid_input'), self.assertRaises(ValueError) as cm:
plan_of_action(self.data, self.rf_template_id, self.network_template_id, self.site_group_ids)
self.assertEqual(str(cm.exception), 'Invalid input')

0 comments on commit 46f9448

Please sign in to comment.