Skip to content

Commit

Permalink
Merge branch 'devel' of https://github.com/dlt-hub/dlt into feat/1651…
Browse files Browse the repository at this point in the history
…-helper-that-returns-delta-tables-created-by-pipeline
  • Loading branch information
jorritsandbrink committed Aug 5, 2024
2 parents d1abeca + 3bb677f commit def563c
Show file tree
Hide file tree
Showing 57 changed files with 2,002 additions and 1,440 deletions.
2 changes: 0 additions & 2 deletions .github/weaviate-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ services:
image: semitechnologies/weaviate:1.21.1
ports:
- 8080:8080
volumes:
- weaviate_data
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/test_destination_clickhouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ jobs:

# OSS ClickHouse
- run: |
docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" up -d
docker compose -f "tests/load/clickhouse/clickhouse-compose.yml" up -d
echo "Waiting for ClickHouse to be healthy..."
timeout 30s bash -c 'until docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done'
timeout 30s bash -c 'until docker compose -f "tests/load/clickhouse/clickhouse-compose.yml" ps | grep -q "healthy"; do sleep 1; done'
echo "ClickHouse is up and running"
name: Start ClickHouse OSS
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:

- name: Stop ClickHouse OSS
if: always()
run: docker-compose -f "tests/load/clickhouse/clickhouse-compose.yml" down -v
run: docker compose -f "tests/load/clickhouse/clickhouse-compose.yml" down -v

# ClickHouse Cloud
- run: |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_destination_dremio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
uses: actions/checkout@master

- name: Start dremio
run: docker-compose -f "tests/load/dremio/docker-compose.yml" up -d
run: docker compose -f "tests/load/dremio/docker-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down Expand Up @@ -87,4 +87,4 @@ jobs:
- name: Stop dremio
if: always()
run: docker-compose -f "tests/load/dremio/docker-compose.yml" down -v
run: docker compose -f "tests/load/dremio/docker-compose.yml" down -v
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker-compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f ".github/weaviate-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
uses: actions/checkout@master

- name: Start weaviate
run: docker-compose -f ".github/weaviate-compose.yml" up -d
run: docker compose -f ".github/weaviate-compose.yml" up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down Expand Up @@ -109,4 +109,4 @@ jobs:

- name: Stop weaviate
if: always()
run: docker-compose -f ".github/weaviate-compose.yml" down -v
run: docker compose -f ".github/weaviate-compose.yml" down -v
164 changes: 112 additions & 52 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.utils import verify_schema_capabilities
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.utils import (
Expand All @@ -42,6 +43,8 @@
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -258,11 +261,45 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura
"""configuration of the staging, if present, injected at runtime"""


TLoadJobState = Literal["running", "failed", "retry", "completed"]
TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]


class LoadJob:
"""Represents a job that loads a single file
class LoadJob(ABC):
"""
A stateful load job, represents one job file
"""

def __init__(self, file_path: str) -> None:
self._file_path = file_path
self._file_name = FileStorage.get_file_name_from_file_path(file_path)
# NOTE: we only accept a full filepath in the constructor
assert self._file_name != self._file_path
self._parsed_file_name = ParsedLoadJobFileName.parse(self._file_name)

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass

@abstractmethod
def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass


class RunnableLoadJob(LoadJob, ABC):
"""Represents a runnable job that loads a single file
Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
Expand All @@ -273,75 +310,95 @@ class LoadJob:
immediately transition job into "failed" or "retry" state respectively.
"""

def __init__(self, file_name: str) -> None:
def __init__(self, file_path: str) -> None:
"""
File name is also a job id (or job id is deterministically derived) so it must be globally unique
"""
# ensure file name
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
self._file_name = file_name
self._parsed_file_name = ParsedLoadJobFileName.parse(file_name)
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass
# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
self._load_table: TTableSchema = None
self._load_id: str = None
self._job_client: "JobClientBase" = None

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name
def set_run_vars(self, load_id: str, schema: Schema, load_table: TTableSchema) -> None:
"""
called by the loader right before the job is run
"""
self._load_id = load_id
self._schema = schema
self._load_table = load_table

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()
@property
def load_table_name(self) -> str:
return self._load_table["name"]

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name
def run_managed(
self,
job_client: "JobClientBase",
) -> None:
"""
wrapper around the user implemented run method
"""
# only jobs that are not running or have not reached a final state
# may be started
assert self._state in ("ready", "retry")
self._job_client = job_client

# filepath is now moved to running
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
self._state = "failed"
self._exception = e
except (DestinationTransientException, Exception) as e:
self._state = "retry"
self._exception = e
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")

@abstractmethod
def run(self) -> None:
"""
run the actual job, this will be executed on a thread and should be implemented by the user
exception will be handled outside of this function
"""
raise NotImplementedError()

def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
return self._state

def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass
return str(self._exception)


class NewLoadJob(LoadJob):
"""Adds a trait that allows to save new job file"""
class FollowupJob:
"""Base class for follow up jobs that should be created"""

@abstractmethod
def new_file_path(self) -> str:
"""Path to a newly created temporary job file. If empty, no followup job should be created"""
pass


class FollowupJob:
"""Adds a trait that allows to create a followup job"""
class HasFollowupJobs:
"""Adds a trait that allows to create single or table chain followup jobs"""

def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
def __init__(
self,
Expand Down Expand Up @@ -394,13 +451,16 @@ def update_stored_schema(
return expected_update

@abstractmethod
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
"""Creates and starts a load job for a particular `table` with content in `file_path`"""
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Creates a load job for a particular `table` with content in `file_path`"""
pass

@abstractmethod
def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
def prepare_load_job_execution( # noqa: B027, optional override
self, job: RunnableLoadJob
) -> None:
"""Prepare the connected job client for the execution of a load job (used for query tags in sql clients)"""
pass

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
Expand All @@ -410,7 +470,7 @@ def create_table_chain_completed_followup_jobs(
self,
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
) -> List[FollowupJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []

Expand Down
5 changes: 5 additions & 0 deletions dlt/common/runtime/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def raise_if_signalled() -> None:
raise SignalReceivedException(_received_signal)


def signal_received() -> bool:
"""check if a signal was received"""
return True if _received_signal else False


def sleep(sleep_seconds: float) -> None:
"""A signal-aware version of sleep function. Will raise SignalReceivedException if signal was received during sleep period."""
# do not allow sleeping if signal was received
Expand Down
19 changes: 6 additions & 13 deletions dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,19 +724,12 @@ def build_job_file_name(

@staticmethod
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool:
"""Checks if package is partially loaded - has jobs that are not new."""
if package_info.state == "normalized":
pending_jobs: Sequence[TJobState] = ["new_jobs"]
else:
pending_jobs = ["completed_jobs", "failed_jobs"]
return (
sum(
len(package_info.jobs[job_state])
for job_state in WORKING_FOLDERS
if job_state not in pending_jobs
)
> 0
)
"""Checks if package is partially loaded - has jobs that are completed and jobs that are not."""
all_jobs_count = sum(len(package_info.jobs[job_state]) for job_state in WORKING_FOLDERS)
completed_jobs_count = len(package_info.jobs["completed_jobs"])
if completed_jobs_count and all_jobs_count - completed_jobs_count > 0:
return True
return False

@staticmethod
def _job_elapsed_time_seconds(file_path: str, now_ts: float = None) -> float:
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, PathLike, IO[Any]]
TSortOrder = Literal["asc", "desc"]
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv"]
TLoaderFileFormat = Literal[
"jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference", "reference_delta"
]
"""known loader file formats"""


Expand Down
Loading

0 comments on commit def563c

Please sign in to comment.