diff --git a/dags/__init__.py b/dags/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dags/data_retention_delete_dag.py b/dags/data_retention_delete_dag.py new file mode 100644 index 00000000..2342cecb --- /dev/null +++ b/dags/data_retention_delete_dag.py @@ -0,0 +1,50 @@ +""" +Implements a retention policy by dropping expired partitions + +A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-implementation-of-data-retention-policy/913 + +Prerequisites +------------- +In CrateDB, tables for storing retention policies need to be created once manually. +See the file setup/data_retention_schema.sql in this repository. +""" +from pathlib import Path +import pendulum +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.decorators import dag, task + + +def map_policy(policy): + return { + "table_fqn": policy[0], + "column": policy[1], + "value": policy[2], + } + + +@task +def get_policies(ds=None): + """Retrieve all partitions effected by a policy""" + pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") + sql = Path("include/data_retention_retrieve_delete_policies.sql") + return pg_hook.get_records( + sql=sql.read_text(encoding="utf-8"), + parameters={"day": ds}, + ) + + +@dag( + start_date=pendulum.datetime(2021, 11, 19, tz="UTC"), + schedule="@daily", + catchup=False, +) +def data_retention_delete(): + SQLExecuteQueryOperator.partial( + task_id="delete_partition", + conn_id="cratedb_connection", + sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", + ).expand(params=get_policies().map(map_policy)) + + +data_retention_delete() diff --git a/dags/data_retention_reallocate_dag.py b/dags/data_retention_reallocate_dag.py new file mode 100644 index 00000000..476e4511 --- /dev/null +++ b/dags/data_retention_reallocate_dag.py @@ -0,0 +1,60 @@ +""" +Implements a retention policy by reallocating cold partitions + +A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-building-a-hot-cold-storage-data-retention-policy/934 + +Prerequisites +------------- +- CrateDB 5.2.0 or later +- Tables for storing retention policies need to be created once manually in + CrateDB. See the file setup/data_retention_schema.sql in this repository. +""" +from pathlib import Path +import pendulum +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.decorators import dag, task + + +@task +def get_policies(ds=None): + """Retrieve all partitions effected by a policy""" + pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") + sql = Path("include/data_retention_retrieve_reallocate_policies.sql") + return pg_hook.get_records( + sql=sql.read_text(encoding="utf-8"), + parameters={"day": ds}, + ) + + +def map_policy(policy): + """Map index-based policy to readable dict structure""" + return { + "schema": policy[0], + "table": policy[1], + "table_fqn": policy[2], + "column": policy[3], + "value": policy[4], + "attribute_name": policy[5], + "attribute_value": policy[6], + } + + +@dag( + start_date=pendulum.datetime(2021, 11, 19, tz="UTC"), + schedule="@daily", + catchup=False, + template_searchpath=["include"], +) +def data_retention_reallocate(): + SQLExecuteQueryOperator.partial( + task_id="reallocate_partitions", + conn_id="cratedb_connection", + sql=""" + ALTER TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}}) + SET ("routing.allocation.require.{{params.attribute_name}}" = '{{params.attribute_value}}'); + """, + ).expand(params=get_policies().map(map_policy)) + + +data_retention_reallocate() diff --git a/dags/data_retention_snapshot_dag.py b/dags/data_retention_snapshot_dag.py new file mode 100644 index 00000000..b447ced0 --- /dev/null +++ b/dags/data_retention_snapshot_dag.py @@ -0,0 +1,67 @@ +""" +Implements a retention policy by snapshotting expired partitions to a repository + +A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-building-a-data-retention-policy-using-external-snapshot-repositories/1001 + +Prerequisites +------------- +In CrateDB, tables for storing retention policies need to be created once manually. +See the file setup/data_retention_schema.sql in this repository. +""" +from pathlib import Path +import pendulum +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.decorators import dag, task + + +@task +def get_policies(ds=None): + """Retrieve all partitions effected by a policy""" + pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") + sql = Path("include/data_retention_retrieve_snapshot_policies.sql") + return pg_hook.get_records( + sql=sql.read_text(encoding="utf-8"), parameters={"day": ds} + ) + + +def map_policy(policy): + """Map index-based policy to readable dict structure""" + return { + "schema": policy[0], + "table": policy[1], + "table_fqn": policy[2], + "column": policy[3], + "value": policy[4], + "target_repository_name": policy[5], + } + + +@dag( + start_date=pendulum.datetime(2021, 11, 19, tz="UTC"), + schedule="@daily", + catchup=False, +) +def data_retention_snapshot(): + policies = get_policies().map(map_policy) + + reallocate = SQLExecuteQueryOperator.partial( + task_id="snapshot_partitions", + conn_id="cratedb_connection", + sql=""" + CREATE SNAPSHOT {{params.target_repository_name}}."{{params.schema}}.{{params.table}}-{{params.value}}" + TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}}) + WITH ("wait_for_completion" = true); + """, + ).expand(params=policies) + + delete = SQLExecuteQueryOperator.partial( + task_id="delete_partitions", + conn_id="cratedb_connection", + sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", + ).expand(params=policies) + + reallocate >> delete + + +data_retention_snapshot() diff --git a/include/data_retention_retrieve_delete_policies.sql b/include/data_retention_retrieve_delete_policies.sql new file mode 100644 index 00000000..d3d73d2b --- /dev/null +++ b/include/data_retention_retrieve_delete_policies.sql @@ -0,0 +1,8 @@ +SELECT QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name), + QUOTE_IDENT(r.partition_column), + TRY_CAST(p.values[r.partition_column] AS BIGINT) +FROM information_schema.table_partitions p +JOIN doc.retention_policies r ON p.table_schema = r.table_schema + AND p.table_name = r.table_name + AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL +WHERE r.strategy = 'delete'; diff --git a/include/data_retention_retrieve_reallocate_policies.sql b/include/data_retention_retrieve_reallocate_policies.sql new file mode 100644 index 00000000..08f61bd9 --- /dev/null +++ b/include/data_retention_retrieve_reallocate_policies.sql @@ -0,0 +1,25 @@ +WITH partition_allocations AS ( + SELECT DISTINCT s.schema_name AS table_schema, + s.table_name, + s.partition_ident, + n.attributes + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id +) +SELECT QUOTE_IDENT(p.table_schema), + QUOTE_IDENT(p.table_name), + QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name), + QUOTE_IDENT(r.partition_column), + TRY_CAST(p.values[r.partition_column] AS BIGINT), + reallocation_attribute_name, + reallocation_attribute_value +FROM information_schema.table_partitions p +JOIN doc.retention_policies r ON p.table_schema = r.table_schema + AND p.table_name = r.table_name + AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL +JOIN partition_allocations a ON a.table_schema = p.table_schema + AND a.table_name = p.table_name + AND p.partition_ident = a.partition_ident + AND attributes[r.reallocation_attribute_name] <> r.reallocation_attribute_value +WHERE r.strategy = 'reallocate' +ORDER BY 5 ASC; diff --git a/include/data_retention_retrieve_snapshot_policies.sql b/include/data_retention_retrieve_snapshot_policies.sql new file mode 100644 index 00000000..af289d70 --- /dev/null +++ b/include/data_retention_retrieve_snapshot_policies.sql @@ -0,0 +1,12 @@ +-- intentionally not adding QUOTE_IDENT to the first two columns, as they are used in an already quoted string later on +SELECT p.table_schema, + p.table_name, + QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name), + QUOTE_IDENT(r.partition_column), + TRY_CAST(p.values[r.partition_column] AS BIGINT), + target_repository_name +FROM information_schema.table_partitions p +JOIN doc.retention_policies r ON p.table_schema = r.table_schema + AND p.table_name = r.table_name + AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL +WHERE r.strategy = 'snapshot'; diff --git a/setup/data_retention_schema.sql b/setup/data_retention_schema.sql new file mode 100644 index 00000000..9fab393d --- /dev/null +++ b/setup/data_retention_schema.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS doc.retention_policies ( + "table_schema" TEXT, + "table_name" TEXT, + "partition_column" TEXT NOT NULL, + "retention_period" INTEGER NOT NULL, + "reallocation_attribute_name" TEXT, + "reallocation_attribute_value" TEXT, + "target_repository_name" TEXT, + "strategy" TEXT NOT NULL, + PRIMARY KEY ("table_schema", "table_name", "strategy") +) +CLUSTERED INTO 1 SHARDS;