Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-3375] [Feature] Allow Users to Specify Merge Clauses for Incremental Models #9060

Closed
3 tasks done
dbernett-amplify opened this issue Nov 12, 2023 · 7 comments
Closed
3 tasks done
Labels
enhancement New feature or request incremental Incremental modeling with dbt wontfix Not a bug or out of scope for dbt-core

Comments

@dbernett-amplify
Copy link

dbernett-amplify commented Nov 12, 2023

Is this your first time submitting a feature request?

  • I have read the expectations for open source contributors
  • I have searched the existing issues, and I could not find an existing issue for this feature
  • I am requesting a straightforward extension of existing dbt functionality, rather than a Big Idea better suited to a discussion

Describe the feature

Overview

dbt's existing "merge" strategy is relatively inflexible in that it always states that matched rows should be updated
and unmatched rows should be inserted. Snowflake's merge statement is actually quite a bit more flexible than this. Specifically Snowflake allows you to:

  1. Include multiple when matched / when not matched clauses
  2. Specify "delete" as an option when matched
  3. Include additional and conditions in your clauses (case predicates)

This update would allow the user to set the merge clauses in the config, allowing the user to access the full flexiblity of the Snowflake merge statement.

Implementation

I currently have this working our dbt project. I have it as a separate incremental strategy, but it could also be incorporated into the existing merge strategy with some conditional logic that uses the existing default merge clauses if the user doesn't specify any merge clauses.

Here's an example of what it looks like in my config block:

    {{
        config(
            materialized='incremental'
            , unique_key = 'result_sid'
            , incremental_strategy = 'specify_merge_clauses'
            , merge_clauses = [
                {
                    "when":"matched"
                    , "and": "not DBT_INTERNAL_SOURCE.is_valid"
                    , "then":"delete"
                }
                , {
                    "when":"matched"
                    , "and": "DBT_INTERNAL_SOURCE.update_sequence > DBT_INTERNAL_DEST.update_sequence"
                    , "then":"update"
                }
                , {
                    "when":"not matched"
                    , "then":"insert"
                }
            ]
        )
    }}

A few things to note:

  1. merge_clauses is a list of dictionaries. So the format is [{dict 1}, {dict 2}, ...].
  2. The user can specify as many or as few merge clauses as they like (minimum of 1).
  3. Each merge clause in merge_clauses is specified as a dictionary with three keys as follows:
    {
    "when": required, one of "matched" or "not matched"
    "and": optional, an additional condition that must be satisfied in order for the "then" behavior to occur
    "then": required, what the user wants to happen when the above are satisfied, one of "update", "insert", or "delete".
    }
  4. The macro automatically handles what comes after "update" or "insert" in the merge statement. The user does not need to specify anything other than the single word "update" or "insert" (or "delete" but delete never has anything after it in the merge statement anyway).

