Skip to content

Commit

Permalink
Pyarrow direct loading (#679)
Browse files Browse the repository at this point in the history
* poc: direct pyarrow load

* arrow to schema types with precision, test

* Fix naming copy_atomic -> move_atomic

* jsonl/parquet file normalizer classes

* pathlib extension checks

* indent fix

* Write parquet with original schema

* extract refactoring

* Init testing, bugfix

* Fix import filename

* Dep

* Mockup incremental implementation for arrow tables

* Create loadstorage per filetype, import with hardlink

* Fallback for extract item format, detect type of lists

* Error message, load tests with arrow

* Some incremental optimizations

some

* Incremental fixes and run incremental tests on arrow & pandas

* Add/update normalize tests

* Fix load test

* Lint

* Add docs page for arrow loading

* Handle none capability

* Fix extract lists

* Exclude TIME in redshift test

* Fix type errors

* Typo

* Create col from numpy array for >200x speedup, index after filter

* in -> not in

* Format binary as hex for redshift

* enables bool and duckdb test on pyarrow loading

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
steinitzu and rudolfix authored Oct 16, 2023
1 parent 87b210b commit d3db284
Show file tree
Hide file tree
Showing 27 changed files with 1,774 additions and 483 deletions.
48 changes: 42 additions & 6 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import abc

from dataclasses import dataclass
from typing import Any, Dict, Sequence, IO, Type, Optional, List, cast
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Type, Union

from dlt.common import json
from dlt.common.typing import StrAny
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.destination import TLoaderFileFormat, DestinationCapabilitiesContext
from dlt.common.configuration import with_config, known_sections, configspec
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import StrAny


@dataclass
class TFileFormatSpec:
Expand Down Expand Up @@ -70,6 +70,8 @@ def class_factory(file_format: TLoaderFileFormat) -> Type["DataWriter"]:
return InsertValuesWriter
elif file_format == "parquet":
return ParquetDataWriter # type: ignore
elif file_format == "arrow":
return ArrowWriter # type: ignore
else:
raise ValueError(file_format)

Expand Down Expand Up @@ -249,3 +251,37 @@ def write_footer(self) -> None:
@classmethod
def data_format(cls) -> TFileFormatSpec:
return TFileFormatSpec("parquet", "parquet", True, False, requires_destination_capabilities=True, supports_compression=False)


class ArrowWriter(ParquetDataWriter):
def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# Schema will be written as-is from the arrow table
pass

def write_data(self, rows: Sequence[Any]) -> None:
from dlt.common.libs.pyarrow import pyarrow
rows = list(rows)
if not rows:
return
first = rows[0]
self.writer = self.writer or pyarrow.parquet.ParquetWriter(
self._f, first.schema, flavor=self.parquet_flavor, version=self.parquet_version, data_page_size=self.parquet_data_page_size
)
for row in rows:
if isinstance(row, pyarrow.Table):
self.writer.write_table(row)
elif isinstance(row, pyarrow.RecordBatch):
self.writer.write_batch(row)
else:
raise ValueError(f"Unsupported type {type(row)}")

@classmethod
def data_format(cls) -> TFileFormatSpec:
return TFileFormatSpec(
"arrow",
file_extension="parquet",
is_binary_format=True,
supports_schema_changes=False,
requires_destination_capabilities=False,
supports_compression=False,
)
4 changes: 2 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
5 changes: 3 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
# puae-jsonl - internal extract -> normalize format bases on jsonl
# insert_values - insert SQL statements
# sql - any sql statement
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet", "reference"]
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet", "reference", "arrow"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
# file formats used internally by dlt
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference"}
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference", "arrow"}
# file formats that may be chosen by the user
EXTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat)) - INTERNAL_LOADER_FILE_FORMATS

Expand Down
93 changes: 92 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any, Tuple, Optional
from typing import Any, Tuple, Optional, Union
from dlt import version
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.typing import TColumnType
from dlt.common.data_types import TDataType
from dlt.common.typing import TFileOrPath

try:
import pyarrow
Expand All @@ -12,6 +15,9 @@
raise MissingDependencyException("DLT parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "DLT Helpers for for parquet.")


TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch]


