Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX Add ability to regexp-replace repo.list files #76

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions pleskdistup/actions/distupgrade.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023-2024. WebPros International GmbH. All rights reserved.
import os
import re
import subprocess
import typing
import urllib.request
Expand Down Expand Up @@ -145,6 +146,135 @@ def estimate_revert_time(self) -> int:
return 20


class CheckAptReposBackups(action.CheckAction):
description: str
_name: str
sources_list_path: str
sources_list_d_path: str

@staticmethod
def get_all_repo_list_files(sources_list_path: str, sources_list_d_path: str) -> typing.List[str]:
ret = [sources_list_path]
for root, _, filenames in os.walk(sources_list_d_path):
for f in filenames:
if f.endswith(".list"):
ret.append(os.path.join(root, f))
return ret

def __init__(self,
sources_list_path: str = "/etc/apt/sources.list",
sources_list_d_path: str = "/etc/apt/sources.list.d/",
) -> None:
self._name = "Check repo.list leftovers from previous conversion"
self.description = "These repo.list files are found with backups, please check/restore them, and delete the backups: {}"
SandakovMM marked this conversation as resolved.
Show resolved Hide resolved
self.sources_list_path = sources_list_path
self.sources_list_d_path = sources_list_d_path

@property
def name(self) -> str:
return self._name.format(self)

@name.setter
def name(self, val: str) -> None:
self._name = val

def _do_check(self) -> bool:
log.debug(f"Checking leftover repo.list files")
archived_files = [f for f in
CheckAptReposBackups.get_all_repo_list_files(self.sources_list_path, self.sources_list_d_path)
if files.backup_exists(f)]
if archived_files:
self.description = self.description.format(",".join(archived_files))
return False
return True


class ReplaceAptReposRegexp(action.ActiveAction):
from_regexp: str
to_regexp: str
sources_list_path: str
sources_list_d_path: str
_name: str

def __init__(
self,
from_regexp: str,
to_regexp: str,
sources_list_path: str = "/etc/apt/sources.list",
sources_list_d_path: str = "/etc/apt/sources.list.d/",
name: str = "set up APT repositories to change from {self.from_regexp!r} to {self.to_regexp!r}",
) -> None:
self.from_regexp = from_regexp
self.to_regexp = to_regexp
self.sources_list_path = sources_list_path
self.sources_list_d_path = sources_list_d_path

self._name = name

@property
def name(self) -> str:
return self._name.format(self=self)

@name.setter
def name(self, val: str) -> None:
self._name = val

def _apply_replace_to_file(self, fpath: str, ptrn: re.Pattern, to_regexp: str) -> None:
changed = False
new_lines = []
with open(fpath) as f:
for line in f:
new_lines.append(ptrn.sub(to_regexp, line))
if new_lines[-1] != line:
changed = True
if not changed:
return
files.backup_file(fpath)
with open(fpath, 'w') as f:
f.writelines(new_lines)

def _get_all_repo_list_files(self) -> typing.List[str]:
return CheckAptReposBackups.get_all_repo_list_files(self.sources_list_path,
self.sources_list_d_path)
ukablan-wpc marked this conversation as resolved.
Show resolved Hide resolved

def _rm_backups(self) -> None:
for f in self._get_all_repo_list_files():
files.remove_backup(f, False, log.debug)

def _change_by_regexp(self, from_regexp: str, to_regexp: str) -> None:
p = re.compile(from_regexp)
for f in self._get_all_repo_list_files():
self._apply_replace_to_file(f, p, to_regexp)

def _revert_all(self) -> None:
for f in self._get_all_repo_list_files():
files.restore_file_from_backup(f)

def _prepare_action(self) -> action.ActionResult:
self._change_by_regexp(self.from_regexp, self.to_regexp)
packages.update_package_list()
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
self._rm_backups()
return action.ActionResult()

def _revert_action(self) -> action.ActionResult:
self._revert_all()
packages.update_package_list()
return action.ActionResult()

def estimate_prepare_time(self) -> int:
return 22

def estimate_revert_time(self) -> int:
return 22


ReplaceAptReposRegexpDebian = ReplaceAptReposRegexp
ReplaceAptReposRegexpUbuntu = ReplaceAptReposRegexp


class SetupAptRepositories(action.ActiveAction):
from_codename: str
to_codename: str
Expand Down
2 changes: 1 addition & 1 deletion pleskdistup/actions/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _prepare_action(self) -> action.ActionResult:
return action.ActionResult()

def _post_action(self) -> action.ActionResult:
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf.bak")
path_to_backup = os.path.join(self.temp_directory, "dovecot.conf" + files.DEFAULT_BACKUP_EXTENSION)
if os.path.exists(self.dovecot_config_path):
shutil.copy(self.dovecot_config_path, path_to_backup)
motd.add_finish_ssh_login_message(f"The dovecot configuration '{self.dovecot_config_path}' has been restored from original distro. Modern configuration was placed in '{path_to_backup}'.\n")
Expand Down
31 changes: 21 additions & 10 deletions pleskdistup/common/src/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

