Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[utilities,plugins] speedup journal collection (v2) #3879

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions sos/report/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,7 @@ def _add_cmd_output(self, **kwargs):
kwargs['changes'] = False
if (not getattr(SoSCommand(**kwargs), "snap_cmd", False) and
(self.get_option('all_logs') or kwargs['sizelimit'] == 0)):
kwargs['sizelimit'] = 0
kwargs['to_file'] = True
if "snap_cmd" in kwargs:
kwargs.pop("snap_cmd")
Expand Down Expand Up @@ -2372,7 +2373,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
binary=False, sizelimit=None, subdir=None,
changes=False, foreground=False, tags=[],
priority=10, cmd_as_tag=False, to_file=False,
container_cmd=False, runas=None):
tac=False, container_cmd=False, runas=None):
"""Execute a command and save the output to a file for inclusion in the
report.

Expand Down Expand Up @@ -2400,6 +2401,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
:param cmd_as_tag: Format command string to tag
:param to_file: Write output directly to file instead
of saving in memory
:param tac: Reverse log lines order
:param runas: Run the `cmd` as the `runas` user

:returns: dict containing status, output, and filename in the
Expand Down Expand Up @@ -2451,7 +2453,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
cmd, timeout=timeout, stderr=stderr, chroot=root,
chdir=runat, env=_env, binary=binary, sizelimit=sizelimit,
poller=self.check_timeout, foreground=foreground,
to_file=out_file, runas=runas
to_file=out_file, tac=tac, runas=runas
)

end = time()
Expand Down Expand Up @@ -2489,7 +2491,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
result = sos_get_command_output(
cmd, timeout=timeout, chroot=False, chdir=runat,
env=env, binary=binary, sizelimit=sizelimit,
poller=self.check_timeout, to_file=out_file
poller=self.check_timeout, to_file=out_file, tac=tac,
)
run_time = time() - start
self._log_debug(f"could not run '{cmd}': command not found")
Expand All @@ -2508,6 +2510,9 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
"truncated")
linkfn = outfn
outfn = outfn.replace('sos_commands', 'sos_strings') + '.tailed'
if out_file:
dest = self.archive.check_path(outfn, P_FILE, force=True)
os.rename(out_file, dest)

if not to_file:
if binary:
Expand Down Expand Up @@ -3083,8 +3088,15 @@ def add_journal(self, units=None, boot=None, since=None, until=None,
if output:
journal_cmd += output_opt % output

fname = journal_cmd
tac = False
if log_size > 0:
journal_cmd = f"{journal_cmd} --reverse"
tac = True

self._log_debug(f"collecting journal: {journal_cmd}")
self._add_cmd_output(cmd=journal_cmd, timeout=timeout,
tac=tac, to_file=True, suggest_filename=fname,
sizelimit=log_size, pred=pred, tags=tags,
priority=priority)

Expand Down
133 changes: 118 additions & 15 deletions sos/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading
import time
import io
import mmap
from contextlib import closing
from collections import deque

Expand Down Expand Up @@ -71,6 +72,7 @@
'recursive_dict_values_by_key',
'shell_out',
'sos_get_command_output',
'tac_logs',
'tail',
]

Expand Down Expand Up @@ -213,7 +215,7 @@ def is_executable(command, sysroot=None):
def sos_get_command_output(command, timeout=TIMEOUT_DEFAULT, stderr=False,
chroot=None, chdir=None, env=None, foreground=False,
binary=False, sizelimit=None, poller=None,
to_file=False, runas=None):
to_file=False, tac=False, runas=None):
# pylint: disable=too-many-locals,too-many-branches
"""Execute a command and return a dictionary of status and output,
optionally changing root or current working directory before
Expand Down Expand Up @@ -275,8 +277,15 @@ def _check_poller(proc):
else:
expanded_args.append(arg)
if to_file:
# pylint: disable=consider-using-with
_output = open(to_file, 'w', encoding='utf-8')
if sizelimit:
# going to use HeadReader
_output = PIPE
elif tac:
# no limit but we need an intermediate file
_output = tempfile.TemporaryFile(dir=os.path.dirname(to_file))
else:
# pylint: disable=consider-using-with
_output = open(to_file, 'wb')
Fixed Show fixed Hide fixed
else:
_output = PIPE
try:
Expand All @@ -285,10 +294,20 @@ def _check_poller(proc):
bufsize=-1, env=cmd_env, close_fds=True,
preexec_fn=_child_prep_fn) as p:

if not to_file:
reader = AsyncReader(p.stdout, sizelimit, binary)
if to_file:
if sizelimit:
if tac:
_output = tempfile.TemporaryFile(
dir=os.path.dirname(to_file)
)
else:
# pylint: disable=consider-using-with
_output = open(to_file, 'wb')
Fixed Show fixed Hide fixed
reader = HeadReader(p.stdout, _output, sizelimit, binary)
else:
reader = FakeReader(p, binary)
else:
reader = FakeReader(p, binary)
reader = TailReader(p.stdout, sizelimit, binary)

