From f2d1739ea0a50630b886b5ef38144b6ae7ead4f6 Mon Sep 17 00:00:00 2001 From: Umit Kablan Date: Tue, 24 Sep 2024 15:33:04 +0300 Subject: [PATCH] FIX Add ability to regexp-replace repo.list files Fixes: PAUX-6369 Invalid APT sources produced by source switch procedure --- pleskdistup/actions/distupgrade.py | 87 +++++++++++++++++++++++++++ pleskdistup/actions/emails.py | 2 +- pleskdistup/common/src/files.py | 28 ++++++--- pleskdistup/common/src/motd.py | 8 +-- pleskdistup/common/tests/motdtests.py | 9 +-- 5 files changed, 115 insertions(+), 19 deletions(-) diff --git a/pleskdistup/actions/distupgrade.py b/pleskdistup/actions/distupgrade.py index b7b01bc..c239bd2 100644 --- a/pleskdistup/actions/distupgrade.py +++ b/pleskdistup/actions/distupgrade.py @@ -1,5 +1,6 @@ # Copyright 2023-2024. WebPros International GmbH. All rights reserved. import os +import re import subprocess import typing import urllib.request @@ -145,6 +146,92 @@ def estimate_revert_time(self) -> int: return 20 +class ReplaceAptReposRegexp(action.ActiveAction): + from_regexp: str + to_regexp: str + sources_list_path: str + sources_list_d_path: str + _name: str + + def __init__( + self, + from_regexp: str, + to_regexp: str, + sources_list_path: str = "/etc/apt/sources.list", + sources_list_d_path: str = "/etc/apt/sources.list.d/", + name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}", + ) -> None: + self.from_regexp = from_regexp + self.to_regexp = to_regexp + self.sources_list_path = sources_list_path + self.sources_list_d_path = sources_list_d_path + + self._name = name + + @property + def name(self): + return self._name.format(self=self) + + def _apply_replace_to_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None: + changed = False + new_lines = [] + with open(fpath) as f: + for line in f: + new_lines.append(ptrn.sub(to_regexp, line)) + if new_lines[-1] != line: + changed = True + if not changed: + return + files.backup_file(fpath) + with open(fpath, 'w') as f: + f.writelines(new_lines) + + def _get_all_repo_list_files(self) -> typing.List[str]: + ret = [self.sources_list_path] + for root, _, filenames in os.walk(self.sources_list_d_path): + for f in filenames: + if f.endswith(".list"): + ret.append(os.path.join(root, f)) + return ret + + def _rm_backups(self) -> None: + for f in self._get_all_repo_list_files(): + files.remove_backup(f, log.debug) + + def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None: + p = re.compile(from_regexp) + for f in self._get_all_repo_list_files(): + self._apply_replace_to_file(f, p, to_regexp) + + def _revert_all(self) -> None: + for f in self._get_all_repo_list_files(): + files.restore_file_from_backup(f) + + def _prepare_action(self) -> action.ActionResult: + self._change_by_regexp(self.from_regexp, self.to_regexp) + packages.update_package_list() + return action.ActionResult() + + def _post_action(self) -> action.ActionResult: + self._rm_backups() + return action.ActionResult() + + def _revert_action(self) -> action.ActionResult: + self._revert_all() + packages.update_package_list() + return action.ActionResult() + + def estimate_prepare_time(self) -> int: + return 22 + + def estimate_revert_time(self) -> int: + return 22 + + +ReplaceAptReposRegexpDebian = ReplaceAptReposRegexp +ReplaceAptReposRegexpUbuntu = ReplaceAptReposRegexp + + class SetupAptRepositories(action.ActiveAction): from_codename: str to_codename: str diff --git a/pleskdistup/actions/emails.py b/pleskdistup/actions/emails.py index 83bcab4..a37f8d3 100644 --- a/pleskdistup/actions/emails.py +++ b/pleskdistup/actions/emails.py @@ -80,7 +80,7 @@ def _prepare_action(self) -> action.ActionResult: return action.ActionResult() def _post_action(self) -> action.ActionResult: - path_to_backup = os.path.join(self.temp_directory, "dovecot.conf.bak") + path_to_backup = os.path.join(self.temp_directory, "dovecot.conf" + files.DEFAULT_BACKUP_EXTENSION) if os.path.exists(self.dovecot_config_path): shutil.copy(self.dovecot_config_path, path_to_backup) motd.add_finish_ssh_login_message(f"The dovecot configuration '{self.dovecot_config_path}' has been restored from original distro. Modern configuration was placed in '{path_to_backup}'.\n") diff --git a/pleskdistup/common/src/files.py b/pleskdistup/common/src/files.py index eb8f4a9..8ae1868 100644 --- a/pleskdistup/common/src/files.py +++ b/pleskdistup/common/src/files.py @@ -11,6 +11,7 @@ PathType = typing.Union[os.PathLike, str] +DEFAULT_BACKUP_EXTENSION = ".conversion.bak" def replace_string(filename: str, original_substring: str, new_substring: str) -> None: with open(filename, "r") as original, open(filename + ".next", "w") as dst: @@ -59,25 +60,32 @@ def get_last_lines(filename: PathType, n: int) -> typing.List[str]: return f.readlines()[-n:] -def backup_file(filename: str) -> None: +def backup_file(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> None: if os.path.exists(filename): - shutil.copy(filename, filename + ".bak") + shutil.copy(filename, filename + ext) -def backup_exists(filename: str) -> bool: - return os.path.exists(filename + ".bak") +def backup_exists(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> bool: + return os.path.exists(filename + ext) -def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None: - if os.path.exists(filename + ".bak"): - shutil.move(filename + ".bak", filename) +def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False, + ext: str = DEFAULT_BACKUP_EXTENSION) -> None: + if os.path.exists(filename + ext): + shutil.move(filename + ext, filename) elif remove_if_no_backup and os.path.exists(filename): os.remove(filename) -def remove_backup(filename: str) -> None: - if os.path.exists(filename + ".bak"): - os.remove(filename + ".bak") +def remove_backup(filename: str, logf : typing.Optional[typing.Callable] = None, + ext: str = DEFAULT_BACKUP_EXTENSION) -> None: + try: + if os.path.exists(filename + ext): + os.remove(filename + ext) + except Exception as ex: + if logf is None: + raise + logf(f"failed to remove backup ({filename}): {ex}") def __get_files_recursive(path: str) -> typing.Iterator[str]: diff --git a/pleskdistup/common/src/motd.py b/pleskdistup/common/src/motd.py index af2b15c..dc9f666 100644 --- a/pleskdistup/common/src/motd.py +++ b/pleskdistup/common/src/motd.py @@ -13,11 +13,11 @@ def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None: def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: try: - if not os.path.exists(motd_path + ".bak"): + if not files.backup_exists(motd_path): if os.path.exists(motd_path): files.backup_file(motd_path) else: - with open(motd_path + ".bak", "a") as motd: + with open(motd_path + files.DEFAULT_BACKUP_EXTENSION, "a") as motd: pass with open(motd_path, "a") as motd: @@ -39,8 +39,8 @@ def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) - def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: try: if not os.path.exists(motd_path + ".next"): - if os.path.exists(motd_path + ".bak"): - shutil.copy(motd_path + ".bak", motd_path + ".next") + if os.path.exists(motd_path + files.DEFAULT_BACKUP_EXTENSION): + shutil.copy(motd_path + files.DEFAULT_BACKUP_EXTENSION, motd_path + ".next") with open(motd_path + ".next", "a") as motd: motd.write(FINISH_INTRODUCE_MESSAGE) diff --git a/pleskdistup/common/tests/motdtests.py b/pleskdistup/common/tests/motdtests.py index 5ec4125..f43abfb 100644 --- a/pleskdistup/common/tests/motdtests.py +++ b/pleskdistup/common/tests/motdtests.py @@ -4,6 +4,7 @@ import tempfile import src.motd as motd +import src.files as files class InprogressSshLoginMessageTests(unittest.TestCase): @@ -11,7 +12,7 @@ def setUp(self): self.motd_path = tempfile.mktemp() def tearDown(self): - for path in [self.motd_path, self.motd_path + ".bak"]: + for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION]: if os.path.exists(path): os.remove(path) @@ -37,7 +38,7 @@ def test_old_backed_up(self): with open(self.motd_path) as motd_file: self.assertEqual(motd_file.read(), "old\nnew\n") - with open(self.motd_path + ".bak") as motd_file: + with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION) as motd_file: self.assertEqual(motd_file.read(), "old\n") def test_restore(self): @@ -57,7 +58,7 @@ def setUp(self): self.motd_path = tempfile.mktemp() def tearDown(self): - for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]: + for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION, self.motd_path + ".next"]: if os.path.exists(path): os.remove(path) @@ -106,7 +107,7 @@ def test_backed_up_message_saved(self): =============================================================================== """.format(motd.MOTD_PATH) - with open(self.motd_path + ".bak", "w") as motd_file: + with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION, "w") as motd_file: motd_file.write("old\n") motd.add_inprogress_ssh_login_message("new\n", self.motd_path)