Skip to content

Commit

Permalink
Segment Migration: Making Endpoint Configurable. (#1236)
Browse files Browse the repository at this point in the history
* Adding dlthub_telemetry_endpoint to RunConfiguration.

* Adding dlthub_telemetry_endpoint to test_configuration.

* Segment Changes:

1. In init_segment() adding checks for env RUNTIME__TELEMETRY_ENDPOINT.
2. Update _SEGMENT_ENDPOINT based on env variable. Set default value if None provided with default write key.
3. Adjusting header based on endpoint.

* Accessing values through config.

* fix minor things and add new endpoint to common tests

* add new endpoint url to local destinations

* Adding new endpoint url to all destinations.

* Adding test for init_segment.

* formating tests.

---------

Co-authored-by: Dave <[email protected]>
  • Loading branch information
zem360 and sh-rp authored Apr 23, 2024
1 parent a225a98 commit 9f04a1b
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ concurrency:

env:
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

jobs:
get_docs_changes:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_dbt_cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:
DBT_CLOUD__API_TOKEN: ${{ secrets.DBT_CLOUD__API_TOKEN }}

RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

jobs:
get_docs_changes:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_dbt_runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ env:

DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

jobs:
get_docs_changes:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-parquet-staging-iceberg\", \"athena-parquet-no-staging-iceberg\"]"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destination_athena_iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"athena\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
EXCLUDED_DESTINATION_CONFIGURATIONS: "[\"athena-no-staging\", \"athena-parquet-no-staging\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"bigquery\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_databricks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"databricks\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_dremio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ concurrency:
env:
RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"dremio\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"mssql\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_qdrant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"qdrant\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"snowflake\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test_destination_synapse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ env:

RUNTIME__SENTRY_DSN: https://cf6086f7d263462088b9fb9f9947caee@o4505514867163136.ingest.sentry.io/4505516212682752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"synapse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

# Slack hook for chess in production example
RUNTIME__SLACK_INCOMING_HOOK: ${{ secrets.RUNTIME__SLACK_INCOMING_HOOK }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env:

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_SEGMENT_WRITE_KEY: TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"duckdb\", \"postgres\", \"filesystem\", \"weaviate\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

Expand Down
1 change: 1 addition & 0 deletions dlt/common/configuration/specs/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class RunConfiguration(BaseConfiguration):
slack_incoming_hook: Optional[TSecretStrValue] = None
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
dlthub_telemetry_segment_write_key: str = "a1F2gc6cNYw2plyAt02sZouZcsRjG7TD"
dlthub_telemetry_endpoint: str = "https://api.segment.io/v1/track"
log_format: str = "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
log_level: str = "WARNING"
request_timeout: float = 60
Expand Down
45 changes: 27 additions & 18 deletions dlt/common/runtime/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,31 @@
_SESSION: requests.Session = None
_WRITE_KEY: str = None
_SEGMENT_REQUEST_TIMEOUT = (1.0, 1.0) # short connect & send timeouts
_SEGMENT_ENDPOINT = "https://api.segment.io/v1/track"
_SEGMENT_ENDPOINT: str = None
_SEGMENT_CONTEXT: TExecutionContext = None


def init_segment(config: RunConfiguration) -> None:
assert (
config.dlthub_telemetry_segment_write_key
), "dlthub_telemetry_segment_write_key not present in RunConfiguration"
if config.dlthub_telemetry_endpoint is None:
raise ValueError("dlthub_telemetry_endpoint not specified in RunConfiguration")

if config.dlthub_telemetry_endpoint == "https://api.segment.io/v1/track":
assert (
config.dlthub_telemetry_segment_write_key
), "dlthub_telemetry_segment_write_key not present in RunConfiguration"

global _WRITE_KEY, _SESSION, _SEGMENT_ENDPOINT
# create thread pool to send telemetry to segment
global _WRITE_KEY, _SESSION
if not _SESSION:
_SESSION = requests.Session()
# flush pool on exit
atexit.register(_at_exit_cleanup)
# store write key
key_bytes = (config.dlthub_telemetry_segment_write_key + ":").encode("ascii")
_WRITE_KEY = base64.b64encode(key_bytes).decode("utf-8")
# store write key if present
if config.dlthub_telemetry_segment_write_key:
key_bytes = (config.dlthub_telemetry_segment_write_key + ":").encode("ascii")
_WRITE_KEY = base64.b64encode(key_bytes).decode("utf-8")
# store endpoint
_SEGMENT_ENDPOINT = config.dlthub_telemetry_endpoint
# cache the segment context
_default_context_fields()

Expand Down Expand Up @@ -95,10 +102,10 @@ def _segment_request_header(write_key: str) -> StrAny:
Returns:
Authentication headers for segment.
"""
return {
"Authorization": "Basic {}".format(write_key),
"Content-Type": "application/json",
}
headers = {"Content-Type": "application/json"}
if write_key:
headers["Authorization"] = "Basic {}".format(write_key)
return headers


def get_anonymous_id() -> str:
Expand Down Expand Up @@ -170,22 +177,24 @@ def _send_event(event_name: str, properties: StrAny, context: StrAny) -> None:
logger.debug("Skipping request to external service: payload was filtered out.")
return

if not _WRITE_KEY:
# If _WRITE_KEY is empty or `None`, telemetry has not been enabled
logger.debug("Skipping request to external service: telemetry key not set.")
if _SEGMENT_ENDPOINT is None:
# If _SEGMENT_ENDPOINT is `None`, telemetry has not been enabled
logger.debug("Skipping request to external service: telemetry endpoint not set.")
return

headers = _segment_request_header(_WRITE_KEY)

def _future_send() -> None:
# import time
# start_ts = time.time()
# start_ts = time.time_ns()
resp = _SESSION.post(
_SEGMENT_ENDPOINT, headers=headers, json=payload, timeout=_SEGMENT_REQUEST_TIMEOUT
)
# print(f"SENDING TO Segment done {resp.status_code} {time.time() - start_ts} {base64.b64decode(_WRITE_KEY)}")
# end_ts = time.time_ns()
# elapsed_time = (end_ts - start_ts) / 10e6
# print(f"SENDING TO Segment done: {elapsed_time}ms Status: {resp.status_code}")
# handle different failure cases
if resp.status_code != 200:
if resp.status_code not in [200, 204]:
logger.debug(
f"Segment telemetry request returned a {resp.status_code} response. "
f"Body: {resp.text}"
Expand Down
1 change: 1 addition & 0 deletions tests/common/configuration/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ class _SecretCredentials(RunConfiguration):
"slack_incoming_hook": None,
"dlthub_telemetry": True,
"dlthub_telemetry_segment_write_key": "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB",
"dlthub_telemetry_endpoint": "https://api.segment.io/v1/track",
"log_format": "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}",
"log_level": "WARNING",
"request_timeout": 60,
Expand Down
56 changes: 56 additions & 0 deletions tests/common/runtime/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, TYPE_CHECKING
from contextlib import nullcontext as does_not_raise
import os
import pytest
import logging
Expand Down Expand Up @@ -49,6 +50,61 @@ def test_sentry_log_level() -> None:
assert sll._handler.level == logging._nameToLevel["WARNING"]


@pytest.mark.parametrize(
"endpoint, write_key, expectation",
[
(
"https://api.segment.io/v1/track",
"TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB",
does_not_raise(),
),
(
"https://telemetry-tracker.services4758.workers.dev/",
None,
does_not_raise(),
),
],
)
def test_telemetry_endpoint(endpoint, write_key, expectation) -> None:
from dlt.common.runtime import segment

with expectation:
segment.init_segment(
RunConfiguration(
dlthub_telemetry_endpoint=endpoint, dlthub_telemetry_segment_write_key=write_key
)
)

assert segment._SEGMENT_ENDPOINT == endpoint
assert segment._WRITE_KEY is not None


@pytest.mark.parametrize(
"endpoint, write_key, expectation",
[
(
"https://api.segment.io/v1/track",
None,
pytest.raises(AssertionError),
),
(
None,
"TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB",
pytest.raises(ValueError),
),
],
)
def test_telemetry_endpoint_exceptions(endpoint, write_key, expectation) -> None:
from dlt.common.runtime import segment

with expectation:
segment.init_segment(
RunConfiguration(
dlthub_telemetry_endpoint=endpoint, dlthub_telemetry_segment_write_key=write_key
)
)


@pytest.mark.forked
def test_sentry_init(environment: DictStrStr) -> None:
with patch("dlt.common.runtime.sentry.before_send", _mock_before_send):
Expand Down

0 comments on commit 9f04a1b

Please sign in to comment.