Skip to content

Commit

Permalink
Added unit tests for RMUTL new methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Samarinnayak committed Oct 28, 2024
1 parent 693b4a1 commit a64ca70
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 2 deletions.
1 change: 1 addition & 0 deletions plugins/module_utils/_global_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def _validate_line_length(line, name):
"""
if len(line) > MAX_LINE_LENGTH:
raise ValueError(f"{name} line exceeds {MAX_LINE_LENGTH} characters: {len(line)}")
return True

def _validate_name_params(param):
if any(len(part) > MAX_NAME_LENGTH for part in param.split('.')):
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/helpers/data_set_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from textwrap import dedent
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils import basic
from ansible_collections.ibm.ibm_zos_cics.plugins.module_utils._response import _execution

PYTHON_LANGUAGE_FEATURES_MESSAGE = "Requires python 3 language features"

Expand Down Expand Up @@ -418,6 +419,32 @@ def CSDUP_add_group_stdout(data_set_name):
DFH5109 I END OF DFHCSDUP UTILITY JOB. HIGHEST RETURN CODE WAS: 0
""".format(data_set_name)

def get_job_output_execution(rc=0):
job = get_sample_job_output()
return _execution(
name="Get job output for {0}".format(job["job_id"]),
rc=rc,
stdout=job.get("ret_code").get("msg"),
stderr=job.get("ret_code").get("msg_txt")
)

def _get_job_dd_output(ddname, job_id):
return (
_get_job_dd_output_execution(rc=0, ddname=ddname, job_id=job_id),
RMUTL_stdout("AUTOINIT", job_id)
)

def _get_job_dd_output_execution(rc = 0, ddname="", job_id=""):
return _execution(
name=JOB_DD_return_name(ddname, job_id),
rc=rc,
stdout=RMUTL_stdout("AUTOINIT", job_id),
stderr="CC"
)

def JOB_DD_return_name(ddname, job_id):
return "Get job dd {0} output for {1}".format(ddname, job_id)

def get_sample_job_output(content="", rc=0, err="CC"):
return {
"class": "",
Expand Down Expand Up @@ -483,6 +510,12 @@ def get_sample_job_output(content="", rc=0, err="CC"):
"system": ""
}

def _get_sample_job_output_with_content():
job = get_sample_job_output()
for dd in job.get("ddnames"):
dd["content"] = RMUTL_stdout("AUTOINIT", job.get("job_id"))
return job


def read_data_set_content_run_name(data_set_name):
return "Read data set {0}".format(data_set_name)
Expand Down
154 changes: 153 additions & 1 deletion tests/unit/module_utils/test_dataset_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
LISTDS_data_set,
LISTDS_data_set_doesnt_exist,
LISTDS_member_doesnt_exist,
LISTDS_run_name
LISTDS_run_name,
get_sample_job_output as JOB_OUTPUT,
get_job_output_execution as JOB_EXECUTION,
JOB_DD_return_name,
RMUTL_stdout,
_get_job_dd_output,
_get_job_dd_output_execution,
_get_sample_job_output_with_content

)
from ansible_collections.ibm.ibm_zos_core.plugins.module_utils.dd_statement import DatasetDefinition
from ansible_collections.ibm.ibm_zos_core.plugins.module_utils.zos_mvs_raw import MVSCmdResponse
Expand Down Expand Up @@ -700,3 +708,147 @@ def test__write_jcl_to_data_set_fail():

assert e.value.message == "Failed to copy JCL content to data set"
assert e.value.executions == expected_executions

def test_data_set_utils_submit_jcl():
jcl_uss_path = "jcl/uss/path"
job_name = "Job_Name"
rc = 0
stdout = ""
stderr = ""
data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr))

expected_executions = [{
"name": "Submit jcl job {0} for {1}".format(jcl_uss_path, job_name),
"rc": 0,
"stdout": "",
"stderr": ""
}]

executions= data_set_utils._submit_jcl(jcl_uss_path, job_name)
assert expected_executions == executions

def test_data_set_utils_submit_jcl_failed():
jcl_uss_path = "jcl/uss/path"
job_name = "Job_Name"
rc = 16
stdout = "CC"
stderr = "CC"
data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr))

expected_executions = [{
"name": "Submit jcl job {0} for {1}".format(jcl_uss_path, job_name),
"rc": 16,
"stdout": "CC",
"stderr": "CC"
}]
with pytest.raises(MVSExecutionException) as e:
data_set_utils._submit_jcl(jcl_uss_path, job_name)
assert e.value.message == "RC {0} when submitting jcl from {1}".format(rc, jcl_uss_path)
assert e.value.executions == expected_executions



