From e470efcddef90b7be5ccec28690e06ee229f4344 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Wed, 15 May 2024 16:52:29 -0400 Subject: [PATCH] Add recommended_file_size cap to limit data writer file size --- dlt/common/data_writers/buffered.py | 3 ++ dlt/common/destination/capabilities.py | 2 + dlt/destinations/impl/bigquery/__init__.py | 2 + tests/common/data_writers/utils.py | 6 +-- .../data_writers/test_buffered_writer.py | 38 ++++++++++++++++++- 5 files changed, 47 insertions(+), 4 deletions(-) diff --git a/dlt/common/data_writers/buffered.py b/dlt/common/data_writers/buffered.py index fdd5b50111..bd32c68c49 100644 --- a/dlt/common/data_writers/buffered.py +++ b/dlt/common/data_writers/buffered.py @@ -55,7 +55,10 @@ def __init__( self.closed_files: List[DataWriterMetrics] = [] # all fully processed files # buffered items must be less than max items in file self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items) + # Explicitly configured max size supersedes destination limit self.file_max_bytes = file_max_bytes + if self.file_max_bytes is None and _caps: + self.file_max_bytes = _caps.recommended_file_size self.file_max_items = file_max_items # the open function is either gzip.open or open self.open = ( diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e74f5a980d..089b4a1d5e 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): preferred_loader_file_format: TLoaderFileFormat = None supported_loader_file_formats: Sequence[TLoaderFileFormat] = None + recommended_file_size: Optional[int] = None + """Recommended file size in bytes when writing extract/load files""" preferred_staging_file_format: Optional[TLoaderFileFormat] = None supported_staging_file_formats: Sequence[TLoaderFileFormat] = None escape_identifier: Callable[[str], str] = None diff --git a/dlt/destinations/impl/bigquery/__init__.py b/dlt/destinations/impl/bigquery/__init__.py index d33466ed5e..39322b43a0 100644 --- a/dlt/destinations/impl/bigquery/__init__.py +++ b/dlt/destinations/impl/bigquery/__init__.py @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext: caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "parquet" caps.supported_staging_file_formats = ["parquet", "jsonl"] + # BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size + caps.recommended_file_size = int(1024 * 1024 * 1024) caps.escape_identifier = escape_bigquery_identifier caps.escape_literal = None caps.format_datetime_literal = format_bigquery_datetime_literal diff --git a/tests/common/data_writers/utils.py b/tests/common/data_writers/utils.py index 2cb440bde1..e6e377b7d0 100644 --- a/tests/common/data_writers/utils.py +++ b/tests/common/data_writers/utils.py @@ -1,5 +1,5 @@ import os -from typing import Type +from typing import Type, Optional from dlt.common.data_writers.buffered import BufferedDataWriter from dlt.common.data_writers.writers import TWriter, ALL_WRITERS @@ -18,8 +18,8 @@ def get_writer( writer: Type[TWriter], buffer_max_items: int = 10, - file_max_items: int = 10, - file_max_bytes: int = None, + file_max_items: Optional[int] = 10, + file_max_bytes: Optional[int] = None, disable_compression: bool = False, caps: DestinationCapabilitiesContext = None, ) -> BufferedDataWriter[TWriter]: diff --git a/tests/extract/data_writers/test_buffered_writer.py b/tests/extract/data_writers/test_buffered_writer.py index 82b81a1cd7..b6da132de9 100644 --- a/tests/extract/data_writers/test_buffered_writer.py +++ b/tests/extract/data_writers/test_buffered_writer.py @@ -2,6 +2,7 @@ import pytest import time from typing import Iterator, Type +from uuid import uuid4 from dlt.common.data_writers.exceptions import BufferedDataWriterClosed from dlt.common.data_writers.writers import ( @@ -11,7 +12,7 @@ JsonlWriter, ALL_WRITERS, ) -from dlt.common.destination.capabilities import TLoaderFileFormat +from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext from dlt.common.schema.utils import new_column from dlt.common.storages.file_storage import FileStorage @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data metrics = writer.import_file( "tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0) ) + + +@pytest.mark.parametrize( + "disable_compression", [True, False], ids=["no_compression", "compression"] +) +@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS) +def test_rotation_on_destination_caps_recommended_file_size( + disable_compression: bool, writer_type: Type[DataWriter] +) -> None: + caps = DestinationCapabilitiesContext.generic_capabilities() + caps.recommended_file_size = int(250 * 1024) + columns = {"id": new_column("id", "text")} + with get_writer( + writer_type, + disable_compression=disable_compression, + buffer_max_items=100, + file_max_items=None, + file_max_bytes=None, + caps=caps, + ) as writer: + for i in range(8): + # Data chunk approximately 40kb serialized + items = [{"id": str(uuid4())} for _ in range(1000)] + writer.write_data_item(items, columns) + if i < 5: + assert not writer.closed_files + + if i > 5: + # We should have written atleast 250kb by now and have rotated the file + assert len(writer.closed_files) == 1 + + # Check the files that were written are all within the recommended size + 1 chunk + assert len(writer.closed_files) == 2 + for file in writer.closed_files: + assert file.file_size < caps.recommended_file_size + 1024 * 50