-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prototype platform connection #727
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
dlt/common/storages/load_storage.py
Outdated
@@ -87,6 +87,7 @@ class LoadPackageInfo(NamedTuple): | |||
package_path: str | |||
state: TLoadPackageState | |||
schema_name: str | |||
schema: Schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi i am adding the full schema to the load info here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO way better if you add TypedDict with schema content, not the object itself. this is being pickled and dumped into trace so obviously dicts work better
dlt/pipeline/platform.py
Outdated
pass | ||
|
||
def on_end_trace(trace: PipelineTrace, pipeline: SupportsPipeline) -> None: | ||
_send_to_beacon(trace, None, pipeline, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now only send on end trace, but it would be nice to have the full progress available
one thing that is a bit strange in the trace is, that there is this additional run step which duplicates the result of the loadinfo as stepinfo. It's not a problem really, but for one the run step is just a representation of the full trace really, and we are sending too much data around, especially given the fact that the loadinfo will always be the most verbose. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two suggestions :) also do you want to merge it or keep a branch for testing?
dlt/common/storages/load_storage.py
Outdated
@@ -87,6 +87,7 @@ class LoadPackageInfo(NamedTuple): | |||
package_path: str | |||
state: TLoadPackageState | |||
schema_name: str | |||
schema: Schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO way better if you add TypedDict with schema content, not the object itself. this is being pickled and dumped into trace so obviously dicts work better
dlt/pipeline/platform.py
Outdated
if pipeline.runtime_config.beacon_token and pipeline.runtime_config.beacon_url: | ||
trace_dump = json.dumps(trace.asdict()) | ||
url = f"{pipeline.runtime_config.beacon_url}/pipeline/{pipeline.runtime_config.beacon_token}/traces" | ||
requests.put(url, json=trace_dump) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you could reuse our telemetry thread executor to send the messages in the thread, without blocking the pipeline execution? the code is already there and I think could be repurposed.
cffa87d
to
022d3f0
Compare
For now this is only for testing i would say. |
dlt/pipeline/trace.py
Outdated
return trace | ||
|
||
|
||
def start_trace_step(trace: PipelineTrace, step: TPipelineStep, pipeline: SupportsPipeline) -> PipelineStepTrace: | ||
trace_step = PipelineStepTrace(uniq_id(), step, pendulum.now()) | ||
with suppress_and_warn(): | ||
TRACKING_MODULE.on_start_trace_step(trace, step, pipeline) | ||
trace.steps.append(trace_step) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any downside to attaching the trace step on trace start? it would be nicer to have it like this when sent to the platform
022d3f0
to
b65bb63
Compare
b65bb63
to
11c8cc7
Compare
11c8cc7
to
f08b795
Compare
change config attribute to platform_dsn add exeuction context info to pipeline trace add pipeline name to pipeline trace
run step is used to correlate several pipeline steps into one transaction. it makes sense IMO to send some summary information. yeah I think we repeat the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good! before we merge:
- your thread pool must register in
at_exit
best if you could refactor segment pool to have many pools registered there - as usual some naming suggestions
dlt/common/runtime/typing.py
Outdated
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Set, Type, TypedDict, NewType, Union, get_args | ||
|
||
|
||
TExecInfoNames = Literal["kubernetes", "docker", "codespaces", "github_actions", "airflow", "notebook", "colab","aws_lambda","gcp_cloud_function"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you format it? or maybe we should go back to blake formatter initially in the same mode we have in verified sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am also for black formatter..
name: str | ||
version: str | ||
|
||
class TExecutionContext(TypedDict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! for open telemetry we can collect way more information that is not anonymous. but in other PR
dlt/common/storages/load_storage.py
Outdated
@@ -87,6 +88,7 @@ class LoadPackageInfo(NamedTuple): | |||
package_path: str | |||
state: TLoadPackageState | |||
schema_name: str | |||
schema: TStoredSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema hash. send a separate pipeine state message with all pipeline schemas
dlt/pipeline/platform.py
Outdated
from dlt.common import json | ||
from dlt.common.runtime import logger | ||
|
||
_THREAD_POOL: ThreadPoolExecutor = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this temporary code? we should reuse the segment pool or at least somehow add this pool to at_exit
handler otherwise messages will not go out at the end.
maybe extract this sending pool to common module that creates a pool and registers it in exit handler?
also implementation below does not have a method to stop the pool. won't that be a problem when testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have extracted a managedexecutor class. it does not seem a missing stop method is a problem when testing...
@@ -27,6 +27,8 @@ class RunConfiguration(BaseConfiguration): | |||
request_max_retry_delay: float = 300 | |||
"""Maximum delay between http request retries""" | |||
config_files_storage_path: str = "/run/config/" | |||
"""Platform connection""" | |||
platform_dsn: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is workspace cookie part of platform_dsn? also this is a secret value so use TSecretStrValue here. maybe we could rename it to dlthub_dsn?
ecb1e22
to
4b9adae
Compare
# Conflicts: # dlt/pipeline/pipeline.py
Questions:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure i have implemented reporting the current state data to the platform the way you had in mind, let me know.
LGTM. I think we will modify it a lot when we actually use it for something :) also see my comments.
I should probably check the load packages and not sync schemas for failed loadpackages? I am not 100 % sure on that atm.
heh good question and another nuance
- you should not bump schema revision in the dataset to which we were loading if the package failed. soon load info will have an information if schema was upgraded in dataset - even if the package failed (refactor
NormalizeInfo
andLoadInfo
#757 ) - you should bump pipeline schema revision to the one that is in state.
pipeline may be ahead of the dataset in terms of schema revision. it may produce load packages that are loaded somewhere else. so we have revisions materialized in dataset and current revision in the pipeline. they may be different
Maybe the whole state/schema synching should not be in the trace decorator at all. It should work the way I implemented it, but somehow it does not feel quite right.
depends if we think it has anything useful for open telemetry collector.
the traces are open telemetry friendly. the state sync MAY BE something more spcific. but for now - LGTM
and we can merge that branch soon
|
||
|
||
TExecInfoNames = Literal[ | ||
"kubernetes", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
look also at this. maybe we can copy code form there to have even more CI envs?
https://www.npmjs.com/package/ci-info
also there's CI
env flag which says that code runs in CI so maybe we should add "generic_ci"
@@ -245,8 +245,8 @@ def run( | |||
) | |||
|
|||
# plug default tracking module | |||
from dlt.pipeline import trace, track | |||
trace.TRACKING_MODULE = track | |||
from dlt.pipeline import trace, track, platform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok for platform but maybe we should just say opentelemetry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the moment it is not opentelemetry format at all. i can rename it, but i would just say we switch to opentelemetry when the prototype is out and then also rename this file.
dlt/pipeline/pipeline.py
Outdated
@@ -329,7 +333,7 @@ def normalize(self, workers: int = 1, loader_file_format: TLoaderFileFormat = No | |||
except Exception as n_ex: | |||
raise PipelineStepFailed(self, "normalize", n_ex, normalize.get_normalize_info()) from n_ex | |||
|
|||
@with_runtime_trace | |||
@with_runtime_trace() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd send the state here
dlt/pipeline/pipeline.py
Outdated
@@ -382,7 +386,7 @@ def load( | |||
except Exception as l_ex: | |||
raise PipelineStepFailed(self, "load", l_ex, self._get_load_info(load)) from l_ex | |||
|
|||
@with_runtime_trace | |||
@with_runtime_trace(send_state=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not here
dlt/pipeline/platform.py
Outdated
if not load_info: | ||
return | ||
|
||
payload = TSchemaSyncPayload( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like PipelineStateSync. LGTM for now. we can add more data to it when we need it. we could also add the pipeline state. it will help with debugging. there's a method to retrieve it in the pipeline and methods to serialize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed it
# Conflicts: # dlt/common/pipeline.py # dlt/common/runtime/exec_info.py # dlt/common/runtime/segment.py # dlt/common/storages/load_storage.py # dlt/pipeline/__init__.py # dlt/pipeline/pipeline.py # dlt/pipeline/trace.py # dlt/pipeline/track.py # tests/common/configuration/test_configuration.py # tests/helpers/streamlit_tests/test_streamlit_show_resources.py
5800e72
to
2d280e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Adds support for sending traces to the dlthub platform. Implements: