Skip to content

Commit

Permalink
Merge pull request #577 from aiven/kmichel-preserve-backup
Browse files Browse the repository at this point in the history
Allow the restore operation to mark a backup as preserved
  • Loading branch information
alanfranz authored Jan 9, 2023
2 parents 2588f4b + d219971 commit 33b6e59
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 123 deletions.
110 changes: 110 additions & 0 deletions pghoard/object_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Copyright (c) 2022 Aiven Ltd
See LICENSE for details
"""
import datetime
import logging
import os
from pathlib import Path
from typing import Optional

from requests import Session
from rohmu import dates


class ObjectStore:
def __init__(self, storage, prefix, site, pgdata):
self.storage = storage
self.prefix = prefix
self.site = site
self.pgdata = pgdata
self.log = logging.getLogger(self.__class__.__name__)

def list_basebackups(self):
return self.storage.list_path(os.path.join(self.prefix, "basebackup"))

def try_request_backup_preservation(self, basebackup: str, preserve_until: datetime.datetime) -> Optional[str]:
try:
return self.request_backup_preservation(basebackup, preserve_until)
except Exception: # pylint: disable=broad-except
# rohmu does not wrap storage implementation errors in high-level errors:
# we can't catch something more specific like "permission denied".
self.log.exception("Could not request backup preservation")
return None

def try_cancel_backup_preservation(self, request_name: str) -> None:
try:
self.cancel_backup_preservation(request_name)
except Exception: # pylint: disable=broad-except
# rohmu does not wrap storage implementation errors in high-level errors:
# we can't catch something more specific like "permission denied".
self.log.exception("Could not cancel backup preservation")

def request_backup_preservation(self, basebackup: str, preserve_until: datetime.datetime) -> str:
backup_name = Path(basebackup).name
request_name = f"{backup_name}_{preserve_until}"
request_path = os.path.join(self.prefix, "preservation_request", request_name)
self.storage.store_file_from_memory(
request_path, b"", {
"preserve-backup": backup_name,
"preserve-until": str(preserve_until)
}
)
return request_name

def cancel_backup_preservation(self, request_name: str) -> None:
request_path = os.path.join(self.prefix, "preservation_request", request_name)
self.storage.delete_key(request_path)

def show_basebackup_list(self, verbose=True):
result = self.list_basebackups()
caption = "Available %r basebackups:" % self.site
print_basebackup_list(result, caption=caption, verbose=verbose)

def get_basebackup_metadata(self, basebackup):
return self.storage.get_metadata_for_key(basebackup)

def get_basebackup_file_to_fileobj(self, basebackup, fileobj, *, progress_callback=None):
return self.storage.get_contents_to_fileobj(basebackup, fileobj, progress_callback=progress_callback)

def get_file_bytes(self, name):
return self.storage.get_contents_to_string(name)[0]


class HTTPRestore(ObjectStore):
def __init__(self, host, port, site, pgdata=None):
super().__init__(storage=None, prefix=None, site=site, pgdata=pgdata)
self.host = host
self.port = port
self.session = Session()

def _url(self, path):
return "http://{host}:{port}/{site}/{path}".format(host=self.host, port=self.port, site=self.site, path=path)

def list_basebackups(self):
response = self.session.get(self._url("basebackup"))
return response.json()["basebackups"]


def print_basebackup_list(basebackups, *, caption="Available basebackups", verbose=True):
print(caption, "\n")
fmt = "{name:40} {size:>11} {orig_size:>11} {time:20}".format
print(fmt(name="Basebackup", size="Backup size", time="Start time", orig_size="Orig size"))
print(fmt(name="-" * 40, size="-" * 11, time="-" * 20, orig_size="-" * 11))
for b in sorted(basebackups, key=lambda b: b["name"]):
meta = b["metadata"].copy()
lm = meta.pop("start-time")
if isinstance(lm, str):
lm = dates.parse_timestamp(lm)
if lm.tzinfo:
lm = lm.astimezone(datetime.timezone.utc).replace(tzinfo=None)
lm_str = lm.isoformat()[:19] + "Z" # # pylint: disable=no-member
size_str = "{} MB".format(int(meta.get("total-size-enc", b["size"])) // (1024 ** 2))
orig_size = int(meta.get("total-size-plain", meta.get("original-file-size")) or 0)
if orig_size:
orig_size_str = "{} MB".format(orig_size // (1024 ** 2))
else:
orig_size_str = "n/a"
print(fmt(name=b["name"], size=size_str, time=lm_str, orig_size=orig_size_str))
if verbose:
print(" metadata:", meta)
17 changes: 17 additions & 0 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
from pghoard.compressor import (
CompressionEvent, CompressionQueue, CompressorThread, WALFileDeleterThread, WalFileDeletionQueue
)
from pghoard.preservation_request import (
is_basebackup_preserved, parse_preservation_requests, patch_basebackup_metadata_with_preservation
)
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import TransferAgent, TransferQueue, UploadEvent
from pghoard.walreceiver import WALReceiver
Expand Down Expand Up @@ -444,6 +447,11 @@ def get_remote_basebackups_info(self, site):
for entry in results:
self.patch_basebackup_info(entry=entry, site_config=site_config)

preservation_requests = storage.list_path(os.path.join(site_config["prefix"], "preservation_request"))
backups_to_preserve = parse_preservation_requests(preservation_requests)
for entry in results:
patch_basebackup_metadata_with_preservation(entry, backups_to_preserve)

results.sort(key=lambda entry: entry["metadata"]["start-time"])
return results

Expand Down Expand Up @@ -472,8 +480,12 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
if allowed_basebackup_count is None:
allowed_basebackup_count = len(basebackups)

now = dates.now()
basebackups_to_delete = []
while len(basebackups) > allowed_basebackup_count:
if is_basebackup_preserved(basebackups[0], now):
self.log.info("Not deleting more backups because %r still needs to preserved", basebackups[0]["name"])
break
self.log.warning(
"Too many basebackups: %d > %d, %r, starting to get rid of %r", len(basebackups), allowed_basebackup_count,
basebackups, basebackups[0]["name"]
Expand All @@ -487,6 +499,9 @@ def determine_backups_to_delete(self, *, basebackups, site_config):
current_time = datetime.datetime.now(datetime.timezone.utc)
if max_age_days and min_backups > 0:
while basebackups and len(basebackups) > min_backups:
if is_basebackup_preserved(basebackups[0], now):
self.log.info("Not deleting more backups because %r still needs to preserved", basebackups[0]["name"])
break
# For age checks we treat the age as current_time - (backup_start_time + backup_interval). So when
# backup interval is set to 24 hours a backup started 2.5 days ago would be considered to be 1.5 days old.
completed_at = basebackups[0]["metadata"]["start-time"] + backup_interval
Expand Down Expand Up @@ -525,6 +540,8 @@ def refresh_backup_list_and_delete_old(self, site):
last_wal_segment_still_needed = basebackups[0]["metadata"]["start-wal-segment"]

if last_wal_segment_still_needed:
# This is breaking concurrent PITR starting from the *previous* backup.
# That's why once a backup is preserved, we keep that backup and all the next ones.
self.delete_remote_wal_before(last_wal_segment_still_needed, site, pg_version)
self.delete_remote_basebackup(
site, basebackup_to_be_deleted["name"], basebackup_to_be_deleted["metadata"], basebackups=basebackups
Expand Down
32 changes: 32 additions & 0 deletions pghoard/preservation_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Copyright (c) 2022 Aiven Ltd
See LICENSE for details
"""
import datetime
from typing import Any, Mapping, Sequence