def test_data_set_utils_get_job_output():
job_id = "JOB12345"
expected_execution=[
JOB_EXECUTION(),
_execution(
name=JOB_DD_return_name("JESMSGLG", job_id),
rc=0,
stdout=RMUTL_stdout("AUTOINIT", job_id),
stderr="CC",
),
_execution(
name=JOB_DD_return_name("JESJCL", job_id),
rc=0,
stdout=RMUTL_stdout("AUTOINIT", job_id),
stderr="CC",
),
_execution(
name=JOB_DD_return_name("JESSYSMSG", job_id),
rc=0,
stdout=RMUTL_stdout("AUTOINIT", job_id),
stderr="CC",
),
_execution(
name=JOB_DD_return_name("SYSPRINT", job_id),
rc=0,
stdout=RMUTL_stdout("AUTOINIT", job_id),
stderr="CC",
)
]
expected_result = _get_sample_job_output_with_content()
data_set_utils.job_output = MagicMock(return_value=[JOB_OUTPUT()])
job_id= JOB_OUTPUT().get("job_id")
data_set_utils._get_job_dd = MagicMock(side_effect=[
_get_job_dd_output("JESMSGLG", job_id),
_get_job_dd_output("JESJCL", job_id),
_get_job_dd_output("JESSYSMSG", job_id),
_get_job_dd_output("SYSPRINT", job_id)
])
result, executions = data_set_utils._get_job_output(job_id=job_id, job_name="DFHRMUTL")

assert result == expected_result
assert executions == expected_execution

def test_data_set_utils_get_job_output_multiple_jobs_failure():
job_id = JOB_OUTPUT()["job_id"]
job_name = JOB_OUTPUT()["job_name"]

mock_jobs = [
JOB_OUTPUT(),
{"ret_code": {"code": 1, "msg": "Error", "msg_txt": "Job failed"}}
]
expected_executions = []
data_set_utils.job_output = MagicMock(return_value = mock_jobs)

with pytest.raises(MVSExecutionException) as e:
data_set_utils._get_job_output(job_id, job_name)

assert e.value.executions == expected_executions
assert f"Query for {job_name} job submitted under Job ID {job_id} failed" in e.value.message

def test_data_set_utils_get_job_output_get_dd_failure():
job_id = JOB_OUTPUT()["job_id"]
job_name = JOB_OUTPUT()["job_name"]

expected_executions = [JOB_EXECUTION()]
expected_exception = TypeError("Failed to get DD content")
data_set_utils.job_output = MagicMock(return_value=[JOB_OUTPUT()])
data_set_utils._get_job_dd = MagicMock(return_value=expected_exception)

with pytest.raises(MVSExecutionException) as e:
data_set_utils._get_job_output(job_id, job_name)

assert e.value.executions == expected_executions
assert f"Could not get all job DDs for {job_name}. An exception occured: " in str(e.value.message)

def test_data_set_utils_get_job_dd():
job_id = "JOB12345"
dd_name = "DD Name"
rc = 0
stdout = RMUTL_stdout("AUTOINIT", job_id)
stderr = "CC"
mock_dd_execution = [_get_job_dd_output_execution(rc, dd_name, job_id)]
data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr))
result_execution, result = data_set_utils._get_job_dd(job_id, dd_name)

assert result == stdout
assert result_execution == mock_dd_execution

def test_data_set_utils_get_job_dd_failure():
job_id = "JOB12345"
dd_name = "DD Name"
rc = 8
stdout = RMUTL_stdout("AUTOINIT", job_id)
stderr = "CC"

mock_dd_execution = [_get_job_dd_output_execution(rc, dd_name, job_id)]
data_set_utils._execute_command = MagicMock(return_value=(rc, stdout, stderr))

with pytest.raises(Exception) as e:
data_set_utils._get_job_dd(job_id, dd_name)

assert e.type == MVSExecutionException
assert f"RC 8 when getting job output for DD Name from {job_id}" in str(e.value)
assert e.value.executions == mock_dd_execution
38 changes: 37 additions & 1 deletion tests/unit/module_utils/test_global_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
)
import pytest
import sys
import tempfile

try:
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
except ImportError:
from mock import MagicMock

Expand Down Expand Up @@ -202,6 +203,41 @@ def test_global_catalog_get_records_autocold_emergency():
resp = global_catalog._get_catalog_records(stdout=stdout)
assert resp == ("AUTOCOLD", "EMERGENCY")

def test_global_catalog_create_dfhrmutl_jcl():
location = "DATA.SET"
sdfhload = "SDFH.LOAD"
cmd = "HI"

mock_file = MagicMock()
mock_file.name = '/mocked/path/to/tempfile'
mock_file.__enter__.return_value = mock_file
with patch('tempfile.NamedTemporaryFile', return_value=mock_file) as mock_tempfile:
result = global_catalog._create_dfhrmutl_jcl(location, sdfhload, cmd)

mock_tempfile.assert_called_once_with(mode='w+', delete=False)
assert result == '/mocked/path/to/tempfile'

def test_global_catalog_validate_line_length():
line = "//STEPLIB DD DSNAME=CTS560.CICS730.SDFHLOAD,DISP=SHR"
name = "SDFHLOAD"
result = global_catalog._validate_line_length(line, name)
assert result is True

def test_global_catalog_validate_line_length_failed():
line = "//STEPLIB DD DSNAME=CTS560.CICS730.SDFHLOAD,DISP=SHR this is inavlid line with more the 72 chars"
name = "SDFHLOAD"
with pytest.raises(ValueError):
global_catalog._validate_line_length(line, name)

def test_global_catalog_validate_name_params():
name = "CTS560.CICS730.SDFHLOAD"
result = global_catalog._validate_name_params(name)
assert result is True

def test_global_catalog_validate_name_params_failed():
name = "CTS560.CICS730.SDFHLOAD1"
with pytest.raises(ValueError):
global_catalog._validate_name_params(name)

def test_global_catalog_run_rmutl_with_cmd():
executions = [
Expand Down

0 comments on commit a64ca70

Please sign in to comment.