From 1d4995ddec7284bd4e612e9ddca6058d94584830 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 30 Oct 2023 12:57:54 +0100 Subject: [PATCH 01/12] start implementation of platform trace support --- .../configuration/specs/run_configuration.py | 3 ++ dlt/pipeline/__init__.py | 4 +-- dlt/pipeline/platform.py | 22 ++++++++++++++ dlt/pipeline/trace.py | 29 ++++++++++++------- 4 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 dlt/pipeline/platform.py diff --git a/dlt/common/configuration/specs/run_configuration.py b/dlt/common/configuration/specs/run_configuration.py index 2ec3648dbe..73e05a03c4 100644 --- a/dlt/common/configuration/specs/run_configuration.py +++ b/dlt/common/configuration/specs/run_configuration.py @@ -27,6 +27,9 @@ class RunConfiguration(BaseConfiguration): request_max_retry_delay: float = 300 """Maximum delay between http request retries""" config_files_storage_path: str = "/run/config/" + """Platform connection""" + beacon_url: Optional[str] = None + beacon_token: Optional[str] = None __section__ = "runtime" diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py index af7dd12294..012b03162d 100644 --- a/dlt/pipeline/__init__.py +++ b/dlt/pipeline/__init__.py @@ -241,8 +241,8 @@ def run( ) # plug default tracking module -from dlt.pipeline import trace, track -trace.TRACKING_MODULE = track +from dlt.pipeline import trace, track, platform +trace.TRACKING_MODULES = [track, platform] # setup default pipeline in the container Container()[PipelineContext] = PipelineContext(pipeline) diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py new file mode 100644 index 0000000000..1af1769a12 --- /dev/null +++ b/dlt/pipeline/platform.py @@ -0,0 +1,22 @@ +"""Implements SupportsTracking""" +from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline +from typing import Any + + +def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline): + if not pipeline.runtime_config.beacon_token or not pipeline.runtime_config.beacon_url: + return + +def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: + _send_to_beacon(trace, step, pipeline, None) + +def on_start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: + _send_to_beacon(trace, step, pipeline, None) + +def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: + _send_to_beacon(trace, step, pipeline, step_info) + +def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: + _send_to_beacon(trace, None, pipeline, None) + + diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index 2ba71396f6..b833790813 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -122,6 +122,10 @@ def last_pipeline_step_trace(self, step_name: TPipelineStep) -> PipelineStepTrac if step.step == step_name: return step return None + + def asdict(self) -> DictStrAny: + """A dictionary representation of PipelineTrace that can be loaded with `dlt`""" + return dataclasses.asdict(self) @property def last_extract_info(self) -> ExtractInfo: @@ -162,22 +166,23 @@ def on_end_trace(self, trace: PipelineTrace, pipeline: SupportsPipeline) -> None ... -# plug in your own tracking module here -# TODO: that probably should be a list of modules / classes with all of them called -TRACKING_MODULE: SupportsTracking = None +# plug in your own tracking modules here +TRACKING_MODULES: SupportsTracking = None def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrace: trace = PipelineTrace(uniq_id(), pendulum.now(), steps=[]) - with suppress_and_warn(): - TRACKING_MODULE.on_start_trace(trace, step, pipeline) + for module in TRACKING_MODULES: + with suppress_and_warn(): + module.on_start_trace(trace, step, pipeline) return trace def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineStepTrace: trace_step = PipelineStepTrace(uniq_id(), step, pendulum.now()) - with suppress_and_warn(): - TRACKING_MODULE.on_start_trace_step(trace, step, pipeline) + for module in TRACKING_MODULES: + with suppress_and_warn(): + module.on_start_trace_step(trace, step, pipeline) return trace_step @@ -211,16 +216,18 @@ def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: Supp trace.resolved_config_values = list(resolved_values) trace.steps.append(step) - with suppress_and_warn(): - TRACKING_MODULE.on_end_trace_step(trace, step, pipeline, step_info) + for module in TRACKING_MODULES: + with suppress_and_warn(): + module.on_end_trace_step(trace, step, pipeline, step_info) def end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, trace_path: str) -> None: trace.finished_at = pendulum.now() if trace_path: save_trace(trace_path, trace) - with suppress_and_warn(): - TRACKING_MODULE.on_end_trace(trace, pipeline) + for module in TRACKING_MODULES: + with suppress_and_warn(): + module.on_end_trace(trace, pipeline) def merge_traces(last_trace: PipelineTrace, new_trace: PipelineTrace) -> PipelineTrace: From 2a6bcb17e751b776ab81805f365bd52e4ea3f7c9 Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 30 Oct 2023 15:44:11 +0100 Subject: [PATCH 02/12] add beacon integration --- dlt/common/storages/load_storage.py | 4 +++- dlt/pipeline/platform.py | 25 +++++++++++++++++-------- dlt/pipeline/trace.py | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/dlt/common/storages/load_storage.py b/dlt/common/storages/load_storage.py index d8eee9b8d6..f4e698b18a 100644 --- a/dlt/common/storages/load_storage.py +++ b/dlt/common/storages/load_storage.py @@ -87,6 +87,7 @@ class LoadPackageInfo(NamedTuple): package_path: str state: TLoadPackageState schema_name: str + schema: Schema schema_update: TSchemaTables completed_at: datetime.datetime jobs: Dict[TJobState, List[LoadJobInfo]] @@ -110,6 +111,7 @@ def asdict(self) -> DictStrAny: table["columns"] = columns d.pop("schema_update") d["tables"] = tables + d["schema"] = self.schema.to_dict() return d def asstr(self, verbosity: int = 0) -> str: @@ -290,7 +292,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo: jobs.append(self._read_job_file_info(state, file, package_created_at)) all_jobs[state] = jobs - return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, applied_update, package_created_at, all_jobs) + return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema, applied_update, package_created_at, all_jobs) def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]: package_path = self.get_normalized_package_path(load_id) diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 1af1769a12..869e37e69d 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -1,22 +1,31 @@ """Implements SupportsTracking""" -from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline from typing import Any +from dlt.sources.helpers import requests +from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline +from dlt.common import json + +count = 0 -def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline): - if not pipeline.runtime_config.beacon_token or not pipeline.runtime_config.beacon_url: - return +def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, some): + if pipeline.runtime_config.beacon_token and pipeline.runtime_config.beacon_url: + trace_dump = json.dumps(trace.asdict()) + url = f"{pipeline.runtime_config.beacon_url}/pipeline/{pipeline.runtime_config.beacon_token}/traces" + requests.put(url, json=trace_dump) def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: - _send_to_beacon(trace, step, pipeline, None) + # _send_to_beacon(trace, step, pipeline, None) + pass def on_start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: - _send_to_beacon(trace, step, pipeline, None) + # _send_to_beacon(trace, step, pipeline, None) + pass def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: - _send_to_beacon(trace, step, pipeline, step_info) + # _send_to_beacon(trace, step, pipeline, step_info) + pass def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: _send_to_beacon(trace, None, pipeline, None) - + pass diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index b833790813..e3e11e3b39 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -180,6 +180,7 @@ def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrac def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineStepTrace: trace_step = PipelineStepTrace(uniq_id(), step, pendulum.now()) + trace.steps.append(trace_step) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_start_trace_step(trace, step, pipeline) @@ -215,7 +216,6 @@ def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: Supp ) , _RESOLVED_TRACES.values()) trace.resolved_config_values = list(resolved_values) - trace.steps.append(step) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_end_trace_step(trace, step, pipeline, step_info) From fea80fa4d9677bf5d56c06eec50fbd9e71b91dea Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 1 Nov 2023 09:51:41 +0100 Subject: [PATCH 03/12] small changes --- dlt/common/configuration/specs/run_configuration.py | 3 +-- dlt/common/pipeline.py | 2 +- dlt/pipeline/platform.py | 10 +++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dlt/common/configuration/specs/run_configuration.py b/dlt/common/configuration/specs/run_configuration.py index 73e05a03c4..c8396b78b6 100644 --- a/dlt/common/configuration/specs/run_configuration.py +++ b/dlt/common/configuration/specs/run_configuration.py @@ -28,8 +28,7 @@ class RunConfiguration(BaseConfiguration): """Maximum delay between http request retries""" config_files_storage_path: str = "/run/config/" """Platform connection""" - beacon_url: Optional[str] = None - beacon_token: Optional[str] = None + beacon_dsn: Optional[str] = None __section__ = "runtime" diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index ddd9003799..d316763a56 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -54,7 +54,7 @@ def asdict(self) -> DictStrAny: """A dictionary representation of NormalizeInfo that can be loaded with `dlt`""" d = self._asdict() # list representation creates a nice table - d["row_counts"] = [(k, v) for k, v in self.row_counts.items()] + d["row_counts"] = [{"table_name": k, "count": v} for k, v in self.row_counts.items()] return d def asstr(self, verbosity: int = 0) -> str: diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 869e37e69d..ec476a82cd 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -8,10 +8,14 @@ count = 0 def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, some): - if pipeline.runtime_config.beacon_token and pipeline.runtime_config.beacon_url: + if pipeline.runtime_config.beacon_dsn: trace_dump = json.dumps(trace.asdict()) - url = f"{pipeline.runtime_config.beacon_url}/pipeline/{pipeline.runtime_config.beacon_token}/traces" - requests.put(url, json=trace_dump) + requests.put(pipeline.runtime_config.beacon_dsn, data=trace_dump) + + trace_dump = json.dumps(trace.asdict(), pretty=True) + with open(f"trace-{count}.json", "w") as f: + f.write(trace_dump) + def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: # _send_to_beacon(trace, step, pipeline, None) From f08b7958f2c60a0cdbe1972ca13b06baa69c69f4 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 1 Nov 2023 12:43:01 +0100 Subject: [PATCH 04/12] fix tests --- dlt/pipeline/platform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index ec476a82cd..5a2a98041d 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -1,7 +1,7 @@ """Implements SupportsTracking""" from typing import Any +import requests -from dlt.sources.helpers import requests from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline from dlt.common import json From dc81f0576441fd7371eea093f4de5662eeea4aad Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 20 Nov 2023 14:01:59 +0100 Subject: [PATCH 05/12] add sending on threads to platform connection change config attribute to platform_dsn add exeuction context info to pipeline trace add pipeline name to pipeline trace --- .../configuration/specs/run_configuration.py | 2 +- dlt/common/runtime/exec_info.py | 21 ++++++++++-- dlt/common/runtime/segment.py | 21 ++++-------- dlt/common/runtime/typing.py | 18 +++++++++++ dlt/common/storages/load_storage.py | 7 ++-- dlt/pipeline/platform.py | 32 +++++++++++++------ dlt/pipeline/trace.py | 9 ++++-- 7 files changed, 75 insertions(+), 35 deletions(-) create mode 100644 dlt/common/runtime/typing.py diff --git a/dlt/common/configuration/specs/run_configuration.py b/dlt/common/configuration/specs/run_configuration.py index c8396b78b6..badc2335d7 100644 --- a/dlt/common/configuration/specs/run_configuration.py +++ b/dlt/common/configuration/specs/run_configuration.py @@ -28,7 +28,7 @@ class RunConfiguration(BaseConfiguration): """Maximum delay between http request retries""" config_files_storage_path: str = "/run/config/" """Platform connection""" - beacon_dsn: Optional[str] = None + platform_dsn: Optional[str] = None __section__ = "runtime" diff --git a/dlt/common/runtime/exec_info.py b/dlt/common/runtime/exec_info.py index ecb8376aa7..fb8a7def7d 100644 --- a/dlt/common/runtime/exec_info.py +++ b/dlt/common/runtime/exec_info.py @@ -1,13 +1,16 @@ import io import os import contextlib +import sys +import multiprocessing +import platform +from dlt.common.runtime.typing import TExecutionContext, TVersion, TExecInfoNames from dlt.common.typing import StrStr, StrAny, Literal, List from dlt.common.utils import filter_env_vars -from dlt.version import __version__ +from dlt.version import __version__, DLT_PKG_NAME -TExecInfoNames = Literal["kubernetes", "docker", "codespaces", "github_actions", "airflow", "notebook", "colab","aws_lambda","gcp_cloud_function"] # if one of these environment variables is set, we assume to be running in CI env CI_ENVIRONMENT_TELL = [ "bamboo.buildKey", @@ -163,4 +166,16 @@ def is_aws_lambda() -> bool: def is_gcp_cloud_function() -> bool: "Return True if the process is running in the serverless platform GCP Cloud Functions" - return os.environ.get("FUNCTION_NAME") is not None \ No newline at end of file + return os.environ.get("FUNCTION_NAME") is not None + + +def get_execution_context() -> TExecutionContext: + "Get execution context information" + return TExecutionContext( + ci_run=in_continuous_integration(), + python=sys.version.split(" ")[0], + cpu=multiprocessing.cpu_count(), + exec_info=exec_info_names(), + os=TVersion(name=platform.system(), version=platform.release()), + library=TVersion(name=DLT_PKG_NAME, version=__version__) + ) diff --git a/dlt/common/runtime/segment.py b/dlt/common/runtime/segment.py index b8d533cccb..af2edbcac0 100644 --- a/dlt/common/runtime/segment.py +++ b/dlt/common/runtime/segment.py @@ -2,12 +2,10 @@ # several code fragments come from https://github.com/RasaHQ/rasa/blob/main/rasa/telemetry.py import os -import sys -import multiprocessing + import atexit import base64 import requests -import platform from concurrent.futures import ThreadPoolExecutor from typing import Literal, Optional from dlt.common.configuration.paths import get_dlt_data_dir @@ -15,10 +13,10 @@ from dlt.common.runtime import logger from dlt.common.configuration.specs import RunConfiguration -from dlt.common.runtime.exec_info import exec_info_names, in_continuous_integration +from dlt.common.runtime.exec_info import get_execution_context, TExecutionContext from dlt.common.typing import DictStrAny, StrAny from dlt.common.utils import uniq_id -from dlt.version import __version__, DLT_PKG_NAME +from dlt.version import __version__ TEventCategory = Literal["pipeline", "command", "helper"] @@ -27,7 +25,7 @@ _WRITE_KEY: str = None _SEGMENT_REQUEST_TIMEOUT = (1.0, 1.0) # short connect & send timeouts _SEGMENT_ENDPOINT = "https://api.segment.io/v1/track" -_SEGMENT_CONTEXT: DictStrAny = None +_SEGMENT_CONTEXT: TExecutionContext = None def init_segment(config: RunConfiguration) -> None: @@ -150,7 +148,7 @@ def _segment_request_payload( } -def _default_context_fields() -> DictStrAny: +def _default_context_fields() -> TExecutionContext: """Return a dictionary that contains the default context values. Return: @@ -161,14 +159,7 @@ def _default_context_fields() -> DictStrAny: if not _SEGMENT_CONTEXT: # Make sure to update the example in docs/docs/telemetry/telemetry.mdx # if you change / add context - _SEGMENT_CONTEXT = { - "os": {"name": platform.system(), "version": platform.release()}, - "ci_run": in_continuous_integration(), - "python": sys.version.split(" ")[0], - "library": {"name": DLT_PKG_NAME, "version": __version__}, - "cpu": multiprocessing.cpu_count(), - "exec_info": exec_info_names() - } + _SEGMENT_CONTEXT = get_execution_context() # avoid returning the cached dict --> caller could modify the dictionary... # usually we would use `lru_cache`, but that doesn't return a dict copy and diff --git a/dlt/common/runtime/typing.py b/dlt/common/runtime/typing.py new file mode 100644 index 0000000000..d308782527 --- /dev/null +++ b/dlt/common/runtime/typing.py @@ -0,0 +1,18 @@ +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Set, Type, TypedDict, NewType, Union, get_args + + +TExecInfoNames = Literal["kubernetes", "docker", "codespaces", "github_actions", "airflow", "notebook", "colab","aws_lambda","gcp_cloud_function"] + +class TVersion(TypedDict): + """TypeDict representing a library version""" + name: str + version: str + +class TExecutionContext(TypedDict): + """TypeDict representing the runtime context info""" + ci_run: bool + python: str + cpu: int + exec_info: List[TExecInfoNames] + library: TVersion + os: TVersion \ No newline at end of file diff --git a/dlt/common/storages/load_storage.py b/dlt/common/storages/load_storage.py index f4e698b18a..83e773579d 100644 --- a/dlt/common/storages/load_storage.py +++ b/dlt/common/storages/load_storage.py @@ -18,6 +18,7 @@ from dlt.common.configuration.accessors import config from dlt.common.exceptions import TerminalValueError from dlt.common.schema import Schema, TSchemaTables, TTableSchemaColumns +from dlt.common.schema.typing import TStoredSchema from dlt.common.storages.configuration import LoadStorageConfiguration from dlt.common.storages.versioned_storage import VersionedStorage from dlt.common.storages.data_item_storage import DataItemStorage @@ -87,7 +88,7 @@ class LoadPackageInfo(NamedTuple): package_path: str state: TLoadPackageState schema_name: str - schema: Schema + schema: TStoredSchema schema_update: TSchemaTables completed_at: datetime.datetime jobs: Dict[TJobState, List[LoadJobInfo]] @@ -111,7 +112,7 @@ def asdict(self) -> DictStrAny: table["columns"] = columns d.pop("schema_update") d["tables"] = tables - d["schema"] = self.schema.to_dict() + d["schema"] = self.schema return d def asstr(self, verbosity: int = 0) -> str: @@ -292,7 +293,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo: jobs.append(self._read_job_file_info(state, file, package_created_at)) all_jobs[state] = jobs - return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema, applied_update, package_created_at, all_jobs) + return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema.to_dict(), applied_update, package_created_at, all_jobs) def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]: package_path = self.get_normalized_package_path(load_id) diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 5a2a98041d..461305dcc0 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -1,21 +1,34 @@ """Implements SupportsTracking""" from typing import Any import requests +from concurrent.futures import ThreadPoolExecutor from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline from dlt.common import json +from dlt.common.runtime import logger -count = 0 +_THREAD_POOL: ThreadPoolExecutor = None -def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, some): - if pipeline.runtime_config.beacon_dsn: - trace_dump = json.dumps(trace.asdict()) - requests.put(pipeline.runtime_config.beacon_dsn, data=trace_dump) +def _init_thread_pool_if_needed() -> None: + global _THREAD_POOL + if not _THREAD_POOL: + _THREAD_POOL = ThreadPoolExecutor(1) - trace_dump = json.dumps(trace.asdict(), pretty=True) - with open(f"trace-{count}.json", "w") as f: - f.write(trace_dump) +def _send_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: + if pipeline.runtime_config.platform_dsn: + def _future_send() -> None: + trace_dump = json.dumps(trace.asdict()) + response = requests.put(pipeline.runtime_config.platform_dsn, data=trace_dump) + if response.status_code != 200: + logger.debug("Failed to send trace to platform.") + + _init_thread_pool_if_needed() + _THREAD_POOL.submit(_future_send) + + # trace_dump = json.dumps(trace.asdict(), pretty=True) + # with open(f"trace.json", "w") as f: + # f.write(trace_dump) def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: # _send_to_beacon(trace, step, pipeline, None) @@ -30,6 +43,5 @@ def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: S pass def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: - _send_to_beacon(trace, None, pipeline, None) - pass + _send_to_platform(trace, pipeline) diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index e3e11e3b39..fc64f4ec37 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -8,6 +8,7 @@ from dlt.common import pendulum from dlt.common.runtime.logger import suppress_and_warn +from dlt.common.runtime.exec_info import TExecutionContext, get_execution_context from dlt.common.configuration import is_secret_hint from dlt.common.configuration.utils import _RESOLVED_TRACES from dlt.common.pipeline import ExtractDataInfo, ExtractInfo, LoadInfo, NormalizeInfo, SupportsPipeline @@ -92,6 +93,8 @@ def asdict(self) -> DictStrAny: class PipelineTrace: """Pipeline runtime trace containing data on "extract", "normalize" and "load" steps and resolved config and secret values.""" transaction_id: str + pipeline_name: str + execution_context: TExecutionContext started_at: datetime.datetime steps: List[PipelineStepTrace] """A list of steps in the trace""" @@ -122,7 +125,7 @@ def last_pipeline_step_trace(self, step_name: TPipelineStep) -> PipelineStepTrac if step.step == step_name: return step return None - + def asdict(self) -> DictStrAny: """A dictionary representation of PipelineTrace that can be loaded with `dlt`""" return dataclasses.asdict(self) @@ -167,11 +170,11 @@ def on_end_trace(self, trace: PipelineTrace, pipeline: SupportsPipeline) -> None # plug in your own tracking modules here -TRACKING_MODULES: SupportsTracking = None +TRACKING_MODULES: List[SupportsTracking] = None def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrace: - trace = PipelineTrace(uniq_id(), pendulum.now(), steps=[]) + trace = PipelineTrace(uniq_id(), pipeline.pipeline_name, get_execution_context(), pendulum.now(), steps=[]) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_start_trace(trace, step, pipeline) From d3109494d75b0b26dad27f4c76b8d944c4373a8a Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 20 Nov 2023 14:18:18 +0100 Subject: [PATCH 06/12] fix config test --- tests/common/configuration/test_configuration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index fc009d8444..cb974f554f 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -461,6 +461,7 @@ class _SecretCredentials(RunConfiguration): 'request_backoff_factor': 1, 'request_max_retry_delay': 300, 'config_files_storage_path': 'storage', + 'platform_dsn': None, "secret_value": None } assert dict(_SecretCredentials()) == expected_dict From 5758e0517a7734560ea279a137b3260e63af162b Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 20 Nov 2023 14:18:33 +0100 Subject: [PATCH 07/12] revert to adding tracestep on end tracestep --- dlt/pipeline/trace.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index fc64f4ec37..c702fa2655 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -183,7 +183,6 @@ def start_trace(step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineTrac def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineStepTrace: trace_step = PipelineStepTrace(uniq_id(), step, pendulum.now()) - trace.steps.append(trace_step) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_start_trace_step(trace, step, pipeline) @@ -219,6 +218,7 @@ def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: Supp ) , _RESOLVED_TRACES.values()) trace.resolved_config_values = list(resolved_values) + trace.steps.append(step) for module in TRACKING_MODULES: with suppress_and_warn(): module.on_end_trace_step(trace, step, pipeline, step_info) From 03fcc21c476b4deaaeb2eced03c2efc293da321b Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 20 Nov 2023 14:58:20 +0100 Subject: [PATCH 08/12] add test for platform connection --- tests/pipeline/test_platform_connection.py | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/pipeline/test_platform_connection.py diff --git a/tests/pipeline/test_platform_connection.py b/tests/pipeline/test_platform_connection.py new file mode 100644 index 0000000000..39caa63872 --- /dev/null +++ b/tests/pipeline/test_platform_connection.py @@ -0,0 +1,41 @@ +import dlt +import os +import time +import requests_mock + +def test_platform_connection() -> None: + + mock_platform_url = "http://platform.com/endpoint" + + os.environ["RUNTIME__PLATFORM_DSN"] = mock_platform_url + + # simple pipeline + @dlt.source(name="platform_test") + def my_source(): + + @dlt.resource(name="test_resource") + def data(): + yield [1, 2, 3] + + return data() + + p = dlt.pipeline(destination="duckdb", pipeline_name="platform_test_pipeline") + + with requests_mock.mock() as m: + m.put(mock_platform_url, json={}, status_code=200) + p.run(my_source()) + + # sleep a bit and find trace in mock requests + time.sleep(1) + + trace_result = None + for call in m.request_history: + if call.url == mock_platform_url: + assert not trace_result, "Multiple calls to platform dsn endpoint" + trace_result = call.json() + + # basic check of trace result + assert trace_result + assert trace_result["pipeline_name"] == "platform_test_pipeline" + assert len(trace_result["steps"]) == 4 + assert trace_result["execution_context"]["library"]["name"] == "dlt" From 4b9adaec7a0c23a65a11c2db9a174a1ce39a7190 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 17:16:16 +0100 Subject: [PATCH 09/12] pr fixes --- .../configuration/specs/run_configuration.py | 2 +- dlt/common/managed_thread_pool.py | 27 ++++++ dlt/common/pipeline.py | 6 +- dlt/common/runtime/segment.py | 17 ++-- dlt/common/runtime/typing.py | 12 ++- dlt/common/storages/load_storage.py | 6 +- dlt/pipeline/pipeline.py | 84 +++++++++-------- dlt/pipeline/platform.py | 94 +++++++++++++++---- dlt/pipeline/trace.py | 12 +-- dlt/pipeline/track.py | 4 +- .../configuration/test_configuration.py | 2 +- tests/pipeline/test_platform_connection.py | 44 +++++++-- 12 files changed, 218 insertions(+), 92 deletions(-) create mode 100644 dlt/common/managed_thread_pool.py diff --git a/dlt/common/configuration/specs/run_configuration.py b/dlt/common/configuration/specs/run_configuration.py index badc2335d7..badf56bde2 100644 --- a/dlt/common/configuration/specs/run_configuration.py +++ b/dlt/common/configuration/specs/run_configuration.py @@ -28,7 +28,7 @@ class RunConfiguration(BaseConfiguration): """Maximum delay between http request retries""" config_files_storage_path: str = "/run/config/" """Platform connection""" - platform_dsn: Optional[str] = None + dlthub_dsn: Optional[TSecretStrValue] = None __section__ = "runtime" diff --git a/dlt/common/managed_thread_pool.py b/dlt/common/managed_thread_pool.py new file mode 100644 index 0000000000..a484294f3c --- /dev/null +++ b/dlt/common/managed_thread_pool.py @@ -0,0 +1,27 @@ +from typing import Optional + +import atexit +from concurrent.futures import ThreadPoolExecutor + +class ManagedThreadPool: + + def __init__(self, max_workers: int = 1) -> None: + self._max_workers = max_workers + self._thread_pool: Optional[ThreadPoolExecutor] = None + + def _create_thread_pool(self) -> None: + assert not self._thread_pool, "Thread pool already created" + self._thread_pool = ThreadPoolExecutor(self._max_workers) + # flush pool on exit + atexit.register(self.stop) + + @property + def thread_pool(self) -> ThreadPoolExecutor: + if not self._thread_pool: + self._create_thread_pool() + return self._thread_pool + + def stop(self, wait: bool = True) -> None: + if self._thread_pool: + self._thread_pool.shutdown(wait=wait) + self._thread_pool = None \ No newline at end of file diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index d316763a56..ebf6a5731c 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -2,7 +2,7 @@ import datetime # noqa: 251 import humanize import contextlib -from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict +from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict, Mapping from typing_extensions import NotRequired from dlt.common import pendulum, logger @@ -194,6 +194,10 @@ class SupportsPipeline(Protocol): def state(self) -> TPipelineState: """Returns dictionary with pipeline state""" + @property + def schemas(self) -> Mapping[str, Schema]: + """Mapping of all pipeline schemas""" + def set_local_state_val(self, key: str, value: Any) -> None: """Sets value in local state. Local state is not synchronized with destination.""" diff --git a/dlt/common/runtime/segment.py b/dlt/common/runtime/segment.py index af2edbcac0..b05fc902cf 100644 --- a/dlt/common/runtime/segment.py +++ b/dlt/common/runtime/segment.py @@ -11,6 +11,7 @@ from dlt.common.configuration.paths import get_dlt_data_dir from dlt.common.runtime import logger +from dlt.common.managed_thread_pool import ManagedThreadPool from dlt.common.configuration.specs import RunConfiguration from dlt.common.runtime.exec_info import get_execution_context, TExecutionContext @@ -20,7 +21,7 @@ TEventCategory = Literal["pipeline", "command", "helper"] -_THREAD_POOL: ThreadPoolExecutor = None +_THREAD_POOL: ManagedThreadPool = ManagedThreadPool(1) _SESSION: requests.Session = None _WRITE_KEY: str = None _SEGMENT_REQUEST_TIMEOUT = (1.0, 1.0) # short connect & send timeouts @@ -32,9 +33,8 @@ def init_segment(config: RunConfiguration) -> None: assert config.dlthub_telemetry_segment_write_key, "dlthub_telemetry_segment_write_key not present in RunConfiguration" # create thread pool to send telemetry to segment - global _THREAD_POOL, _WRITE_KEY, _SESSION - if not _THREAD_POOL: - _THREAD_POOL = ThreadPoolExecutor(1) + global _WRITE_KEY, _SESSION + if not _SESSION: _SESSION = requests.Session() # flush pool on exit atexit.register(_at_exit_cleanup) @@ -84,10 +84,9 @@ def before_send(event: DictStrAny) -> Optional[DictStrAny]: def _at_exit_cleanup() -> None: - global _THREAD_POOL, _SESSION, _WRITE_KEY, _SEGMENT_CONTEXT - if _THREAD_POOL: - _THREAD_POOL.shutdown(wait=True) - _THREAD_POOL = None + global _SESSION, _WRITE_KEY, _SEGMENT_CONTEXT + if _SESSION: + _THREAD_POOL.stop(True) _SESSION.close() _SESSION = None _WRITE_KEY = None @@ -211,4 +210,4 @@ def _future_send() -> None: f"Segment telemetry request returned a failure. Response: {data}" ) - _THREAD_POOL.submit(_future_send) + _THREAD_POOL.thread_pool.submit(_future_send) diff --git a/dlt/common/runtime/typing.py b/dlt/common/runtime/typing.py index d308782527..39def406f8 100644 --- a/dlt/common/runtime/typing.py +++ b/dlt/common/runtime/typing.py @@ -1,7 +1,17 @@ from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Set, Type, TypedDict, NewType, Union, get_args -TExecInfoNames = Literal["kubernetes", "docker", "codespaces", "github_actions", "airflow", "notebook", "colab","aws_lambda","gcp_cloud_function"] +TExecInfoNames = Literal[ + "kubernetes", + "docker", + "codespaces", + "github_actions", + "airflow", + "notebook", + "colab", + "aws_lambda", + "gcp_cloud_function" + ] class TVersion(TypedDict): """TypeDict representing a library version""" diff --git a/dlt/common/storages/load_storage.py b/dlt/common/storages/load_storage.py index 83e773579d..854b4ba5a6 100644 --- a/dlt/common/storages/load_storage.py +++ b/dlt/common/storages/load_storage.py @@ -88,7 +88,7 @@ class LoadPackageInfo(NamedTuple): package_path: str state: TLoadPackageState schema_name: str - schema: TStoredSchema + schema_hash: str schema_update: TSchemaTables completed_at: datetime.datetime jobs: Dict[TJobState, List[LoadJobInfo]] @@ -112,7 +112,7 @@ def asdict(self) -> DictStrAny: table["columns"] = columns d.pop("schema_update") d["tables"] = tables - d["schema"] = self.schema + d["schema_hash"] = self.schema_hash return d def asstr(self, verbosity: int = 0) -> str: @@ -293,7 +293,7 @@ def get_load_package_info(self, load_id: str) -> LoadPackageInfo: jobs.append(self._read_job_file_info(state, file, package_created_at)) all_jobs[state] = jobs - return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema.to_dict(), applied_update, package_created_at, all_jobs) + return LoadPackageInfo(load_id, self.storage.make_full_path(package_path), package_state, schema.name, schema.version_hash, applied_update, package_created_at, all_jobs) def begin_schema_update(self, load_id: str) -> Optional[TSchemaTables]: package_path = self.get_normalized_package_path(load_id) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 465eccfdb6..0e8169051a 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from functools import wraps from collections.abc import Sequence as C_Sequence -from typing import Any, Callable, ClassVar, List, Iterator, Optional, Sequence, Tuple, cast, get_type_hints, ContextManager +from typing import Any, Callable, ClassVar, List, Iterator, Optional, Sequence, Tuple, cast, get_type_hints, ContextManager, Mapping from concurrent.futures import Executor from dlt import version @@ -89,48 +89,52 @@ def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any: return _wrap # type: ignore -def with_runtime_trace(f: TFun) -> TFun: +def with_runtime_trace(send_state: bool = False) -> Callable[[TFun], TFun]: - @wraps(f) - def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any: - trace: PipelineTrace = self._trace - trace_step: PipelineStepTrace = None - step_info: Any = None - is_new_trace = self._trace is None and self.config.enable_runtime_trace + def decorator(f: TFun) -> TFun: - # create a new trace if we enter a traced function and there's no current trace - if is_new_trace: - self._trace = trace = start_trace(cast(TPipelineStep, f.__name__), self) + @wraps(f) + def _wrap(self: "Pipeline", *args: Any, **kwargs: Any) -> Any: + trace: PipelineTrace = self._trace + trace_step: PipelineStepTrace = None + step_info: Any = None + is_new_trace = self._trace is None and self.config.enable_runtime_trace - try: - # start a trace step for wrapped function - if trace: - trace_step = start_trace_step(trace, cast(TPipelineStep, f.__name__), self) + # create a new trace if we enter a traced function and there's no current trace + if is_new_trace: + self._trace = trace = start_trace(cast(TPipelineStep, f.__name__), self) - step_info = f(self, *args, **kwargs) - return step_info - except Exception as ex: - step_info = ex # step info is an exception - raise - finally: try: - if trace_step: - # if there was a step, finish it - end_trace_step(self._trace, trace_step, self, step_info) - if is_new_trace: - assert trace is self._trace, f"Messed up trace reference {id(self._trace)} vs {id(trace)}" - end_trace(trace, self, self._pipeline_storage.storage_path) + # start a trace step for wrapped function + if trace: + trace_step = start_trace_step(trace, cast(TPipelineStep, f.__name__), self) + + step_info = f(self, *args, **kwargs) + return step_info + except Exception as ex: + step_info = ex # step info is an exception + raise finally: - # always end trace - if is_new_trace: - assert self._trace == trace, f"Messed up trace reference {id(self._trace)} vs {id(trace)}" - # if we end new trace that had only 1 step, add it to previous trace - # this way we combine several separate calls to extract, normalize, load as single trace - # the trace of "run" has many steps and will not be merged - self._last_trace = merge_traces(self._last_trace, trace) - self._trace = None + try: + if trace_step: + # if there was a step, finish it + end_trace_step(self._trace, trace_step, self, step_info, send_state) + if is_new_trace: + assert trace is self._trace, f"Messed up trace reference {id(self._trace)} vs {id(trace)}" + end_trace(trace, self, self._pipeline_storage.storage_path, send_state) + finally: + # always end trace + if is_new_trace: + assert self._trace == trace, f"Messed up trace reference {id(self._trace)} vs {id(trace)}" + # if we end new trace that had only 1 step, add it to previous trace + # this way we combine several separate calls to extract, normalize, load as single trace + # the trace of "run" has many steps and will not be merged + self._last_trace = merge_traces(self._last_trace, trace) + self._trace = None - return _wrap # type: ignore + return _wrap # type: ignore + + return decorator def with_config_section(sections: Tuple[str, ...]) -> Callable[[TFun], TFun]: @@ -253,7 +257,7 @@ def drop(self) -> "Pipeline": self.runtime_config ) - @with_runtime_trace + @with_runtime_trace() @with_schemas_sync # this must precede with_state_sync @with_state_sync(may_extract_state=True) @with_config_section((known_sections.EXTRACT,)) @@ -293,7 +297,7 @@ def extract( # TODO: provide metrics from extractor raise PipelineStepFailed(self, "extract", exc, ExtractInfo(describe_extract_data(data))) from exc - @with_runtime_trace + @with_runtime_trace() @with_schemas_sync @with_config_section((known_sections.NORMALIZE,)) def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = None) -> NormalizeInfo: @@ -326,7 +330,7 @@ def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = No except Exception as n_ex: raise PipelineStepFailed(self, "normalize", n_ex, normalize.get_normalize_info()) from n_ex - @with_runtime_trace + @with_runtime_trace() @with_schemas_sync @with_state_sync() @with_config_section((known_sections.LOAD,)) @@ -379,7 +383,7 @@ def load( except Exception as l_ex: raise PipelineStepFailed(self, "load", l_ex, self._get_load_info(load)) from l_ex - @with_runtime_trace + @with_runtime_trace(send_state=True) @with_config_section(("run",)) def run( self, diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 461305dcc0..136239a0a8 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -1,47 +1,101 @@ """Implements SupportsTracking""" -from typing import Any +from typing import Any, cast, TypedDict, List import requests -from concurrent.futures import ThreadPoolExecutor +from dlt.common.managed_thread_pool import ManagedThreadPool +from urllib.parse import urljoin from dlt.pipeline.trace import PipelineTrace, PipelineStepTrace, TPipelineStep, SupportsPipeline from dlt.common import json from dlt.common.runtime import logger +from dlt.common.pipeline import LoadInfo +from dlt.common.schema.typing import TStoredSchema -_THREAD_POOL: ThreadPoolExecutor = None +_THREAD_POOL: ManagedThreadPool = ManagedThreadPool(1) +TRACE_URL_SUFFIX = "/trace" +STATE_URL_SUFFIX = "/state" -def _init_thread_pool_if_needed() -> None: - global _THREAD_POOL - if not _THREAD_POOL: - _THREAD_POOL = ThreadPoolExecutor(1) +class TSchemaSyncPayload(TypedDict): + pipeline_name: str + destination_name: str + destination_displayable_credentials: str + destination_fingerprint: str + dataset_name: str + schemas: List[TStoredSchema] -def _send_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: - if pipeline.runtime_config.platform_dsn: +def _send_trace_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: + """ + Send the full trace after a run operation to the platform + TODO: Migrate this to open telemetry in the next iteration + """ + if not pipeline.runtime_config.dlthub_dsn: + return - def _future_send() -> None: + def _future_send() -> None: + try: trace_dump = json.dumps(trace.asdict()) - response = requests.put(pipeline.runtime_config.platform_dsn, data=trace_dump) + url = pipeline.runtime_config.dlthub_dsn + TRACE_URL_SUFFIX + response = requests.put(url, data=trace_dump) if response.status_code != 200: - logger.debug("Failed to send trace to platform.") + logger.debug(f"Failed to send trace to platform, response code: {response.status_code}") + except Exception as e: + logger.debug(f"Exception while sending trace to platform: {e}") - _init_thread_pool_if_needed() - _THREAD_POOL.submit(_future_send) + _THREAD_POOL.thread_pool.submit(_future_send) # trace_dump = json.dumps(trace.asdict(), pretty=True) # with open(f"trace.json", "w") as f: # f.write(trace_dump) +def _sync_schemas_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: + if not pipeline.runtime_config.dlthub_dsn: + return + + # sync only if load step was processed + load_info: LoadInfo = None + for step in trace.steps: + if step.step == "load": + load_info = cast(LoadInfo, step.step_info) + + if not load_info: + return + + payload = TSchemaSyncPayload( + pipeline_name=pipeline.pipeline_name, + destination_name=load_info.destination_name, + destination_displayable_credentials=load_info.destination_displayable_credentials, + destination_fingerprint=load_info.destination_fingerprint, + dataset_name=load_info.dataset_name, + schemas=[] + ) + + # attach all schemas + for schema_name in pipeline.schemas: + schema = pipeline.schemas[schema_name] + payload["schemas"].append(schema.to_dict()) + + def _future_send() -> None: + try: + url = pipeline.runtime_config.dlthub_dsn + STATE_URL_SUFFIX + response = requests.put(url, data=json.dumps(payload)) + if response.status_code != 200: + logger.debug(f"Failed to send state to platform, response code: {response.status_code}") + except Exception as e: + logger.debug(f"Exception while sending state to platform: {e}") + + _THREAD_POOL.thread_pool.submit(_future_send) + def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: - # _send_to_beacon(trace, step, pipeline, None) pass def on_start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: - # _send_to_beacon(trace, step, pipeline, None) pass -def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: - # _send_to_beacon(trace, step, pipeline, step_info) +def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any, send_state: bool) -> None: pass -def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: - _send_to_platform(trace, pipeline) +def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None: + _send_trace_to_platform(trace, pipeline) + if send_state: + # also sync schemas to dlthub + _sync_schemas_to_platform(trace, pipeline) diff --git a/dlt/pipeline/trace.py b/dlt/pipeline/trace.py index c702fa2655..1fb04cda2e 100644 --- a/dlt/pipeline/trace.py +++ b/dlt/pipeline/trace.py @@ -162,10 +162,10 @@ def on_start_trace(self, trace: PipelineTrace, step: TPipelineStep, pipeline: Su def on_start_trace_step(self, trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: ... - def on_end_trace_step(self, trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: + def on_end_trace_step(self, trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any, send_state: bool) -> None: ... - def on_end_trace(self, trace: PipelineTrace, pipeline: SupportsPipeline) -> None: + def on_end_trace(self, trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None: ... @@ -189,7 +189,7 @@ def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: Suppor return trace_step -def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: +def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any, send_state: bool) -> None: # saves runtime trace of the pipeline if isinstance(step_info, PipelineStepFailed): step_exception = str(step_info) @@ -221,16 +221,16 @@ def end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: Supp trace.steps.append(step) for module in TRACKING_MODULES: with suppress_and_warn(): - module.on_end_trace_step(trace, step, pipeline, step_info) + module.on_end_trace_step(trace, step, pipeline, step_info, send_state) -def end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, trace_path: str) -> None: +def end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, trace_path: str, send_state: bool) -> None: trace.finished_at = pendulum.now() if trace_path: save_trace(trace_path, trace) for module in TRACKING_MODULES: with suppress_and_warn(): - module.on_end_trace(trace, pipeline) + module.on_end_trace(trace, pipeline, send_state) def merge_traces(last_trace: PipelineTrace, new_trace: PipelineTrace) -> PipelineTrace: diff --git a/dlt/pipeline/track.py b/dlt/pipeline/track.py index 07e9a2d137..490d98e794 100644 --- a/dlt/pipeline/track.py +++ b/dlt/pipeline/track.py @@ -75,7 +75,7 @@ def on_start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: Sup _add_sentry_tags(span, pipeline) -def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any) -> None: +def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, step_info: Any, send_state: bool) -> None: if pipeline.runtime_config.sentry_dsn: # print(f"---END SENTRY SPAN {trace.transaction_id}:{step.span_id}: {step} SCOPE: {Hub.current.scope}") with contextlib.suppress(Exception): @@ -103,7 +103,7 @@ def on_end_trace_step(trace: PipelineTrace, step: PipelineStepTrace, pipeline: S dlthub_telemetry_track("pipeline", step.step, props) -def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: +def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None: if pipeline.runtime_config.sentry_dsn: # print(f"---END SENTRY TX: {trace.transaction_id} SCOPE: {Hub.current.scope}") with contextlib.suppress(Exception): diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index cb974f554f..f902f2ac50 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -461,7 +461,7 @@ class _SecretCredentials(RunConfiguration): 'request_backoff_factor': 1, 'request_max_retry_delay': 300, 'config_files_storage_path': 'storage', - 'platform_dsn': None, + 'dlthub_dsn': None, "secret_value": None } assert dict(_SecretCredentials()) == expected_dict diff --git a/tests/pipeline/test_platform_connection.py b/tests/pipeline/test_platform_connection.py index 39caa63872..4826e5a09b 100644 --- a/tests/pipeline/test_platform_connection.py +++ b/tests/pipeline/test_platform_connection.py @@ -3,14 +3,20 @@ import time import requests_mock +TRACE_URL_SUFFIX = "/trace" +STATE_URL_SUFFIX = "/state" + def test_platform_connection() -> None: mock_platform_url = "http://platform.com/endpoint" - os.environ["RUNTIME__PLATFORM_DSN"] = mock_platform_url + os.environ["RUNTIME__DLTHUB_DSN"] = mock_platform_url + + trace_url = mock_platform_url + TRACE_URL_SUFFIX + state_url = mock_platform_url + STATE_URL_SUFFIX # simple pipeline - @dlt.source(name="platform_test") + @dlt.source(name="first_source") def my_source(): @dlt.resource(name="test_resource") @@ -19,23 +25,45 @@ def data(): return data() - p = dlt.pipeline(destination="duckdb", pipeline_name="platform_test_pipeline") + @dlt.source(name="second_source") + def my_source_2(): + + @dlt.resource(name="test_resource") + def data(): + yield [1, 2, 3] + + return data() + + p = dlt.pipeline(destination="duckdb", pipeline_name="platform_test_pipeline", dataset_name="platform_test_dataset") with requests_mock.mock() as m: m.put(mock_platform_url, json={}, status_code=200) - p.run(my_source()) + p.run([my_source(), my_source_2()]) # sleep a bit and find trace in mock requests - time.sleep(1) + time.sleep(2) trace_result = None + state_result = None for call in m.request_history: - if call.url == mock_platform_url: - assert not trace_result, "Multiple calls to platform dsn endpoint" + if call.url == trace_url: + assert not trace_result, "Multiple calls to trace endpoint" trace_result = call.json() + if call.url == state_url: + assert not state_result, "Multiple calls to state endpoint" + state_result = call.json() + # basic check of trace result - assert trace_result + assert trace_result, "no trace" assert trace_result["pipeline_name"] == "platform_test_pipeline" assert len(trace_result["steps"]) == 4 assert trace_result["execution_context"]["library"]["name"] == "dlt" + + # basic check of state result + assert state_result, "no state update" + assert state_result["pipeline_name"] == "platform_test_pipeline" + assert state_result["dataset_name"] == "platform_test_dataset" + assert len(state_result["schemas"]) == 2 + assert state_result["schemas"][0]["name"] == "first_source" + assert state_result["schemas"][1]["name"] == "second_source" From 41942840dca7b1d2f0c55eddb873e16f8c3563b0 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 17:20:51 +0100 Subject: [PATCH 10/12] fix tests --- tests/pipeline/test_platform_connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/pipeline/test_platform_connection.py b/tests/pipeline/test_platform_connection.py index 4826e5a09b..86f0975d1d 100644 --- a/tests/pipeline/test_platform_connection.py +++ b/tests/pipeline/test_platform_connection.py @@ -65,5 +65,4 @@ def data(): assert state_result["pipeline_name"] == "platform_test_pipeline" assert state_result["dataset_name"] == "platform_test_dataset" assert len(state_result["schemas"]) == 2 - assert state_result["schemas"][0]["name"] == "first_source" - assert state_result["schemas"][1]["name"] == "second_source" + assert {state_result["schemas"][0]["name"], state_result["schemas"][1]["name"]} == {"first_source", "second_source"} From 1770d0d5553fc27bb5cbd0f820428dde5aa09e70 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 21 Nov 2023 17:25:41 +0100 Subject: [PATCH 11/12] fix linting --- tests/helpers/streamlit_tests/test_streamlit_show_resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/helpers/streamlit_tests/test_streamlit_show_resources.py b/tests/helpers/streamlit_tests/test_streamlit_show_resources.py index fcf232ea76..e31a5d5cbb 100644 --- a/tests/helpers/streamlit_tests/test_streamlit_show_resources.py +++ b/tests/helpers/streamlit_tests/test_streamlit_show_resources.py @@ -57,7 +57,7 @@ def test_multiple_resources_pipeline(): ) load_info = pipeline.run([source1(10), source2(20)]) - source1_schema = load_info.pipeline.schemas.get("source1") # type: ignore[attr-defined] + source1_schema = load_info.pipeline.schemas.get("source1") assert load_info.pipeline.schema_names == ["source2", "source1"] # type: ignore[attr-defined] From d453db3d83e1dfdbfce4c64ffec490810a0380fc Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 23 Nov 2023 13:39:23 +0100 Subject: [PATCH 12/12] small pr changes --- Makefile | 4 ++-- dlt/pipeline/pipeline.py | 4 ++-- dlt/pipeline/platform.py | 11 +++++------ 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Makefile b/Makefile index 1059cfdf0a..d9d92ec799 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ dev: has-poetry lint: ./check-package.sh - poetry run black ./ --diff --exclude=".*syntax_error.py|\.venv.*" + poetry run black ./ --diff --exclude=".*syntax_error.py|\.venv.*|_storage/.*" # poetry run isort ./ --diff poetry run mypy --config-file mypy.ini dlt tests poetry run flake8 --max-line-length=200 dlt @@ -56,7 +56,7 @@ lint: # $(MAKE) lint-security format: - poetry run black ./ --exclude=".*syntax_error.py|\.venv.*" + poetry run black ./ --exclude=".*syntax_error.py|\.venv.*|_storage/.*" # poetry run isort ./ test-and-lint-snippets: diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index d4ccdafcec..836442f5bb 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -433,7 +433,7 @@ def normalize( self, "normalize", n_ex, normalize.get_normalize_info() ) from n_ex - @with_runtime_trace() + @with_runtime_trace(send_state=True) @with_schemas_sync @with_state_sync() @with_config_section((known_sections.LOAD,)) @@ -486,7 +486,7 @@ def load( except Exception as l_ex: raise PipelineStepFailed(self, "load", l_ex, self._get_load_info(load)) from l_ex - @with_runtime_trace(send_state=True) + @with_runtime_trace() @with_config_section(("run",)) def run( self, diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 5ebaaf662e..c8014d5ae7 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -15,7 +15,7 @@ STATE_URL_SUFFIX = "/state" -class TSchemaSyncPayload(TypedDict): +class TPipelineSyncPayload(TypedDict): pipeline_name: str destination_name: str destination_displayable_credentials: str @@ -64,7 +64,7 @@ def _sync_schemas_to_platform(trace: PipelineTrace, pipeline: SupportsPipeline) if not load_info: return - payload = TSchemaSyncPayload( + payload = TPipelineSyncPayload( pipeline_name=pipeline.pipeline_name, destination_name=load_info.destination_name, destination_displayable_credentials=load_info.destination_displayable_credentials, @@ -109,11 +109,10 @@ def on_end_trace_step( step_info: Any, send_state: bool, ) -> None: - pass + if send_state: + # also sync schemas to dlthub + _sync_schemas_to_platform(trace, pipeline) def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline, send_state: bool) -> None: _send_trace_to_platform(trace, pipeline) - if send_state: - # also sync schemas to dlthub - _sync_schemas_to_platform(trace, pipeline)