from rohmu import dates


def patch_basebackup_metadata_with_preservation(
basebackup_entry: Mapping[str, Any],
backups_to_preserve: Mapping[str, datetime.datetime],
) -> None:
basebackup_entry["metadata"]["preserve-until"] = backups_to_preserve.get(basebackup_entry["name"])


def is_basebackup_preserved(basebackup_entry: Mapping[str, Any], now: datetime.datetime) -> bool:
preserve_until = basebackup_entry["metadata"].get("preserve-until")
return preserve_until is not None and preserve_until > now


def parse_preservation_requests(preservation_requests: Sequence[Mapping[str, Any]], ) -> Mapping[str, datetime.datetime]:
backups_to_preserve: dict[str, datetime.datetime] = {}
for preservation_request in preservation_requests:
backup_name = preservation_request["metadata"]["preserve-backup"]
preserve_until = dates.parse_timestamp(preservation_request["metadata"]["preserve-until"])
if backup_name in backups_to_preserve:
backups_to_preserve[backup_name] = max(backups_to_preserve[backup_name], preserve_until)
else:
backups_to_preserve[backup_name] = preserve_until
return backups_to_preserve
100 changes: 32 additions & 68 deletions pghoard/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import argparse
import base64
import contextlib
import datetime
import enum
import errno
import io
Expand All @@ -31,13 +30,13 @@
from typing import Any, Dict, List, Optional, Set, Union

from psycopg2.extensions import adapt
from requests import Session
from rohmu import dates, get_transfer, rohmufile
from rohmu.errors import (Error, InvalidConfigurationError, MaybeRecoverableError)

