Skip to content

Commit

Permalink
FIX Add ability to regexp-replace repo.list files
Browse files Browse the repository at this point in the history
Fixes: PAUX-6369 Invalid APT sources produced by source switch procedure
  • Loading branch information
ukablan-wpc committed Sep 26, 2024
1 parent 8a71a4b commit f2d1739
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 19 deletions.
87 changes: 87 additions & 0 deletions pleskdistup/actions/distupgrade.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023-2024. WebPros International GmbH. All rights reserved.
import os
import re
import subprocess
import typing
import urllib.request
Expand Down Expand Up @@ -145,6 +146,92 @@ def estimate_revert_time(self) -> int:
return 20


class ReplaceAptReposRegexp(action.ActiveAction):
from_regexp: str
to_regexp: str
sources_list_path: str
sources_list_d_path: str
_name: str

def __init__(
self,
from_regexp: str,
to_regexp: str,
sources_list_path: str = "/etc/apt/sources.list",
sources_list_d_path: str = "/etc/apt/sources.list.d/",
name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}",
) -> None:
self.from_regexp = from_regexp
self.to_regexp = to_regexp
self.sources_list_path = sources_list_path
self.sources_list_d_path = sources_list_d_path

self._name = name

@property
def name(self):
return self._name.format(self=self)

def _apply_replace_to_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None:
changed = False
new_lines = []
with open(fpath) as f:
for line in f:
new_lines.append(ptrn.sub(to_regexp, line))
if new_lines[-1] != line:
changed = True
if not changed:
return
files.backup_file(fpath)
with open(fpath, 'w') as f:
f.writelines(new_lines)

def _get_all_repo_list_files(self) -> typing.List[str]:
ret = [self.sources_list_path]
for root, _, filenames in os.walk(self.sources_list_d_path):
for f in filenames:
if f.endswith(".list"):
ret.append(os.path.join(root, f))
return ret

def _rm_backups(self) -> None:
for f in self._get_all_repo_list_files():
files.remove_backup(f, log.debug)

def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None:
p = re.compile(from_regexp)
for f in self._get_all_repo_list_files():
self._apply_replace_to_file(f, p, to_regexp)

def _revert_all(self) -> None:
for f in self._get_all_repo_list_files():
files.restore_file_from_backup(f)

def _prepare_action(self) -> action.ActionResult:
self._change_by_regexp(self.from_regexp, self.to_regexp)
packages.update_package_list()
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
self._rm_backups()
return action.ActionResult()

def _revert_action(self) -> action.ActionResult:
self._revert_all()
packages.update_package_list()
return action.ActionResult()

def estimate_prepare_time(self) -> int:
return 22

def estimate_revert_time(self) -> int:
return 22


ReplaceAptReposRegexpDebian = ReplaceAptReposRegexp
ReplaceAptReposRegexpUbuntu = ReplaceAptReposRegexp


class SetupAptRepositories(action.ActiveAction):
from_codename: str
to_codename: str
Expand Down
2 changes: 1 addition & 1 deletion pleskdistup/actions/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _prepare_action(self) -> action.ActionResult:
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf.bak")
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf" + files.DEFAULT_BACKUP_EXTENSION)
if os.path.exists(self.dovecot_config_path):
shutil.copy(self.dovecot_config_path, path_to_backup)
motd.add_finish_ssh_login_message(f"The dovecot configuration '{self.dovecot_config_path}' has been restored from original distro. Modern configuration was placed in '{path_to_backup}'.\n")
Expand Down
28 changes: 18 additions & 10 deletions pleskdistup/common/src/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

PathType = typing.Union[os.PathLike, str]

DEFAULT_BACKUP_EXTENSION = ".conversion.bak"

def replace_string(filename: str, original_substring: str, new_substring: str) -> None:
with open(filename, "r") as original, open(filename + ".next", "w") as dst:
Expand Down Expand Up @@ -59,25 +60,32 @@ def get_last_lines(filename: PathType, n: int) -> typing.List[str]:
return f.readlines()[-n:]


def backup_file(filename: str) -> None:
def backup_file(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
if os.path.exists(filename):
shutil.copy(filename, filename + ".bak")
shutil.copy(filename, filename + ext)


def backup_exists(filename: str) -> bool:
return os.path.exists(filename + ".bak")
def backup_exists(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> bool:
return os.path.exists(filename + ext)


def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None:
if os.path.exists(filename + ".bak"):
shutil.move(filename + ".bak", filename)
def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False,
ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
if os.path.exists(filename + ext):
shutil.move(filename + ext, filename)
elif remove_if_no_backup and os.path.exists(filename):
os.remove(filename)


def remove_backup(filename: str) -> None:
if os.path.exists(filename + ".bak"):
os.remove(filename + ".bak")
def remove_backup(filename: str, logf : typing.Optional[typing.Callable] = None,
ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
try:
if os.path.exists(filename + ext):
os.remove(filename + ext)
except Exception as ex:
if logf is None:
raise
logf(f"failed to remove backup ({filename}): {ex}")


def __get_files_recursive(path: str) -> typing.Iterator[str]:
Expand Down
8 changes: 4 additions & 4 deletions pleskdistup/common/src/motd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None:

def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".bak"):
if not files.backup_exists(motd_path):
if os.path.exists(motd_path):
files.backup_file(motd_path)
else:
with open(motd_path + ".bak", "a") as motd:
with open(motd_path + files.DEFAULT_BACKUP_EXTENSION, "a") as motd:
pass

with open(motd_path, "a") as motd:
Expand All @@ -39,8 +39,8 @@ def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -
def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".next"):
if os.path.exists(motd_path + ".bak"):
shutil.copy(motd_path + ".bak", motd_path + ".next")
if os.path.exists(motd_path + files.DEFAULT_BACKUP_EXTENSION):
shutil.copy(motd_path + files.DEFAULT_BACKUP_EXTENSION, motd_path + ".next")

with open(motd_path + ".next", "a") as motd:
motd.write(FINISH_INTRODUCE_MESSAGE)
Expand Down
9 changes: 5 additions & 4 deletions pleskdistup/common/tests/motdtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import tempfile

import src.motd as motd
import src.files as files


class InprogressSshLoginMessageTests(unittest.TestCase):
def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION]:
if os.path.exists(path):
os.remove(path)

Expand All @@ -37,7 +38,7 @@ def test_old_backed_up(self):
with open(self.motd_path) as motd_file:
self.assertEqual(motd_file.read(), "old\nnew\n")

with open(self.motd_path + ".bak") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION) as motd_file:
self.assertEqual(motd_file.read(), "old\n")

def test_restore(self):
Expand All @@ -57,7 +58,7 @@ def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION, self.motd_path + ".next"]:
if os.path.exists(path):
os.remove(path)

Expand Down Expand Up @@ -106,7 +107,7 @@ def test_backed_up_message_saved(self):
===============================================================================
""".format(motd.MOTD_PATH)

with open(self.motd_path + ".bak", "w") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION, "w") as motd_file:
motd_file.write("old\n")

motd.add_inprogress_ssh_login_message("new\n", self.motd_path)
Expand Down

0 comments on commit f2d1739

Please sign in to comment.