if poller:
while reader.running:
Expand All @@ -301,19 +320,25 @@ def _check_poller(proc):
except Exception:
p.terminate()
if to_file:
_output.close()
if tac:
with open(to_file, 'wb') as f_dst:
tac_logs(_output, f_dst, True)
# until we separate timeouts from the `timeout` command
# handle per-cmd timeouts via Plugin status checks
reader.running = False
return {'status': 124, 'output': reader.get_contents(),
'truncated': reader.is_full}
if to_file:
_output.close()

# wait for Popen to set the returncode
while p.poll() is None:
pass

if to_file:
if tac:
with open(to_file, 'wb') as f_dst:
tac_logs(_output, f_dst,
reader.is_full or p.returncode != 0)

if p.returncode in (126, 127):
stdout = b""
else:
Expand All @@ -325,11 +350,52 @@ def _check_poller(proc):
'truncated': reader.is_full
}
except OSError as e:
if to_file:
_output.close()
if e.errno == errno.ENOENT:
return {'status': 127, 'output': "", 'truncated': ''}
raise e
finally:
if hasattr(_output, 'close'):
_output.close()


def tac_logs(f_src, f_dst, drop_last_log=False):
"""Python implementation of the tac utility with support
for multiline logs (starting with space). It is intended
to reverse the output of 'journalctl --reverse'.
"""
NEWLINE_B = b'\n'
NEWLINE_I = 10
SPACE_I = 32
f_src.flush()
if os.fstat(f_src.fileno()).st_size == 0:
return
with mmap.mmap(f_src.fileno(), 0, access=mmap.ACCESS_READ) as mm:
sep1 = sep2 = mm.size()-1
if mm[sep2] != NEWLINE_I:
drop_last_log = True
while sep2 >= 0:
sep1 = mm.rfind(NEWLINE_B, 0, sep1)
# multiline logs have a first line not starting with space
# followed by lines starting with spaces
# line 5
# line 4
# multiline 4
# line 3
if mm[sep1+1] == SPACE_I:
# first line starts with a space
# (this should not happen)
if sep1 == -1:
break
# go find the previous NEWLINE
continue
# When we truncate or timeout, the last log
# might be a partial multiline log
if drop_last_log:
drop_last_log = False
else:
# write the (multi)line log ending with the NEWLINE
f_dst.write(mm[sep1+1:sep2+1])
sep2 = sep1


def import_module(module_fqname, superclasses=None):
Expand Down Expand Up @@ -475,8 +541,8 @@ def recursive_dict_values_by_key(dobj, keys=[]):


class FakeReader():
"""Used as a replacement AsyncReader for when we are writing directly to
disk, and allows us to keep more simplified flows for executing,
"""Used when we are writing directly to disk without sizelimits,
this allows us to keep more simplified flows for executing,
monitoring, and collecting command output.
"""

Expand All @@ -496,8 +562,45 @@ def running(self):
return self.process.poll() is None


class AsyncReader(threading.Thread):
"""Used to limit command output to a given size without deadlocking
class HeadReader(threading.Thread):
"""Used to 'head' the command output (f_src) to a given size
without deadlocking sos. Takes a sizelimit value in MB.
"""

COPY_BUFSIZE = 1024*1024

def __init__(self, f_src, f_dst, sizelimit, binary):
super().__init__()
self.f_src = f_src
self.f_dst = f_dst
self.remaining = sizelimit * 1048576 # convert to bytes
self.binary = binary
self.running = True
self.start()

def run(self):
"""Reads from the f_src (Popen stdout pipe) until we reach sizelimit.
once done, close f_src to signal the program that we are done.
"""
while self.remaining > 0:
buf = self.f_src.read(min(self.remaining, self.COPY_BUFSIZE))
if not buf:
break
self.f_dst.write(buf)
self.remaining -= len(buf)
self.f_src.close()
self.running = False

def get_contents(self):
return '' if not self.binary else b''

@property
def is_full(self):
return self.remaining <= 0


class TailReader(threading.Thread):
"""Used to tail the command output to a given size without deadlocking
sos.

Takes a sizelimit value in MB, and will compile stdout from Popen into a
Expand Down
2 changes: 1 addition & 1 deletion tests/report_tests/plugin_tests/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_journal_collected(self):
_m = self.get_plugin_manifest('cups')
ent = None
for cmd in _m['commands']:
if cmd['exec'] == 'journalctl --no-pager --unit cups':
if cmd['exec'] == 'journalctl --no-pager --unit cups --reverse':
ent = cmd
assert ent, "No manifest entry for journalctl cups"

Expand Down
Loading
Loading