Skip to content

Commit

Permalink
Merge pull request #76 from ukablan-wpc/main
Browse files Browse the repository at this point in the history
FIX Add ability to regexp-replace repo.list files
  • Loading branch information
ukablan-wpc authored Sep 30, 2024
2 parents e9e9500 + b8f9846 commit db08062
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 19 deletions.
138 changes: 138 additions & 0 deletions pleskdistup/actions/distupgrade.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023-2024. WebPros International GmbH. All rights reserved.
import os
import re
import subprocess
import typing
import urllib.request
Expand Down Expand Up @@ -145,6 +146,143 @@ def estimate_revert_time(self) -> int:
return 20


class CheckAptReposBackups(action.CheckAction):
description: str
_name: str
sources_list_path: str
sources_list_d_path: str

@staticmethod
def get_all_repo_list_files(sources_list_path: str, sources_list_d_path: str) -> typing.List[str]:
ret = [sources_list_path]
for root, _, filenames in os.walk(sources_list_d_path):
for f in filenames:
if f.endswith(".list"):
ret.append(os.path.join(root, f))
return ret

def __init__(
self,
sources_list_path: str = "/etc/apt/sources.list",
sources_list_d_path: str = "/etc/apt/sources.list.d/",
) -> None:
self._name = "Check repo.list leftovers from previous conversion"
self.description = """These repo.list backups are found:
\t{}
Please use `--revert` flag to restore and remove those backups.
"""
self.sources_list_path = sources_list_path
self.sources_list_d_path = sources_list_d_path

@property
def name(self) -> str:
return self._name.format(self)

@name.setter
def name(self, val: str) -> None:
self._name = val

def _do_check(self) -> bool:
log.debug("Checking leftover repo.list files")
archived_files = [
f for f in
CheckAptReposBackups.get_all_repo_list_files(self.sources_list_path, self.sources_list_d_path)
if files.backup_exists(f)
]
if archived_files:
self.description = self.description.format(",".join(archived_files))
return False
return True


class ReplaceAptReposRegexp(action.ActiveAction):
from_regexp: str
to_regexp: str
sources_list_path: str
sources_list_d_path: str
_name: str

def __init__(
self,
from_regexp: str,
to_regexp: str,
sources_list_path: str = "/etc/apt/sources.list",
sources_list_d_path: str = "/etc/apt/sources.list.d/",
name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}",
) -> None:
self.from_regexp = from_regexp
self.to_regexp = to_regexp
self.sources_list_path = sources_list_path
self.sources_list_d_path = sources_list_d_path

self._name = name

@property
def name(self) -> str:
return self._name.format(self=self)

@name.setter
def name(self, val: str) -> None:
self._name = val

def _apply_replace_to_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None:
changed = False
new_lines = []
with open(fpath) as f:
for line in f:
new_lines.append(ptrn.sub(to_regexp, line))
if new_lines[-1] != line:
changed = True
if not changed:
return
files.backup_file(fpath)
with open(fpath, 'w') as f:
f.writelines(new_lines)

def _get_all_repo_list_files(self) -> typing.List[str]:
return CheckAptReposBackups.get_all_repo_list_files(
self.sources_list_path,
self.sources_list_d_path,
)

def _rm_backups(self) -> None:
for f in self._get_all_repo_list_files():
files.remove_backup(f, False, log.debug)

def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None:
p = re.compile(from_regexp)
for f in self._get_all_repo_list_files():
self._apply_replace_to_file(f, p, to_regexp)

def _revert_all(self) -> None:
for f in self._get_all_repo_list_files():
files.restore_file_from_backup(f)

def _prepare_action(self) -> action.ActionResult:
self._change_by_regexp(self.from_regexp, self.to_regexp)
packages.update_package_list()
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
self._rm_backups()
return action.ActionResult()

def _revert_action(self) -> action.ActionResult:
self._revert_all()
packages.update_package_list()
return action.ActionResult()

def estimate_prepare_time(self) -> int:
return 22

def estimate_revert_time(self) -> int:
return 22


ReplaceAptReposRegexpDebian = ReplaceAptReposRegexp
ReplaceAptReposRegexpUbuntu = ReplaceAptReposRegexp


