Skip to content

Commit

Permalink
allow clients to prepare for job execution on thread and move query t…
Browse files Browse the repository at this point in the history
…ag execution there.
  • Loading branch information
sh-rp committed Jul 30, 2024
1 parent 9fc995e commit bf9f912
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
5 changes: 5 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def run_managed(
# filepath is now moved to running
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
Expand Down Expand Up @@ -456,6 +457,10 @@ def create_load_job(
"""Creates a load job for a particular `table` with content in `file_path`"""
pass

def prepare_load_job_execution(self, job: RunnableLoadJob) -> None:
"""Prepare the connected job client for the execution of a load job (used for query tags in sql clients)"""
pass

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"

Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs"""
self._set_query_tags_for_job(load_id, table)
if SqlLoadJob.is_sql_job(file_path):
# create sql load job
return SqlLoadJob(file_path)
Expand Down Expand Up @@ -655,6 +654,9 @@ def _verify_schema(self) -> None:
logger.error(str(exception))
raise exceptions[0]

def prepare_load_job_execution(self, job: RunnableLoadJob) -> None:
self._set_query_tags_for_job(load_id=job._load_id, table=job._load_table)

def _set_query_tags_for_job(self, load_id: str, table: TTableSchema) -> None:
"""Sets query tags in sql_client for a job in package `load_id`, starting for a particular `table`"""
from dlt.common.pipeline import current_pipeline
Expand Down

0 comments on commit bf9f912

Please sign in to comment.