diff --git a/config/csv_mapping.json b/config/csv_mapping.json new file mode 100644 index 0000000..fbeb508 --- /dev/null +++ b/config/csv_mapping.json @@ -0,0 +1,75 @@ +{ + "name": { + "DEFAULT": "name", + "SET": null + }, + "DeviceRoleCriteria": { + "DEFAULT": "OU", + "SET": null, + "MAPPING": true + }, + "objectClass": { + "DEFAULT": "objectClass", + "SET": null + }, + "appl-NAC-DeviceRoleProd": { + "DEFAULT": "appl-NAC-DeviceRoleProd", + "SET": null + }, + "appl-NAC-DeviceRoleInst": { + "DEFAULT": "appl-NAC-DeviceRoleInst", + "SET": null + }, + "appl-NAC-FQDN": { + "DEFAULT": "appl-NAC-FQDN", + "SET": null + }, + "appl-NAC-Hostname": { + "DEFAULT": "appl-NAC-Hostname", + "SET": null + }, + "appl-NAC-Active": { + "DEFAULT": "appl-NAC-Active", + "SET": null + }, + "appl-NAC-ForceDot1X": { + "DEFAULT": "appl-NAC-ForceDot1X", + "SET": null + }, + "appl-NAC-Install": { + "DEFAULT": "appl-NAC-Install", + "SET": null + }, + "appl-NAC-AllowAccessCAB": { + "DEFAULT": "appl-NAC-AllowAccessCAB", + "SET": null + }, + "appl-NAC-AllowAccessAIR": { + "DEFAULT": "appl-NAC-AllowAccessAIR", + "SET": null + }, + "appl-NAC-AllowAccessVPN": { + "DEFAULT": "appl-NAC-AllowAccessVPN", + "SET": null + }, + "appl-NAC-AllowAccessCEL": { + "DEFAULT": "appl-NAC-AllowAccessCEL", + "SET": null + }, + "appl-NAC-macAddressCAB": { + "DEFAULT": "appl-NAC-macAddressCAB", + "SET": null + }, + "appl-NAC-macAddressAIR": { + "DEFAULT": "appl-NAC-macAddressAIR", + "SET": null + }, + "appl-NAC-Certificate": { + "DEFAULT": "appl-NAC-Certificate", + "SET": null + }, + "synchronized": { + "DEFAULT": "synchronized", + "SET": null + } +} \ No newline at end of file diff --git a/config/ou_mapping.json b/config/ou_mapping.json new file mode 100644 index 0000000..a51a4aa --- /dev/null +++ b/config/ou_mapping.json @@ -0,0 +1,14 @@ +{ + "DEFAULT": { + "DeviceRoleProd": "DeviceRoleProdDefault", + "DeviceRoleInst": "DeviceRoleInstDefault" + }, + "OU1": { + "DeviceRoleProd": "DeviceRoleProd1", + "DeviceRoleInst": "DeviceRoleInst1" + }, + "OU2": { + "DeviceRoleProd": "DeviceRoleProd2", + "DeviceRoleInst": "DeviceRoleInst2" + } +} \ No newline at end of file diff --git a/resources/ldapObjects.csv b/resources/ldapObjects.csv index ab648da..983d731 100644 --- a/resources/ldapObjects.csv +++ b/resources/ldapObjects.csv @@ -1,3 +1,4 @@ -appl-NAC-AllowAccessAIR;appl-NAC-AllowAccessVPN;objectClass;appl-NAC-Active;appl-NAC-AllowAccessCAB;appl-NAC-DeviceRoleInst;appl-NAC-DeviceRoleProd;appl-NAC-AllowAccessCEL;appl-NAC-Install;appl-NAC-macAddressAIR;appl-NAC-FQDN;appl-NAC-Hostname;appl-NAC-macAddressCAB;appl-NAC-Certificate;appl-NAC-ForceDot1X -True;True;appl-NAC-Device;True;True;DeviceRoleInst3;DeviceRoleProd2;True;True;001122334455;FQDN2;Hostname2;001122334455,001122334455;Certificate3;True -True;True;appl-NAC-Device;True;True;DeviceRoleInst3;DeviceRoleProd2;True;True;001122334456;FQDN4;Hostname4;001122334456,001122334456;Certificate4;True +appl-NAC-AllowAccessAIR;appl-NAC-AllowAccessVPN;objectClass;appl-NAC-Active;appl-NAC-AllowAccessCAB;appl-NAC-DeviceRoleInst;appl-NAC-DeviceRoleProd;appl-NAC-AllowAccessCEL;appl-NAC-Install;appl-NAC-macAddressAIR;appl-NAC-FQDN;appl-NAC-Hostname;appl-NAC-macAddressCAB;appl-NAC-Certificate;appl-NAC-ForceDot1X;OU +True;True;appl-NAC-Device;True;True;DeviceRoleInst2;DeviceRoleProd2;True;True;001122334455;FQDN2;Hostname1;001122334456,001122334456;Certificate3;True;OU1 +True;True;appl-NAC-Device;True;True;DeviceRoleInstDefault;DeviceRoleProdDefault;True;True;001122334457;FQDN4;Hostname2;001122334458,001122334458;Certificate4;True;OU2 +True;True;appl-NAC-Device;True;True;DeviceRoleInst1;DeviceRoleProd1;True;True;001122334459;FQDN4;Hostname3;001122334451,001122334451;Certificate4;True;OUDEFAULT diff --git a/src/helper/config.py b/src/helper/config.py index 0d86ed2..5587f8d 100644 --- a/src/helper/config.py +++ b/src/helper/config.py @@ -14,6 +14,7 @@ along with this program. If not, see . ''' import logging +import json from pathlib import Path from configparser import ConfigParser @@ -32,3 +33,14 @@ def get_config_from_file(file_path): config = ConfigParser() config.read(file_path) return config + + +def get_config_from_json(file_path): + logging.debug('mapping file: {}'.format(file_path)) + file_path = Path(file_path) + if not file_path.exists(): + raise ConfigFileNotFound(file_path) + + with open(file_path, 'r') as file: + json_config = json.load(file) + return json_config diff --git a/src/nac/management/commands/import_devices_from_csv.py b/src/nac/management/commands/import_devices_from_csv.py index b79571d..8009081 100644 --- a/src/nac/management/commands/import_devices_from_csv.py +++ b/src/nac/management/commands/import_devices_from_csv.py @@ -7,12 +7,16 @@ from nac.models import Device, AuthorizationGroup, DeviceRoleProd, DeviceRoleInst from nac.forms import DeviceForm from helper.logging import setup_console_logger -from helper.filesystem import get_resources_directory, get_existing_path +from helper.filesystem import get_resources_directory, get_existing_path, get_config_directory +from helper.config import get_config_from_json from helper.database import MacList +import traceback DEFAULT_SOURCE_FILE = get_resources_directory() / 'ldapObjects.csv' -SAVE_FILE = get_resources_directory() / "invalid_devices.csv" +DEFAULT_SAVE_FILE = get_resources_directory() / "invalid_devices.csv" +DEFAULT_CSV_MAPPING = get_config_directory() / 'csv_mapping.json' +DEFAULT_OU_MAPPING = get_config_directory() / 'ou_mapping.json' class Command(BaseCommand): @@ -27,7 +31,7 @@ def add_arguments(self, parser): ) parser.add_argument( '-a', '--auth_group', - default='DefaultAG', + default='AuthGroupDefault', help='specify the Device Authorization Group' ) parser.add_argument( @@ -35,11 +39,21 @@ def add_arguments(self, parser): action='store_true', help='specify if existing Devices should be updated' ) + parser.add_argument( + '-c', '--csv_config', + default=DEFAULT_CSV_MAPPING, + help='use a specific config file [src/csv_mapping.cfg]') + parser.add_argument( + '-o', '--ou_config', + default=DEFAULT_OU_MAPPING, + help='use a specific config file [src/ou_mapping.cfg]') def handle(self, *args, **options): setup_console_logger(options['verbosity']) self.source_file = get_existing_path(options['csv_file']) self.update = options['update'] + self.csv_mapping = get_config_from_json(options['csv_config']) + self.ou_mapping = get_config_from_json(options['ou_config']) self.mac_list = MacList() if not self.source_file: logging.error( @@ -56,6 +70,12 @@ def handle(self, *args, **options): self.clear_invalid_devices_file() self.read_csv() + def get_set_or_default(self, json_config_key): + if json_config_key['SET'] is not None: + return json_config_key['SET'] + else: + return json_config_key['DEFAULT'] + def check_valid_auth_group(self, auth_group): exists = AuthorizationGroup.objects.filter(name=auth_group).exists() if not exists: @@ -65,11 +85,11 @@ def check_valid_auth_group(self, auth_group): def clear_invalid_devices_file(self): try: - with open(SAVE_FILE, "w"): - logging.info(f"Removing all entries in {SAVE_FILE}") + with open(DEFAULT_SAVE_FILE, "w"): + logging.info(f"Removing all entries in {DEFAULT_SAVE_FILE}") except Exception as e: logging.error( - f"Removing all entries in {SAVE_FILE} FAILED -> {e}" + f"Removing all entries in {DEFAULT_SAVE_FILE} FAILED -> {e}" ) def read_csv(self): @@ -93,35 +113,65 @@ def handle_deviceObject(self, deviceObject): self.save_invalid_devices(deviceObject) except Exception as e: logging.error(f"Error: Handling device Object failed -> {e}") + traceback.print_exc() + + def get_deviceRole(self, deviceObject): + DeviceRoleCriteria = self.csv_mapping['DeviceRoleCriteria'] + if DeviceRoleCriteria['MAPPING']: + return self.get_deviceRole_from_ou_mapping(deviceObject) + else: + return self.get_deviceRole_from_csv_mapping(deviceObject) + + def get_deviceRole_from_ou_mapping(self, deviceObject): + DeviceRoleCriteria = self.csv_mapping['DeviceRoleCriteria'] + ou = deviceObject.get(self.get_set_or_default(DeviceRoleCriteria)) + + if ou not in self.ou_mapping.keys(): + ou = self.ou_mapping["DEFAULT"] + else: + ou = self.ou_mapping[ou] + + try: + deviceRoleProd = DeviceRoleProd.objects.get(name=ou['DeviceRoleProd']) + except ObjectDoesNotExist: + raise ValidationError(f"DeviceRoleProd: {ou['DeviceRoleProd']} not in Database") + + try: + deviceRoleInst = DeviceRoleInst.objects.get(name=ou['DeviceRoleInst']) + except ObjectDoesNotExist: + raise ValidationError(f"DeviceRoleInst: {ou['DeviceRoleInst']} not in Database") + + return deviceRoleProd, deviceRoleInst + + def get_deviceRole_from_csv_mapping(self, deviceObject): + deviceRoleProd = self.get_set_or_default(self.csv_mapping['appl-NAC-DeviceRoleProd']) + deviceRoleInst = self.get_set_or_default(self.csv_mapping['appl-NAC-DeviceRoleInst']) + + try: + deviceRoleProd = DeviceRoleProd.objects.get(name=deviceObject.get(deviceRoleProd)) + except ObjectDoesNotExist: + raise ValidationError(f"DeviceRoleProd: {deviceObject.get(deviceRoleProd)} not in Database") + + try: + deviceRoleInst = DeviceRoleInst.objects.get(name=deviceObject.get(deviceRoleInst)) + except ObjectDoesNotExist: + raise ValidationError(f"DeviceRoleInst: {deviceObject.get(deviceRoleInst)} not in Database") + + return deviceRoleProd, deviceRoleInst def check_device(self, deviceObject): logging.info(f"Checking validity of device " - f"{deviceObject.get('appl-NAC-Hostname')}") + f"{deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Hostname']))}") try: - if deviceObject.get('objectClass') != 'appl-NAC-Device': + if deviceObject.get(self.get_set_or_default(self.csv_mapping['objectClass'])) != 'appl-NAC-Device': raise ValidationError( f"Invalid Object-type! EXPECTED: appl-NAC-Device <->" - f" ACTUAL: {deviceObject.get('objectClass')}") + f" ACTUAL: {deviceObject.get(self.get_set_or_default(self.csv_mapping['objectClass']))}") with transaction.atomic(): auth_group = AuthorizationGroup.objects.get( name=self.auth_group ) - try: - deviceRoleProd = DeviceRoleProd.objects.get( - name=deviceObject.get("appl-NAC-DeviceRoleProd")) - except ObjectDoesNotExist: - raise ValidationError( - f"DeviceRoleProd: " - f"{deviceObject.get('appl-NAC-DeviceRoleProd')} " - f"not in Database") - try: - deviceRoleInst = DeviceRoleInst.objects.get( - name=deviceObject.get("appl-NAC-DeviceRoleInst")) - except ObjectDoesNotExist: - raise ValidationError( - f"DeviceRoleInst: " - f"{deviceObject.get('appl-NAC-DeviceRoleInst')} " - f"not in Database") + deviceRoleProd, deviceRoleInst = self.get_deviceRole(deviceObject) if deviceRoleProd not in auth_group.DeviceRoleProd.all(): raise ValidationError( f"DeviceRoleProd: {deviceRoleProd} " @@ -132,44 +182,44 @@ def check_device(self, deviceObject): f"not in authorization group: {auth_group}") device_data = { - "name": deviceObject.get("appl-NAC-Hostname"), + "name": deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Hostname'])), "authorization_group": auth_group, "appl_NAC_DeviceRoleProd": deviceRoleProd, "appl_NAC_DeviceRoleInst": deviceRoleInst, - "appl_NAC_FQDN": deviceObject.get("appl-NAC-FQDN"), - "appl_NAC_Hostname": deviceObject.get("appl-NAC-Hostname"), + "appl_NAC_FQDN": deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-FQDN'])), + "appl_NAC_Hostname": deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Hostname'])), "appl_NAC_Active": self.str_to_bool( - deviceObject.get("appl-NAC-Active") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Active'])) ), "appl_NAC_ForceDot1X": self.str_to_bool( - deviceObject.get("appl-NAC-ForceDot1X") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-ForceDot1X'])) ), "appl_NAC_Install": self.str_to_bool( - deviceObject.get("appl-NAC-Install") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Install'])) ), "appl_NAC_AllowAccessCAB": self.str_to_bool( - deviceObject.get("appl-NAC-AllowAccessCAB") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-AllowAccessCAB'])) ), "appl_NAC_AllowAccessAIR": self.str_to_bool( - deviceObject.get("appl-NAC-AllowAccessAIR") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-AllowAccessAIR'])) ), "appl_NAC_AllowAccessVPN": self.str_to_bool( - deviceObject.get("appl-NAC-AllowAccessVPN") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-AllowAccessVPN'])) ), "appl_NAC_AllowAccessCEL": self.str_to_bool( - deviceObject.get("appl-NAC-AllowAccessCEL") + deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-AllowAccessCEL'])) ), "appl_NAC_macAddressAIR": deviceObject.get( - "appl-NAC-macAddressAIR" + self.get_set_or_default(self.csv_mapping['appl-NAC-macAddressAIR']) ), "appl_NAC_macAddressCAB": deviceObject.get( - "appl-NAC-macAddressCAB" + self.get_set_or_default(self.csv_mapping['appl-NAC-macAddressCAB']) ), "appl_NAC_Certificate": deviceObject.get( - "appl-NAC-Certificate" + self.get_set_or_default(self.csv_mapping['appl-NAC-Certificate']) ), "synchronized": self.str_to_bool( - deviceObject.get("synchronized") + deviceObject.get(self.get_set_or_default(self.csv_mapping['synchronized'])) ) } device_form = DeviceForm(device_data) @@ -177,13 +227,13 @@ def check_device(self, deviceObject): logging.debug(f"Device {device_data.get('name')} is valid") exists, device_id = self.mac_list.check_existing_mac(device_form.cleaned_data) if exists: - logging.debug(f"Device {deviceObject.get('appl-NAC-Hostname')} already exists") + logging.debug(f"Device {device_data.get('appl_NAC_Hostname')} already exists") if self.update: - logging.debug(f"Updating Device {deviceObject.get('appl-NAC-Hostname')}") + logging.debug(f"Updating Device {device_data.get('appl_NAC_Hostname')}") Device.objects.filter(id=device_id).update(**device_form.cleaned_data) return None else: - raise ValidationError(f"Device {deviceObject.get('appl-NAC-Hostname')} exists and will not get updated") + raise ValidationError(f"Device {device_data.get('appl_NAC_Hostname')} exists and will not get updated") return device_form.cleaned_data else: logging.error( @@ -198,7 +248,7 @@ def check_device(self, deviceObject): raise except Exception as e: logging.error( - f"Checking validity of device {deviceObject.get('name')}: " + f"Checking validity of device {deviceObject.get(self.get_set_or_default(self.csv_mapping['appl-NAC-Hostname']))}: " f"FAILED -> {e}" ) raise @@ -229,20 +279,20 @@ def add_device_to_db(self, deviceObject_valid): def save_invalid_devices(self, deviceObject_invalid): try: column_header = deviceObject_invalid.keys() - with open(SAVE_FILE, 'a', newline="") as csvfile: - logging.info(f"Writing invalid device to {SAVE_FILE}") + with open(DEFAULT_SAVE_FILE, 'a', newline="") as csvfile: + logging.info(f"Writing invalid device to {DEFAULT_SAVE_FILE}") writer = DictWriter( csvfile, fieldnames=column_header, delimiter=";" ) - if stat(SAVE_FILE).st_size == 0: + if stat(DEFAULT_SAVE_FILE).st_size == 0: writer.writeheader() writer.writerows([deviceObject_invalid]) logging.debug( - f"Writing invalid device to {SAVE_FILE}: " + f"Writing invalid device to {DEFAULT_SAVE_FILE}: " f"SUCCESSFUL" ) except Exception as e: logging.error( f"Writing invalid device to " - f"{SAVE_FILE}: FAILED -> {e}" + f"{DEFAULT_SAVE_FILE}: FAILED -> {e}" ) diff --git a/src/tests/test_commands/test_import_devices_from_csv.py b/src/tests/test_commands/test_import_devices_from_csv.py index ca13edc..dc61b4f 100644 --- a/src/tests/test_commands/test_import_devices_from_csv.py +++ b/src/tests/test_commands/test_import_devices_from_csv.py @@ -1,11 +1,12 @@ import pytest from nac.management.commands.import_devices_from_csv import Command, \ - DEFAULT_SOURCE_FILE, SAVE_FILE + DEFAULT_SOURCE_FILE, DEFAULT_SAVE_FILE, DEFAULT_OU_MAPPING, DEFAULT_CSV_MAPPING from unittest.mock import patch, mock_open, MagicMock -from django.core.exceptions import ValidationError +from django.core.exceptions import ValidationError, ObjectDoesNotExist from django.core.management.base import CommandError from nac.models import AuthorizationGroup, DeviceRoleProd, DeviceRoleInst, Device from helper.database import MacList +from helper.config import get_config_from_json @pytest.fixture @@ -53,6 +54,9 @@ def test_check_device(mock_check_existing_mac, mock_logging, mock_atomic, appl_N test_authorization_group.DeviceRoleProd.set([test_deviceRoleProd]) command.auth_group = 'test' command.update = False + command.csv_mapping = get_config_from_json(DEFAULT_CSV_MAPPING) + command.csv_mapping['DeviceRoleCriteria']['MAPPING'] = False + command.ou_mapping = {} command.mac_list = MacList() data = { "name": "test", @@ -102,6 +106,9 @@ def test_check_device_exceptions( test_deviceRoleInst = DeviceRoleInst.objects.create(name="test") test_authorization_group = AuthorizationGroup.objects.create(name="test") command.auth_group = 'test' + command.csv_mapping = get_config_from_json(DEFAULT_CSV_MAPPING) + command.csv_mapping['DeviceRoleCriteria']['MAPPING'] = False + command.ou_mapping = {} command.mac_list = MacList() test_authorization_group.DeviceRoleProd.set([test_deviceRoleProd]) test_authorization_group.DeviceRoleInst.set([test_deviceRoleInst]) @@ -110,7 +117,9 @@ def test_check_device_exceptions( "objectClass": "appl-NAC-Device", "authorization_group": test_authorization_group, "appl-NAC-DeviceRoleProd": test_deviceRoleProd, - "appl-NAC-DeviceRoleInst": test_deviceRoleInst} + "appl-NAC-DeviceRoleInst": test_deviceRoleInst, + "appl-NAC-Hostname": "test" + } mock_form = MagicMock() mock_form.is_valid.return_value = False mock_form.errors = {"Type": ["Reason"]} @@ -291,7 +300,7 @@ def test_save_invalid_devices_empty_file(mock_logging, mock_stat, mock_writer_instance = MagicMock() mock_writer.return_value = mock_writer_instance command.save_invalid_devices(device) - mocked_file.assert_called_once_with(SAVE_FILE, + mocked_file.assert_called_once_with(DEFAULT_SAVE_FILE, 'a', newline="") mock_writer.assert_called_once_with(mocked_file(), fieldnames=device.keys(), @@ -299,9 +308,9 @@ def test_save_invalid_devices_empty_file(mock_logging, mock_stat, mock_writer_instance.writeheader.assert_called_once() mock_writer_instance.writerows.assert_called_once_with([device]) mock_logging.info.assert_called_once_with( - f"Writing invalid device to {SAVE_FILE}") + f"Writing invalid device to {DEFAULT_SAVE_FILE}") mock_logging.debug.assert_called_once_with( - f"Writing invalid device to {SAVE_FILE}: SUCCESSFUL") + f"Writing invalid device to {DEFAULT_SAVE_FILE}: SUCCESSFUL") @patch('builtins.open', new_callable=mock_open) @@ -324,7 +333,7 @@ def test_save_invalid_devices_exception(mock_file, mock_logging, command): command.save_invalid_devices(device) mock_logging.error.assert_called_once_with( f"Writing invalid device to " - f"{SAVE_FILE}: FAILED -> Failed") + f"{DEFAULT_SAVE_FILE}: FAILED -> Failed") @patch('builtins.open', new_callable=mock_open, read_data="name\n;test") @@ -363,9 +372,9 @@ def test_read_csv_exception(mock_logging, mock_handler, @patch('nac.management.commands.import_devices_from_csv.logging') def test_clear_invalid_devices_file(mock_logging, mock_file, command): command.clear_invalid_devices_file() - mock_file.assert_called_once_with(SAVE_FILE, "w") + mock_file.assert_called_once_with(DEFAULT_SAVE_FILE, "w") mock_logging.info.assert_called_once_with( - f"Removing all entries in {SAVE_FILE}") + f"Removing all entries in {DEFAULT_SAVE_FILE}") @patch('builtins.open', side_effect=Exception("Failed")) @@ -373,9 +382,9 @@ def test_clear_invalid_devices_file(mock_logging, mock_file, command): def test_clear_invalid_devices_file_exception(mock_logging, mock_file, command): command.clear_invalid_devices_file() - mock_file.assert_called_once_with(SAVE_FILE, "w") + mock_file.assert_called_once_with(DEFAULT_SAVE_FILE, "w") mock_logging.error.assert_called_once_with( - f"Removing all entries in {SAVE_FILE} FAILED -> Failed" + f"Removing all entries in {DEFAULT_SAVE_FILE} FAILED -> Failed" ) @@ -446,7 +455,7 @@ def test_add_arguments(command): ) mock_parser.add_argument.assert_any_call( '-a', '--auth_group', - default='DefaultAG', + default='AuthGroupDefault', help='specify the Device Authorization Group' ) mock_parser.add_argument.assert_any_call( @@ -470,7 +479,9 @@ def test_handle(mock_read_csv, mock_clear_invalid_devices_file, 'verbosity': 0, 'csv_file': 'test.csv', 'auth_group': 'testag', - 'update': False + 'update': False, + 'csv_config': DEFAULT_CSV_MAPPING, + 'ou_config': DEFAULT_OU_MAPPING } mock_get_existing_path.return_value = 'mockpath/to/test.csv' command.handle(**options) @@ -510,3 +521,115 @@ def test_check_valid_auth_group_not_exists(mock_logging, command): mock_logging.error.assert_called_once_with( "Authorization Group-Object: dummy not in Database") assert result is None + + +def test_get_set_or_default(command): + + assert command.get_set_or_default({'SET': 'value', 'DEFAULT': 'default'}) == 'value' + assert command.get_set_or_default({'SET': None, 'DEFAULT': 'default'}) == 'default' + + with pytest.raises(KeyError): + command.get_set_or_default({'DEFAULT': 'default'}) + + +def test_get_deviceRole_ou_mapping(command): + command.csv_mapping = { + "DeviceRoleCriteria": { + "DEFAULT": "OU", + "SET": None, + "MAPPING": True + } + } + command.ou_mapping = { + "DEFAULT": { + "DeviceRoleProd": "DeviceRoleProdDefault", + "DeviceRoleInst": "DeviceRoleInstDefault" + }, + "OU": { + "DeviceRoleProd": "DeviceRoleProd", + "DeviceRoleInst": "DeviceRoleInst" + } + } + device_object = {'OU': 'OU'} + mock_deviceroleprod = MagicMock() + mock_deviceroleinst = MagicMock() + + with patch('nac.models.DeviceRoleProd.objects.get', return_value=mock_deviceroleprod) as mock_prod_get, \ + patch('nac.models.DeviceRoleInst.objects.get', return_value=mock_deviceroleinst) as mock_inst_get: + + result = command.get_deviceRole(device_object) + + mock_prod_get.assert_called_once_with(name='DeviceRoleProd') + mock_inst_get.assert_called_once_with(name='DeviceRoleInst') + assert result == (mock_deviceroleprod, mock_deviceroleinst) + + device_object = {'OU': '404'} + mock_deviceroleprod = MagicMock() + mock_deviceroleinst = MagicMock() + + with patch('nac.models.DeviceRoleProd.objects.get', return_value=mock_deviceroleprod) as mock_prod_get, \ + patch('nac.models.DeviceRoleInst.objects.get', return_value=mock_deviceroleinst) as mock_inst_get: + + result = command.get_deviceRole(device_object) + + mock_prod_get.assert_called_once_with(name='DeviceRoleProdDefault') + mock_inst_get.assert_called_once_with(name='DeviceRoleInstDefault') + assert result == (mock_deviceroleprod, mock_deviceroleinst) + + +def test_get_deviceRole_csv_mapping(command): + command.csv_mapping = { + "DeviceRoleCriteria": { + "MAPPING": False + }, + "appl-NAC-DeviceRoleProd": { + "DEFAULT": "appl-NAC-DeviceRoleProd", + "SET": None + }, + "appl-NAC-DeviceRoleInst": { + "DEFAULT": "appl-NAC-DeviceRoleInst", + "SET": None + } + } + device_object = {'appl-NAC-DeviceRoleProd': 'DeviceRoleProd1', 'appl-NAC-DeviceRoleInst': 'DeviceRoleInst1'} + mock_deviceroleprod = MagicMock() + mock_deviceroleinst = MagicMock() + + with patch('nac.models.DeviceRoleProd.objects.get', return_value=mock_deviceroleprod) as mock_prod_get, \ + patch('nac.models.DeviceRoleInst.objects.get', return_value=mock_deviceroleinst) as mock_inst_get: + + result = command.get_deviceRole(device_object) + + mock_prod_get.assert_called_once_with(name='DeviceRoleProd1') + mock_inst_get.assert_called_once_with(name='DeviceRoleInst1') + assert result == (mock_deviceroleprod, mock_deviceroleinst) + + +def test_get_deviceRole_ou_mapping_validation_errors(command): + command.csv_mapping = { + "DeviceRoleCriteria": { + "DEFAULT": "OU", + "SET": None, + "MAPPING": True + } + } + command.ou_mapping = { + "OU": { + "DeviceRoleProd": "DeviceRoleProd1", + "DeviceRoleInst": "DeviceRoleInst1" + } + } + device_object = {'OU': 'OU'} + + with patch('nac.models.DeviceRoleProd.objects.get', side_effect=ObjectDoesNotExist()), \ + pytest.raises(ValidationError) as excinfo: + command.get_deviceRole(device_object) + + assert "DeviceRoleProd: DeviceRoleProd1 not in Database" in str(excinfo.value) + + with patch('nac.models.DeviceRoleProd.objects.get', return_value=MagicMock()), \ + patch('nac.models.DeviceRoleInst.objects.get', side_effect=ObjectDoesNotExist()), \ + pytest.raises(ValidationError) as excinfo: + command.get_deviceRole(device_object) + + assert "DeviceRoleInst: DeviceRoleInst1 not in Database" in str(excinfo.value)