Skip to content

Commit

Permalink
add support for starting load jobs as slots free up
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jun 19, 2024
1 parent d4b0bd0 commit 78a5989
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 117 deletions.
5 changes: 5 additions & 0 deletions dlt/common/runtime/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def raise_if_signalled() -> None:
raise SignalReceivedException(_received_signal)


def signal_received() -> bool:
"""check if a signal was received"""
return True if _received_signal else False


def sleep(sleep_seconds: float) -> None:
"""A signal-aware version of sleep function. Will raise SignalReceivedException if signal was received during sleep period."""
# do not allow sleeping if signal was received
Expand Down
143 changes: 79 additions & 64 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DestinationTerminalException,
DestinationTransientException,
)
from dlt.common.runtime import signals

from dlt.destinations.job_impl import EmptyLoadJob

Expand Down Expand Up @@ -194,34 +195,37 @@ def w_spool_job(
self.load_storage.normalized_packages.start_job(load_id, job.file_name())
return job

def spool_new_jobs(self, load_id: str, schema: Schema) -> Tuple[int, List[LoadJob]]:
def spool_new_jobs(
self, load_id: str, schema: Schema, running_jobs_count: int
) -> List[LoadJob]:
# use thread based pool as jobs processing is mostly I/O and we do not want to pickle jobs
load_files = filter_new_jobs(
self.load_storage.list_new_jobs(load_id), self.capabilities, self.config
)
load_files = self.load_storage.list_new_jobs(load_id)
file_count = len(load_files)
if file_count == 0:
logger.info(f"No new jobs found in {load_id}")
return 0, []
logger.info(f"Will load {file_count}, creating jobs")
return []

load_files = filter_new_jobs(load_files, self.capabilities, self.config, running_jobs_count)
file_count = len(load_files)
logger.info(f"Will load additional {file_count}, creating jobs")
param_chunk = [(id(self), file, load_id, schema) for file in load_files]
# exceptions should not be raised, None as job is a temporary failure
# other jobs should not be affected
jobs = self.pool.map(Load.w_spool_job, *zip(*param_chunk))
# remove None jobs and check the rest
return file_count, [job for job in jobs if job is not None]
return [job for job in jobs if job is not None]

def retrieve_jobs(
self, client: JobClientBase, load_id: str, staging_client: JobClientBase = None
) -> Tuple[int, List[LoadJob]]:
) -> List[LoadJob]:
jobs: List[LoadJob] = []

# list all files that were started but not yet completed
started_jobs = self.load_storage.normalized_packages.list_started_jobs(load_id)

logger.info(f"Found {len(started_jobs)} that are already started and should be continued")
if len(started_jobs) == 0:
return 0, jobs
return jobs

for file_path in started_jobs:
try:
Expand All @@ -237,7 +241,7 @@ def retrieve_jobs(
raise
jobs.append(job)

return len(jobs), jobs
return jobs

def get_new_jobs_info(self, load_id: str) -> List[ParsedLoadJobFileName]:
return [
Expand Down Expand Up @@ -274,14 +278,19 @@ def create_followup_jobs(
jobs = jobs + starting_job.create_followup_jobs(state)
return jobs

def complete_jobs(self, load_id: str, jobs: List[LoadJob], schema: Schema) -> List[LoadJob]:
def complete_jobs(
self, load_id: str, jobs: List[LoadJob], schema: Schema
) -> Tuple[List[LoadJob], Exception]:
"""Run periodically in the main thread to collect job execution statuses.
After detecting change of status, it commits the job state by moving it to the right folder
May create one or more followup jobs that get scheduled as new jobs. New jobs are created
only in terminal states (completed / failed)
"""
# list of jobs still running
remaining_jobs: List[LoadJob] = []
# if an exception condition was met, return it to the main runner
pending_exception: Exception = None

def _schedule_followup_jobs(followup_jobs: Iterable[NewLoadJob]) -> None:
for followup_job in followup_jobs:
Expand Down Expand Up @@ -323,6 +332,13 @@ def _schedule_followup_jobs(followup_jobs: Iterable[NewLoadJob]) -> None:
f"Job for {job.job_id()} failed terminally in load {load_id} with message"
f" {failed_message}"
)
# schedule exception on job failure
if self.config.raise_on_failed_jobs:
pending_exception = LoadClientJobFailed(
load_id,
job.job_file_info().job_id(),
failed_message,
)
elif state == "retry":
# try to get exception message from job
retry_message = job.exception()
Expand All @@ -331,6 +347,16 @@ def _schedule_followup_jobs(followup_jobs: Iterable[NewLoadJob]) -> None:
logger.warning(
f"Job for {job.job_id()} retried in load {load_id} with message {retry_message}"
)
# possibly schedule exception on too many retries
if self.config.raise_on_max_retries:
r_c = job.job_file_info().retry_count + 1
if r_c > 0 and r_c % self.config.raise_on_max_retries == 0:
pending_exception = LoadClientJobRetry(
load_id,
job.job_file_info().job_id(),
r_c,
self.config.raise_on_max_retries,
)
elif state == "completed":
# create followup jobs
_schedule_followup_jobs(self.create_followup_jobs(load_id, state, job, schema))
Expand All @@ -346,7 +372,7 @@ def _schedule_followup_jobs(followup_jobs: Iterable[NewLoadJob]) -> None:
"Jobs", 1, message="WARNING: Some of the jobs failed!", label="Failed"
)

return remaining_jobs
return remaining_jobs, pending_exception

def complete_package(self, load_id: str, schema: Schema, aborted: bool = False) -> None:
# do not commit load id for aborted packages
Expand All @@ -371,6 +397,18 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False)
f"All jobs completed, archiving package {load_id} with aborted set to {aborted}"
)

