Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow direct loading #679

Merged
merged 30 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d30eb1b
poc: direct pyarrow load
steinitzu Oct 10, 2023
730dce2
arrow to schema types with precision, test
steinitzu Oct 1, 2023
41e2805
Fix naming copy_atomic -> move_atomic
steinitzu Oct 2, 2023
9433cc4
jsonl/parquet file normalizer classes
steinitzu Oct 14, 2023
3393110
pathlib extension checks
steinitzu Oct 2, 2023
40de1b5
indent fix
steinitzu Oct 2, 2023
a203822
Write parquet with original schema
steinitzu Oct 2, 2023
c2cd0bd
extract refactoring
steinitzu Oct 10, 2023
487987c
Init testing, bugfix
steinitzu Oct 9, 2023
2b94489
Fix import filename
steinitzu Oct 9, 2023
817eca5
Dep
steinitzu Oct 9, 2023
9973bb8
Mockup incremental implementation for arrow tables
steinitzu Oct 10, 2023
6203d71
Create loadstorage per filetype, import with hardlink
steinitzu Oct 11, 2023
4fa718d
Fallback for extract item format, detect type of lists
steinitzu Oct 11, 2023
1000613
Error message, load tests with arrow
steinitzu Oct 12, 2023
5f452b6
Some incremental optimizations
steinitzu Oct 12, 2023
b414a69
Incremental fixes and run incremental tests on arrow & pandas
steinitzu Oct 12, 2023
274f0ee
Add/update normalize tests
steinitzu Oct 14, 2023
6b4d43a
Fix load test
steinitzu Oct 14, 2023
919c5ab
Lint
steinitzu Oct 14, 2023
d58f77e
Add docs page for arrow loading
steinitzu Oct 14, 2023
73b481b
Handle none capability
steinitzu Oct 14, 2023
7df1252
Fix extract lists
steinitzu Oct 14, 2023
3662756
Exclude TIME in redshift test
steinitzu Oct 15, 2023
f16a7ba
Fix type errors
steinitzu Oct 15, 2023
93811e0
Typo
steinitzu Oct 15, 2023
33ce792
Create col from numpy array for >200x speedup, index after filter
steinitzu Oct 15, 2023
b9f7aaf
in -> not in
steinitzu Oct 16, 2023
b78618c
Format binary as hex for redshift
steinitzu Oct 16, 2023
2b96904
enables bool and duckdb test on pyarrow loading
rudolfix Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import abc

from dataclasses import dataclass
from typing import Any, Dict, Sequence, IO, Type, Optional, List, cast
from typing import IO, TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Type, Union

from dlt.common import json
from dlt.common.typing import StrAny
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.destination import TLoaderFileFormat, DestinationCapabilitiesContext
from dlt.common.configuration import with_config, known_sections, configspec
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import StrAny


@dataclass
class TFileFormatSpec:
Expand Down Expand Up @@ -70,6 +70,8 @@ def class_factory(file_format: TLoaderFileFormat) -> Type["DataWriter"]:
return InsertValuesWriter
elif file_format == "parquet":
return ParquetDataWriter # type: ignore
elif file_format == "arrow":
return ArrowWriter # type: ignore
else:
raise ValueError(file_format)

Expand Down Expand Up @@ -249,3 +251,37 @@ def write_footer(self) -> None:
@classmethod
def data_format(cls) -> TFileFormatSpec:
return TFileFormatSpec("parquet", "parquet", True, False, requires_destination_capabilities=True, supports_compression=False)


class ArrowWriter(ParquetDataWriter):
def write_header(self, columns_schema: TTableSchemaColumns) -> None:
# Schema will be written as-is from the arrow table
pass

def write_data(self, rows: Sequence[Any]) -> None:
from dlt.common.libs.pyarrow import pyarrow
rows = list(rows)
if not rows:
return
first = rows[0]
self.writer = self.writer or pyarrow.parquet.ParquetWriter(
self._f, first.schema, flavor=self.parquet_flavor, version=self.parquet_version, data_page_size=self.parquet_data_page_size
)
for row in rows:
if isinstance(row, pyarrow.Table):
self.writer.write_table(row)
elif isinstance(row, pyarrow.RecordBatch):
self.writer.write_batch(row)
else:
raise ValueError(f"Unsupported type {type(row)}")

@classmethod
def data_format(cls) -> TFileFormatSpec:
return TFileFormatSpec(
"arrow",
file_extension="parquet",
is_binary_format=True,
supports_schema_changes=False,
requires_destination_capabilities=False,
supports_compression=False,
)
4 changes: 2 additions & 2 deletions dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
from dlt.common.destination.capabilities import DestinationCapabilitiesContext, TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg
5 changes: 3 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
# puae-jsonl - internal extract -> normalize format bases on jsonl
# insert_values - insert SQL statements
# sql - any sql statement
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet", "reference"]
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet", "reference", "arrow"]
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
# file formats used internally by dlt
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference"}
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference", "arrow"}
# file formats that may be chosen by the user
EXTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat)) - INTERNAL_LOADER_FILE_FORMATS

Expand Down
93 changes: 92 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any, Tuple, Optional
from typing import Any, Tuple, Optional, Union
from dlt import version
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.typing import TColumnType
from dlt.common.data_types import TDataType
from dlt.common.typing import TFileOrPath

try:
import pyarrow
Expand All @@ -12,6 +15,9 @@
raise MissingDependencyException("DLT parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "DLT Helpers for for parquet.")


TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch]


def get_py_arrow_datatype(column: TColumnType, caps: DestinationCapabilitiesContext, tz: str) -> Any:
column_type = column["data_type"]
if column_type == "text":
Expand Down Expand Up @@ -82,3 +88,88 @@ def get_pyarrow_int(precision: Optional[int]) -> Any:
elif precision <= 32:
return pyarrow.int32()
return pyarrow.int64()


def _get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType:
"""Returns (data_type, precision, scale) tuple from pyarrow.DataType
"""
if pyarrow.types.is_string(dtype) or pyarrow.types.is_large_string(dtype):
return dict(data_type="text")
elif pyarrow.types.is_floating(dtype):
return dict(data_type="double")
elif pyarrow.types.is_boolean(dtype):
return dict(data_type="bool")
elif pyarrow.types.is_timestamp(dtype):
if dtype.unit == "s":
precision = 0
elif dtype.unit == "ms":
precision = 3
elif dtype.unit == "us":
precision = 6
else:
precision = 9
return dict(data_type="timestamp", precision=precision)
elif pyarrow.types.is_date(dtype):
return dict(data_type="date")
elif pyarrow.types.is_time(dtype):
# Time fields in schema are `DataType` instead of `Time64Type` or `Time32Type`
if dtype == pyarrow.time32("s"):
precision = 0
elif dtype == pyarrow.time32("ms"):
precision = 3
elif dtype == pyarrow.time64("us"):
precision = 6
else:
precision = 9
return dict(data_type="time", precision=precision)
elif pyarrow.types.is_integer(dtype):
result: TColumnType = dict(data_type="bigint")
if dtype.bit_width != 64: # 64bit is a default bigint
result["precision"] = dtype.bit_width
return result
elif pyarrow.types.is_fixed_size_binary(dtype):
return dict(data_type="binary", precision=dtype.byte_width)
elif pyarrow.types.is_binary(dtype) or pyarrow.types.is_large_binary(dtype):
return dict(data_type="binary")
elif pyarrow.types.is_decimal(dtype):
return dict(data_type="decimal", precision=dtype.precision, scale=dtype.scale)
elif pyarrow.types.is_nested(dtype):
return dict(data_type="complex")
else:
raise ValueError(dtype)


def py_arrow_to_table_schema_columns(schema: pyarrow.Schema) -> TTableSchemaColumns:
"""Convert a PyArrow schema to a table schema columns dict.

Args:
schema (pyarrow.Schema): pyarrow schema

Returns:
TTableSchemaColumns: table schema columns
"""
result: TTableSchemaColumns = {}
for field in schema:
result[field.name] = {
"name": field.name,
"nullable": field.nullable,
**_get_column_type_from_py_arrow(field.type),
}
return result


def get_row_count(parquet_file: TFileOrPath) -> int:
"""Get the number of rows in a parquet file.

Args:
parquet_file (str): path to parquet file

Returns:
int: number of rows
"""
with pyarrow.parquet.ParquetFile(parquet_file) as reader:
return reader.metadata.num_rows # type: ignore[no-any-return]


def is_arrow_item(item: Any) -> bool:
return isinstance(item, (pyarrow.Table, pyarrow.RecordBatch))
2 changes: 1 addition & 1 deletion dlt/common/storages/data_item_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_writer(self, load_id: str, schema_name: str, table_name: str) -> Buffere
writer_id = f"{load_id}.{schema_name}.{table_name}"
writer = self.buffered_writers.get(writer_id, None)
if not writer:
# assign a jsonl writer for each table
# assign a writer for each table
path = self._get_data_item_path_template(load_id, schema_name, table_name)
writer = BufferedDataWriter(self.loader_file_format, path)
self.buffered_writers[writer_id] = writer
Expand Down
54 changes: 47 additions & 7 deletions dlt/common/storages/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ def save_atomic(storage_path: str, relative_path: str, data: Any, file_type: str
raise

@staticmethod
def copy_atomic(source_file_path: str, dest_folder_path: str) -> str:
def move_atomic_to_folder(source_file_path: str, dest_folder_path: str) -> str:
file_name = os.path.basename(source_file_path)
dest_file_path = os.path.join(dest_folder_path, file_name)
return FileStorage.move_atomic_to_file(source_file_path, dest_file_path)

@staticmethod
def move_atomic_to_file(source_file_path: str, dest_file_path: str) -> str:
try:
os.rename(source_file_path, dest_file_path)
except OSError:
# copy to local temp file
dest_temp_file = os.path.join(dest_folder_path, uniq_id())
folder_name = os.path.dirname(dest_file_path)
dest_temp_file = os.path.join(folder_name, uniq_id())
try:
shutil.copyfile(source_file_path, dest_temp_file)
os.rename(dest_temp_file, dest_file_path)
Expand All @@ -63,6 +68,19 @@ def copy_atomic(source_file_path: str, dest_folder_path: str) -> str:
raise
return dest_file_path

@staticmethod
def copy_atomic_to_file(source_file_path: str, dest_file_path: str) -> str:
folder_name = os.path.dirname(dest_file_path)
dest_temp_file = os.path.join(folder_name, uniq_id())
try:
shutil.copyfile(source_file_path, dest_temp_file)
os.rename(dest_temp_file, dest_file_path)
except Exception:
if os.path.isfile(dest_temp_file):
os.remove(dest_temp_file)
raise
return dest_file_path

def load(self, relative_path: str) -> Any:
# raises on file not existing
with self.open_file(relative_path) as text_file:
Expand Down Expand Up @@ -144,6 +162,19 @@ def link_hard(self, from_relative_path: str, to_relative_path: str) -> None:
self.make_full_path(to_relative_path)
)

@staticmethod
def link_hard_with_fallback(external_file_path: str, to_file_path: str) -> None:
"""Try to create a hardlink and fallback to copying when filesystem doesn't support links
"""
try:
os.link(external_file_path, to_file_path)
except OSError as ex:
# Fallback to copy when fs doesn't support links or attempting to make a cross-device link
if ex.errno in (errno.EPERM, errno.EXDEV, errno.EMLINK):
FileStorage.copy_atomic_to_file(external_file_path, to_file_path)
else:
raise

def atomic_rename(self, from_relative_path: str, to_relative_path: str) -> None:
"""Renames a path using os.rename which is atomic on POSIX, Windows and NFS v4.

Expand Down Expand Up @@ -195,11 +226,20 @@ def rename_tree_files(self, from_relative_path: str, to_relative_path: str) -> N
if not os.listdir(root):
os.rmdir(root)

def atomic_import(self, external_file_path: str, to_folder: str) -> str:
"""Moves a file at `external_file_path` into the `to_folder` effectively importing file into storage"""
return self.to_relative_path(FileStorage.copy_atomic(external_file_path, self.make_full_path(to_folder)))
# file_name = FileStorage.get_file_name_from_file_path(external_path)
# os.rename(external_path, os.path.join(self.make_full_path(to_folder), file_name))
def atomic_import(self, external_file_path: str, to_folder: str, new_file_name: Optional[str] = None) -> str:
"""Moves a file at `external_file_path` into the `to_folder` effectively importing file into storage

Args:
external_file_path: Path to file to be imported
to_folder: Path to folder where file should be imported
new_file_name: Optional new file name for the imported file, otherwise the original file name is used

