diff --git a/sos/report/plugins/__init__.py b/sos/report/plugins/__init__.py index 69b692bf24..925939d8f9 100644 --- a/sos/report/plugins/__init__.py +++ b/sos/report/plugins/__init__.py @@ -2032,6 +2032,7 @@ def _add_cmd_output(self, **kwargs): kwargs['changes'] = False if (not getattr(SoSCommand(**kwargs), "snap_cmd", False) and (self.get_option('all_logs') or kwargs['sizelimit'] == 0)): + kwargs['sizelimit'] = 0 kwargs['to_file'] = True if "snap_cmd" in kwargs: kwargs.pop("snap_cmd") @@ -2372,7 +2373,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, binary=False, sizelimit=None, subdir=None, changes=False, foreground=False, tags=[], priority=10, cmd_as_tag=False, to_file=False, - container_cmd=False, runas=None): + tac=False, container_cmd=False, runas=None): """Execute a command and save the output to a file for inclusion in the report. @@ -2400,6 +2401,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, :param cmd_as_tag: Format command string to tag :param to_file: Write output directly to file instead of saving in memory + :param tac: Reverse log lines order :param runas: Run the `cmd` as the `runas` user :returns: dict containing status, output, and filename in the @@ -2451,7 +2453,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, cmd, timeout=timeout, stderr=stderr, chroot=root, chdir=runat, env=_env, binary=binary, sizelimit=sizelimit, poller=self.check_timeout, foreground=foreground, - to_file=out_file, runas=runas + to_file=out_file, tac=tac, runas=runas ) end = time() @@ -2489,7 +2491,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, result = sos_get_command_output( cmd, timeout=timeout, chroot=False, chdir=runat, env=env, binary=binary, sizelimit=sizelimit, - poller=self.check_timeout, to_file=out_file + poller=self.check_timeout, to_file=out_file, tac=tac, ) run_time = time() - start self._log_debug(f"could not run '{cmd}': command not found") @@ -2508,6 +2510,9 @@ def _collect_cmd_output(self, cmd, suggest_filename=None, "truncated") linkfn = outfn outfn = outfn.replace('sos_commands', 'sos_strings') + '.tailed' + if out_file: + dest = self.archive.check_path(outfn, P_FILE, force=True) + os.rename(out_file, dest) if not to_file: if binary: @@ -3083,8 +3088,15 @@ def add_journal(self, units=None, boot=None, since=None, until=None, if output: journal_cmd += output_opt % output + fname = journal_cmd + tac = False + if log_size > 0: + journal_cmd = f"{journal_cmd} --reverse" + tac = True + self._log_debug(f"collecting journal: {journal_cmd}") self._add_cmd_output(cmd=journal_cmd, timeout=timeout, + tac=tac, to_file=True, suggest_filename=fname, sizelimit=log_size, pred=pred, tags=tags, priority=priority) diff --git a/sos/utilities.py b/sos/utilities.py index e666c64972..d27c24a91c 100644 --- a/sos/utilities.py +++ b/sos/utilities.py @@ -20,6 +20,7 @@ import threading import time import io +import mmap from contextlib import closing from collections import deque @@ -71,6 +72,7 @@ 'recursive_dict_values_by_key', 'shell_out', 'sos_get_command_output', + 'tac_logs', 'tail', ] @@ -213,7 +215,7 @@ def is_executable(command, sysroot=None): def sos_get_command_output(command, timeout=TIMEOUT_DEFAULT, stderr=False, chroot=None, chdir=None, env=None, foreground=False, binary=False, sizelimit=None, poller=None, - to_file=False, runas=None): + to_file=False, tac=False, runas=None): # pylint: disable=too-many-locals,too-many-branches """Execute a command and return a dictionary of status and output, optionally changing root or current working directory before @@ -275,8 +277,15 @@ def _check_poller(proc): else: expanded_args.append(arg) if to_file: - # pylint: disable=consider-using-with - _output = open(to_file, 'w', encoding='utf-8') + if sizelimit: + # going to use HeadReader + _output = PIPE + elif tac: + # no limit but we need an intermediate file + _output = tempfile.TemporaryFile(dir=os.path.dirname(to_file)) + else: + # pylint: disable=consider-using-with + _output = open(to_file, 'wb') else: _output = PIPE try: @@ -285,10 +294,20 @@ def _check_poller(proc): bufsize=-1, env=cmd_env, close_fds=True, preexec_fn=_child_prep_fn) as p: - if not to_file: - reader = AsyncReader(p.stdout, sizelimit, binary) + if to_file: + if sizelimit: + if tac: + _output = tempfile.TemporaryFile( + dir=os.path.dirname(to_file) + ) + else: + # pylint: disable=consider-using-with + _output = open(to_file, 'wb') + reader = HeadReader(p.stdout, _output, sizelimit, binary) + else: + reader = FakeReader(p, binary) else: - reader = FakeReader(p, binary) + reader = TailReader(p.stdout, sizelimit, binary) if poller: while reader.running: @@ -301,19 +320,25 @@ def _check_poller(proc): except Exception: p.terminate() if to_file: - _output.close() + if tac: + with open(to_file, 'wb') as f_dst: + tac_logs(_output, f_dst, True) # until we separate timeouts from the `timeout` command # handle per-cmd timeouts via Plugin status checks reader.running = False return {'status': 124, 'output': reader.get_contents(), 'truncated': reader.is_full} - if to_file: - _output.close() # wait for Popen to set the returncode while p.poll() is None: pass + if to_file: + if tac: + with open(to_file, 'wb') as f_dst: + tac_logs(_output, f_dst, + reader.is_full or p.returncode != 0) + if p.returncode in (126, 127): stdout = b"" else: @@ -325,11 +350,52 @@ def _check_poller(proc): 'truncated': reader.is_full } except OSError as e: - if to_file: - _output.close() if e.errno == errno.ENOENT: return {'status': 127, 'output': "", 'truncated': ''} raise e + finally: + if hasattr(_output, 'close'): + _output.close() + + +def tac_logs(f_src, f_dst, drop_last_log=False): + """Python implementation of the tac utility with support + for multiline logs (starting with space). It is intended + to reverse the output of 'journalctl --reverse'. + """ + NEWLINE_B = b'\n' + NEWLINE_I = 10 + SPACE_I = 32 + f_src.flush() + if os.fstat(f_src.fileno()).st_size == 0: + return + with mmap.mmap(f_src.fileno(), 0, access=mmap.ACCESS_READ) as mm: + sep1 = sep2 = mm.size()-1 + if mm[sep2] != NEWLINE_I: + drop_last_log = True + while sep2 >= 0: + sep1 = mm.rfind(NEWLINE_B, 0, sep1) + # multiline logs have a first line not starting with space + # followed by lines starting with spaces + # line 5 + # line 4 + # multiline 4 + # line 3 + if mm[sep1+1] == SPACE_I: + # first line starts with a space + # (this should not happen) + if sep1 == -1: + break + # go find the previous NEWLINE + continue + # When we truncate or timeout, the last log + # might be a partial multiline log + if drop_last_log: + drop_last_log = False + else: + # write the (multi)line log ending with the NEWLINE + f_dst.write(mm[sep1+1:sep2+1]) + sep2 = sep1 def import_module(module_fqname, superclasses=None): @@ -475,8 +541,8 @@ def recursive_dict_values_by_key(dobj, keys=[]): class FakeReader(): - """Used as a replacement AsyncReader for when we are writing directly to - disk, and allows us to keep more simplified flows for executing, + """Used when we are writing directly to disk without sizelimits, + this allows us to keep more simplified flows for executing, monitoring, and collecting command output. """ @@ -496,8 +562,45 @@ def running(self): return self.process.poll() is None -class AsyncReader(threading.Thread): - """Used to limit command output to a given size without deadlocking +class HeadReader(threading.Thread): + """Used to 'head' the command output (f_src) to a given size + without deadlocking sos. Takes a sizelimit value in MB. + """ + + COPY_BUFSIZE = 1024*1024 + + def __init__(self, f_src, f_dst, sizelimit, binary): + super().__init__() + self.f_src = f_src + self.f_dst = f_dst + self.remaining = sizelimit * 1048576 # convert to bytes + self.binary = binary + self.running = True + self.start() + + def run(self): + """Reads from the f_src (Popen stdout pipe) until we reach sizelimit. + once done, close f_src to signal the program that we are done. + """ + while self.remaining > 0: + buf = self.f_src.read(min(self.remaining, self.COPY_BUFSIZE)) + if not buf: + break + self.f_dst.write(buf) + self.remaining -= len(buf) + self.f_src.close() + self.running = False + + def get_contents(self): + return '' if not self.binary else b'' + + @property + def is_full(self): + return self.remaining <= 0 + + +class TailReader(threading.Thread): + """Used to tail the command output to a given size without deadlocking sos. Takes a sizelimit value in MB, and will compile stdout from Popen into a diff --git a/tests/report_tests/plugin_tests/defaults.py b/tests/report_tests/plugin_tests/defaults.py index 4d790114d1..26efdf7f3b 100644 --- a/tests/report_tests/plugin_tests/defaults.py +++ b/tests/report_tests/plugin_tests/defaults.py @@ -36,7 +36,7 @@ def test_journal_collected(self): _m = self.get_plugin_manifest('cups') ent = None for cmd in _m['commands']: - if cmd['exec'] == 'journalctl --no-pager --unit cups': + if cmd['exec'] == 'journalctl --no-pager --unit cups --reverse': ent = cmd assert ent, "No manifest entry for journalctl cups" diff --git a/tests/unittests/utilities_tests.py b/tests/unittests/utilities_tests.py index d8d5bed5f4..6e48dfb581 100644 --- a/tests/unittests/utilities_tests.py +++ b/tests/unittests/utilities_tests.py @@ -6,13 +6,14 @@ # # See the LICENSE file in the source distribution for further information. import os.path +import tempfile import unittest # PYCOMPAT from io import StringIO from sos.utilities import (grep, is_executable, sos_get_command_output, - find, tail, shell_out) + find, tail, shell_out, tac_logs) TEST_DIR = os.path.dirname(__file__) @@ -101,4 +102,101 @@ def test_not_in_pattern(self): leaves = find("leaf", TEST_DIR, path_pattern="tests/path") self.assertFalse(any(name.endswith("leaf") for name in leaves)) + +class TacTest(unittest.TestCase): + + @staticmethod + def tac_logs_str(str_src, drop_last_log): + """Helper to use tac_logs with strings instead of file descriptors""" + with tempfile.TemporaryFile() as f_src, \ + tempfile.TemporaryFile() as f_dst: + f_src.write(str_src) + tac_logs(f_src, f_dst, drop_last_log) + f_dst.seek(0) + return f_dst.read() + + def test_tac_limits(self): + self.assertEqual(self.tac_logs_str(b"", True), b"") + self.assertEqual(self.tac_logs_str(b"", False), b"") + + self.assertEqual(self.tac_logs_str(b"\n", True), b"") + self.assertEqual(self.tac_logs_str(b"\n", False), b"\n") + + self.assertEqual(self.tac_logs_str(b"\n\n\n", True), b"\n\n") + self.assertEqual(self.tac_logs_str(b"\n\n\n", False), b"\n\n\n") + + self.assertEqual(self.tac_logs_str(b" ", True), b"") + self.assertEqual(self.tac_logs_str(b" ", False), b"") + + def test_tac_partialline(self): + tac = (b"line 3\n" + b"line 2\n" + b"line 1 no new line") + + cat = (b"line 2\n" + b"line 3\n") + + # partial log line are always dropped + self.assertEqual(self.tac_logs_str(tac, True), cat) + self.assertEqual(self.tac_logs_str(tac, False), cat) + + def test_tac_multiline1(self): + tac = (b"line 5\n" + b"line 4\n" + b"multiline 3.0\n" + b" multiline 3.1\n" + b" multiline 3.2\n" + b"line 2\n" + b"maybe multiline 1.0\n") + + cat1 = (b"line 2\n" + b"multiline 3.0\n" + b" multiline 3.1\n" + b" multiline 3.2\n" + b"line 4\n" + b"line 5\n") + + cat2 = (b"maybe multiline 1.0\n" + b"line 2\n" + b"multiline 3.0\n" + b" multiline 3.1\n" + b" multiline 3.2\n" + b"line 4\n" + b"line 5\n") + + self.assertEqual(self.tac_logs_str(tac, True), cat1) + self.assertEqual(self.tac_logs_str(tac, False), cat2) + + def test_tac_multiline2(self): + tac = (b"line 3\n" + b"line 2\n" + b"multiline 1.0\n" + b" multiline 1.1\n" + b" multiline 1.2\n") + + cat1 = (b"line 2\n" + b"line 3\n") + + cat2 = (b"multiline 1.0\n" + b" multiline 1.1\n" + b" multiline 1.2\n" + b"line 2\n" + b"line 3\n") + + self.assertEqual(self.tac_logs_str(tac, True), cat1) + self.assertEqual(self.tac_logs_str(tac, False), cat2) + + def test_tac_multiline_partial(self): + tac = (b"line 3\n" + b"line 2\n" + b"multiline 1.0\n" + b" multiline 1.1\n" + b" multiline 1.2") + + cat = (b"line 2\n" + b"line 3\n") + + self.assertEqual(self.tac_logs_str(tac, True), cat) + self.assertEqual(self.tac_logs_str(tac, False), cat) + # vim: set et ts=4 sw=4 :