Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add incremental backups and logic to the backend #46

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions frontend/src/pages/Backups/ScheduledBackups.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ interface ScheduleRow {
last_run_time: string
cluster: string
schedule: string
incremental_schedule: string
table: string
database: string
bucket: string
Expand All @@ -26,6 +27,7 @@ interface Backups {
type FieldType = {
cluster?: string
schedule?: string
incremental_schedule?: string
database?: string
table?: string
bucket?: string
Expand Down Expand Up @@ -130,6 +132,7 @@ export default function ScheduledBackups() {
},
{ title: 'Cluster', dataIndex: 'cluster' },
{ title: 'Schedule', dataIndex: 'schedule' },
{ title: 'Incremental Schedule', dataIndex: 'incremental_schedule' },
{ title: 'Last Run Time', dataIndex: 'last_run_time' },
{ title: 'Database', dataIndex: 'database' },
{ title: 'Table', dataIndex: 'table' },
Expand Down Expand Up @@ -207,6 +210,17 @@ export default function ScheduledBackups() {
<Input />
</Form.Item>

<Form.Item<FieldType>
label="Incremental Schedule"
name="incremental_schedule"
initialValue="0 0 * * *"
rules={[
{ required: true, message: 'Please provide a cron schedule for the incremental backup' },
]}
>
<Input />
</Form.Item>

<Form.Item<FieldType>
label="Database"
name="database"
Expand Down Expand Up @@ -247,7 +261,7 @@ export default function ScheduledBackups() {
label="AWS Access Key ID"
name="aws_access_key_id"
initialValue="AKIAIOSFODNN7EXAMPLE"
rules={[{ required: false, message: 'AWS Access Key ID to use for access to the S3 bucket' }]}
rules={[{ required: true, message: 'AWS Access Key ID to use for access to the S3 bucket' }]}
>
<Input />
</Form.Item>
Expand All @@ -256,7 +270,7 @@ export default function ScheduledBackups() {
label="AWS Secret Access Key"
name="aws_secret_access_key"
initialValue="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
rules={[{ required: false, message: 'AWS Secret Access Key used to access S3 bucket' }]}
rules={[{ required: true, message: 'AWS Secret Access Key used to access S3 bucket' }]}
>
<Input />
</Form.Item>
Expand Down
2 changes: 2 additions & 0 deletions housewatch/api/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Meta:
def validate(self, data):
if data.get("schedule") and not croniter.is_valid(data["schedule"]):
raise serializers.ValidationError(f"Invalid cron expression: {e}")
if data.get("incremental_schedule") and not croniter.is_valid(data["incremental_schedule"]):
raise serializers.ValidationError(f"Invalid cron expression: {e}")
return data


Expand Down
22 changes: 19 additions & 3 deletions housewatch/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs):


@app.task(track_started=True, ignore_result=False, max_retries=0)
def run_backup(backup_id: str):
def run_backup(backup_id: str, incremental: bool = False):
from housewatch.clickhouse import backups

backups.run_backup(backup_id)
logger.info("Running backup", backup_id=backup_id, incremental=incremental)

backups.run_backup(backup_id, incremental=incremental)


@app.task(track_started=True, ignore_result=False, max_retries=0)
Expand All @@ -52,11 +54,25 @@ def schedule_backups():
nr = croniter(backup.schedule, lrt).get_next(datetime)
if nr.tzinfo is None:
nr = timezone.make_aware(nr)
logger.info("Checking backup", backup_id=backup.id, next_run=nr, now=now)

nir = None
if backup.incremental_schedule is not None:
lirt = backup.last_incremental_run_time
if lirt is None:
lirt = backup.created_at
nir = croniter(backup.incremental_schedule, lirt).get_next(datetime)
if nir.tzinfo is None:
nir = timezone.make_aware(nir)

logger.info("Checking backup", backup_id=backup.id, next_run=nr, next_incremental_run=nir, now=now)
if nr < now:
run_backup.delay(backup.id)
backup.last_run_time = now
backup.save()
elif backup.incremental_schedule is not None and nir < now:
run_backup.delay(backup.id, incremental=True)
backup.last_incremental_run_time = now
backup.save()


@app.task(track_started=True, ignore_result=False, max_retries=0)
Expand Down
113 changes: 98 additions & 15 deletions housewatch/clickhouse/backups.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,65 @@
import structlog
from collections import defaultdict
from datetime import datetime
from housewatch.clickhouse.client import run_query, run_query_on_shards
from typing import Dict, Optional
from uuid import uuid4
from housewatch.clickhouse.client import run_query
from housewatch.models.backup import ScheduledBackup, ScheduledBackupRun
from housewatch.clickhouse.clusters import get_node_per_shard

from django.conf import settings
from django.utils import timezone

from clickhouse_driver import Client

logger = structlog.get_logger(__name__)


def execute_backup_on_shards(
query: str,
params: Dict[str, str | int] = {},
query_settings: Dict[str, str | int] = {},
query_id: Optional[str] = None,
substitute_params: bool = True,
cluster: Optional[str] = None,
aws_key: Optional[str] = None,
aws_secret: Optional[str] = None,
base_backup: Optional[str] = None,
):
"""
This function will execute a backup on each shard in a cluster
This is very similar to run_query_on_shards but it has very specific things for backups
specifically around base_backup settings
"""
nodes = get_node_per_shard(cluster)
responses = []
for shard, node in nodes:
params["shard"] = shard
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}/{shard}', '{aws_key}', '{aws_secret}')"
final_query = query % (params or {}) if substitute_params else query
client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
response.append(item)
responses.append((shard, response))
return response


def get_backups(cluster=None):
if cluster:
QUERY = """SELECT id, name, status, error, start_time, end_time, num_files, formatReadableSize(total_size) total_size, num_entries, uncompressed_size, compressed_size, files_read, bytes_read FROM clusterAllReplicas(%(cluster)s, system.backups) ORDER BY start_time DESC"""
Expand All @@ -28,15 +78,16 @@ def get_backup(backup, cluster=None):
return run_query(QUERY, {"uuid": backup}, use_cache=False)


def create_table_backup(database, table, bucket, path, cluster=None, aws_key=None, aws_secret=None):
def create_table_backup(database, table, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
if cluster:
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
return run_query_on_shards(
return execute_backup_on_shards(
QUERY,
{
"database": database,
Expand All @@ -46,11 +97,17 @@ def create_table_backup(database, table, bucket, path, cluster=None, aws_key=Non
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
QUERY,
{
Expand All @@ -61,19 +118,22 @@ def create_table_backup(database, table, bucket, path, cluster=None, aws_key=Non
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
)


def create_database_backup(database, bucket, path, cluster=None, aws_key=None, aws_secret=None):
def create_database_backup(database, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
if cluster:
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
return run_query_on_shards(

return execute_backup_on_shards(
QUERY,
{
"database": database,
Expand All @@ -82,11 +142,17 @@ def create_database_backup(database, bucket, path, cluster=None, aws_key=None, a
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)
QUERY = """BACKUP DATABASE %(database)s
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
QUERY,
{
Expand All @@ -96,39 +162,56 @@ def create_database_backup(database, bucket, path, cluster=None, aws_key=None, a
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
)


def run_backup(backup_id):
def run_backup(backup_id, incremental=False):
backup = ScheduledBackup.objects.get(id=backup_id)
now = timezone.now()
path = backup.path + "/" + now.isoformat()
base_backup = None
S3_LOCATION = f"https://{backup.bucket}.s3.amazonaws.com/{path}"
if incremental:
if not backup.last_run:
logger.info("Cannot run incremental backup without a base backup")
base_backup = backup.last_base_backup
if backup.is_database_backup():
uuid = create_database_backup(
create_database_backup(
backup.database,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
)[0]["id"]
base_backup=base_backup,
)
elif backup.is_table_backup():
uuid = create_table_backup(
create_table_backup(
backup.database,
backup.table,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
)[0]["id"]
br = ScheduledBackupRun.objects.create(scheduled_backup=backup, id=uuid, started_at=now)
base_backup=base_backup,
)
uuid = str(uuid4())
br = ScheduledBackupRun.objects.create(
scheduled_backup=backup, id=uuid, started_at=now, is_incremental=incremental, base_backup=base_backup
)
br.save()
backup.last_run = br
backup.last_run_time = now
if incremental:
backup.last_incremental_run = br
backup.last_incremental_run_time = now
else:
backup.last_run = br
backup.last_run_time = now
backup.last_base_backup = S3_LOCATION
backup.save()
return uuid
return


def restore_backup(backup):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 4.1.1 on 2023-09-12 02:19

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('housewatch', '0009_scheduledbackup_cluster_alter_scheduledbackup_id'),
]

operations = [
migrations.AddField(
model_name='scheduledbackup',
name='incremental_schedule',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackup',
name='last_base_backup',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackup',
name='last_incremental_run_time',
field=models.DateTimeField(null=True),
),
migrations.AddField(
model_name='scheduledbackuprun',
name='base_backup',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackuprun',
name='is_incremental',
field=models.BooleanField(default=False),
),
]
7 changes: 7 additions & 0 deletions housewatch/models/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ class ScheduledBackup(models.Model):
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
enabled: models.BooleanField = models.BooleanField(default=False)
last_run_time: models.DateTimeField = models.DateTimeField(null=True)
last_incremental_run_time: models.DateTimeField = models.DateTimeField(null=True)
last_base_backup: models.CharField = models.CharField(max_length=255, null=True)
last_run: models.ForeignKey = models.ForeignKey("ScheduledBackupRun", on_delete=models.SET_NULL, null=True)

# This will be a CRON expression for the job
schedule: models.CharField = models.CharField(max_length=255)
incremental_schedule: models.CharField = models.CharField(max_length=255, null=True)
table: models.CharField = models.CharField(max_length=255, null=True)
database: models.CharField = models.CharField(max_length=255)
cluster: models.CharField = models.CharField(max_length=255, null=True)
Expand Down Expand Up @@ -51,12 +54,16 @@ def is_table_backup(self):
def save(self, *args, **kwargs):
if not croniter.is_valid(self.schedule):
raise ValueError("Invalid CRON expression")
if self.incremental_schedule and not croniter.is_valid(self.incremental_schedule):
raise ValueError("Invalid CRON expression")
super().save(*args, **kwargs)


class ScheduledBackupRun(models.Model):
id: models.UUIDField = models.UUIDField(primary_key=True)
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
base_backup: models.CharField = models.CharField(max_length=255, null=True)
is_incremental: models.BooleanField = models.BooleanField(default=False)
scheduled_backup: models.ForeignKey = models.ForeignKey(ScheduledBackup, on_delete=models.CASCADE)
started_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
finished_at: models.DateTimeField = models.DateTimeField(null=True)
Expand Down