from pghoard.common import (
BaseBackupFormat, FileType, FileTypePrefixes, StrEnum, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.object_store import (HTTPRestore, ObjectStore, print_basebackup_list)

from . import common, config, logutil, version
from .postgres_command import PGHOARD_HOST, PGHOARD_PORT
Expand Down Expand Up @@ -178,30 +177,6 @@ def create_recovery_conf(
return content


def print_basebackup_list(basebackups, *, caption="Available basebackups", verbose=True):
print(caption, "\n")
fmt = "{name:40} {size:>11} {orig_size:>11} {time:20}".format
print(fmt(name="Basebackup", size="Backup size", time="Start time", orig_size="Orig size"))
print(fmt(name="-" * 40, size="-" * 11, time="-" * 20, orig_size="-" * 11))
for b in sorted(basebackups, key=lambda b: b["name"]):
meta = b["metadata"].copy()
lm = meta.pop("start-time")
if isinstance(lm, str):
lm = dates.parse_timestamp(lm)
if lm.tzinfo:
lm = lm.astimezone(datetime.timezone.utc).replace(tzinfo=None)
lm_str = lm.isoformat()[:19] + "Z" # # pylint: disable=no-member
size_str = "{} MB".format(int(meta.get("total-size-enc", b["size"])) // (1024 ** 2))
orig_size = int(meta.get("total-size-plain", meta.get("original-file-size")) or 0)
if orig_size:
orig_size_str = "{} MB".format(orig_size // (1024 ** 2))
else:
orig_size_str = "n/a"
print(fmt(name=b["name"], size=size_str, time=lm_str, orig_size=orig_size_str))
if verbose:
print(" metadata:", meta)


class Restore:
log_tracebacks = False

Expand Down Expand Up @@ -268,6 +243,21 @@ def target_args():
help="Restore the database to a PG primary",
action="store_true"
)
cmd.add_argument(
"--preserve-until", help="Request the backup to be preserved until that date", metavar="ISO_TIMESTAMP"
)
cmd.add_argument(
"--cancel-preserve-on-success",
help="Cancel the preservation request if the backup was successfully restored",
action="store_true",
default=True,
)
cmd.add_argument(
"--no-cancel-preserve-on-success",
help="Don't cancel the preservation request if the backup was successfully restored",
dest="cancel_preserve_on_success",
action="store_false",
)

cmd = add_cmd(self.list_basebackups_http)
host_port_args()
Expand Down Expand Up @@ -332,6 +322,8 @@ def get_basebackup(self, arg):
overwrite=arg.overwrite,
tablespace_mapping=tablespace_mapping,
tablespace_base_dir=arg.tablespace_base_dir,
preserve_until=arg.preserve_until,
cancel_preserve_on_success=arg.cancel_preserve_on_success,
)
except RestoreError: # pylint: disable=try-except-raise
# Pass RestoreErrors thru
Expand Down Expand Up @@ -450,7 +442,9 @@ def _get_basebackup(
restore_to_primary=None,
overwrite=False,
tablespace_mapping=None,
tablespace_base_dir=None
tablespace_base_dir=None,
preserve_until: Optional[str] = None,
cancel_preserve_on_success: bool = True,
):
targets = [recovery_target_name, recovery_target_time, recovery_target_xid]
if sum(0 if flag is None else 1 for flag in targets) > 1:
Expand All @@ -472,6 +466,12 @@ def _get_basebackup(
metadata = self.storage.get_basebackup_metadata(basebackup["name"])
tablespaces = {}

# If requested, mark the backup for preservation
preserve_request: Optional[str] = None
if preserve_until is not None:
preserve_until_datetime = dates.parse_timestamp(preserve_until)
preserve_request = self.storage.try_request_backup_preservation(basebackup["name"], preserve_until_datetime)

# Make sure we have a proper place to write the $PGDATA and possible tablespaces
dirs_to_create = []
dirs_to_recheck = []
Expand Down Expand Up @@ -612,6 +612,11 @@ def _get_basebackup(
restore_to_primary=restore_to_primary,
)

if preserve_request is not None and cancel_preserve_on_success:
# This is intentionally not done if pghoard fails earlier with an exception,
# we only cancel the preservation if the backup was successfully restored.
self.storage.try_cancel_backup_preservation(preserve_request)

print("Basebackup restoration complete.")
print("You can start PostgreSQL by running pg_ctl -D %s start" % pgdata)
print("On systemd based systems you can run systemctl start postgresql")
Expand Down Expand Up @@ -1004,47 +1009,6 @@ def _fetch_and_extract_one_backup(self, metadata, file_size, fetch_fn):
self.log.info("Processing of %r completed successfully", file_name)


class ObjectStore:
def __init__(self, storage, prefix, site, pgdata):
self.storage = storage
self.prefix = prefix
self.site = site
self.pgdata = pgdata
self.log = logging.getLogger(self.__class__.__name__)

def list_basebackups(self):
return self.storage.list_path(os.path.join(self.prefix, "basebackup"))

def show_basebackup_list(self, verbose=True):
result = self.list_basebackups()
caption = "Available %r basebackups:" % self.site
print_basebackup_list(result, caption=caption, verbose=verbose)

def get_basebackup_metadata(self, basebackup):
return self.storage.get_metadata_for_key(basebackup)

def get_basebackup_file_to_fileobj(self, basebackup, fileobj, *, progress_callback=None):
return self.storage.get_contents_to_fileobj(basebackup, fileobj, progress_callback=progress_callback)

def get_file_bytes(self, name):
return self.storage.get_contents_to_string(name)[0]


class HTTPRestore(ObjectStore):
def __init__(self, host, port, site, pgdata=None):
super().__init__(storage=None, prefix=None, site=site, pgdata=pgdata)
self.host = host
self.port = port
self.session = Session()

def _url(self, path):
return "http://{host}:{port}/{site}/{path}".format(host=self.host, port=self.port, site=self.site, path=path)

def list_basebackups(self):
response = self.session.get(self._url("basebackup"))
return response.json()["basebackups"]


def main():
try:
restore = Restore()
Expand Down
Loading

0 comments on commit 33b6e59

Please sign in to comment.