Skip to content

Commit

Permalink
Add "data retention" implementation for CrateDB
Browse files Browse the repository at this point in the history
  • Loading branch information
hammerhead authored and amotl committed Jun 27, 2023
1 parent 871810e commit 7d296b4
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 0 deletions.
Empty file added dags/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions dags/data_retention_delete_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Implements a retention policy by dropping expired partitions
A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-implementation-of-data-retention-policy/913
Prerequisites
-------------
In CrateDB, tables for storing retention policies need to be created once manually.
See the file setup/data_retention_schema.sql in this repository.
"""
from pathlib import Path
import pendulum
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import dag, task


def map_policy(policy):
return {
"table_fqn": policy[0],
"column": policy[1],
"value": policy[2],
}


@task
def get_policies(ds=None):
"""Retrieve all partitions effected by a policy"""
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
sql = Path("include/data_retention_retrieve_delete_policies.sql")
return pg_hook.get_records(
sql=sql.read_text(encoding="utf-8"),
parameters={"day": ds},
)


@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
)
def data_retention_delete():
SQLExecuteQueryOperator.partial(
task_id="delete_partition",
conn_id="cratedb_connection",
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
).expand(params=get_policies().map(map_policy))


data_retention_delete()
60 changes: 60 additions & 0 deletions dags/data_retention_reallocate_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Implements a retention policy by reallocating cold partitions
A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-building-a-hot-cold-storage-data-retention-policy/934
Prerequisites
-------------
- CrateDB 5.2.0 or later
- Tables for storing retention policies need to be created once manually in
CrateDB. See the file setup/data_retention_schema.sql in this repository.
"""
from pathlib import Path
import pendulum
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import dag, task


@task
def get_policies(ds=None):
"""Retrieve all partitions effected by a policy"""
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
sql = Path("include/data_retention_retrieve_reallocate_policies.sql")
return pg_hook.get_records(
sql=sql.read_text(encoding="utf-8"),
parameters={"day": ds},
)


def map_policy(policy):
"""Map index-based policy to readable dict structure"""
return {
"schema": policy[0],
"table": policy[1],
"table_fqn": policy[2],
"column": policy[3],
"value": policy[4],
"attribute_name": policy[5],
"attribute_value": policy[6],
}


@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
template_searchpath=["include"],
)
def data_retention_reallocate():
SQLExecuteQueryOperator.partial(
task_id="reallocate_partitions",
conn_id="cratedb_connection",
sql="""
ALTER TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}})
SET ("routing.allocation.require.{{params.attribute_name}}" = '{{params.attribute_value}}');
""",
).expand(params=get_policies().map(map_policy))


data_retention_reallocate()
67 changes: 67 additions & 0 deletions dags/data_retention_snapshot_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Implements a retention policy by snapshotting expired partitions to a repository
A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-building-a-data-retention-policy-using-external-snapshot-repositories/1001
Prerequisites
-------------
In CrateDB, tables for storing retention policies need to be created once manually.
See the file setup/data_retention_schema.sql in this repository.
"""
from pathlib import Path
import pendulum
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import dag, task


@task
def get_policies(ds=None):
"""Retrieve all partitions effected by a policy"""
pg_hook = PostgresHook(postgres_conn_id="cratedb_connection")
sql = Path("include/data_retention_retrieve_snapshot_policies.sql")
return pg_hook.get_records(
sql=sql.read_text(encoding="utf-8"), parameters={"day": ds}
)


def map_policy(policy):
"""Map index-based policy to readable dict structure"""
return {
"schema": policy[0],
"table": policy[1],
"table_fqn": policy[2],
"column": policy[3],
"value": policy[4],
"target_repository_name": policy[5],
}


@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule="@daily",
catchup=False,
)
def data_retention_snapshot():
policies = get_policies().map(map_policy)

reallocate = SQLExecuteQueryOperator.partial(
task_id="snapshot_partitions",
conn_id="cratedb_connection",
sql="""
CREATE SNAPSHOT {{params.target_repository_name}}."{{params.schema}}.{{params.table}}-{{params.value}}"
TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}})
WITH ("wait_for_completion" = true);
""",
).expand(params=policies)

delete = SQLExecuteQueryOperator.partial(
task_id="delete_partitions",
conn_id="cratedb_connection",
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
).expand(params=policies)

reallocate >> delete


data_retention_snapshot()
8 changes: 8 additions & 0 deletions include/data_retention_retrieve_delete_policies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
QUOTE_IDENT(r.partition_column),
TRY_CAST(p.values[r.partition_column] AS BIGINT)
FROM information_schema.table_partitions p
JOIN doc.retention_policies r ON p.table_schema = r.table_schema
AND p.table_name = r.table_name
AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL
WHERE r.strategy = 'delete';
25 changes: 25 additions & 0 deletions include/data_retention_retrieve_reallocate_policies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
WITH partition_allocations AS (
SELECT DISTINCT s.schema_name AS table_schema,
s.table_name,
s.partition_ident,
n.attributes
FROM sys.shards s
JOIN sys.nodes n ON s.node['id'] = n.id
)
SELECT QUOTE_IDENT(p.table_schema),
QUOTE_IDENT(p.table_name),
QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
QUOTE_IDENT(r.partition_column),
TRY_CAST(p.values[r.partition_column] AS BIGINT),
reallocation_attribute_name,
reallocation_attribute_value
FROM information_schema.table_partitions p
JOIN doc.retention_policies r ON p.table_schema = r.table_schema
AND p.table_name = r.table_name
AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL
JOIN partition_allocations a ON a.table_schema = p.table_schema
AND a.table_name = p.table_name
AND p.partition_ident = a.partition_ident
AND attributes[r.reallocation_attribute_name] <> r.reallocation_attribute_value
WHERE r.strategy = 'reallocate'
ORDER BY 5 ASC;
12 changes: 12 additions & 0 deletions include/data_retention_retrieve_snapshot_policies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- intentionally not adding QUOTE_IDENT to the first two columns, as they are used in an already quoted string later on
SELECT p.table_schema,
p.table_name,
QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name),
QUOTE_IDENT(r.partition_column),
TRY_CAST(p.values[r.partition_column] AS BIGINT),
target_repository_name
FROM information_schema.table_partitions p
JOIN doc.retention_policies r ON p.table_schema = r.table_schema
AND p.table_name = r.table_name
AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL
WHERE r.strategy = 'snapshot';
12 changes: 12 additions & 0 deletions setup/data_retention_schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS doc.retention_policies (
"table_schema" TEXT,
"table_name" TEXT,
"partition_column" TEXT NOT NULL,
"retention_period" INTEGER NOT NULL,
"reallocation_attribute_name" TEXT,
"reallocation_attribute_value" TEXT,
"target_repository_name" TEXT,
"strategy" TEXT NOT NULL,
PRIMARY KEY ("table_schema", "table_name", "strategy")
)
CLUSTERED INTO 1 SHARDS;

0 comments on commit 7d296b4

Please sign in to comment.