And then my get_specify_merge_clauses_sql macro is identical to the existing get_merge_sql macro, except that I have replaced this code:

    {% if unique_key %}
    when matched then update set
        {% for column_name in update_columns -%}
            {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
    {% endif %}

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

with this code


    {% for merge_clause in merge_clauses %}
        when {{merge_clause.when}} {% if 'and' in merge_clause -%} and {%- endif %} {{merge_clause.and}} then {{merge_clause.then}}
        {% if merge_clause.then|lower == 'update' -%} 
            set
            {% for column_name in update_columns -%}
                {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
                {%- if not loop.last %}, {%- endif %}
            {%- endfor %}
        {%- endif -%}
        {%- if merge_clause.then|lower == 'insert' -%}
            ({{ dest_cols_csv }})
            values
            ({{ dest_cols_csv }})
        {%- endif %}
    {% endfor %}

and I have also added {%- set merge_clauses = config.get('merge_clauses')%} at the top along with the other set statements.

Describe alternatives you've considered

No response

Who will this benefit?

The use case that I needed this for was that I wanted to:

  1. Delete the existing record if the new record matched on id and the new record had is_valid = false
  2. Update the existing record if the new record matched on id and had a later "update_sequence"
  3. Do nothing if the new record matched on id but did not have a later "update_sequence"
  4. Insert if the new record did not match

This is just one particular use case, but I imagine that there are lots of use cases where a user might want to have more flexibility than what the existing merge strategy provides. For example, it is also possible to use this to insert if the new record does not match and do nothing otherwise, as I outlined in #9056. However, I think that one is so common (and really should be the default when your only source of duplicates is your lookback period) that I think it still deserves a separate option that allows the end user to specify that behavior without having to type out the full merge clause dictionary as is done here.

Are you interested in contributing this feature?

Sure

Anything else?

I work in Snowflake. Not sure how this works on other platforms.

There's one small bit of flexibility that this doesn't allow which is that, if the user has multiple when matched ... then update clauses (with different and conditions) it doesn't let the user specify different columns to update for those different clauses, but I feel like that would be a relatively rare use case.

@dbernett-amplify dbernett-amplify added enhancement New feature or request triage labels Nov 12, 2023
@github-actions github-actions bot changed the title [Feature] Allow Users to Specify Merge Clauses for Incremental Models [CT-3375] [Feature] Allow Users to Specify Merge Clauses for Incremental Models Nov 12, 2023
@dbernett-amplify
Copy link
Author

dbernett-amplify commented Nov 12, 2023

Here's some code that would integrate this feature request, my feature request in #9056, and the traditional merge behavior into a single macro. (Currently calling it custom_merge, but it could just become the merge strategy).

{% macro get_custom_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
    {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set merge_update_columns = config.get('merge_update_columns') -%}
    {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
    {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
    {%- set sql_header = config.get('sql_header', none) -%}
    {%- set merge_clauses = config.get('merge_clauses') -%}
    {%- set update_matches = true if config.get('update_matches') is none else config.get('update_matches') -%}

    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{"(" ~ predicates | join(") and (") ~ ")"}}

    {% if merge_clauses is not none %}

        {% for merge_clause in merge_clauses %}
            when {{merge_clause.when}} {% if 'and' in merge_clause -%} and {%- endif %} {{merge_clause.and}} then {{merge_clause.then}}
            {% if merge_clause.then|lower == 'update' -%} 
                set
                {% for column_name in update_columns -%}
                    {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
                    {%- if not loop.last %}, {%- endif %}
                {%- endfor %}
            {%- endif -%}
            {%- if merge_clause.then|lower == 'insert' -%}
                ({{ dest_cols_csv }})
                values
                ({{ dest_cols_csv }})
            {%- endif %}
        {% endfor %}

    {% else %}

        {% if unique_key and update_matches %}
        when matched then update set
            {% for column_name in update_columns -%}
                {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
                {%- if not loop.last %}, {%- endif %}
            {%- endfor %}
        {% endif %}

        when not matched then insert
            ({{ dest_cols_csv }})
        values
            ({{ dest_cols_csv }})

    {% endif %}    

{% endmacro %}

@dbernett-amplify
Copy link
Author

@dbeatty10 Thanks for the prompt reply and the links! I've read through them and my takeaways are:

  1. Lots of folks want to be able to customize incremental strategies :-)
  2. Y'all don't want to support 57 different niche incremental strategies (makes lots of sense!)
  3. You want a solution that is transparent so that anyone looking at the model can understand exactly what it's doing.

FWIW, I think the strategy I've proposed here could be a great solution to all of the above. It handles every specific case that I've seen people asking for in all of the other linked issues. It also handles as some pretty complicated merge logic that's impossible to handle with the existing merge strategy, no matter how cleverly you write the model SQL itself. (The limitation is that the existing merge strategy considers at most two cases when matched and when not matched, and sometimes you need to consider 3 or more cases). It's also very straightforward and easy to tell from the config block exactly what is going to happen (see my example config block above). Finally, adding merge_clauses seems to be in line with some of the other customization options you've added recently like incremental predicates.

Anyway, doesn't matter much to me either way, since I've already implemented this as a working custom strategy in our dbt project (see my comment above for the code). But I think it'd be cool to bring this functionality to the broader dbt user base. And then maybe y'all would stop having to respond to so many feature requests about custom incremental strategies. 😆😉😄

@dbeatty10
Copy link
Contributor

Two other related comments:

@dbernett-amplify
Copy link
Author

Thanks!

@tmastny
Copy link
Contributor

tmastny commented Nov 15, 2023

I like this suggestion! I was about to suggest something similar, since I maintain a custom incremental strategy, and I think it's a little clunky.

Right now I have to copy and rename this entire macro and slightly adjust the when matched clauses like the OP.

It would be nice if I could just write that part as a macro (with maybe some new helpers to make it easier)

{% macro custom_incremental_strategy(target, source, unique_key, dest_columns, incremental_predicates) -%}
    when matched and <condition> = TRUE then delete

    when matched then update set {{ set_columns(...) }}
    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

{% endmacro %}

and call that in the config(..., incremental_strategy='custom_incremental_strategy').

Do do this, we could hook in the custom merge statements after all the arguments and data are processed (rather than before as it is now).

{% macro default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
    {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set merge_update_columns = config.get('merge_update_columns') -%}
    {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
    {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
    {%- set sql_header = config.get('sql_header', none) -%}

    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    {% if config.get('incremental_strategy') %}
    {{ call_custom_incremental_strategy(config.get('incremental_strategy'), args....) }}
    {% else %}
    {{ default_strategy() }}
    {% endif %}

{% endmacro %}

I slightly prefer this approach because I can write a normal jinja-based SQL macro, with nice syntax highlighting, formatting, and autocomplete in dbt Cloud and VSCode. I don't have those same features when writing lists of dictionaries of strings.

That's my personal preference, but I'd also be curious to hear if dbt labs have a general philosophy on what should be SQL with jinja and what should be stringified SQL.

Also, this could be done in addition to OP's suggestion. If the call to a custom incremental strategy after the arguments are parsed, you could add in the merged_clauses macro and read them from the list of dictionaries.

@dbeatty10
Copy link
Contributor

Thanks for opening this @dbernett-amplify ! 🏆

  1. Lots of folks want to be able to customize incremental strategies :-)
  2. Y'all don't want to support 57 different niche incremental strategies (makes lots of sense!)
  3. You want a solution that is transparent so that anyone looking at the model can understand exactly what it's doing.

Well said 👍

Anyway, doesn't matter much to me either way, since I've already implemented this as a working custom strategy in our dbt project (see my comment above for the code). But I think it'd be cool to bring this functionality to the broader dbt user base.

Similar to #9312 (comment), we think that custom incremental strategies is the way to go here (like you've already done) rather than make changes to default__get_merge_sql.

Then folks can share their custom strategies via the dbt package ecosystem per these instructions.

Per @tmastny's comments, we've created #9223 to consider ways to make it easier to customize incremental materializations that utilize a MERGE statement. In particular, the sketch in #9223 (comment) is one implementation idea.

@dbeatty10 dbeatty10 closed this as not planned Won't fix, can't repro, duplicate, stale Jan 10, 2024
@dbeatty10 dbeatty10 added wontfix Not a bug or out of scope for dbt-core and removed triage labels Jan 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request incremental Incremental modeling with dbt wontfix Not a bug or out of scope for dbt-core
Projects
None yet
Development

No branches or pull requests

3 participants