Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report success and failures better in seqerakit #157

Merged
merged 5 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions seqerakit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ def handle_block(self, block, args, destroy=False, dryrun=False):

def main(args=None):
options = parse_args(args if args is not None else sys.argv[1:])
logging.basicConfig(level=options.log_level)
logging.basicConfig(level=getattr(logging, options.log_level.upper()))

# Parse CLI arguments into a list and create a Seqera Platform instance
cli_args_list = options.cli_args.split() if options.cli_args else []
sp = seqeraplatform.SeqeraPlatform(cli_args=cli_args_list, dryrun=options.dryrun)

# If the info flag is set, run 'tw info'
if options.info:
sp = seqeraplatform.SeqeraPlatform()
print(sp.info())
result = sp.info()
if not options.dryrun:
print(result)
return

if not options.yaml:
Expand All @@ -161,11 +166,6 @@ def main(args=None):
else:
options.yaml = [sys.stdin]

# Parse CLI arguments into a list
cli_args_list = options.cli_args.split() if options.cli_args else []

sp = seqeraplatform.SeqeraPlatform(cli_args=cli_args_list, dryrun=options.dryrun)

block_manager = BlockParser(
sp,
[
Expand All @@ -186,15 +186,10 @@ def main(args=None):
cmd_args_dict = helper.parse_all_yaml(options.yaml, destroy=options.delete)
for block, args_list in cmd_args_dict.items():
for args in args_list:
try:
# Run the 'tw' methods for each block
block_manager.handle_block(
block, args, destroy=options.delete, dryrun=options.dryrun
)
except (ResourceExistsError, ResourceCreationError) as e:
logging.error(e)
sys.exit(1)
except ValueError as e:
block_manager.handle_block(
block, args, destroy=options.delete, dryrun=options.dryrun
)
except (ResourceExistsError, ResourceCreationError, ValueError) as e:
logging.error(e)
sys.exit(1)

Expand Down
34 changes: 19 additions & 15 deletions seqerakit/overwrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ def handle_overwrite(self, block, args, overwrite=False, destroy=False):
if self.check_resource_exists(operation["name_key"], sp_args):
# if resource exists and overwrite is true, delete
if overwrite:
logging.debug(
logging.info(
f" The attempted {block} resource already exists."
" Overwriting.\n"
)
self.delete_resource(block, operation, sp_args)
elif destroy:
logging.debug(f" Deleting the {block} resource.")
logging.info(f" Deleting the {block} resource.")
self.delete_resource(block, operation, sp_args)
else: # return an error if resource exists, overwrite=False
raise ResourceExistsError(
Expand All @@ -147,7 +147,8 @@ def _get_team_args(self, args):

if not jsondata:
json_method = getattr(self.sp, "-o json")
json_out = json_method("teams", "list", "-o", args["organization"])
with self.sp.suppress_output():
json_out = json_method("teams", "list", "-o", args["organization"])
self.block_jsondata["teams"] = json_out
else:
json_out = jsondata
Expand Down Expand Up @@ -244,27 +245,30 @@ def _get_json_data(self, block, args, keys_to_get):
# Fetch the data if it does not exist
if block == "teams":
sp_args = self._get_values_from_cmd_args(args[0], keys_to_get)
self.cached_jsondata = json_method(
block, "list", "-o", sp_args["organization"]
)
with self.sp.suppress_output():
self.cached_jsondata = json_method(
block, "list", "-o", sp_args["organization"]
)
elif block in Overwrite.generic_deletion or block in {
"participants",
"labels",
}:
sp_args = self._get_values_from_cmd_args(args, keys_to_get)
self.cached_jsondata = json_method(
block, "list", "-w", sp_args["workspace"]
)
elif block == "members" or block == "workspaces": # TODO
with self.sp.suppress_output():
self.cached_jsondata = json_method(
block, "list", "-w", sp_args["workspace"]
)
elif block == "members" or block == "workspaces":
sp_args = self._get_values_from_cmd_args(args, keys_to_get)
self.cached_jsondata = json_method(
block, "list", "-o", sp_args["organization"]
)
with self.sp.suppress_output():
self.cached_jsondata = json_method(
block, "list", "-o", sp_args["organization"]
)
else:
sp_args = self._get_values_from_cmd_args(args, keys_to_get)
self.cached_jsondata = json_method(block, "list")
with self.sp.suppress_output():
self.cached_jsondata = json_method(block, "list")

# Store this data in the block_jsondata dict for later use
self.block_jsondata[block] = self.cached_jsondata
return self.cached_jsondata, sp_args

Expand Down
52 changes: 34 additions & 18 deletions seqerakit/seqeraplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from contextlib import contextmanager
import os
import shlex
import logging
import subprocess
import re
import json

logging.basicConfig(level=logging.DEBUG)


class SeqeraPlatform:
"""
Expand All @@ -44,13 +43,15 @@ def __call__(self, *args, **kwargs):
return self.tw_instance._tw_run(command, **kwargs)

# Constructs a new SeqeraPlatform instance
def __init__(self, cli_args=None, dryrun=False):
def __init__(self, cli_args=None, dryrun=False, print_stdout=True):
if cli_args and "--verbose" in cli_args:
raise ValueError(
"--verbose is not supported as a CLI argument to seqerakit."
)
self.cli_args = cli_args or []
self.dryrun = dryrun
self.print_stdout = print_stdout
self._suppress_output = False

def _construct_command(self, cmd, *args, **kwargs):
command = ["tw"] + self.cli_args
Expand Down Expand Up @@ -99,27 +100,27 @@ def _check_env_vars(self, command):
return " ".join(full_cmd_parts)

# Executes a 'tw' command in a subprocess and returns the output.
def _execute_command(self, full_cmd, to_json=False):
logging.debug(f" Running command: {full_cmd}")
def _execute_command(self, full_cmd, to_json=False, print_stdout=True):
logging.info(f" Running command: {full_cmd}")
process = subprocess.Popen(
full_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True
)
stdout, _ = process.communicate()
stdout = stdout.decode("utf-8").strip()

should_print = (
print_stdout if print_stdout is not None else self.print_stdout
) and not self._suppress_output

if should_print:
logging.info(f" Command output: {stdout}")

if "ERROR: " in stdout or process.returncode != 0:
self._handle_command_errors(str(stdout))
self._handle_command_errors(stdout)

return json.loads(stdout) if to_json else stdout

def _execute_info_command(self):
# Directly execute 'tw info' command
command = "tw info"
return self._execute_command(command)

def _handle_command_errors(self, stdout):
logging.error(stdout)

# Check for specific tw cli error patterns and raise custom exceptions
if re.search(
r"ERROR: .*already (exists|a participant)", stdout, flags=re.IGNORECASE
Expand All @@ -134,18 +135,33 @@ def _handle_command_errors(self, stdout):
)

def _tw_run(self, cmd, *args, **kwargs):
print_stdout = kwargs.pop("print_stdout", None)
full_cmd = self._construct_command(cmd, *args, **kwargs)
if not full_cmd or self.dryrun:
logging.debug(f"DRYRUN: Running command {full_cmd}")
logging.info(f"DRYRUN: Running command {full_cmd}")
return
return self._execute_command(full_cmd, kwargs.get("to_json"))
return self._execute_command(full_cmd, kwargs.get("to_json"), print_stdout)

@contextmanager
def suppress_output(self):
original_suppress = self._suppress_output
self._suppress_output = True
try:
yield
finally:
self._suppress_output = original_suppress

# Allow any 'tw' subcommand to be called as a method.
def __getattr__(self, cmd):
if cmd == "info":
return self._execute_info_command
else:
return self.TwCommand(self, cmd.replace("_", "-"))
return lambda *args, **kwargs: self._tw_run(
["info"], *args, **kwargs, print_stdout=False
)
if cmd == "-o json":
return lambda *args, **kwargs: self._tw_run(
["-o", "json"] + list(args), **kwargs
)
return self.TwCommand(self, cmd.replace("_", "-"))


class ResourceExistsError(Exception):
Expand Down
130 changes: 130 additions & 0 deletions tests/unit/test_seqeraplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import subprocess
import os
import logging
from io import StringIO


class TestSeqeraPlatform(unittest.TestCase):
Expand Down Expand Up @@ -186,6 +188,32 @@ def test_cli_args_exclusion_of_verbose(self): # TODO: remove this test once fix
"--verbose is not supported as a CLI argument to seqerakit.",
)

@patch("subprocess.Popen")
def test_info_command_construction(self, mock_subprocess):
# Mock the subprocess call to prevent actual execution
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (b"", b"")

self.sp.info()
called_command = mock_subprocess.call_args[0][0]

# Check if the constructed command is correct
expected_command_part = "tw --url http://tower-api.com --insecure info"
self.assertIn(expected_command_part, called_command)

# Check if the cli_args are included in the called command
for arg in self.cli_args:
self.assertIn(arg, called_command)

@patch("subprocess.Popen")
def test_info_command_dryrun(self, mock_subprocess):
# Initialize SeqeraPlatform with dryrun enabled
self.sp.dryrun = True
self.sp.info()

# Check that subprocess.Popen is not called
mock_subprocess.assert_not_called()


class TestKitOptions(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -242,5 +270,107 @@ def test_error_raised_for_unset_env_vars(self):
)


class TestSeqeraPlatformOutputHandling(unittest.TestCase):
def setUp(self):
self.sp = seqeraplatform.SeqeraPlatform()
# Set up logging to capture output
self.log_capture = StringIO()
self.log_handler = logging.StreamHandler(self.log_capture)
logging.getLogger().addHandler(self.log_handler)
logging.getLogger().setLevel(logging.INFO)

def tearDown(self):
logging.getLogger().removeHandler(self.log_handler)
logging.getLogger().setLevel(logging.NOTSET)

@patch("subprocess.Popen")
def test_suppress_output(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (
b'{"key": "value"}',
b"",
)

log_capture = StringIO()
logging.getLogger().addHandler(logging.StreamHandler(log_capture))

with self.sp.suppress_output():
self.sp.pipelines("list")

log_contents = log_capture.getvalue()
self.assertIn("Running command:", log_contents)
self.assertNotIn("Command output:", log_contents)

@patch("subprocess.Popen")
def test_suppress_output_context(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (
b'{"key": "value"}',
b"",
)

# Test that stdout is suppressed within the context manager
with self.sp.suppress_output():
result = self.sp._execute_command("tw pipelines list", to_json=True)
self.assertEqual(result, {"key": "value"})

# Test that stdout is not suppressed outside the context manager
result = self.sp._execute_command("tw pipelines list", to_json=True)
self.assertEqual(result, {"key": "value"})

@patch("subprocess.Popen")
def test_json_output_handling(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (
b'{"key": "value"}',
b"",
)

result = self.sp._execute_command("tw pipelines list", to_json=True)
self.assertEqual(result, {"key": "value"})

result = self.sp._execute_command("tw pipelines list", to_json=False)
self.assertEqual(result, '{"key": "value"}')

@patch("subprocess.Popen")
def test_print_stdout_override(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (b"output", b"")

# Test with print_stdout=True
self.sp._execute_command("tw pipelines list", print_stdout=True)
log_output = self.log_capture.getvalue()
self.assertIn("Command output: output", log_output)

# Clear the log capture
self.log_capture.truncate(0)
self.log_capture.seek(0)

# Test with print_stdout=False
self.sp._execute_command("tw pipelines list", print_stdout=False)
log_output = self.log_capture.getvalue()
self.assertNotIn("Command output: output", log_output)

@patch("subprocess.Popen")
def test_error_handling_with_suppressed_output(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=1)
mock_subprocess.return_value.communicate.return_value = (
b"ERROR: Something went wrong",
b"",
)

with self.assertRaises(seqeraplatform.ResourceCreationError):
with self.sp.suppress_output():
self.sp._execute_command("tw pipelines list")

@patch("subprocess.Popen")
def test_json_parsing_error(self, mock_subprocess):
mock_subprocess.return_value = MagicMock(returncode=0)
mock_subprocess.return_value.communicate.return_value = (b"Invalid JSON", b"")

with self.assertRaises(json.JSONDecodeError):
self.sp._execute_command("tw pipelines list", to_json=True)


if __name__ == "__main__":
unittest.main()
Loading