From a64ca70b393f93954bbf59caeccec1b0a4cb893c Mon Sep 17 00:00:00 2001 From: Samreen-Nayak Date: Mon, 28 Oct 2024 15:26:43 +0000 Subject: [PATCH] Added unit tests for RMUTL new methods --- plugins/module_utils/_global_catalog.py | 1 + tests/unit/helpers/data_set_helper.py | 33 ++++ tests/unit/module_utils/test_dataset_utils.py | 154 +++++++++++++++++- .../unit/module_utils/test_global_catalog.py | 38 ++++- 4 files changed, 224 insertions(+), 2 deletions(-) diff --git a/plugins/module_utils/_global_catalog.py b/plugins/module_utils/_global_catalog.py index 75466fc3..83eb81c6 100644 --- a/plugins/module_utils/_global_catalog.py +++ b/plugins/module_utils/_global_catalog.py @@ -100,6 +100,7 @@ def _validate_line_length(line, name): """ if len(line) > MAX_LINE_LENGTH: raise ValueError(f"{name} line exceeds {MAX_LINE_LENGTH} characters: {len(line)}") + return True def _validate_name_params(param): if any(len(part) > MAX_NAME_LENGTH for part in param.split('.')): diff --git a/tests/unit/helpers/data_set_helper.py b/tests/unit/helpers/data_set_helper.py index 2b310e40..6c14864c 100644 --- a/tests/unit/helpers/data_set_helper.py +++ b/tests/unit/helpers/data_set_helper.py @@ -6,6 +6,7 @@ from textwrap import dedent from ansible.module_utils.common.text.converters import to_bytes from ansible.module_utils import basic +from ansible_collections.ibm.ibm_zos_cics.plugins.module_utils._response import _execution PYTHON_LANGUAGE_FEATURES_MESSAGE = "Requires python 3 language features" @@ -418,6 +419,32 @@ def CSDUP_add_group_stdout(data_set_name): DFH5109 I END OF DFHCSDUP UTILITY JOB. HIGHEST RETURN CODE WAS: 0 """.format(data_set_name) +def get_job_output_execution(rc=0): + job = get_sample_job_output() + return _execution( + name="Get job output for {0}".format(job["job_id"]), + rc=rc, + stdout=job.get("ret_code").get("msg"), + stderr=job.get("ret_code").get("msg_txt") + ) + +def _get_job_dd_output(ddname, job_id): + return ( + _get_job_dd_output_execution(rc=0, ddname=ddname, job_id=job_id), + RMUTL_stdout("AUTOINIT", job_id) + ) + +def _get_job_dd_output_execution(rc = 0, ddname="", job_id=""): + return _execution( + name=JOB_DD_return_name(ddname, job_id), + rc=rc, + stdout=RMUTL_stdout("AUTOINIT", job_id), + stderr="CC" + ) + +def JOB_DD_return_name(ddname, job_id): + return "Get job dd {0} output for {1}".format(ddname, job_id) + def get_sample_job_output(content="", rc=0, err="CC"): return { "class": "", @@ -483,6 +510,12 @@ def get_sample_job_output(content="", rc=0, err="CC"): "system": "" } +def _get_sample_job_output_with_content(): + job = get_sample_job_output() + for dd in job.get("ddnames"): + dd["content"] = RMUTL_stdout("AUTOINIT", job.get("job_id")) + return job + def read_data_set_content_run_name(data_set_name): return "Read data set {0}".format(data_set_name) diff --git a/tests/unit/module_utils/test_dataset_utils.py b/tests/unit/module_utils/test_dataset_utils.py index a612a8b2..bfbc42c4 100644 --- a/tests/unit/module_utils/test_dataset_utils.py +++ b/tests/unit/module_utils/test_dataset_utils.py @@ -15,7 +15,15 @@ LISTDS_data_set, LISTDS_data_set_doesnt_exist, LISTDS_member_doesnt_exist, - LISTDS_run_name + LISTDS_run_name, + get_sample_job_output as JOB_OUTPUT, + get_job_output_execution as JOB_EXECUTION, + JOB_DD_return_name, + RMUTL_stdout, + _get_job_dd_output, + _get_job_dd_output_execution, + _get_sample_job_output_with_content + ) from ansible_collections.ibm.ibm_zos_core.plugins.module_utils.dd_statement import DatasetDefinition from ansible_collections.ibm.ibm_zos_core.plugins.module_utils.zos_mvs_raw import MVSCmdResponse @@ -700,3 +708,147 @@ def test__write_jcl_to_data_set_fail(): assert e.value.message == "Failed to copy JCL content to data set" assert e.value.executions == expected_executions + +def test_data_set_utils_submit_jcl(): + jcl_uss_path = "jcl/uss/path" + job_name = "Job_Name" + rc = 0 + stdout = "" + stderr = "" + data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr)) + + expected_executions = [{ + "name": "Submit jcl job {0} for {1}".format(jcl_uss_path, job_name), + "rc": 0, + "stdout": "", + "stderr": "" + }] + + executions= data_set_utils._submit_jcl(jcl_uss_path, job_name) + assert expected_executions == executions + +def test_data_set_utils_submit_jcl_failed(): + jcl_uss_path = "jcl/uss/path" + job_name = "Job_Name" + rc = 16 + stdout = "CC" + stderr = "CC" + data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr)) + + expected_executions = [{ + "name": "Submit jcl job {0} for {1}".format(jcl_uss_path, job_name), + "rc": 16, + "stdout": "CC", + "stderr": "CC" + }] + with pytest.raises(MVSExecutionException) as e: + data_set_utils._submit_jcl(jcl_uss_path, job_name) + assert e.value.message == "RC {0} when submitting jcl from {1}".format(rc, jcl_uss_path) + assert e.value.executions == expected_executions + + + +def test_data_set_utils_get_job_output(): + job_id = "JOB12345" + expected_execution=[ + JOB_EXECUTION(), + _execution( + name=JOB_DD_return_name("JESMSGLG", job_id), + rc=0, + stdout=RMUTL_stdout("AUTOINIT", job_id), + stderr="CC", + ), + _execution( + name=JOB_DD_return_name("JESJCL", job_id), + rc=0, + stdout=RMUTL_stdout("AUTOINIT", job_id), + stderr="CC", + ), + _execution( + name=JOB_DD_return_name("JESSYSMSG", job_id), + rc=0, + stdout=RMUTL_stdout("AUTOINIT", job_id), + stderr="CC", + ), + _execution( + name=JOB_DD_return_name("SYSPRINT", job_id), + rc=0, + stdout=RMUTL_stdout("AUTOINIT", job_id), + stderr="CC", + ) + ] + expected_result = _get_sample_job_output_with_content() + data_set_utils.job_output = MagicMock(return_value=[JOB_OUTPUT()]) + job_id= JOB_OUTPUT().get("job_id") + data_set_utils._get_job_dd = MagicMock(side_effect=[ + _get_job_dd_output("JESMSGLG", job_id), + _get_job_dd_output("JESJCL", job_id), + _get_job_dd_output("JESSYSMSG", job_id), + _get_job_dd_output("SYSPRINT", job_id) + ]) + result, executions = data_set_utils._get_job_output(job_id=job_id, job_name="DFHRMUTL") + + assert result == expected_result + assert executions == expected_execution + +def test_data_set_utils_get_job_output_multiple_jobs_failure(): + job_id = JOB_OUTPUT()["job_id"] + job_name = JOB_OUTPUT()["job_name"] + + mock_jobs = [ + JOB_OUTPUT(), + {"ret_code": {"code": 1, "msg": "Error", "msg_txt": "Job failed"}} + ] + expected_executions = [] + data_set_utils.job_output = MagicMock(return_value = mock_jobs) + + with pytest.raises(MVSExecutionException) as e: + data_set_utils._get_job_output(job_id, job_name) + + assert e.value.executions == expected_executions + assert f"Query for {job_name} job submitted under Job ID {job_id} failed" in e.value.message + +def test_data_set_utils_get_job_output_get_dd_failure(): + job_id = JOB_OUTPUT()["job_id"] + job_name = JOB_OUTPUT()["job_name"] + + expected_executions = [JOB_EXECUTION()] + expected_exception = TypeError("Failed to get DD content") + data_set_utils.job_output = MagicMock(return_value=[JOB_OUTPUT()]) + data_set_utils._get_job_dd = MagicMock(return_value=expected_exception) + + with pytest.raises(MVSExecutionException) as e: + data_set_utils._get_job_output(job_id, job_name) + + assert e.value.executions == expected_executions + assert f"Could not get all job DDs for {job_name}. An exception occured: " in str(e.value.message) + +def test_data_set_utils_get_job_dd(): + job_id = "JOB12345" + dd_name = "DD Name" + rc = 0 + stdout = RMUTL_stdout("AUTOINIT", job_id) + stderr = "CC" + mock_dd_execution = [_get_job_dd_output_execution(rc, dd_name, job_id)] + data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr)) + result_execution, result = data_set_utils._get_job_dd(job_id, dd_name) + + assert result == stdout + assert result_execution == mock_dd_execution + +def test_data_set_utils_get_job_dd_failure(): + job_id = "JOB12345" + dd_name = "DD Name" + rc = 8 + stdout = RMUTL_stdout("AUTOINIT", job_id) + stderr = "CC" + + mock_dd_execution = [_get_job_dd_output_execution(rc, dd_name, job_id)] + data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr)) + + with pytest.raises(Exception) as e: + data_set_utils._get_job_dd(job_id, dd_name) + + assert e.type == MVSExecutionException + assert f"RC 8 when getting job output for DD Name from {job_id}" in str(e.value) + assert e.value.executions == mock_dd_execution diff --git a/tests/unit/module_utils/test_global_catalog.py b/tests/unit/module_utils/test_global_catalog.py index d482d3d1..30a40817 100644 --- a/tests/unit/module_utils/test_global_catalog.py +++ b/tests/unit/module_utils/test_global_catalog.py @@ -26,9 +26,10 @@ ) import pytest import sys +import tempfile try: - from unittest.mock import MagicMock + from unittest.mock import MagicMock, patch except ImportError: from mock import MagicMock @@ -202,6 +203,41 @@ def test_global_catalog_get_records_autocold_emergency(): resp = global_catalog._get_catalog_records(stdout=stdout) assert resp == ("AUTOCOLD", "EMERGENCY") +def test_global_catalog_create_dfhrmutl_jcl(): + location = "DATA.SET" + sdfhload = "SDFH.LOAD" + cmd = "HI" + + mock_file = MagicMock() + mock_file.name = '/mocked/path/to/tempfile' + mock_file.__enter__.return_value = mock_file + with patch('tempfile.NamedTemporaryFile', return_value=mock_file) as mock_tempfile: + result = global_catalog._create_dfhrmutl_jcl(location, sdfhload, cmd) + + mock_tempfile.assert_called_once_with(mode='w+', delete=False) + assert result == '/mocked/path/to/tempfile' + +def test_global_catalog_validate_line_length(): + line = "//STEPLIB DD DSNAME=CTS560.CICS730.SDFHLOAD,DISP=SHR" + name = "SDFHLOAD" + result = global_catalog._validate_line_length(line, name) + assert result is True + +def test_global_catalog_validate_line_length_failed(): + line = "//STEPLIB DD DSNAME=CTS560.CICS730.SDFHLOAD,DISP=SHR this is inavlid line with more the 72 chars" + name = "SDFHLOAD" + with pytest.raises(ValueError): + global_catalog._validate_line_length(line, name) + +def test_global_catalog_validate_name_params(): + name = "CTS560.CICS730.SDFHLOAD" + result = global_catalog._validate_name_params(name) + assert result is True + +def test_global_catalog_validate_name_params_failed(): + name = "CTS560.CICS730.SDFHLOAD1" + with pytest.raises(ValueError): + global_catalog._validate_name_params(name) def test_global_catalog_run_rmutl_with_cmd(): executions = [