Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added null check for partitions batches #392

Merged
merged 7 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
{%- set single_partition = [] -%}
{%- for col in row -%}


{%- set column_type = adapter.convert_type(table, loop.index0) -%}
{%- if column_type == 'integer' -%}
{%- set comp_func = '=' -%}
{%- if col is none -%}
{%- set value = 'null' -%}
{%- set comp_func = ' is ' -%}
{%- elif column_type == 'integer' -%}
{%- set value = col | string -%}
{%- elif column_type == 'string' -%}
{%- set value = "'" + col + "'" -%}
Expand All @@ -31,7 +36,7 @@
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
{%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%}
{%- do single_partition.append(partition_key + '=' + value) -%}
{%- do single_partition.append(partition_key + comp_func + value) -%}
{%- endfor -%}

{%- set single_partition_expression = single_partition | join(' and ') -%}
Expand Down
139 changes: 139 additions & 0 deletions tests/functional/adapter/test_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,67 @@
cross join unnest(date_array) as t2(date_column)
"""

test_single_nullable_partition_model_sql = """
with data as (
select
random() as col_1,
row_number() over() as id
from
unnest(sequence(1, 200))
)

select
col_1, id
from data
union all
select random() as col_1, NULL as id
union all
select random() as col_1, NULL as id
"""

test_nullable_partitions_model_sql = """
{{ config(
materialized='table',
format='parquet',
s3_data_naming='table',
partitioned_by=['id', 'date_column']
) }}

with data as (
select
random() as rnd,
row_number() over() as id,
cast(date_column as date) as date_column
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
)

select
rnd,
case when id <= 50 then null else id end as id,
date_column
from data
union all
select
random() as rnd,
NULL as id,
NULL as date_column
union all
select
random() as rnd,
NULL as id,
cast('2023-09-02' as date) as date_column
union all
select
random() as rnd,
40 as id,
NULL as date_column
"""


class TestHiveTablePartitions:
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -125,3 +186,81 @@ def test__check_incremental_run_with_partitions(self, project):
incremental_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert incremental_records_count == 212


class TestHiveNullValuedPartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "hive",
"+materialized": "table",
"+partitioned_by": ["id", "date_column"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_nullable_partitions_model.sql": test_nullable_partitions_model_sql,
}

def test__check_run_with_partitions(self, project):
relation_name = "test_nullable_partitions_model"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"
model_run_result_null_id_count_query = (
f"select count(*) as records from {project.test_schema}.{relation_name} where id is null"
)
model_run_result_null_date_count_query = (
f"select count(*) as records from {project.test_schema}.{relation_name} where date_column is null"
)

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 215

null_id_count_first_run = project.run_sql(model_run_result_null_id_count_query, fetch="all")[0][0]

assert null_id_count_first_run == 52

null_date_count_first_run = project.run_sql(model_run_result_null_date_count_query, fetch="all")[0][0]

assert null_date_count_first_run == 2


class TestHiveSingleNullValuedPartition:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "hive",
"+materialized": "table",
"+partitioned_by": ["id"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_single_nullable_partition_model.sql": test_single_nullable_partition_model_sql,
}

def test__check_run_with_partitions(self, project):
relation_name = "test_single_nullable_partition_model"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 202