class SetupAptRepositories(action.ActiveAction):
from_codename: str
to_codename: str
Expand Down
2 changes: 1 addition & 1 deletion pleskdistup/actions/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _prepare_action(self) -> action.ActionResult:
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf.bak")
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf" + files.DEFAULT_BACKUP_EXTENSION)
if os.path.exists(self.dovecot_config_path):
shutil.copy(self.dovecot_config_path, path_to_backup)
motd.add_finish_ssh_login_message(f"The dovecot configuration '{self.dovecot_config_path}' has been restored from original distro. Modern configuration was placed in '{path_to_backup}'.\n")
Expand Down
38 changes: 28 additions & 10 deletions pleskdistup/common/src/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

PathType = typing.Union[os.PathLike, str]

DEFAULT_BACKUP_EXTENSION = ".conversion.bak"


def replace_string(filename: str, original_substring: str, new_substring: str) -> None:
with open(filename, "r") as original, open(filename + ".next", "w") as dst:
Expand Down Expand Up @@ -59,25 +61,41 @@ def get_last_lines(filename: PathType, n: int) -> typing.List[str]:
return f.readlines()[-n:]


def backup_file(filename: str) -> None:
def backup_file(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
if os.path.exists(filename):
shutil.copy(filename, filename + ".bak")
shutil.copy(filename, filename + ext)


def backup_exists(filename: str) -> bool:
return os.path.exists(filename + ".bak")
def backup_exists(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> bool:
return os.path.exists(filename + ext)


def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None:
if os.path.exists(filename + ".bak"):
shutil.move(filename + ".bak", filename)
def restore_file_from_backup(
filename: str,
remove_if_no_backup: bool = False,
ext: str = DEFAULT_BACKUP_EXTENSION,
) -> None:
if os.path.exists(filename + ext):
shutil.move(filename + ext, filename)
elif remove_if_no_backup and os.path.exists(filename):
os.remove(filename)


def remove_backup(filename: str) -> None:
if os.path.exists(filename + ".bak"):
os.remove(filename + ".bak")
def remove_backup(
filename: str,
raise_exception: bool = True,
logf: typing.Optional[typing.Callable[[str], typing.Any]] = None,
ext: str = DEFAULT_BACKUP_EXTENSION,
) -> None:
backup_name = filename + ext
try:
if os.path.exists(backup_name):
os.remove(backup_name)
except Exception as ex:
if logf is not None:
logf(f"failed to remove backup ({backup_name}): {ex}")
if raise_exception:
raise


def __get_files_recursive(path: str) -> typing.Iterator[str]:
Expand Down
8 changes: 4 additions & 4 deletions pleskdistup/common/src/motd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None:

def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".bak"):
if not files.backup_exists(motd_path):
if os.path.exists(motd_path):
files.backup_file(motd_path)
else:
with open(motd_path + ".bak", "a") as motd:
with open(motd_path + files.DEFAULT_BACKUP_EXTENSION, "a") as motd:
pass

with open(motd_path, "a") as motd:
Expand All @@ -39,8 +39,8 @@ def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -
def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".next"):
if os.path.exists(motd_path + ".bak"):
shutil.copy(motd_path + ".bak", motd_path + ".next")
if os.path.exists(motd_path + files.DEFAULT_BACKUP_EXTENSION):
shutil.copy(motd_path + files.DEFAULT_BACKUP_EXTENSION, motd_path + ".next")

with open(motd_path + ".next", "a") as motd:
motd.write(FINISH_INTRODUCE_MESSAGE)
Expand Down
9 changes: 5 additions & 4 deletions pleskdistup/common/tests/motdtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import tempfile

import src.motd as motd
import src.files as files


class InprogressSshLoginMessageTests(unittest.TestCase):
def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION]:
if os.path.exists(path):
os.remove(path)

Expand All @@ -37,7 +38,7 @@ def test_old_backed_up(self):
with open(self.motd_path) as motd_file:
self.assertEqual(motd_file.read(), "old\nnew\n")

with open(self.motd_path + ".bak") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION) as motd_file:
self.assertEqual(motd_file.read(), "old\n")

def test_restore(self):
Expand All @@ -57,7 +58,7 @@ def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION, self.motd_path + ".next"]:
if os.path.exists(path):
os.remove(path)

Expand Down Expand Up @@ -106,7 +107,7 @@ def test_backed_up_message_saved(self):
===============================================================================
""".format(motd.MOTD_PATH)

with open(self.motd_path + ".bak", "w") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION, "w") as motd_file:
motd_file.write("old\n")

motd.add_inprogress_ssh_login_message("new\n", self.motd_path)
Expand Down

0 comments on commit db08062

Please sign in to comment.