Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify run_sql tasks across DAGs #4883

Open
AetherUnbound opened this issue Sep 6, 2024 · 6 comments
Open

Unify run_sql tasks across DAGs #4883

AetherUnbound opened this issue Sep 6, 2024 · 6 comments
Assignees
Labels
💻 aspect: code Concerns the software code in the repository 🧰 goal: internal improvement Improvement that benefits maintainers, not users good first issue New-contributor friendly help wanted Open to participation from the community 🟩 priority: low Low priority and doesn't need to be rushed 🧱 stack: catalog Related to the catalog and Airflow DAGs 🔧 tech: airflow Involves Apache Airflow

Comments

@AetherUnbound
Copy link
Collaborator

Description

We have several unique implementations of a common run_sql task across our DAGs. This task was pulled out into common/sql.py in #4836:

@task
def run_sql(
sql_template: str,
postgres_conn_id: str = POSTGRES_CONN_ID,
task: AbstractOperator = None,
timeout: float = None,
handler: callable = RETURN_ROW_COUNT,
**kwargs,
):
"""
Run an SQL query with the given template and parameters. Any kwargs handed
into the function outside of those defined will be passed into the template
`.format` call.
"""
query = sql_template.format(**kwargs)
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=(
timeout if timeout else PostgresHook.get_execution_timeout(task)
),
)
return postgres.run(query, handler=handler)

We have several DAGs which can now use this run_sql function directly, rather than re-implementing their own:

  • delete_records:

def run_sql(
sql_template: str,
postgres_conn_id: str = POSTGRES_CONN_ID,
task: AbstractOperator = None,
timeout: timedelta = None,
handler: callable = RETURN_ROW_COUNT,
**kwargs,
):
query = sql_template.format(**kwargs)
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=(
timeout if timeout else PostgresHook.get_execution_timeout(task)
),
)
return postgres.run(query, handler=handler)

  • batched_update (this one may require some additional work on either the base function or the call to accommodate the dry_run variable):

def run_sql(
dry_run: bool,
sql_template: str,
query_id: str,
log_sql: bool = True,
postgres_conn_id: str = POSTGRES_CONN_ID,
task: AbstractOperator = None,
timeout: timedelta = None,
handler: callable = RETURN_ROW_COUNT,
**kwargs,
):
query = sql_template.format(
temp_table_name=constants.TEMP_TABLE_NAME.format(query_id=query_id), **kwargs
)
if dry_run:
logger.info(
"This is a dry run: no SQL will be executed. To perform the updates,"
" rerun the DAG with the conf option `'dry_run': false`."
)
logger.info(query)
return 0
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=(
timeout if timeout else PostgresHook.get_execution_timeout(task)
),
log_sql=log_sql,
)
return postgres.run(query, handler=handler)

  • add_license_url:

def run_sql(
sql: str,
log_sql: bool = True,
method: str = "get_records",
handler: callable = None,
autocommit: bool = False,
postgres_conn_id: str = POSTGRES_CONN_ID,
dag_task: AbstractOperator = None,
):
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(dag_task),
log_sql=log_sql,
)
if method == "get_records":
return postgres.get_records(sql)
elif method == "get_first":
return postgres.get_first(sql)
else:
return postgres.run(sql, autocommit=autocommit, handler=handler)

Additional context

This also came up in the discussion of #4572

@AetherUnbound AetherUnbound added good first issue New-contributor friendly help wanted Open to participation from the community 💻 aspect: code Concerns the software code in the repository 🔧 tech: airflow Involves Apache Airflow 🟩 priority: low Low priority and doesn't need to be rushed 🧰 goal: internal improvement Improvement that benefits maintainers, not users 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Sep 6, 2024
@soysaucewaso
Copy link

Can I try this? I've never contributed to this project before.

@AetherUnbound
Copy link
Collaborator Author

Hi @soysaucewaso, thank you for your interest in contributing to Openverse! I've assigned this issue to you. If you have any questions, you may leave them here.

Please check out our welcome and general setup documentation pages for getting started with setting up your local environment.

@soysaucewaso
Copy link

soysaucewaso commented Sep 22, 2024

It looks as though airflow doesn't supports tasks calling other tasks. Just to clarify, should I change the DAG logic so it first runs the 1st task and then the 2nd? Having run_sql as a helper method abstracted away the run_sql call from the dag to the task which makes the dags logic less complex.
Maybe we could have a common run sql helper method in common which implements all the logic, and then a run_sql task implemented like this so run_sql can also be a task.

def run_sql_helper(args):
    logic ...
    return hook
@task
def run_sql(args):
    return run_sql_helper(args)

And then tasks could call run_sql_helper

What do you think?

@AetherUnbound
Copy link
Collaborator Author

@soysaucewaso in what cases would the task be called from another task? Ideally we'd want to unify all of the cases listed into a single run_sql task which could be imported and called in lieu of the current run_sql functions.

@soysaucewaso
Copy link

Thanks for the response. Currently the alternative run_sql functions are used by various tasks such as create_deleted_records and delete_records_from_media_table.

Here's the create_deleted_records task:

@task
@setup_deleted_db_columns_for_media_type
@setup_db_columns_for_media_type
def create_deleted_records(
    *,
    select_query: str,
    deleted_reason: str,
    media_type: str,
    db_columns: list[Column] = None,
    deleted_db_columns: list[Column] = None,
    task: AbstractOperator = None,
    postgres_conn_id: str = POSTGRES_CONN_ID,
):
    """
    Select records from the given media table using the select query, and then for each
    record create a corresponding record in the Deleted Media table.
    """

    destination_cols = ", ".join([col.db_name for col in deleted_db_columns])

    # To build the source columns, we first list all columns in the main media table
    source_cols = ", ".join([col.db_name for col in db_columns])

    # Then add the deleted-media specific columns.
    # `deleted_on` is set to its insert value to get the current timestamp:
    source_cols += f", {DELETED_ON.get_insert_value()}"
    # `deleted_reason` is set to the given string
    source_cols += f", '{deleted_reason}'"

    # The provider, foreign_id pair uniquely identifies a record. When trying to
    # add a record to the deleted_media table, if the record's (provider, foreign_id)
    # pair is already present in the table, no additional record will be added and the
    # existing record in the deleted_media table will not be updated. This preserves the
    # record exactly as it was when it was first deleted.
    unique_cols = f"({PROVIDER.db_name}, md5({FOREIGN_ID.db_name}))"

    return run_sql(
        sql_template=constants.CREATE_RECORDS_QUERY,
        postgres_conn_id=postgres_conn_id,
        task=task,
        destination_table=f"deleted_{media_type}",
        destination_cols=destination_cols,
        source_table=media_type,
        source_cols=source_cols,
        select_query=select_query,
        unique_cols=unique_cols,
    )

Are you suggesting we delete tasks like create_deleted_records which call run_sql and instead implement this logic in the DAGs?

Maybe we should keep these tasks, but move them so they return the kwargs for run_sql and then make the DAGs call both tasks?

@soysaucewaso
Copy link

@AetherUnbound I forgot to tag you in my previous question

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🧰 goal: internal improvement Improvement that benefits maintainers, not users good first issue New-contributor friendly help wanted Open to participation from the community 🟩 priority: low Low priority and doesn't need to be rushed 🧱 stack: catalog Related to the catalog and Airflow DAGs 🔧 tech: airflow Involves Apache Airflow
Projects
Status: 📅 To Do
Development

No branches or pull requests

2 participants