def get_py_arrow_datatype(column: TColumnType, caps: DestinationCapabilitiesContext, tz: str) -> Any:
column_type = column["data_type"]
if column_type == "text":
Expand Down Expand Up @@ -82,3 +88,88 @@ def get_pyarrow_int(precision: Optional[int]) -> Any:
elif precision <= 32:
return pyarrow.int32()
return pyarrow.int64()


def _get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
"""Returns (data_type, precision, scale) tuple from pyarrow.DataType
"""
if pyarrow.types.is_string(dtype) or pyarrow.types.is_large_string(dtype):
return dict(data_type="text")
elif pyarrow.types.is_floating(dtype):
return dict(data_type="double")
elif pyarrow.types.is_boolean(dtype):
return dict(data_type="bool")
elif pyarrow.types.is_timestamp(dtype):
if dtype.unit == "s":
precision = 0
elif dtype.unit == "ms":
precision = 3
elif dtype.unit == "us":
precision = 6
else:
precision = 9
return dict(data_type="timestamp", precision=precision)
elif pyarrow.types.is_date(dtype):
return dict(data_type="date")
elif pyarrow.types.is_time(dtype):
# Time fields in schema are `DataType` instead of `Time64Type` or `Time32Type`
if dtype == pyarrow.time32("s"):
precision = 0
elif dtype == pyarrow.time32("ms"):
precision = 3
elif dtype == pyarrow.time64("us"):
precision = 6
else:
precision = 9
return dict(data_type="time", precision=precision)
elif pyarrow.types.is_integer(dtype):
result: TColumnType = dict(data_type="bigint")
if dtype.bit_width != 64: # 64bit is a default bigint
result["precision"] = dtype.bit_width
return result
elif pyarrow.types.is_fixed_size_binary(dtype):
return dict(data_type="binary", precision=dtype.byte_width)
elif pyarrow.types.is_binary(dtype) or pyarrow.types.is_large_binary(dtype):
return dict(data_type="binary")
elif pyarrow.types.is_decimal(dtype):
return dict(data_type="decimal", precision=dtype.precision, scale=dtype.scale)
elif pyarrow.types.is_nested(dtype):
return dict(data_type="complex")
else:
raise ValueError(dtype)


def py_arrow_to_table_schema_columns(schema: pyarrow.Schema) -> TTableSchemaColumns:
"""Convert a PyArrow schema to a table schema columns dict.
Args:
schema (pyarrow.Schema): pyarrow schema
Returns:
TTableSchemaColumns: table schema columns
"""
result: TTableSchemaColumns = {}
for field in schema:
result[field.name] = {
"name": field.name,
"nullable": field.nullable,
**_get_column_type_from_py_arrow(field.type),
}
return result


def get_row_count(parquet_file: TFileOrPath) -> int:
"""Get the number of rows in a parquet file.
Args:
parquet_file (str): path to parquet file
Returns:
int: number of rows
"""
with pyarrow.parquet.ParquetFile(parquet_file) as reader:
return reader.metadata.num_rows # type: ignore[no-any-return]


