From eebbb65161e643468117b51e6838595f1daaff0a Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 18:22:36 +0200 Subject: [PATCH 1/8] add sanity check to prevent missing config setup --- tests/load/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/load/utils.py b/tests/load/utils.py index 110c2b433d..2f20e91e69 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -226,6 +226,10 @@ def destinations_configs( DestinationTestConfiguration(destination="synapse", supports_dbt=False), ] + # sanity check that when selecting default destinations, one of each sql destination is actually + # provided + assert set(SQL_DESTINATIONS) == {d.destination for d in destination_configs} + if default_vector_configs: # for now only weaviate destination_configs += [DestinationTestConfiguration(destination="weaviate")] From 054b57718d7fa4e9fc6ac9cd2c2c65bf0dce68e7 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 19:06:53 +0200 Subject: [PATCH 2/8] fall back to append for merge without merge keys --- dlt/destinations/sql_jobs.py | 109 ++++++++++-------- .../load/pipeline/test_filesystem_pipeline.py | 81 ++++--------- tests/load/pipeline/test_merge_disposition.py | 13 ++- tests/pipeline/utils.py | 57 ++++----- 4 files changed, 114 insertions(+), 146 deletions(-) diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index eadedb742e..6b683ccaf9 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -146,7 +146,10 @@ def generate_sql( class SqlMergeJob(SqlBaseJob): - """Generates a list of sql statements that merge the data from staging dataset into destination dataset.""" + """ + Generates a list of sql statements that merge the data from staging dataset into destination dataset. + If no merge keys are discovered, falls back to append. + """ failed_text: str = "Tried to generate a merge sql job for the following tables:" @@ -383,68 +386,74 @@ def gen_merge_sql( get_columns_names_with_prop(root_table, "merge_key"), ) ) - key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys) - unique_column: str = None - root_key_column: str = None + # if we do not have any merge keys to select from, we will fall back to a staged append, i.E. + # just skip the delete part + append_fallback = (len(primary_keys) + len(merge_keys)) == 0 - if len(table_chain) == 1: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=True - ) - # if no child tables, just delete data from top table - for clause in key_table_clauses: - sql.append(f"DELETE {clause};") - else: - key_table_clauses = cls.gen_key_table_clauses( - root_table_name, staging_root_table_name, key_clauses, for_delete=False - ) - # use unique hint to create temp table with all identifiers to delete - unique_columns = get_columns_names_with_prop(root_table, "unique") - if not unique_columns: - raise MergeDispositionException( - sql_client.fully_qualified_dataset_name(), - staging_root_table_name, - [t["name"] for t in table_chain], - f"There is no unique column (ie _dlt_id) in top table {root_table['name']} so" - " it is not possible to link child tables to it.", - ) - # get first unique column - unique_column = escape_id(unique_columns[0]) - # create temp table with unique identifier - create_delete_temp_table_sql, delete_temp_table_name = cls.gen_delete_temp_table_sql( - unique_column, key_table_clauses, sql_client - ) - sql.extend(create_delete_temp_table_sql) + if not append_fallback: + key_clauses = cls._gen_key_table_clauses(primary_keys, merge_keys) - # delete from child tables first. This is important for databricks which does not support temporary tables, - # but uses temporary views instead - for table in table_chain[1:]: - table_name = sql_client.make_qualified_table_name(table["name"]) - root_key_columns = get_columns_names_with_prop(table, "root_key") - if not root_key_columns: + unique_column: str = None + root_key_column: str = None + + if len(table_chain) == 1: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=True + ) + # if no child tables, just delete data from top table + for clause in key_table_clauses: + sql.append(f"DELETE {clause};") + else: + key_table_clauses = cls.gen_key_table_clauses( + root_table_name, staging_root_table_name, key_clauses, for_delete=False + ) + # use unique hint to create temp table with all identifiers to delete + unique_columns = get_columns_names_with_prop(root_table, "unique") + if not unique_columns: raise MergeDispositionException( sql_client.fully_qualified_dataset_name(), staging_root_table_name, [t["name"] for t in table_chain], - "There is no root foreign key (ie _dlt_root_id) in child table" - f" {table['name']} so it is not possible to refer to top level table" - f" {root_table['name']} unique column {unique_column}", + "There is no unique column (ie _dlt_id) in top table" + f" {root_table['name']} so it is not possible to link child tables to it.", ) - root_key_column = escape_id(root_key_columns[0]) + # get first unique column + unique_column = escape_id(unique_columns[0]) + # create temp table with unique identifier + create_delete_temp_table_sql, delete_temp_table_name = ( + cls.gen_delete_temp_table_sql(unique_column, key_table_clauses, sql_client) + ) + sql.extend(create_delete_temp_table_sql) + + # delete from child tables first. This is important for databricks which does not support temporary tables, + # but uses temporary views instead + for table in table_chain[1:]: + table_name = sql_client.make_qualified_table_name(table["name"]) + root_key_columns = get_columns_names_with_prop(table, "root_key") + if not root_key_columns: + raise MergeDispositionException( + sql_client.fully_qualified_dataset_name(), + staging_root_table_name, + [t["name"] for t in table_chain], + "There is no root foreign key (ie _dlt_root_id) in child table" + f" {table['name']} so it is not possible to refer to top level table" + f" {root_table['name']} unique column {unique_column}", + ) + root_key_column = escape_id(root_key_columns[0]) + sql.append( + cls.gen_delete_from_sql( + table_name, root_key_column, delete_temp_table_name, unique_column + ) + ) + + # delete from top table now that child tables have been prcessed sql.append( cls.gen_delete_from_sql( - table_name, root_key_column, delete_temp_table_name, unique_column + root_table_name, unique_column, delete_temp_table_name, unique_column ) ) - # delete from top table now that child tables have been prcessed - sql.append( - cls.gen_delete_from_sql( - root_table_name, unique_column, delete_temp_table_name, unique_column - ) - ) - # get name of column with hard_delete hint, if specified not_deleted_cond: str = None hard_delete_col = get_first_column_name_with_prop(root_table, "hard_delete") diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index d24b799349..0105eb64f2 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -19,35 +19,19 @@ from tests.common.utils import load_json_case from tests.utils import ALL_TEST_DATA_ITEM_FORMATS, TestDataItemFormat, skip_if_not_active from dlt.destinations.path_utils import create_path - +from tests.load.pipeline.utils import load_table_counts skip_if_not_active("filesystem") -def assert_file_matches( - layout: str, job: LoadJobInfo, load_id: str, client: FilesystemClient -) -> None: - """Verify file contents of load job are identical to the corresponding file in destination""" - local_path = Path(job.file_path) - filename = local_path.name - destination_fn = create_path( - layout, - filename, - client.schema.name, - load_id, - extra_placeholders=client.config.extra_placeholders, - ) - destination_path = posixpath.join(client.dataset_path, destination_fn) - - assert local_path.read_bytes() == client.fs_client.read_bytes(destination_path) - - def test_pipeline_merge_write_disposition(default_buckets_env: str) -> None: """Run pipeline twice with merge write disposition - Resource with primary key falls back to append. Resource without keys falls back to replace. + Regardless wether primary key is set or not, filesystem appends """ import pyarrow.parquet as pq # Module is evaluated by other tests + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "True" + pipeline = dlt.pipeline( pipeline_name="test_" + uniq_id(), destination="filesystem", @@ -66,50 +50,25 @@ def other_data(): def some_source(): return [some_data(), other_data()] - info1 = pipeline.run(some_source(), write_disposition="merge") - info2 = pipeline.run(some_source(), write_disposition="merge") - - client: FilesystemClient = pipeline.destination_client() # type: ignore[assignment] - layout = client.config.layout - - append_glob = list(client._get_table_dirs(["some_data"]))[0] - replace_glob = list(client._get_table_dirs(["other_data"]))[0] - - append_files = client.fs_client.ls(append_glob, detail=False, refresh=True) - replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True) - - load_id1 = info1.loads_ids[0] - load_id2 = info2.loads_ids[0] - - # resource with pk is loaded with append and has 1 copy for each load - assert len(append_files) == 2 - assert any(load_id1 in fn for fn in append_files) - assert any(load_id2 in fn for fn in append_files) - - # resource without pk is treated as append disposition - assert len(replace_files) == 2 - assert any(load_id1 in fn for fn in replace_files) - assert any(load_id2 in fn for fn in replace_files) - - # Verify file contents - assert info2.load_packages - for pkg in info2.load_packages: - assert pkg.jobs["completed_jobs"] - for job in pkg.jobs["completed_jobs"]: - assert_file_matches(layout, job, pkg.load_id, client) - - complete_fn = f"{client.schema.name}.{LOADS_TABLE_NAME}.%s" + pipeline.run(some_source(), write_disposition="merge") + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 3, + "other_data": 5, + } - # Test complete_load markers are saved - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id1)) - assert client.fs_client.isfile(posixpath.join(client.dataset_path, complete_fn % load_id2)) + # second load shows that merge always appends on filesystem + pipeline.run(some_source(), write_disposition="merge") + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 6, + "other_data": 10, + } - # Force replace + # Force replace, back to initial values pipeline.run(some_source(), write_disposition="replace") - append_files = client.fs_client.ls(append_glob, detail=False, refresh=True) - replace_files = client.fs_client.ls(replace_glob, detail=False, refresh=True) - assert len(append_files) == 1 - assert len(replace_files) == 1 + assert load_table_counts(pipeline, "some_data", "other_data") == { + "some_data": 3, + "other_data": 5, + } @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index bfcdccfba4..8c9042ffff 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -240,10 +240,17 @@ def test_merge_no_child_tables(destination_config: DestinationTestConfiguration) assert github_2_counts["issues"] == 100 if destination_config.supports_merge else 115 +# mark as essential for now +@pytest.mark.essential @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, local_filesystem_configs=True), + ids=lambda x: x.name, ) def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) -> None: + # NOTE: we can test filesystem destination merge behavior here too, will also fallback! + if destination_config.file_format == "insert_values": + pytest.skip("Insert values row count checking is buggy, skipping") p = destination_config.setup_pipeline("github_3", full_refresh=True) github_data = github() # remove all keys @@ -264,8 +271,8 @@ def test_merge_no_merge_keys(destination_config: DestinationTestConfiguration) - info = p.run(github_data, loader_file_format=destination_config.file_format) assert_load_info(info) github_1_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()]) - # only ten rows remains. merge falls back to replace when no keys are specified - assert github_1_counts["issues"] == 10 if destination_config.supports_merge else 100 - 45 + # we have 10 rows more, merge falls back to append if no keys present + assert github_1_counts["issues"] == 100 - 45 + 10 @pytest.mark.parametrize( diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index c4e1f5314b..036154b582 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -3,6 +3,7 @@ import pytest import random from os import environ +import io import dlt from dlt.common import json, sleep @@ -80,7 +81,7 @@ def assert_data_table_counts(p: dlt.Pipeline, expected_counts: DictStrAny) -> No ), f"Table counts do not match, expected {expected_counts}, got {table_counts}" -def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: +def load_file(fs_client, path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: """ util function to load a filesystem destination file and return parsed content values may not be cast to the right type, especially for insert_values, please @@ -96,26 +97,22 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: # table name will be last element of path table_name = path.split("/")[-1] - - # skip loads table - if table_name == "_dlt_loads": - return table_name, [] - full_path = posixpath.join(path, file) # load jsonl if ext == "jsonl": - with open(full_path, "rU", encoding="utf-8") as f: - for line in f: + file_text = fs_client.read_text(full_path) + for line in file_text.split("\n"): + if line: result.append(json.loads(line)) # load insert_values (this is a bit volatile if the exact format of the source file changes) elif ext == "insert_values": - with open(full_path, "rU", encoding="utf-8") as f: - lines = f.readlines() - # extract col names - cols = lines[0][15:-2].split(",") - for line in lines[2:]: + file_text = fs_client.read_text(full_path) + lines = file_text.split("\n") + cols = lines[0][15:-2].split(",") + for line in lines[2:]: + if line: values = line[1:-3].split(",") result.append(dict(zip(cols, values))) @@ -123,20 +120,20 @@ def load_file(path: str, file: str) -> Tuple[str, List[Dict[str, Any]]]: elif ext == "parquet": import pyarrow.parquet as pq - with open(full_path, "rb") as f: - table = pq.read_table(f) - cols = table.column_names - count = 0 - for column in table: - column_name = cols[count] - item_count = 0 - for item in column.to_pylist(): - if len(result) <= item_count: - result.append({column_name: item}) - else: - result[item_count][column_name] = item - item_count += 1 - count += 1 + file_bytes = fs_client.read_bytes(full_path) + table = pq.read_table(io.BytesIO(file_bytes)) + cols = table.column_names + count = 0 + for column in table: + column_name = cols[count] + item_count = 0 + for item in column.to_pylist(): + if len(result) <= item_count: + result.append({column_name: item}) + else: + result[item_count][column_name] = item + item_count += 1 + count += 1 return table_name, result @@ -149,7 +146,7 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A client.dataset_path, detail=False, refresh=True ): for file in files: - table_name, items = load_file(basedir, file) + table_name, items = load_file(client.fs_client, basedir, file) if table_name not in table_names: continue if table_name in result: @@ -157,10 +154,6 @@ def load_files(p: dlt.Pipeline, *table_names: str) -> Dict[str, List[Dict[str, A else: result[table_name] = items - # loads file is special case - if LOADS_TABLE_NAME in table_names and file.find(".{LOADS_TABLE_NAME}."): - result[LOADS_TABLE_NAME] = [] - return result From 6ef720a342c03d83f0c5b4b0d10dc274793ee24b Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 19:33:20 +0200 Subject: [PATCH 3/8] add test for checking behavior of hard_delete without key --- tests/load/pipeline/test_merge_disposition.py | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 8c9042ffff..5c1d60d0db 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -596,8 +596,10 @@ def r(data): destinations_configs(default_sql_configs=True, supports_merge=True), ids=lambda x: x.name, ) -@pytest.mark.parametrize("key_type", ["primary_key", "merge_key"]) +@pytest.mark.parametrize("key_type", ["primary_key", "merge_key", "no_key"]) def test_hard_delete_hint(destination_config: DestinationTestConfiguration, key_type: str) -> None: + # no_key setting will have the effect that hard deletes have no effect, since hard delete records + # can not be matched table_name = "test_hard_delete_hint" @dlt.resource( @@ -612,6 +614,9 @@ def data_resource(data): data_resource.apply_hints(primary_key="id", merge_key="") elif key_type == "merge_key": data_resource.apply_hints(primary_key="", merge_key="id") + elif key_type == "no_key": + # we test what happens if there are no merge keys + pass p = destination_config.setup_pipeline(f"abstract_{key_type}", full_refresh=True) @@ -630,7 +635,7 @@ def data_resource(data): ] info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) - assert load_table_counts(p, table_name)[table_name] == 1 + assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 2) # update one record (None for hard_delete column is treated as "not True") data = [ @@ -638,16 +643,17 @@ def data_resource(data): ] info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) - assert load_table_counts(p, table_name)[table_name] == 1 + assert load_table_counts(p, table_name)[table_name] == (1 if key_type != "no_key" else 3) # compare observed records with expected records - qual_name = p.sql_client().make_qualified_table_name(table_name) - observed = [ - {"id": row[0], "val": row[1], "deleted": row[2]} - for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}") - ] - expected = [{"id": 2, "val": "baz", "deleted": None}] - assert sorted(observed, key=lambda d: d["id"]) == expected + if key_type != "no_key": + qual_name = p.sql_client().make_qualified_table_name(table_name) + observed = [ + {"id": row[0], "val": row[1], "deleted": row[2]} + for row in select_data(p, f"SELECT id, val, deleted FROM {qual_name}") + ] + expected = [{"id": 2, "val": "baz", "deleted": None}] + assert sorted(observed, key=lambda d: d["id"]) == expected # insert two records with same key data = [ @@ -661,6 +667,12 @@ def data_resource(data): assert counts == 2 elif key_type == "merge_key": assert counts == 3 + elif key_type == "no_key": + assert counts == 5 + + # we do not need to test "no_key" further + if key_type == "no_key": + return # delete one key, resulting in one (primary key) or two (merge key) deleted records data = [ From 46d1fea015ce145d39fec74b890301e06a91e1ae Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 16 Apr 2024 19:40:33 +0200 Subject: [PATCH 4/8] add schema warning --- dlt/common/destination/reference.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 9318dca535..38238c2fa6 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -351,6 +351,14 @@ def _verify_schema(self) -> None: f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item] f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}.""" ) + if not has_column_with_prop(table, "primary_key") and not has_column_with_prop( + table, "merge_key" + ): + logger.warning( + f"Table {table_name} has write_disposition set to merge, but no primary or" + " merge keys defined. " + + "dlt will fall back to append for this table." + ) if has_column_with_prop(table, "hard_delete"): if len(get_columns_names_with_prop(table, "hard_delete")) > 1: raise SchemaException( From dfbba6034cf2854f5fcdb255a91967a790c4a02f Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 14:02:01 +0200 Subject: [PATCH 5/8] fix athena iceberg locations --- dlt/destinations/impl/athena/athena.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 1beb249386..25152ec06f 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -367,7 +367,7 @@ def _get_table_update_sql( if is_iceberg: sql.append(f"""CREATE TABLE {qualified_table_name} ({columns}) - LOCATION '{location}' + LOCATION '{location.rstrip('/')}' TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""") elif table_format == "jsonl": sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name} From d2738a3150c1809535475835b3b13a68890469d9 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 14:46:52 +0200 Subject: [PATCH 6/8] add note in docs about merge fallback behavior --- docs/website/docs/general-usage/incremental-loading.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 28d2f862b2..e7a7faddb0 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -132,6 +132,12 @@ def github_repo_events(last_created_at = dlt.sources.incremental("created_at", " yield from _get_rest_pages("events") ``` +:::note +If you use the `merge` write disposition, but do not specify merge or primary keys, merge will fallback to `append`. +The appended data will be inserted from a staging table in one transaction for most destinations in this case. +::: + + #### Delete records The `hard_delete` column hint can be used to delete records from the destination dataset. The behavior of the delete mechanism depends on the data type of the column marked with the hint: 1) `bool` type: only `True` leads to a delete—`None` and `False` values are disregarded From 8fc29fa74523d0b77185a159dcae5b8ee1e3a314 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 14:56:41 +0200 Subject: [PATCH 7/8] fix merge switching tests --- tests/load/pipeline/test_write_disposition_changes.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/load/pipeline/test_write_disposition_changes.py b/tests/load/pipeline/test_write_disposition_changes.py index 50986727ed..2a7a94ef6b 100644 --- a/tests/load/pipeline/test_write_disposition_changes.py +++ b/tests/load/pipeline/test_write_disposition_changes.py @@ -10,6 +10,7 @@ from dlt.pipeline.exceptions import PipelineStepFailed +@dlt.resource(primary_key="id") def data_with_subtables(offset: int) -> Any: for _, index in enumerate(range(offset, offset + 100), 1): yield { @@ -96,13 +97,10 @@ def test_switch_to_merge(destination_config: DestinationTestConfiguration, with_ pipeline_name="test_switch_to_merge", full_refresh=True ) - @dlt.resource() - def resource(): - yield data_with_subtables(10) @dlt.source() def source(): - return resource() + return data_with_subtables(10) s = source() s.root_key = with_root_key From 796f9cee1c30509e937a72ba5fc860d8b76b3e21 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 17 Apr 2024 16:01:45 +0200 Subject: [PATCH 8/8] fix one additional test with fallback --- tests/load/pipeline/test_merge_disposition.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 5c1d60d0db..2924aeb6df 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -298,14 +298,14 @@ def test_merge_keys_non_existing_columns(destination_config: DestinationTestConf if not destination_config.supports_merge: return - # all the keys are invalid so the merge falls back to replace + # all the keys are invalid so the merge falls back to append github_data = github() github_data.load_issues.apply_hints(merge_key=("mA1", "Ma2"), primary_key=("123-x",)) github_data.load_issues.add_filter(take_first(1)) info = p.run(github_data, loader_file_format=destination_config.file_format) assert_load_info(info) github_2_counts = load_table_counts(p, *[t["name"] for t in p.default_schema.data_tables()]) - assert github_2_counts["issues"] == 1 + assert github_2_counts["issues"] == 100 - 45 + 1 with p._sql_job_client(p.default_schema) as job_c: _, table_schema = job_c.get_storage_table("issues") assert "url" in table_schema