Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added null check for partitions batches #392

Merged
merged 7 commits into from
Sep 6, 2023
Merged

Conversation

lukas-gust
Copy link
Contributor

@lukas-gust lukas-gust commented Sep 1, 2023

Description

This is my first contribution, thank you in advance.

There is an issue when the batch partition values take on a null value and are of certain types. When filtering source rows based on partition if an integer partition value show null the current behavior is to convert that to a string in the jinja macro resulting in partition_key=None and for normal values partition_key=1234. The former is incorrect syntax and behavior.

The aim of this is to fix that issue by evaluating if the partition value is none and setting the filter comparison function to something like partition_key is null.

Models used to test - Optional

{# This table is merged on load (falls back to append currently). Needs deduplication. #}
{# This is an iceberg table type from the project config #}

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'merge',
    partitioned_by = ['status', 'organization_id', 'hour(created_at)'],
    unique_key = ['id'],
) }}

with using_tickets as (

    select
        id,
        "url",
        tags,
        via__channel as via_channel,
        json_parse("via__source__from") as via_source_from,
        via__source__to as via_source_to,
        via__source__rel as via_source_rel,
        cast(created_at as timestamp(6)) as created_at,
        cast(updated_at as timestamp(6)) as updated_at,
        "type",
        "subject",
        raw_subject,
        "description",
        priority,
        "status",
        requester_id,
        submitter_id,
        assignee_id,
        group_id,
        has_incidents,
        is_public,
        custom_status_id,
        ticket_form_id,
        brand_id,
        allow_channelback,
        allow_attachments,
        from_messaging_channel,
        cast(generated_timestamp as timestamp(6)) as generated_timestamp,
        channel,
        property,
        ticket_type,
        organization_id,
        opt_out_reason,
        internal_escalation,
        rsa_issue_resolution,
        escalation_type,
        origin,
        "_dlt_id" as dlt_id,
        "_dlt_load_id" as dlt_load_id
    from
        {{ source(
            'bronze_zendesk_support',
            'tickets'
        ) }}

    {% if is_incremental() %}
    where updated_at >= coalesce((select max(updated_at) from {{ this }}), date('1900-01-01'))
    {% endif %}
),