def update_loadpackage_info(self, load_id: str) -> None:
# update counter we only care about the jobs that are scheduled to be loaded
package_info = self.load_storage.normalized_packages.get_load_package_info(load_id)
total_jobs = reduce(lambda p, c: p + len(c), package_info.jobs.values(), 0)
no_failed_jobs = len(package_info.jobs["failed_jobs"])
no_completed_jobs = len(package_info.jobs["completed_jobs"]) + no_failed_jobs
self.collector.update("Jobs", no_completed_jobs, total_jobs)
if no_failed_jobs > 0:
self.collector.update(
"Jobs", no_failed_jobs, message="WARNING: Some of the jobs failed!", label="Failed"
)

def load_single_package(self, load_id: str, schema: Schema) -> None:
new_jobs = self.get_new_jobs_info(load_id)

Expand Down Expand Up @@ -414,72 +452,49 @@ def load_single_package(self, load_id: str, schema: Schema) -> None:
drop_tables=dropped_tables,
truncate_tables=truncated_tables,
)

self.load_storage.commit_schema_update(load_id, applied_update)

# initialize staging destination and spool or retrieve unfinished jobs
# collect all unfinished jobs
running_jobs: List[LoadJob] = []
if self.staging_destination:
with self.get_staging_destination_client(schema) as staging_client:
jobs_count, jobs = self.retrieve_jobs(job_client, load_id, staging_client)
else:
jobs_count, jobs = self.retrieve_jobs(job_client, load_id)

if not jobs:
# jobs count is a total number of jobs including those that could not be initialized
jobs_count, jobs = self.spool_new_jobs(load_id, schema)
# if there are no existing or new jobs we complete the package
if jobs_count == 0:
self.complete_package(load_id, schema, False)
return
# update counter we only care about the jobs that are scheduled to be loaded
package_info = self.load_storage.normalized_packages.get_load_package_info(load_id)
total_jobs = reduce(lambda p, c: p + len(c), package_info.jobs.values(), 0)
no_failed_jobs = len(package_info.jobs["failed_jobs"])
no_completed_jobs = len(package_info.jobs["completed_jobs"]) + no_failed_jobs
self.collector.update("Jobs", no_completed_jobs, total_jobs)
if no_failed_jobs > 0:
self.collector.update(
"Jobs", no_failed_jobs, message="WARNING: Some of the jobs failed!", label="Failed"
)
running_jobs += self.retrieve_jobs(job_client, load_id, staging_client)
running_jobs += self.retrieve_jobs(job_client, load_id)

# loop until all jobs are processed
while True:
try:
remaining_jobs = self.complete_jobs(load_id, jobs, schema)
if len(remaining_jobs) == 0:
# get package status
package_info = self.load_storage.normalized_packages.get_load_package_info(
load_id
)
# possibly raise on failed jobs
if self.config.raise_on_failed_jobs:
if package_info.jobs["failed_jobs"]:
failed_job = package_info.jobs["failed_jobs"][0]
raise LoadClientJobFailed(
load_id,
failed_job.job_file_info.job_id(),
failed_job.failed_message,
)
# possibly raise on too many retries
if self.config.raise_on_max_retries:
for new_job in package_info.jobs["new_jobs"]:
r_c = new_job.job_file_info.retry_count
if r_c > 0 and r_c % self.config.raise_on_max_retries == 0:
raise LoadClientJobRetry(
load_id,
new_job.job_file_info.job_id(),
r_c,
self.config.raise_on_max_retries,
)
# we continously spool new jobs and complete finished ones
running_jobs, pending_exception = self.complete_jobs(load_id, running_jobs, schema)
# do not spool new jobs if there was a signal
if not signals.signal_received() and not pending_exception:
running_jobs += self.spool_new_jobs(load_id, schema, len(running_jobs))
self.update_loadpackage_info(load_id)

if len(running_jobs) == 0:
# if a pending exception was discovered during completion of jobs
# we can raise it now
if pending_exception:
raise pending_exception
break
# process remaining jobs again
jobs = remaining_jobs
# this will raise on signal
sleep(1)
sleep(0.5)
except LoadClientJobFailed:
# the package is completed and skipped
self.update_loadpackage_info(load_id)
self.complete_package(load_id, schema, True)
raise

# always update load package info
self.update_loadpackage_info(load_id)

# complete the package if no new or started jobs present after loop exit
if (
len(self.load_storage.list_new_jobs(load_id)) == 0
and len(self.load_storage.normalized_packages.list_started_jobs(load_id)) == 0
):
self.complete_package(load_id, schema, False)

def run(self, pool: Optional[Executor]) -> TRunMetrics:
# store pool
self.pool = pool or NullExecutor()
Expand Down
8 changes: 7 additions & 1 deletion dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ def filter_new_jobs(
file_names: Sequence[str],
capabilities: DestinationCapabilitiesContext,
config: LoaderConfiguration,
running_jobs_count: int,
) -> Sequence[str]:
"""Filters the list of new jobs to adhere to max_workers and parallellism strategy"""
"""NOTE: in the current setup we only filter based on settings for the final destination"""
Expand All @@ -242,6 +243,11 @@ def filter_new_jobs(
if mp := capabilities.max_parallel_load_jobs:
max_workers = min(max_workers, mp)

# if all slots are full, do not create new jobs
if running_jobs_count >= max_workers:
return []
max_jobs = max_workers - running_jobs_count

# regular sequential works on all jobs
eligible_jobs = file_names

Expand All @@ -257,4 +263,4 @@ def filter_new_jobs(
)
]

return eligible_jobs[:max_workers]
return eligible_jobs[:max_jobs]
Loading

0 comments on commit 78a5989

Please sign in to comment.