Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synapse and mssql bugfixes and improvements #1174

Merged
merged 10 commits into from
Apr 7, 2024
5 changes: 4 additions & 1 deletion dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ def write_row(row: StrAny, last_row: bool = False) -> None:

# if next chunk add separator
if self._chunks_written > 0:
self._f.write(",\n")
if self._caps.insert_values_writer_type == "default":
self._f.write(",\n")
elif self._caps.insert_values_writer_type == "select_union":
self._f.write("\nUNION ALL\n")

# write rows
for row in rows[:-1]:
Expand Down
33 changes: 33 additions & 0 deletions tests/load/pipeline/test_insert_values_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import pytest

import dlt

from tests.pipeline.utils import assert_load_info, load_table_counts
from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb", "synapse"]),
ids=lambda x: x.name,
)
def test_buffering(destination_config: DestinationTestConfiguration) -> None:
@dlt.resource(write_disposition="replace")
def items():
yield [{"id": i} for i in range(10)]

# set buffer size less than number of data items
os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "5"

# ensure both writer types are tested
p = destination_config.setup_pipeline("abstract", full_refresh=True)
if destination_config.destination == "duckdb":
assert p.destination.capabilities().insert_values_writer_type == "default"
elif destination_config.destination == "synapse":
assert p.destination.capabilities().insert_values_writer_type == "select_union"

# run pipeline and assert expectations
info = p.run(items())
assert_load_info(info)
assert load_table_counts(p, "items")["items"] == 10
Loading