From 43600580f9462f915ed1ac172779c4268c3e7fb1 Mon Sep 17 00:00:00 2001 From: Umit Kablan Date: Wed, 2 Oct 2024 17:08:11 +0300 Subject: [PATCH] Fix apt-get lock held by another process error by retry --- pleskdistup/common/src/dpkg.py | 89 ++++++++++++++++++++++++++++------ pleskdistup/common/src/util.py | 82 +++++++++++++++++++++++-------- 2 files changed, 135 insertions(+), 36 deletions(-) diff --git a/pleskdistup/common/src/dpkg.py b/pleskdistup/common/src/dpkg.py index 6ccf416..da43064 100644 --- a/pleskdistup/common/src/dpkg.py +++ b/pleskdistup/common/src/dpkg.py @@ -5,8 +5,11 @@ import re import subprocess import typing +import time -from . import files, util +from . import files, log, util + +DPKG_TEMPFAIL_RETRY: typing.List[int] = [30, 60, 90, 120] APT_CHOOSE_OLD_FILES_OPTIONS = [ "-o", "Dpkg::Options::=--force-confdef", @@ -122,9 +125,59 @@ def safely_install_packages( install_packages(pkgs, repository, force_package_config) +def _exec_retry_when_locked( + apt_get_cmd: typing.List[str], + tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None, + collect_stdout: bool = False, +) -> str: + cant_get_lock = False + stdout = [] + + if tmpfail_retry_intervals is None: + tmpfail_retry_intervals = DPKG_TEMPFAIL_RETRY + + def process_stdout(line: str) -> None: + if collect_stdout: + nonlocal stdout + stdout.append(line) + log.info("stdout: {}".format(line.rstrip('\n')), to_stream=False) + + def process_stderr(line: str) -> None: + log.info("stderr: {}".format(line.rstrip('\n'))) + nonlocal cant_get_lock + if cant_get_lock: + return + if "E: Could not get lock" in line: + cant_get_lock = True + + i = 0 + while True: + cant_get_lock = False + stdout.clear() + log.info(f"Executing: {' '.join(apt_get_cmd)}") + exit_code = util.exec_get_output_streamed( + apt_get_cmd, process_stdout, process_stderr, + env={ + "PATH": os.environ["PATH"], + "DEBIAN_FRONTEND": "noninteractive", + "LC_ALL": "C", + "LANG": "C", + }, + ) + if exit_code == 0: + break + if i >= len(tmpfail_retry_intervals) or not cant_get_lock: + raise subprocess.CalledProcessError(returncode=exit_code, cmd=apt_get_cmd) + log.info(f"{apt_get_cmd[0]} failed because lock is already held, will retry in {tmpfail_retry_intervals[i]} seconds..") + time.sleep(tmpfail_retry_intervals[i]) + i += 1 + return "".join(stdout) + + def remove_packages( pkgs: typing.List[str], simulate: bool = False, + tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None, ) -> typing.Optional[typing.Dict[str, typing.List[PackageEntry]]]: if len(pkgs) == 0: return None @@ -133,7 +186,7 @@ def remove_packages( if simulate: cmd.append("--simulate") cmd += pkgs - cmd_out = util.logged_check_call(cmd) + cmd_out = _exec_retry_when_locked(cmd, tmpfail_retry_intervals, collect_stdout=True) if simulate: return _parse_apt_get_simulation(cmd_out) return None @@ -142,6 +195,7 @@ def remove_packages( def safely_remove_packages( pkgs: typing.List[str], protected_pkgs: typing.Optional[typing.Iterable[str]] = None, + tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None, ) -> None: sim_res = remove_packages(pkgs, simulate=True) if sim_res is not None and protected_pkgs is not None: @@ -149,28 +203,32 @@ def safely_remove_packages( violations = _find_protection_violations(sim_res, protected_set) if violations: raise PackageProtectionError(protected_packages=violations) - remove_packages(pkgs) + remove_packages(pkgs, False, tmpfail_retry_intervals) def find_related_repofiles(repository_file: str) -> typing.List[str]: return files.find_files_case_insensitive("/etc/apt/sources.list.d", repository_file) -def update_package_list() -> None: - util.logged_check_call(["/usr/bin/apt-get", "update", "-y"]) +def update_package_list(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None: + cmd = ["/usr/bin/apt-get", "update", "-y"] + _exec_retry_when_locked(cmd, tmpfail_retry_intervals) -def upgrade_packages(pkgs: typing.Optional[typing.List[str]] = None) -> None: +def upgrade_packages( + pkgs: typing.Optional[typing.List[str]] = None, + tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None, +) -> None: if pkgs is None: pkgs = [] cmd = ["/usr/bin/apt-get", "upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS + pkgs - util.logged_check_call(cmd, env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) + _exec_retry_when_locked(cmd, tmpfail_retry_intervals) -def autoremove_outdated_packages() -> None: - util.logged_check_call(["/usr/bin/apt-get", "autoremove", "-y"], - env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) +def autoremove_outdated_packages(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None: + cmd = ["/usr/bin/apt-get", "autoremove", "-y"] + _exec_retry_when_locked(cmd, tmpfail_retry_intervals) def depconfig_parameter_set(parameter: str, value: str) -> None: @@ -184,13 +242,14 @@ def depconfig_parameter_get(parameter: str) -> str: return process.stdout.split(" ")[1].strip() -def restore_installation() -> None: - util.logged_check_call(["/usr/bin/apt-get", "-f", "install", "-y"]) +def restore_installation(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None: + cmd = ["/usr/bin/apt-get", "-f", "install", "-y"] + _exec_retry_when_locked(cmd, tmpfail_retry_intervals) -def do_distupgrade() -> None: - util.logged_check_call(["apt-get", "dist-upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS, - env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"}) +def do_distupgrade(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None: + cmd = ["apt-get", "dist-upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS + _exec_retry_when_locked(cmd, tmpfail_retry_intervals) def get_installed_packages_list(regex: str) -> typing.List[typing.Tuple[str, str]]: diff --git a/pleskdistup/common/src/util.py b/pleskdistup/common/src/util.py index 4ac8c91..aff70d6 100644 --- a/pleskdistup/common/src/util.py +++ b/pleskdistup/common/src/util.py @@ -6,34 +6,74 @@ from . import log +def log_outputs_check_call( + cmd: typing.Union[typing.Sequence[str], str], + collect_return_stdout: bool = False, + **kwargs, +) -> str: + ''' + Runs cmd and raises on nonzero exit code. Returns stdout when collect_return_stdout + ''' + log.info(f"Running: {cmd!r}. Output:") + stdout = [] + + def proc_stdout(line: str) -> None: + log.info("stdout: {}".format(line.rstrip('\n')), to_stream=False) + if collect_return_stdout: + stdout.append(line) + + def proc_stderr(line: str) -> None: + log.info("stderr: {}".format(line.rstrip('\n'))) + + exit_code = exec_get_output_streamed(cmd, proc_stdout, proc_stderr, **kwargs) + if exit_code != 0: + log.err(f"Command {cmd!r} failed with return code {exit_code}") + raise subprocess.CalledProcessError(returncode=exit_code, cmd=cmd) + + log.info(f"Command {cmd!r} finished successfully") + return "".join(stdout) + + # Returns standard output def logged_check_call(cmd: typing.Union[typing.Sequence[str], str], **kwargs) -> str: - log.info(f"Running: {cmd!r}. Output:") + return log_outputs_check_call(cmd, collect_return_stdout=True, **kwargs) + - # I beleive we should be able pass argument to the subprocess function - # from the caller. So we have to inject stdout/stderr/universal_newlines - kwargs["stdout"] = subprocess.PIPE - kwargs["stderr"] = subprocess.STDOUT +def exec_get_output_streamed( + cmd: typing.Union[typing.Sequence[str], str], + process_stdout_line: typing.Optional[typing.Callable[[str], None]], + process_stderr_line: typing.Optional[typing.Callable[[str], None]], + **kwargs, +) -> int: + ''' + Allows to get stdout/stderr by streaming line by line, by calling callbacks + and returns process exit code + ''' + kwargs["stdout"] = (subprocess.DEVNULL if process_stdout_line is None + else subprocess.PIPE) + kwargs["stderr"] = (subprocess.DEVNULL if process_stderr_line is None + else subprocess.PIPE) kwargs["universal_newlines"] = True - stdout = [] process = subprocess.Popen(cmd, **kwargs) - while None is process.poll(): - if not process.stdout: - log.err(f"Can't get process output from {cmd!r}") - raise RuntimeError(f"Can't get process output from {cmd!r}") - line = process.stdout.readline() - if line: - stdout.append(line) - if line.strip(): - log.info(line.strip(), to_stream=False) + if process_stdout_line is None and process_stderr_line is None: + process.communicate() + return process.returncode - if process.returncode != 0: - log.err(f"Command {cmd!r} failed with return code {process.returncode}") - raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd, output="\n".join(stdout)) - - log.info(f"Command {cmd!r} finished successfully") - return "\n".join(stdout) + while process.poll() is None: + if process_stdout_line is not None: + if not process.stdout: + raise RuntimeError(f"Cannot get process stdout of command {cmd!r}") + line = process.stdout.readline() + if line: + process_stdout_line(line) + if process_stderr_line is not None: + if not process.stderr: + raise RuntimeError(f"Cannot get process stderr of command {cmd!r}") + line = process.stderr.readline() + if line: + process_stderr_line(line) + return process.returncode def merge_dicts_of_lists(