Skip to content

Commit

Permalink
Add recommended_file_size cap to limit data writer file size
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed May 15, 2024
1 parent 53dde6c commit e470efc
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 4 deletions.
3 changes: 3 additions & 0 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def __init__(
self.closed_files: List[DataWriterMetrics] = [] # all fully processed files
# buffered items must be less than max items in file
self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items)
# Explicitly configured max size supersedes destination limit
self.file_max_bytes = file_max_bytes
if self.file_max_bytes is None and _caps:
self.file_max_bytes = _caps.recommended_file_size
self.file_max_items = file_max_items
# the open function is either gzip.open or open
self.open = (
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
supported_staging_file_formats: Sequence[TLoaderFileFormat] = None
escape_identifier: Callable[[str], str] = None
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
# BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size
caps.recommended_file_size = int(1024 * 1024 * 1024)
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.format_datetime_literal = format_bigquery_datetime_literal
Expand Down
6 changes: 3 additions & 3 deletions tests/common/data_writers/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Type
from typing import Type, Optional

from dlt.common.data_writers.buffered import BufferedDataWriter
from dlt.common.data_writers.writers import TWriter, ALL_WRITERS
Expand All @@ -18,8 +18,8 @@
def get_writer(
writer: Type[TWriter],
buffer_max_items: int = 10,
file_max_items: int = 10,
file_max_bytes: int = None,
file_max_items: Optional[int] = 10,
file_max_bytes: Optional[int] = None,
disable_compression: bool = False,
caps: DestinationCapabilitiesContext = None,
) -> BufferedDataWriter[TWriter]:
Expand Down
38 changes: 37 additions & 1 deletion tests/extract/data_writers/test_buffered_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
import time
from typing import Iterator, Type
from uuid import uuid4

from dlt.common.data_writers.exceptions import BufferedDataWriterClosed
from dlt.common.data_writers.writers import (
Expand All @@ -11,7 +12,7 @@
JsonlWriter,
ALL_WRITERS,
)
from dlt.common.destination.capabilities import TLoaderFileFormat
from dlt.common.destination.capabilities import TLoaderFileFormat, DestinationCapabilitiesContext
from dlt.common.schema.utils import new_column
from dlt.common.storages.file_storage import FileStorage

Expand Down Expand Up @@ -330,3 +331,38 @@ def test_special_write_rotates(disable_compression: bool, writer_type: Type[Data
metrics = writer.import_file(
"tests/extract/cases/imported.any", DataWriterMetrics("", 1, 231, 0, 0)
)


@pytest.mark.parametrize(
"disable_compression", [True, False], ids=["no_compression", "compression"]
)
@pytest.mark.parametrize("writer_type", ALL_OBJECT_WRITERS)
def test_rotation_on_destination_caps_recommended_file_size(
disable_compression: bool, writer_type: Type[DataWriter]
) -> None:
caps = DestinationCapabilitiesContext.generic_capabilities()
caps.recommended_file_size = int(250 * 1024)
columns = {"id": new_column("id", "text")}
with get_writer(
writer_type,
disable_compression=disable_compression,
buffer_max_items=100,
file_max_items=None,
file_max_bytes=None,
caps=caps,
) as writer:
for i in range(8):
# Data chunk approximately 40kb serialized
items = [{"id": str(uuid4())} for _ in range(1000)]
writer.write_data_item(items, columns)
if i < 5:
assert not writer.closed_files

if i > 5:
# We should have written atleast 250kb by now and have rotated the file
assert len(writer.closed_files) == 1

# Check the files that were written are all within the recommended size + 1 chunk
assert len(writer.closed_files) == 2
for file in writer.closed_files:
assert file.file_size < caps.recommended_file_size + 1024 * 50

0 comments on commit e470efc

Please sign in to comment.