def is_arrow_item(item: Any) -> bool:
return isinstance(item, (pyarrow.Table, pyarrow.RecordBatch))
2 changes: 1 addition & 1 deletion dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_writer(self, load_id: str, schema_name: str, table_name: str) -> Buffere
writer_id = f"{load_id}.{schema_name}.{table_name}"
writer = self.buffered_writers.get(writer_id, None)
if not writer:
# assign a jsonl writer for each table
# assign a writer for each table
path = self._get_data_item_path_template(load_id, schema_name, table_name)
writer = BufferedDataWriter(self.loader_file_format, path)
self.buffered_writers[writer_id] = writer
Expand Down
54 changes: 47 additions & 7 deletions dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ def save_atomic(storage_path: str, relative_path: str, data: Any, file_type: str
raise

@staticmethod
def copy_atomic(source_file_path: str, dest_folder_path: str) -> str:
def move_atomic_to_folder(source_file_path: str, dest_folder_path: str) -> str:
file_name = os.path.basename(source_file_path)
dest_file_path = os.path.join(dest_folder_path, file_name)
return FileStorage.move_atomic_to_file(source_file_path, dest_file_path)

@staticmethod
def move_atomic_to_file(source_file_path: str, dest_file_path: str) -> str:
try:
os.rename(source_file_path, dest_file_path)
except OSError:
# copy to local temp file
dest_temp_file = os.path.join(dest_folder_path, uniq_id())
folder_name = os.path.dirname(dest_file_path)
dest_temp_file = os.path.join(folder_name, uniq_id())
try:
shutil.copyfile(source_file_path, dest_temp_file)
os.rename(dest_temp_file, dest_file_path)
Expand All @@ -63,6 +68,19 @@ def copy_atomic(source_file_path: str, dest_folder_path: str) -> str:
raise
return dest_file_path

@staticmethod
def copy_atomic_to_file(source_file_path: str, dest_file_path: str) -> str:
folder_name = os.path.dirname(dest_file_path)
dest_temp_file = os.path.join(folder_name, uniq_id())
try:
shutil.copyfile(source_file_path, dest_temp_file)
os.rename(dest_temp_file, dest_file_path)
except Exception:
if os.path.isfile(dest_temp_file):
os.remove(dest_temp_file)
raise
return dest_file_path

def load(self, relative_path: str) -> Any:
# raises on file not existing
with self.open_file(relative_path) as text_file:
Expand Down Expand Up @@ -144,6 +162,19 @@ def link_hard(self, from_relative_path: str, to_relative_path: str) -> None:
self.make_full_path(to_relative_path)
)

@staticmethod
def link_hard_with_fallback(external_file_path: str, to_file_path: str) -> None:
"""Try to create a hardlink and fallback to copying when filesystem doesn't support links
"""
try:
os.link(external_file_path, to_file_path)
except OSError as ex:
# Fallback to copy when fs doesn't support links or attempting to make a cross-device link
if ex.errno in (errno.EPERM, errno.EXDEV, errno.EMLINK):
FileStorage.copy_atomic_to_file(external_file_path, to_file_path)
else:
raise