using_extracted_json as (
    select
        {# Some json extraction logic #}
    from
        using_tickets
),

updates as (
    select
        *
        row_number() over (partition by id order by updated_at desc) as row_number
    from
        using_extracted_json
    
    {% if is_incremental() %}
    where
        id in (select id from {{ this }})
    {% endif %}
),

inserts as (
    select
        *
        row_number() over (partition by id order by updated_at desc) as row_number
    from
        using_extracted_json
    
    {% if is_incremental() %}
    where
        id not in (select id from {{ this }})
    {% endif %}
)

select 
    *
from
(
    select * from updates 
    union 
    select * from inserts
) as results
where row_number = 1

Checklist

  • You followed contributing section
  • You kept your Pull Request small and focused on a single feature or bug fix.
  • You added unit testing when necessary
  • You added functional testing when necessary

@nicor88 nicor88 changed the title Added null check for partitions batches fix: Added null check for partitions batches Sep 1, 2023
@nicor88 nicor88 added the enable-functional-tests Label to trigger functional testing label Sep 1, 2023
@nicor88
Copy link
Contributor

nicor88 commented Sep 1, 2023

@lukas-gust looks good - thanks for the fix.
We should consider to cover this edge case with some functional testing: https://github.com/dbt-athena/dbt-athena/blob/main/tests/functional/adapter/test_partitions.py - we could add something here. WDYT? - we can create for example a dataset with more than 100 dates and then append some null values.

@lukas-gust
Copy link
Contributor Author

@lukas-gust looks good - thanks for the fix.
We should consider to cover this edge case with some functional testing: https://github.com/dbt-athena/dbt-athena/blob/main/tests/functional/adapter/test_partitions.py - we could add something here. WDYT? - we can create for example a dataset with more than 100 dates and then append some null values.

@nicor88
I agree. Seems easy enough. Let me get familiar with the tests in there.

I think it's worth conducting the edge case test for all the data types, integer, string, date, timestamp. To ensure that it is always formed as is null on the where clause.

@lukas-gust
Copy link
Contributor Author

The new test I added fails on the record count. I need to step away for the time being.

@nicor88
Copy link
Contributor

nicor88 commented Sep 2, 2023

@lukas-gust it's quite weird, the record count is 3 instead of 212...I will have a deep look into it. Maybe we are hitting another edge case, but I can't figure it out at the moment.

@nicor88
Copy link
Contributor

nicor88 commented Sep 2, 2023

@lukas-gust I modified a bit the test case to cover a situation where we have only 1 partition column (int) that contains NULL values - should pass.

Anyhow I created another test case:

with data as (
    select
        cast(random() * 100 as int) col_1,
        row_number() over() as id
    from
        unnest(sequence(1, 200))
)

select
    'test case sequence' as record_type,
    id,
    case when col_1 < 50 then null else col_1 end as col_1
from data
union all
select
    'test case set 1' as record_type,
    NULL as id,
    NULL as col_1
union all
select
    'test case set 2' as record_type,
    NULL as id,
    23 as col_1
union all
select
    'test case set 3' as record_type,
    50 as id,
    NULL as col_1

somehow that does work - I expect 202 columns - but I get only a part of those.

The issue seems to be due to the fact that with NULLs the where generated statements doesn't cover all cases.
That means that your proposed change works doesn't work in all the scenarions.

EDIT: I found the issue! - that is also the reason why your original test case didn't work.
If we use random() the function get called each time, therefore we have a not deterministic behavior because random get called each time we pick the partition values.

@nicor88
Copy link
Contributor

nicor88 commented Sep 2, 2023

I just tested locally - let's add another test case

{{ config(
    materialized='table',
    format='parquet',
    s3_data_naming='table',
    partitioned_by=['id', 'date_column']
) }}

with data as (
    select
    random() as rnd,
    row_number() over() as id,
    cast(date_column as date) as date_column
from (
    values (
        sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
    )
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
)

select
    rnd,
    case when id  <= 50 then null else id end as id,
    date_column
from data
union all
select
    random() as rnd,
    NULL as id,
    NULL as date_column
union all
select
    random() as rnd,
    NULL as id,
    cast('2023-09-02' as date) as date_column
union all
select
    random() as rnd,
    40 as id,
    NULL as date_column

where we partition by id and date_column - should work.

We expect 215 records.

Also as the count of null is deterministic we can say for example that we expect 52 records with NULL id.

After you add this we should merge and release.

@lukas-gust
Copy link
Contributor Author

Thank you for the help! That's interesting, I mean it makes sense, but I'm trying to think if that's an issue. So each time we iterate we generate a new set of random data so we can't know how many records there were. Is this side effect of the tests or would this occurr naturally of my query was structured the way I had it?

@nicor88
Copy link
Contributor

nicor88 commented Sep 2, 2023

The issue is that random() gets evaluate on each query where it's called.

Imagine that in your dataset gives id=345 (generate by random) - but when we do select distinct id from (....sql query with the dataset) -> random() will return another value that is might not 345, that create a non deterministic behavior - that is different in real case datasets.

It's an issue mostly due to how random works - nothing related to partition handling implementation, it shouldn't occur in real datasets - except if you use random as partition.

to understand a bit more how random works try this:

with data as (
select random() as id
)

select id from data
union all
select id from data

you will get 2 different values of id - even if the id it's definined once in in data CTE...

@lukas-gust
Copy link
Contributor Author

That makes sense. Wouldn't it be the same situation for something like current_timestamp? Basically wondering if we ought to stage a non-partitioned temp table to read from instead. That way this isn't possible. Or is it a desired behavior. Thoughts? Glad we were able to work through the test.

@nicor88
Copy link
Contributor

nicor88 commented Sep 2, 2023

I believe that persisting the SQL of the model to a not-partitioned table is indeed better - not sure why @svdimchenko didn't go for that - but we can certainly add it. It will make the SQL queries that are run in case we hit partition limitations easier to debug - but that can be done in another PR.

@lukas-gust would you mind to add this #392 (comment) in another test case? to make the changes even more complete.

@lukas-gust
Copy link
Contributor Author

100% agree. @nicor88 Yes I will work on it later tonight.

@svdimchenko
Copy link
Contributor

svdimchenko commented Sep 2, 2023

I believe that persisting the SQL of the model to a not-partitioned table is indeed better - not sure why @svdimchenko didn't go for that

Hi there! @nicor88 I thought once we create not partitioned tmp table we'll need to full scan this tmp table on every insert iteration into target relation. This approach may lead to costs growth I guess. But we can discuss that of course.
However not I see that we'll run model's sql consecutively, which may be even more costly and consume more resources. My bad and it will be great enhancement to rework tmp relation creation.

@nicor88
Copy link
Contributor

nicor88 commented Sep 4, 2023

@svdimchenko

thought once we create not partitioned tmp table we'll need to full scan this tmp table on every insert iteration into target relation. This approach may lead to costs growth I guess. But we can discuss that of course.
However not I see that we'll run model's sql consecutively, which may be even more costly and consume more resources. My bad and it will be great enhancement to rework tmp relation creation.

Few reasons why to use a tmp table (using parquet):

  • we force the usage of a columnar format - (imagine cases where the user use JSON) and we run many queries for the insert statements in this not optimal source table
  • the distinct queries to get partitions will be easier to read - select my_partition_column from my_tmp_table
  • the insert queries will be also easier to read - insert into my_final_table(x, y, z) select x,y,z from my_tmp_table where z='a' and z='c'
  • in case something goes wrong we have my_tmp_table that we could inspect

Notes

  • even using a tmp table as parquet we cannot avoid full scans on it - for that we need to use partitions
  • I don't think that using a tmp table will create much more costs - there will be only 1 extra first query where we persist the dataset, but the consecutive queries on the tmp could be less costly in term of data scans as we force usage of parquet - and scan only on columns that we select.
  • persisint the sql query in a tmp table will improve scenarios where random or current timestamp functions are used, creating a more deterministic behavior

Happy to discuss more about it :)

@nicor88 nicor88 self-requested a review September 5, 2023 19:38
@nicor88
Copy link
Contributor

nicor88 commented Sep 5, 2023

@svdimchenko do you want to look a this as well?

@nicor88
Copy link
Contributor

nicor88 commented Sep 6, 2023

great work @lukas-gust, and nice first contribution 💯

@nicor88 nicor88 merged commit a9a8696 into dbt-labs:main Sep 6, 2023
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enable-functional-tests Label to trigger functional testing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants