Skip to content

Commit

Permalink
Merge branch 'devel' into fingerprint_1
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Jul 2, 2024
2 parents 2feb807 + b6d08aa commit 5fe3e67
Show file tree
Hide file tree
Showing 319 changed files with 10,807 additions and 4,248 deletions.
81 changes: 81 additions & 0 deletions .github/workflows/test_destination_lancedb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: dest | lancedb

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"lancedb\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: dest | lancedb tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.11.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- name: Install dependencies
run: poetry install --no-interaction -E lancedb -E parquet --with sentry-sdk --with pipeline

- name: Install embedding provider dependencies
run: poetry run pip install openai

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
22 changes: 21 additions & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ jobs:
# Do not run on forks, unless allowed, secrets are used here
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

# Service containers to run with `container-job`
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Provide the password for postgres
env:
POSTGRES_DB: dlt_data
POSTGRES_USER: loader
POSTGRES_PASSWORD: loader
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:

- name: Check out
Expand Down Expand Up @@ -61,7 +81,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres --with docs,sentry-sdk --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery -E postgres -E lancedb --with docs,sentry-sdk --without airflow

- name: create secrets.toml for examples
run: pwd && echo "$DLT_SECRETS_TOML" > docs/examples/.dlt/secrets.toml
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ lint-and-test-snippets:
cd docs/website/docs && poetry run pytest --ignore=node_modules

lint-and-test-examples:
poetry run mypy --config-file mypy.ini docs/examples
poetry run flake8 --max-line-length=200 docs/examples
cd docs/tools && poetry run python prepare_examples_tests.py
poetry run flake8 --max-line-length=200 docs/examples
poetry run mypy --config-file mypy.ini docs/examples
cd docs/examples && poetry run pytest


Expand Down
4 changes: 3 additions & 1 deletion dlt/common/configuration/specs/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ class RunConfiguration(BaseConfiguration):
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
dlthub_telemetry_endpoint: Optional[str] = "https://telemetry.scalevector.ai"
dlthub_telemetry_segment_write_key: Optional[str] = None
log_format: str = "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
log_format: str = (
"{asctime}|[{levelname}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
)
log_level: str = "WARNING"
request_timeout: float = 60
"""Timeout for http requests"""
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/data_writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DataWriterMetrics,
TDataItemFormat,
FileWriterSpec,
create_import_spec,
resolve_best_writer_spec,
get_best_writer_spec,
is_native_writer,
Expand All @@ -11,12 +12,13 @@
from dlt.common.data_writers.escape import (
escape_redshift_literal,
escape_redshift_identifier,
escape_bigquery_identifier,
escape_hive_identifier,
)

__all__ = [
"DataWriter",
"FileWriterSpec",
"create_import_spec",
"resolve_best_writer_spec",
"get_best_writer_spec",
"is_native_writer",
Expand All @@ -26,5 +28,5 @@
"new_file_id",
"escape_redshift_literal",
"escape_redshift_identifier",
"escape_bigquery_identifier",
"escape_hive_identifier",
]
33 changes: 29 additions & 4 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import gzip
import time
from typing import ClassVar, List, IO, Any, Optional, Type, Generic
import contextlib
from typing import ClassVar, Iterator, List, IO, Any, Optional, Type, Generic

from dlt.common.typing import TDataItem, TDataItems
from dlt.common.data_writers.exceptions import (
BufferedDataWriterClosed,
DestinationCapabilitiesRequired,
FileImportNotFound,
InvalidFileNameTemplateException,
)
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
Expand Down Expand Up @@ -138,18 +140,31 @@ def write_empty_file(self, columns: TTableSchemaColumns) -> DataWriterMetrics:
self._last_modified = time.time()
return self._rotate_file(allow_empty_file=True)

def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterMetrics:
def import_file(
self, file_path: str, metrics: DataWriterMetrics, with_extension: str = None
) -> DataWriterMetrics:
"""Import a file from `file_path` into items storage under a new file name. Does not check
the imported file format. Uses counts from `metrics` as a base. Logically closes the imported file
The preferred import method is a hard link to avoid copying the data. If current filesystem does not
support it, a regular copy is used.
Alternative extension may be provided via `with_extension` so various file formats may be imported into the same folder.
"""
# TODO: we should separate file storage from other storages. this creates circular deps
from dlt.common.storages import FileStorage

self._rotate_file()
FileStorage.link_hard_with_fallback(file_path, self._file_name)
# import file with alternative extension
spec = self.writer_spec
if with_extension:
spec = self.writer_spec._replace(file_extension=with_extension)
with self.alternative_spec(spec):
self._rotate_file()
try:
FileStorage.link_hard_with_fallback(file_path, self._file_name)
except FileNotFoundError as f_ex:
raise FileImportNotFound(file_path, self._file_name) from f_ex

self._last_modified = time.time()
metrics = metrics._replace(
file_path=self._file_name,
Expand All @@ -176,6 +191,16 @@ def close(self, skip_flush: bool = False) -> None:
def closed(self) -> bool:
return self._closed

@contextlib.contextmanager
def alternative_spec(self, spec: FileWriterSpec) -> Iterator[FileWriterSpec]:
"""Temporarily changes the writer spec ie. for the moment file is rotated"""
old_spec = self.writer_spec
try:
self.writer_spec = spec
yield spec
finally:
self.writer_spec = old_spec

def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

Expand Down
31 changes: 31 additions & 0 deletions dlt/common/data_writers/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import ClassVar, Literal, Optional
from dlt.common.configuration import configspec, known_sections
from dlt.common.configuration.specs import BaseConfiguration

CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvFormatConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

# read options
on_error_continue: bool = False
encoding: str = "utf-8"

__section__: ClassVar[str] = known_sections.DATA_WRITER


@configspec
class ParquetFormatConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER
6 changes: 3 additions & 3 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def escape_redshift_identifier(v: str) -> str:
escape_dremio_identifier = escape_postgres_identifier


def escape_bigquery_identifier(v: str) -> str:
def escape_hive_identifier(v: str) -> str:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
return "`" + v.replace("\\", "\\\\").replace("`", "\\`") + "`"


def escape_snowflake_identifier(v: str) -> str:
# Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue
# See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers
return escape_postgres_identifier(v.upper())
return escape_postgres_identifier(v)


escape_databricks_identifier = escape_bigquery_identifier
escape_databricks_identifier = escape_hive_identifier


DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self, file_name: str):
super().__init__(f"Writer with recent file name {file_name} is already closed")


class FileImportNotFound(DataWriterException, FileNotFoundError):
def __init__(self, import_file_path: str, local_file_path: str) -> None:
self.import_file_path = import_file_path
self.local_file_path = local_file_path
super().__init__(
f"Attempt to import non existing file {import_file_path} into extract storage file"
f" {local_file_path}"
)


class DestinationCapabilitiesRequired(DataWriterException, ValueError):
def __init__(self, file_format: TLoaderFileFormat):
self.file_format = file_format
Expand Down
Loading

0 comments on commit 5fe3e67

Please sign in to comment.