def atomic_rename(self, from_relative_path: str, to_relative_path: str) -> None:
"""Renames a path using os.rename which is atomic on POSIX, Windows and NFS v4.
Expand Down Expand Up @@ -195,11 +226,20 @@ def rename_tree_files(self, from_relative_path: str, to_relative_path: str) -> N
if not os.listdir(root):
os.rmdir(root)

def atomic_import(self, external_file_path: str, to_folder: str) -> str:
"""Moves a file at `external_file_path` into the `to_folder` effectively importing file into storage"""
return self.to_relative_path(FileStorage.copy_atomic(external_file_path, self.make_full_path(to_folder)))
# file_name = FileStorage.get_file_name_from_file_path(external_path)
# os.rename(external_path, os.path.join(self.make_full_path(to_folder), file_name))
def atomic_import(self, external_file_path: str, to_folder: str, new_file_name: Optional[str] = None) -> str:
"""Moves a file at `external_file_path` into the `to_folder` effectively importing file into storage
Args:
external_file_path: Path to file to be imported
to_folder: Path to folder where file should be imported
new_file_name: Optional new file name for the imported file, otherwise the original file name is used
Returns:
Path to imported file relative to storage root
"""
new_file_name = new_file_name or os.path.basename(external_file_path)
dest_file_path = os.path.join(self.make_full_path(to_folder), new_file_name)
return self.to_relative_path(FileStorage.move_atomic_to_file(external_file_path, dest_file_path))

def in_storage(self, path: str) -> bool:
assert path is not None
Expand Down
10 changes: 8 additions & 2 deletions dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from dlt.common.configuration.inject import with_config
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.storages.file_storage import FileStorage
from dlt.common.data_writers import TLoaderFileFormat, DataWriter
from dlt.common.data_writers import DataWriter
from dlt.common.destination import ALL_SUPPORTED_FILE_FORMATS, TLoaderFileFormat
from dlt.common.configuration.accessors import config
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import Schema, TSchemaTables, TTableSchemaColumns
Expand Down Expand Up @@ -138,7 +139,7 @@ class LoadStorage(DataItemStorage, VersionedStorage):
SCHEMA_FILE_NAME = "schema.json" # package schema
PACKAGE_COMPLETED_FILE_NAME = "package_completed.json" # completed package marker file, currently only to store data with os.stat

ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
ALL_SUPPORTED_FILE_FORMATS = ALL_SUPPORTED_FILE_FORMATS

@with_config(spec=LoadStorageConfiguration, sections=(known_sections.LOAD,))
def __init__(
Expand Down Expand Up @@ -314,6 +315,11 @@ def add_new_job(self, load_id: str, job_file_path: str, job_state: TJobState = "
"""Adds new job by moving the `job_file_path` into `new_jobs` of package `load_id`"""
self.storage.atomic_import(job_file_path, self._get_job_folder_path(load_id, job_state))

def atomic_import(self, external_file_path: str, to_folder: str) -> str:
"""Copies or links a file at `external_file_path` into the `to_folder` effectively importing file into storage"""
# LoadStorage.parse_job_file_name
return self.storage.to_relative_path(FileStorage.move_atomic_to_folder(external_file_path, self.storage.make_full_path(to_folder)))

def start_job(self, load_id: str, file_name: str) -> str:
return self._move_job(load_id, LoadStorage.NEW_JOBS_FOLDER, LoadStorage.STARTED_JOBS_FOLDER, file_name)

Expand Down
22 changes: 14 additions & 8 deletions dlt/common/storages/normalize_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import ClassVar, Sequence, NamedTuple
from typing import ClassVar, Sequence, NamedTuple, Union
from itertools import groupby
from pathlib import Path

Expand All @@ -7,11 +7,14 @@
from dlt.common.storages.file_storage import FileStorage
from dlt.common.storages.configuration import NormalizeStorageConfiguration
from dlt.common.storages.versioned_storage import VersionedStorage
from dlt.common.destination import TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.exceptions import TerminalValueError

class TParsedNormalizeFileName(NamedTuple):
schema_name: str
table_name: str
file_id: str
file_format: TLoaderFileFormat


class NormalizeStorage(VersionedStorage):
Expand Down Expand Up @@ -47,10 +50,13 @@ def build_extracted_file_stem(schema_name: str, table_name: str, file_id: str) -
@staticmethod
def parse_normalize_file_name(file_name: str) -> TParsedNormalizeFileName:
# parse extracted file name and returns (events found, load id, schema_name)
if not file_name.endswith("jsonl"):
raise ValueError(file_name)

parts = Path(file_name).stem.split(".")
if len(parts) != 3:
raise ValueError(file_name)
return TParsedNormalizeFileName(*parts)
file_name_p: Path = Path(file_name)
parts = file_name_p.name.split(".")
ext = parts[-1]
if ext not in ALL_SUPPORTED_FILE_FORMATS:
raise TerminalValueError(f"File format {ext} not supported. Filename: {file_name}")
return TParsedNormalizeFileName(*parts) # type: ignore[arg-type]

def delete_extracted_files(self, files: Sequence[str]) -> None:
for file_name in files:
self.storage.delete(file_name)
4 changes: 3 additions & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from datetime import datetime, date # noqa: I251
import inspect
import os
from re import Pattern as _REPattern
from typing import Callable, Dict, Any, Final, Literal, List, Mapping, NewType, Optional, Tuple, Type, TypeVar, Generic, Protocol, TYPE_CHECKING, Union, runtime_checkable, get_args, get_origin
from typing import Callable, Dict, Any, Final, Literal, List, Mapping, NewType, Optional, Tuple, Type, TypeVar, Generic, Protocol, TYPE_CHECKING, Union, runtime_checkable, get_args, get_origin, IO
from typing_extensions import TypeAlias, ParamSpec, Concatenate

from dlt.common.pendulum import timedelta, pendulum
Expand Down Expand Up @@ -44,6 +45,7 @@
TVariantBase = TypeVar("TVariantBase", covariant=True)
TVariantRV = Tuple[str, Any]
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, os.PathLike, IO[Any]]

@runtime_checkable
class SupportsVariant(Protocol, Generic[TVariantBase]):
Expand Down
Loading

0 comments on commit d3db284

Please sign in to comment.