From cffa87dded74af9ace5d53abbe72863c56591086 Mon Sep 17 00:00:00 2001 From: Dave Date: Wed, 1 Nov 2023 09:51:41 +0100 Subject: [PATCH] small changes --- .../configuration/specs/run_configuration.py | 3 +-- dlt/common/pipeline.py | 2 +- dlt/pipeline/platform.py | 15 ++++++++++++--- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/dlt/common/configuration/specs/run_configuration.py b/dlt/common/configuration/specs/run_configuration.py index 73e05a03c4..c8396b78b6 100644 --- a/dlt/common/configuration/specs/run_configuration.py +++ b/dlt/common/configuration/specs/run_configuration.py @@ -28,8 +28,7 @@ class RunConfiguration(BaseConfiguration): """Maximum delay between http request retries""" config_files_storage_path: str = "/run/config/" """Platform connection""" - beacon_url: Optional[str] = None - beacon_token: Optional[str] = None + beacon_dsn: Optional[str] = None __section__ = "runtime" diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index aeb0bdc68a..a6692bc265 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -54,7 +54,7 @@ def asdict(self) -> DictStrAny: """A dictionary representation of NormalizeInfo that can be loaded with `dlt`""" d = self._asdict() # list representation creates a nice table - d["row_counts"] = [(k, v) for k, v in self.row_counts.items()] + d["row_counts"] = [{"table_name": k, "count": v} for k, v in self.row_counts.items()] return d def asstr(self, verbosity: int = 0) -> str: diff --git a/dlt/pipeline/platform.py b/dlt/pipeline/platform.py index 869e37e69d..55526dd9dc 100644 --- a/dlt/pipeline/platform.py +++ b/dlt/pipeline/platform.py @@ -8,10 +8,19 @@ count = 0 def _send_to_beacon(trace: PipelineTrace, step: PipelineStepTrace, pipeline: SupportsPipeline, some): - if pipeline.runtime_config.beacon_token and pipeline.runtime_config.beacon_url: + if pipeline.runtime_config.beacon_dsn: trace_dump = json.dumps(trace.asdict()) - url = f"{pipeline.runtime_config.beacon_url}/pipeline/{pipeline.runtime_config.beacon_token}/traces" - requests.put(url, json=trace_dump) + requests.put(pipeline.runtime_config.beacon_dsn, data=trace_dump) + + print("runtime_config") + print(pipeline.runtime_config) + print(pipeline.runtime_config.beacon_token) + print(pipeline.runtime_config.beacon_url) + + trace_dump = json.dumps(trace.asdict(), pretty=True) + with open(f"trace-{count}.json", "w") as f: + f.write(trace_dump) + def on_start_trace(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> None: # _send_to_beacon(trace, step, pipeline, None)