Skip to content

Commit

Permalink
adds csv writer (#1185)
Browse files Browse the repository at this point in the history
* bumps for prerelease 0.4.8a1

* requires password and database in motherduck credentials

* identifies data writer by both file format and source item format, adds csv writer for arrow and object(wip)

* adds postgres csv writer via COPY

* improves arrow and parquet tests, adds arrow normalization edge cases

* refactors extractors in extract, disables schema caches when processing multiple arrows

* refactors item normalizers, adds arrow normalization, improves logging

* removes internal file formats from loader file formats, renames, tests improvements

* adds simple csv and postgres docs

* closes writers on exceptions, passes metrics on exceptions, fixes some edge cases with empty arrow files

* fixes empty tables writer tests and bugs

* fixes closing writers when exception during flush, missing tzdata on windows handling

* installs tzdata on windows ci

* adds csv to docs index

* fixes athena sql job client tests setup

* adjusts for timezone for the preferred precision, all other precision use timestamp w/o tz

* generates create table statements for synapse outside of a job

* fixes athena table undefinded detection

* generates all timestamps with timezones in parquet. tests workarounds in duckdb

* fixes quoting in regular csv writer and force nulls in postgres copy job

* finalizes the docs

* renames jobs in tests so it is possible to select them as required
  • Loading branch information
rudolfix authored Apr 7, 2024
1 parent e0b4731 commit 18665f1
Show file tree
Hide file tree
Showing 105 changed files with 2,508 additions and 955 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
make lint
matrix_job_required_check:
name: Lint results
name: lint | code & tests
needs: run_lint
runs-on: ubuntu-latest
if: always()
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: ./.github/workflows/get_docs_changes.yml

run_airflow:
name: test
name: tools | airflow tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_build_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: ./.github/workflows/get_docs_changes.yml

run_airflow:
name: build
name: tools | docker images build
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
runs-on: ubuntu-latest
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ jobs:
with:
python-version: ${{ matrix.python-version }}

- name: Install tzdata on windows
run: |
cd %USERPROFILE%
curl https://data.iana.org/time-zones/releases/tzdata2021e.tar.gz --output tzdata.tar.gz
mkdir tzdata
tar --extract --file tzdata.tar.gz --directory tzdata
mkdir %USERPROFILE%\Downloads\tzdata
copy tzdata %USERPROFILE%\Downloads\tzdata
curl https://raw.githubusercontent.com/unicode-org/cldr/master/common/supplemental/windowsZones.xml --output %USERPROFILE%\Downloads\tzdata\windowsZones.xml
if: runner.os == 'Windows'
shell: cmd

- name: Install Poetry
# https://github.com/snok/install-poetry#running-on-windows
uses: snok/[email protected]
Expand Down Expand Up @@ -136,7 +148,7 @@ jobs:
# shell: cmd

matrix_job_required_check:
name: Common tests
name: common | common tests
needs: run_common
runs-on: ubuntu-latest
if: always()
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_dbt_cloud:
name: test
name: tools | dbt cloud tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_dbt:
name: test
name: tools | dbt runner tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | athena tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | athena iceberg tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | bigquery tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down Expand Up @@ -69,7 +69,7 @@ jobs:

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | databricks tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | mssql tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_qdrant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | qdrant tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | snowflake tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | synapse tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test
name: dest | redshift, postgres and fs tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ env:
jobs:

run_lint:
name: lint and test
name: docs | snippets & examples lint and test
runs-on: ubuntu-latest
# Do not run on forks, unless allowed, secrets are used here
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
uses: ./.github/workflows/get_docs_changes.yml

run_loader:
name: test
name: dest | postgres, duckdb and fs local tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

Expand Down
10 changes: 8 additions & 2 deletions dlt/common/data_writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from dlt.common.data_writers.writers import DataWriter, DataWriterMetrics, TLoaderFileFormat
from dlt.common.data_writers.writers import (
DataWriter,
DataWriterMetrics,
TDataItemFormat,
FileWriterSpec,
)
from dlt.common.data_writers.buffered import BufferedDataWriter, new_file_id
from dlt.common.data_writers.escape import (
escape_redshift_literal,
Expand All @@ -8,8 +13,9 @@

__all__ = [
"DataWriter",
"FileWriterSpec",
"DataWriterMetrics",
"TLoaderFileFormat",
"TDataItemFormat",
"BufferedDataWriter",
"new_file_id",
"escape_redshift_literal",
Expand Down
84 changes: 51 additions & 33 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import gzip
import time
from typing import ClassVar, List, IO, Any, Optional, Type, TypeVar, Generic
from typing import ClassVar, List, IO, Any, Optional, Type, Generic

from dlt.common.typing import TDataItem, TDataItems
from dlt.common.data_writers import TLoaderFileFormat
from dlt.common.data_writers.exceptions import (
BufferedDataWriterClosed,
DestinationCapabilitiesRequired,
InvalidFileNameTemplateException,
)
from dlt.common.data_writers.writers import DataWriter, DataWriterMetrics
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.configuration import with_config, known_sections, configspec
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.utils import uniq_id

TWriter = TypeVar("TWriter", bound=DataWriter)


def new_file_id() -> str:
"""Creates new file id which is globally unique within table_name scope"""
Expand All @@ -38,7 +35,7 @@ class BufferedDataWriterConfiguration(BaseConfiguration):
@with_config(spec=BufferedDataWriterConfiguration)
def __init__(
self,
file_format: TLoaderFileFormat,
writer_spec: FileWriterSpec,
file_name_template: str,
*,
buffer_max_items: int = 5000,
Expand All @@ -47,10 +44,13 @@ def __init__(
disable_compression: bool = False,
_caps: DestinationCapabilitiesContext = None
):
self.file_format = file_format
self._file_format_spec = DataWriter.data_format_from_file_format(self.file_format)
if self._file_format_spec.requires_destination_capabilities and not _caps:
raise DestinationCapabilitiesRequired(file_format)
self.writer_spec = writer_spec
if self.writer_spec.requires_destination_capabilities and not _caps:
raise DestinationCapabilitiesRequired(self.writer_spec.file_format)
self.writer_cls = DataWriter.class_factory(
writer_spec.file_format, writer_spec.data_item_format
)
self._supports_schema_changes = self.writer_spec.supports_schema_changes
self._caps = _caps
# validate if template has correct placeholders
self.file_name_template = file_name_template
Expand All @@ -61,9 +61,7 @@ def __init__(
self.file_max_items = file_max_items
# the open function is either gzip.open or open
self.open = (
gzip.open
if self._file_format_spec.supports_compression and not disable_compression
else open
gzip.open if self.writer_spec.supports_compression and not disable_compression else open
)

self._current_columns: TTableSchemaColumns = None
Expand All @@ -87,8 +85,9 @@ def write_data_item(self, item: TDataItems, columns: TTableSchemaColumns) -> int
# rotate file if columns changed and writer does not allow for that
# as the only allowed change is to add new column (no updates/deletes), we detect the change by comparing lengths
if (
self._writer
and not self._writer.data_format().supports_schema_changes
self._current_columns is not None
and (self._writer or self._supports_schema_changes == "False")
and self._supports_schema_changes != "True"
and len(columns) != len(self._current_columns)
):
assert len(columns) > len(self._current_columns)
Expand Down Expand Up @@ -165,10 +164,12 @@ def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterM
self._rotate_file()
return metrics

def close(self) -> None:
self._ensure_open()
self._flush_and_close_file()
self._closed = True
def close(self, skip_flush: bool = False) -> None:
"""Flushes the data, writes footer (skip_flush is True), collects metrics and closes the underlying file."""
# like regular files, we do not except on double close
if not self._closed:
self._flush_and_close_file(skip_flush=skip_flush)
self._closed = True

@property
def closed(self) -> bool:
Expand All @@ -178,26 +179,36 @@ def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

def __exit__(self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: Any) -> None:
self.close()
# skip flush if we had exception
in_exception = exc_val is not None
try:
self.close(skip_flush=in_exception)
except Exception:
if not in_exception:
# close again but without flush
self.close(skip_flush=True)
# raise the on close exception if we are not handling another exception
if not in_exception:
raise

def _rotate_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
metrics = self._flush_and_close_file(allow_empty_file)
self._file_name = (
self.file_name_template % new_file_id() + "." + self._file_format_spec.file_extension
self.file_name_template % new_file_id() + "." + self.writer_spec.file_extension
)
self._created = time.time()
return metrics

def _flush_items(self, allow_empty_file: bool = False) -> None:
if self._buffered_items_count > 0 or allow_empty_file:
if self._buffered_items or allow_empty_file:
# we only open a writer when there are any items in the buffer and first flush is requested
if not self._writer:
# create new writer and write header
if self._file_format_spec.is_binary_format:
if self.writer_spec.is_binary_format:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8") # type: ignore
self._writer = DataWriter.from_file_format(self.file_format, self._file, caps=self._caps) # type: ignore[assignment]
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
if self._buffered_items:
Expand All @@ -206,15 +217,22 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
self._buffered_items.clear()
self._buffered_items_count = 0

def _flush_and_close_file(self, allow_empty_file: bool = False) -> DataWriterMetrics:
# if any buffered items exist, flush them
self._flush_items(allow_empty_file)
# if writer exists then close it
if not self._writer:
return None
# write the footer of a file
self._writer.write_footer()
self._file.flush()
def _flush_and_close_file(
self, allow_empty_file: bool = False, skip_flush: bool = False
) -> DataWriterMetrics:
if not skip_flush:
# if any buffered items exist, flush them
self._flush_items(allow_empty_file)
# if writer exists then close it
if not self._writer:
return None
# write the footer of a file
self._writer.write_footer()
self._file.flush()
else:
if not self._writer:
return None
self._writer.close()
# add file written to the list so we can commit all the files later
metrics = DataWriterMetrics(
self._file_name,
Expand Down
Loading

0 comments on commit 18665f1

Please sign in to comment.