PathType = typing.Union[os.PathLike, str]

DEFAULT_BACKUP_EXTENSION = ".conversion.bak"

def replace_string(filename: str, original_substring: str, new_substring: str) -> None:
with open(filename, "r") as original, open(filename + ".next", "w") as dst:
Expand Down Expand Up @@ -59,25 +60,35 @@ def get_last_lines(filename: PathType, n: int) -> typing.List[str]:
return f.readlines()[-n:]


def backup_file(filename: str) -> None:
def backup_file(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
if os.path.exists(filename):
ukablan-wpc marked this conversation as resolved.
Show resolved Hide resolved
shutil.copy(filename, filename + ".bak")
shutil.copy(filename, filename + ext)


def backup_exists(filename: str) -> bool:
return os.path.exists(filename + ".bak")
def backup_exists(filename: str, ext: str = DEFAULT_BACKUP_EXTENSION) -> bool:
return os.path.exists(filename + ext)


def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False) -> None:
if os.path.exists(filename + ".bak"):
shutil.move(filename + ".bak", filename)
def restore_file_from_backup(filename: str, remove_if_no_backup: bool = False,
ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
if os.path.exists(filename + ext):
shutil.move(filename + ext, filename)
elif remove_if_no_backup and os.path.exists(filename):
os.remove(filename)


def remove_backup(filename: str) -> None:
if os.path.exists(filename + ".bak"):
os.remove(filename + ".bak")
def remove_backup(filename: str,
raise_exception: bool = True,
ukablan-wpc marked this conversation as resolved.
Show resolved Hide resolved
logf : typing.Optional[typing.Callable] = None,
ukablan-wpc marked this conversation as resolved.
Show resolved Hide resolved
ext: str = DEFAULT_BACKUP_EXTENSION) -> None:
try:
if os.path.exists(filename + ext):
os.remove(filename + ext)
except Exception as ex:
if logf is not None:
logf(f"failed to remove backup ({filename}): {ex}")
if raise_exception:
raise


def __get_files_recursive(path: str) -> typing.Iterator[str]:
Expand Down
8 changes: 4 additions & 4 deletions pleskdistup/common/src/motd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def restore_ssh_login_message(motd_path: str = MOTD_PATH) -> None:

def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".bak"):
if not files.backup_exists(motd_path):
if os.path.exists(motd_path):
files.backup_file(motd_path)
else:
with open(motd_path + ".bak", "a") as motd:
with open(motd_path + files.DEFAULT_BACKUP_EXTENSION, "a") as motd:
pass

with open(motd_path, "a") as motd:
Expand All @@ -39,8 +39,8 @@ def add_inprogress_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -
def add_finish_ssh_login_message(message: str, motd_path: str = MOTD_PATH) -> None:
try:
if not os.path.exists(motd_path + ".next"):
if os.path.exists(motd_path + ".bak"):
shutil.copy(motd_path + ".bak", motd_path + ".next")
if os.path.exists(motd_path + files.DEFAULT_BACKUP_EXTENSION):
shutil.copy(motd_path + files.DEFAULT_BACKUP_EXTENSION, motd_path + ".next")

with open(motd_path + ".next", "a") as motd:
motd.write(FINISH_INTRODUCE_MESSAGE)
Expand Down
9 changes: 5 additions & 4 deletions pleskdistup/common/tests/motdtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import tempfile

import src.motd as motd
import src.files as files


class InprogressSshLoginMessageTests(unittest.TestCase):
def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION]:
if os.path.exists(path):
os.remove(path)

Expand All @@ -37,7 +38,7 @@ def test_old_backed_up(self):
with open(self.motd_path) as motd_file:
self.assertEqual(motd_file.read(), "old\nnew\n")

with open(self.motd_path + ".bak") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION) as motd_file:
self.assertEqual(motd_file.read(), "old\n")

def test_restore(self):
Expand All @@ -57,7 +58,7 @@ def setUp(self):
self.motd_path = tempfile.mktemp()

def tearDown(self):
for path in [self.motd_path, self.motd_path + ".bak", self.motd_path + ".next"]:
for path in [self.motd_path, self.motd_path + files.DEFAULT_BACKUP_EXTENSION, self.motd_path + ".next"]:
if os.path.exists(path):
os.remove(path)

Expand Down Expand Up @@ -106,7 +107,7 @@ def test_backed_up_message_saved(self):
===============================================================================
""".format(motd.MOTD_PATH)

with open(self.motd_path + ".bak", "w") as motd_file:
with open(self.motd_path + files.DEFAULT_BACKUP_EXTENSION, "w") as motd_file:
motd_file.write("old\n")

motd.add_inprogress_ssh_login_message("new\n", self.motd_path)
Expand Down
Loading