diff --git a/pleskdistup/actions/distupgrade.py b/pleskdistup/actions/distupgrade.py index b7b01bc..47fbf9c 100644 --- a/pleskdistup/actions/distupgrade.py +++ b/pleskdistup/actions/distupgrade.py @@ -1,5 +1,6 @@ # Copyright 2023-2024. WebPros International GmbH. All rights reserved. import os +import re import subprocess import typing import urllib.request @@ -145,6 +146,143 @@ def estimate_revert_time(self) -> int: return 20 +class CheckAptReposBackups(action.CheckAction): + description: str + _name: str + sources_list_path: str + sources_list_d_path: str + + @staticmethod + def get_all_repo_list_files(sources_list_path: str, sources_list_d_path: str) -> typing.List[str]: + ret = [sources_list_path] + for root, _, filenames in os.walk(sources_list_d_path): + for f in filenames: + if f.endswith(".list"): + ret.append(os.path.join(root, f)) + return ret + + def __init__( + self, + sources_list_path: str = "/etc/apt/sources.list", + sources_list_d_path: str = "/etc/apt/sources.list.d/", + ) -> None: + self._name = "Check repo.list leftovers from previous conversion" + self.description = """These repo.list backups are found: +\t{} +Please use `--revert` flag to restore and remove those backups. +""" + self.sources_list_path = sources_list_path + self.sources_list_d_path = sources_list_d_path + + @property + def name(self) -> str: + return self._name.format(self) + + @name.setter + def name(self, val: str) -> None: + self._name = val + + def _do_check(self) -> bool: + log.debug("Checking leftover repo.list files") + archived_files = [ + f for f in + CheckAptReposBackups.get_all_repo_list_files(self.sources_list_path, self.sources_list_d_path) + if files.backup_exists(f) + ] + if archived_files: + self.description = self.description.format(",".join(archived_files)) + return False + return True + + +class ReplaceAptReposRegexp(action.ActiveAction): + from_regexp: str + to_regexp: str + sources_list_path: str + sources_list_d_path: str + _name: str + + def __init__( + self, + from_regexp: str, + to_regexp: str, + sources_list_path: str = "/etc/apt/sources.list", + sources_list_d_path: str = "/etc/apt/sources.list.d/", + name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}", + ) -> None: + self.from_regexp = from_regexp + self.to_regexp = to_regexp + self.sources_list_path = sources_list_path + self.sources_list_d_path = sources_list_d_path + + self._name = name + + @property + def name(self) -> str: + return self._name.format(self=self) + + @name.setter + def name(self, val: str) -> None: + self._name = val + + def _apply_replace_to_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None: + changed = False + new_lines = [] + with open(fpath) as f: + for line in f: + new_lines.append(ptrn.sub(to_regexp, line)) + if new_lines[-1] != line: + changed = True + if not changed: + return + files.backup_file(fpath) + with open(fpath, 'w') as f: + f.writelines(new_lines) + + def _get_all_repo_list_files(self) -> typing.List[str]: + return CheckAptReposBackups.get_all_repo_list_files( + self.sources_list_path, + self.sources_list_d_path, + ) + + def _rm_backups(self) -> None: + for f in self._get_all_repo_list_files(): + files.remove_backup(f, False, log.debug) + + def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None: + p = re.compile(from_regexp) + for f in self._get_all_repo_list_files(): + self._apply_replace_to_file(f, p, to_regexp) + + def _revert_all(self) -> None: + for f in self._get_all_repo_list_files(): + files.restore_file_from_backup(f) + + def _prepare_action(self) -> action.ActionResult: + self._change_by_regexp(self.from_regexp, self.to_regexp) + packages.update_package_list() + return action.ActionResult() + + def _post_action(self) -> action.ActionResult: + self._rm_backups() + return action.ActionResult() + + def _revert_action(self) -> action.ActionResult: + self._revert_all() + packages.update_package_list() + return action.ActionResult() + + def estimate_prepare_time(self) -> int: + return 22 + + def estimate_revert_time(self) -> int: + return 22 + + +ReplaceAptReposRegexpDebian = ReplaceAptReposRegexp +ReplaceAptReposRegexpUbuntu = ReplaceAptReposRegexp + + class SetupAptRepositories(action.ActiveAction): from_codename: str to_codename: str diff --git a/pleskdistup/actions/emails.py b/pleskdistup/actions/emails.py index 83bcab4..a37f8d3 100644 --- a/pleskdistup/actions/emails.py +++ b/pleskdistup/actions/emails.py @@ -80,7 +80,7 @@ def _prepare_action(self) -> action.ActionResult: return action.ActionResult() def _post_action(self) -> action.ActionResult: - path_to_backup = os.path.join(self.temp_directory, "dovecot.conf.bak") + path_to_backup = os.path.join(self.temp_directory, "dovecot.conf" + files.DEFAULT_BACKUP_EXTENSION) if os.path.exists(self.dovecot_config_path): shutil.copy(self.dovecot_config_path, path_to_backup) motd.add_finish_ssh_login_message(f"The dovecot configuration '{self.dovecot_config_path}' has been restored from original distro. Modern configuration was placed in '{path_to_backup}'.\n") diff --git a/pleskdistup/common/src/files.py b/pleskdistup/common/src/files.py index eb8f4a9..2384ecb 100644 --- a/pleskdistup/common/src/files.py +++ b/pleskdistup/common/src/files.py @@ -11,6 +11,8 @@ PathType = typing.Union[os.PathLike, str] +DEFAULT_BACKUP_EXTENSION = ".conversion.bak" + def replace_string(filename: str, original_substring: str, new_substring: str) -> None: with open(filename, "r") as original, open(filename + ".next", "w") as dst: @@ -59,25 +61,41 @@ def get_last_lines(filename: PathType, n: int) -> typing.List[str]: return f.readlines()[-n:] -def backup_file(filename: str) -> None: +def backup_file(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> None: if os.path.exists(filename): - shutil.copy(filename, filename + ".bak") + shutil.copy(filename, filename + ext) -def backup_exists(filename: str) -> bool: - return os.path.exists(filename + ".bak") +def backup_exists(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> bool: + return os.path.exists(filename + ext) -def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None: - if os.path.exists(filename + ".bak"): - shutil.move(filename + ".bak", filename) +def restore_file_from_backup( + filename: str, + remove_if_no_backup: bool = False, + ext: str = DEFAULT_BACKUP_EXTENSION, +) -> None: + if os.path.exists(filename + ext): + shutil.move(filename + ext, filename) elif remove_if_no_backup and os.path.exists(filename): os.remove(filename) -def remove_backup(filename: str) -> None: - if os.path.exists(filename + ".bak"): - os.remove(filename + ".bak") +def remove_backup( + filename: str, + raise_exception: bool = True, + logf: typing.Optional[typing.Callable[[str], typing.Any]] = None, + ext: str = DEFAULT_BACKUP_EXTENSION, +) -> None: + backup_name = filename + ext + try: + if os.path.exists(backup_name): + os.remove(backup_name) + except Exception as ex: + if logf is not None: + logf(f"failed to remove backup ({backup_name}): {ex}") + if raise_exception: + raise def __get_files_recursive(path: str) -> typing.Iterator[str]: diff --git a/pleskdistup/common/src/motd.py b/pleskdistup/common/src/motd.py index af2b15c..dc9f666 100644 --- a/pleskdistup/common/src/motd.py +++ b/pleskdistup/common/src/motd.py @@ -13,11 +13,11 @@ def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None: def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: try: - if not os.path.exists(motd_path + ".bak"): + if not files.backup_exists(motd_path): if os.path.exists(motd_path): files.backup_file(motd_path) else: - with open(motd_path + ".bak", "a") as motd: + with open(motd_path + files.DEFAULT_BACKUP_EXTENSION, "a") as motd: pass with open(motd_path, "a") as motd: @@ -39,8 +39,8 @@ def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) - def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None: try: if not os.path.exists(motd_path + ".next"): - if os.path.exists(motd_path + ".bak"): - shutil.copy(motd_path + ".bak", motd_path + ".next") + if os.path.exists(motd_path + files.DEFAULT_BACKUP_EXTENSION): + shutil.copy(motd_path + files.DEFAULT_BACKUP_EXTENSION, motd_path + ".next") with open(motd_path + ".next", "a") as motd: motd.write(FINISH_INTRODUCE_MESSAGE) diff --git a/pleskdistup/common/tests/motdtests.py b/pleskdistup/common/tests/motdtests.py index 5ec4125..f43abfb 100644 --- a/pleskdistup/common/tests/motdtests.py +++ b/pleskdistup/common/tests/motdtests.py @@ -4,6 +4,7 @@ import tempfile import src.motd as motd +import src.files as files class InprogressSshLoginMessageTests(unittest.TestCase): @@ -11,7 +12,7 @@ def setUp(self): self.motd_path = tempfile.mktemp() def tearDown(self): - for path in [self.motd_path, self.motd_path + ".bak"]: + for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION]: if os.path.exists(path): os.remove(path) @@ -37,7 +38,7 @@ def test_old_backed_up(self): with open(self.motd_path) as motd_file: self.assertEqual(motd_file.read(), "old\nnew\n") - with open(self.motd_path + ".bak") as motd_file: + with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION) as motd_file: self.assertEqual(motd_file.read(), "old\n") def test_restore(self): @@ -57,7 +58,7 @@ def setUp(self): self.motd_path = tempfile.mktemp() def tearDown(self): - for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]: + for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION, self.motd_path + ".next"]: if os.path.exists(path): os.remove(path) @@ -106,7 +107,7 @@ def test_backed_up_message_saved(self): =============================================================================== """.format(motd.MOTD_PATH) - with open(self.motd_path + ".bak", "w") as motd_file: + with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION, "w") as motd_file: motd_file.write("old\n") motd.add_inprogress_ssh_login_message("new\n", self.motd_path)