Returns:
Path to imported file relative to storage root
"""
new_file_name = new_file_name or os.path.basename(external_file_path)
dest_file_path = os.path.join(self.make_full_path(to_folder), new_file_name)
return self.to_relative_path(FileStorage.move_atomic_to_file(external_file_path, dest_file_path))

def in_storage(self, path: str) -> bool:
assert path is not None
Expand Down
10 changes: 8 additions & 2 deletions dlt/common/storages/load_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from dlt.common.configuration.inject import with_config
from dlt.common.typing import DictStrAny, StrAny
from dlt.common.storages.file_storage import FileStorage
from dlt.common.data_writers import TLoaderFileFormat, DataWriter
from dlt.common.data_writers import DataWriter
from dlt.common.destination import ALL_SUPPORTED_FILE_FORMATS, TLoaderFileFormat
from dlt.common.configuration.accessors import config
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import Schema, TSchemaTables, TTableSchemaColumns
Expand Down Expand Up @@ -138,7 +139,7 @@ class LoadStorage(DataItemStorage, VersionedStorage):
SCHEMA_FILE_NAME = "schema.json" # package schema
PACKAGE_COMPLETED_FILE_NAME = "package_completed.json" # completed package marker file, currently only to store data with os.stat

ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))
ALL_SUPPORTED_FILE_FORMATS = ALL_SUPPORTED_FILE_FORMATS

@with_config(spec=LoadStorageConfiguration, sections=(known_sections.LOAD,))
def __init__(
Expand Down Expand Up @@ -311,6 +312,11 @@ def add_new_job(self, load_id: str, job_file_path: str, job_state: TJobState = "
"""Adds new job by moving the `job_file_path` into `new_jobs` of package `load_id`"""
self.storage.atomic_import(job_file_path, self._get_job_folder_path(load_id, job_state))

def atomic_import(self, external_file_path: str, to_folder: str) -> str:
"""Copies or links a file at `external_file_path` into the `to_folder` effectively importing file into storage"""
# LoadStorage.parse_job_file_name
return self.storage.to_relative_path(FileStorage.move_atomic_to_folder(external_file_path, self.storage.make_full_path(to_folder)))

def start_job(self, load_id: str, file_name: str) -> str:
return self._move_job(load_id, LoadStorage.NEW_JOBS_FOLDER, LoadStorage.STARTED_JOBS_FOLDER, file_name)

Expand Down
22 changes: 14 additions & 8 deletions dlt/common/storages/normalize_storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import ClassVar, Sequence, NamedTuple
from typing import ClassVar, Sequence, NamedTuple, Union
from itertools import groupby
from pathlib import Path

Expand All @@ -7,11 +7,14 @@
from dlt.common.storages.file_storage import FileStorage
from dlt.common.storages.configuration import NormalizeStorageConfiguration
from dlt.common.storages.versioned_storage import VersionedStorage
from dlt.common.destination import TLoaderFileFormat, ALL_SUPPORTED_FILE_FORMATS
from dlt.common.exceptions import TerminalValueError

class TParsedNormalizeFileName(NamedTuple):
schema_name: str
table_name: str
file_id: str
file_format: TLoaderFileFormat


class NormalizeStorage(VersionedStorage):
Expand Down Expand Up @@ -47,10 +50,13 @@ def build_extracted_file_stem(schema_name: str, table_name: str, file_id: str) -
@staticmethod
def parse_normalize_file_name(file_name: str) -> TParsedNormalizeFileName:
# parse extracted file name and returns (events found, load id, schema_name)
if not file_name.endswith("jsonl"):
raise ValueError(file_name)

parts = Path(file_name).stem.split(".")
if len(parts) != 3:
raise ValueError(file_name)
return TParsedNormalizeFileName(*parts)
file_name_p: Path = Path(file_name)
parts = file_name_p.name.split(".")
ext = parts[-1]
if ext not in ALL_SUPPORTED_FILE_FORMATS:
raise TerminalValueError(f"File format {ext} not supported. Filename: {file_name}")
return TParsedNormalizeFileName(*parts) # type: ignore[arg-type]

def delete_extracted_files(self, files: Sequence[str]) -> None:
for file_name in files:
self.storage.delete(file_name)
4 changes: 3 additions & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from datetime import datetime, date # noqa: I251
import inspect
import os
from re import Pattern as _REPattern
from typing import Callable, Dict, Any, Final, Literal, List, Mapping, NewType, Optional, Tuple, Type, TypeVar, Generic, Protocol, TYPE_CHECKING, Union, runtime_checkable, get_args, get_origin
from typing import Callable, Dict, Any, Final, Literal, List, Mapping, NewType, Optional, Tuple, Type, TypeVar, Generic, Protocol, TYPE_CHECKING, Union, runtime_checkable, get_args, get_origin, IO
from typing_extensions import TypeAlias, ParamSpec, Concatenate

from dlt.common.pendulum import timedelta, pendulum
Expand Down Expand Up @@ -44,6 +45,7 @@
TVariantBase = TypeVar("TVariantBase", covariant=True)
TVariantRV = Tuple[str, Any]
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, os.PathLike, IO[Any]]

@runtime_checkable
class SupportsVariant(Protocol, Generic[TVariantBase]):
Expand Down
Loading
Loading