Skip to content

Commit

Permalink
streamlit bugfixes (#645)
Browse files Browse the repository at this point in the history
* fixes base python bin path

* fixes json logger reinit

* replaces depends_on with data_from in resource decorator

* displays location name in athena using stage

* removes json-logger

* installs all deps together into venv so pip figures our the right versions

* forces local duckdb version when installing dbt venv

* detects when motherduck does not support local duckdb version
  • Loading branch information
rudolfix authored Sep 27, 2023
1 parent 8f00513 commit 2c5043c
Show file tree
Hide file tree
Showing 18 changed files with 3,870 additions and 3,311 deletions.
8 changes: 4 additions & 4 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ def __init__(self, method: str) -> None:
super().__init__(f"Process pool supports only fork start method, {method} not supported. Switch the pool type to threading")


class CannotInstallDependency(DltException):
def __init__(self, dependency: str, interpreter: str, output: AnyStr) -> None:
self.dependency = dependency
class CannotInstallDependencies(DltException):
def __init__(self, dependencies: Sequence[str], interpreter: str, output: AnyStr) -> None:
self.dependencies = dependencies
self.interpreter = interpreter
if isinstance(output, bytes):
str_output = output.decode("utf-8")
else:
str_output = output
super().__init__(f"Cannot install dependency {dependency} with {interpreter} and pip:\n{str_output}\n")
super().__init__(f"Cannot install dependencies {', '.join(dependencies)} with {interpreter} and pip:\n{str_output}\n")


class VenvNotFound(DltException):
Expand Down
18 changes: 9 additions & 9 deletions dlt/common/runners/venv.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import subprocess
from typing import Any, List, Type

from dlt.common.exceptions import CannotInstallDependency, VenvNotFound
from dlt.common.exceptions import CannotInstallDependencies, VenvNotFound


class DLTEnvBuilder(venv.EnvBuilder):
Expand Down Expand Up @@ -59,8 +59,8 @@ def restore_current(cls) -> "Venv":
venv = cls.restore(os.environ["VIRTUAL_ENV"], current=True)
except KeyError:
import sys
bin_path, _ = os.path.split(sys.executable)
context = types.SimpleNamespace(bin_path=bin_path, env_exe=sys.executable)
# do not set bin path because it is not known
context = types.SimpleNamespace(bin_path="", env_exe=sys.executable)
venv = cls(context, current=True)
return venv

Expand Down Expand Up @@ -114,12 +114,12 @@ def add_dependencies(self, dependencies: List[str] = None) -> None:

@staticmethod
def _install_deps(context: types.SimpleNamespace, dependencies: List[str]) -> None:
for dep in dependencies:
cmd = [context.env_exe, "-Im", "pip", "install", dep]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
raise CannotInstallDependency(dep, context.env_exe, exc.output)
cmd = [context.env_exe, "-Im", "pip", "install"]
try:
subprocess.check_output(cmd + dependencies, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
raise CannotInstallDependencies(dependencies, context.env_exe, exc.output)


@staticmethod
def is_virtual_env() -> bool:
Expand Down
179 changes: 179 additions & 0 deletions dlt/common/runtime/json_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@

import logging
from datetime import datetime # noqa: I251
import traceback
from logging import Logger
from typing import Any, List, Type

from dlt.common.json import json
from dlt.common.typing import DictStrAny, StrAny

EMPTY_VALUE = '-'
JSON_SERIALIZER = lambda log: json.dumps(log)
COMPONENT_ID = EMPTY_VALUE
COMPONENT_NAME = EMPTY_VALUE
COMPONENT_INSTANCE_INDEX = 0

# The list contains all the attributes listed in
# http://docs.python.org/library/logging.html#logrecord-attributes
RECORD_ATTR_SKIP_LIST = [
'asctime', 'created', 'exc_info', 'exc_text', 'filename', 'args',
'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', 'msg',
'msecs', 'msecs', 'message', 'name', 'pathname', 'process',
'processName', 'relativeCreated', 'thread', 'threadName', 'extra',
# Also exclude legacy 'props'
'props',
]

RECORD_ATTR_SKIP_LIST.append('stack_info')
EASY_TYPES = (str, bool, dict, float, int, list, type(None))

_default_formatter: Type[logging.Formatter] = None
_epoch = datetime(1970, 1, 1)


def config_root_logger() -> None:
"""
You must call this if you are using root logger.
Make all root logger' handlers produce JSON format
& remove duplicate handlers for request instrumentation logging.
Please made sure that you call this after you called "logging.basicConfig() or logging.getLogger()
"""
global _default_formatter
update_formatter_for_loggers([logging.root], _default_formatter)


def init(custom_formatter: Type[logging.Formatter] = None) -> None:
"""
This is supposed to be called only one time.
If **custom_formatter** is passed, it will (in non-web context) use this formatter over the default.
"""

global _default_formatter

if custom_formatter:
if not issubclass(custom_formatter, logging.Formatter):
raise ValueError('custom_formatter is not subclass of logging.Formatter', custom_formatter)

_default_formatter = custom_formatter if custom_formatter else JSONLogFormatter
logging._defaultFormatter = _default_formatter() # type: ignore

# go to all the initialized logger and update it to use JSON formatter
existing_loggers = list(map(logging.getLogger, logging.Logger.manager.loggerDict))
update_formatter_for_loggers(existing_loggers, _default_formatter)


class BaseJSONFormatter(logging.Formatter):
"""
Base class for JSON formatters
"""
base_object_common: DictStrAny = {}

def __init__(self, *args: Any, **kw: Any) -> None:
super(BaseJSONFormatter, self).__init__(*args, **kw)
if COMPONENT_ID and COMPONENT_ID != EMPTY_VALUE:
self.base_object_common["component_id"] = COMPONENT_ID
if COMPONENT_NAME and COMPONENT_NAME != EMPTY_VALUE:
self.base_object_common["component_name"] = COMPONENT_NAME
if COMPONENT_INSTANCE_INDEX and COMPONENT_INSTANCE_INDEX != EMPTY_VALUE:
self.base_object_common["component_instance_idx"] = COMPONENT_INSTANCE_INDEX

def format(self, record: logging.LogRecord) -> str: # noqa
log_object = self._format_log_object(record)
return JSON_SERIALIZER(log_object)

def _format_log_object(self, record: logging.LogRecord) -> DictStrAny:
utcnow = datetime.utcnow()
base_obj = {
"written_at": iso_time_format(utcnow),
"written_ts": epoch_nano_second(utcnow),
}
base_obj.update(self.base_object_common)
# Add extra fields
base_obj.update(self._get_extra_fields(record))
return base_obj

def _get_extra_fields(self, record: logging.LogRecord) -> StrAny:
fields: DictStrAny = {}

if record.args:
fields['msg'] = record.msg

for key, value in record.__dict__.items():
if key not in RECORD_ATTR_SKIP_LIST:
if isinstance(value, EASY_TYPES):
fields[key] = value
else:
fields[key] = repr(value)

# Always add 'props' to the root of the log, assumes props is a dict
if hasattr(record, 'props') and isinstance(record.props, dict):
fields.update(record.props)

return fields



def _sanitize_log_msg(record: logging.LogRecord) -> str:
return record.getMessage().replace('\n', '_').replace('\r', '_').replace('\t', '_')


class JSONLogFormatter(BaseJSONFormatter):
"""
Formatter for non-web application log
"""

def get_exc_fields(self, record: logging.LogRecord) -> StrAny:
if record.exc_info:
exc_info = self.format_exception(record.exc_info)
else:
exc_info = record.exc_text
return {
'exc_info': exc_info,
'filename': record.filename,
}

@classmethod
def format_exception(cls, exc_info: Any) -> str:
return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''

def _format_log_object(self, record: logging.LogRecord) -> DictStrAny:
json_log_object = super(JSONLogFormatter, self)._format_log_object(record)
json_log_object.update({
"msg": _sanitize_log_msg(record),
"type": "log",
"logger": record.name,
"thread": record.threadName,
"level": record.levelname,
"module": record.module,
"line_no": record.lineno,
})

if record.exc_info or record.exc_text:
json_log_object.update(self.get_exc_fields(record))

return json_log_object


def update_formatter_for_loggers(loggers_iter: List[Logger], formatter: Type[logging.Formatter]) -> None:
"""
:param formatter:
:param loggers_iter:
"""
for logger in loggers_iter:
if not isinstance(logger, Logger):
raise RuntimeError("%s is not a logging.Logger instance", logger)
for handler in logger.handlers:
if not isinstance(handler.formatter, formatter):
handler.formatter = formatter()


def epoch_nano_second(datetime_: datetime) -> int:
return int((datetime_ - _epoch).total_seconds()) * 1000000000 + datetime_.microsecond * 1000


def iso_time_format(datetime_: datetime) -> str:
return '%04d-%02d-%02dT%02d:%02d:%02d.%03dZ' % (
datetime_.year, datetime_.month, datetime_.day, datetime_.hour, datetime_.minute, datetime_.second,
int(datetime_.microsecond / 1000))
29 changes: 14 additions & 15 deletions dlt/common/runtime/logger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import contextlib
import logging
import json_logging
import traceback
from logging import LogRecord, Logger
from typing import Any, Iterator, Protocol
Expand Down Expand Up @@ -85,17 +84,6 @@ def format(self, record: LogRecord) -> str: # noqa: A003
return s


class _CustomJsonFormatter(json_logging.JSONLogFormatter):

version: StrStr = None

def _format_log_object(self, record: LogRecord, request_util: Any) -> Any:
json_log_object = super(_CustomJsonFormatter, self)._format_log_object(record, request_util)
if self.version:
json_log_object.update({"version": self.version})
return json_log_object


def _init_logging(logger_name: str, level: str, fmt: str, component: str, version: StrStr) -> Logger:
if logger_name == "root":
logging.basicConfig(level=level)
Expand All @@ -111,13 +99,24 @@ def _init_logging(logger_name: str, level: str, fmt: str, component: str, versio

# set right formatter
if is_json_logging(fmt):
from dlt.common.runtime import json_logging

class _CustomJsonFormatter(json_logging.JSONLogFormatter):
version: StrStr = None

def _format_log_object(self, record: LogRecord) -> Any:
json_log_object = super(_CustomJsonFormatter, self)._format_log_object(record)
if self.version:
json_log_object.update({"version": self.version})
return json_log_object

json_logging.COMPONENT_NAME = component
json_logging.JSON_SERIALIZER = json.dumps
json_logging.RECORD_ATTR_SKIP_LIST.remove("process")
if "process" in json_logging.RECORD_ATTR_SKIP_LIST:
json_logging.RECORD_ATTR_SKIP_LIST.remove("process")
# set version as class variable as we cannot pass custom constructor parameters
_CustomJsonFormatter.version = version
# the only thing method above effectively does is to replace the formatter
json_logging.init_non_web(enable_json=True, custom_formatter=_CustomJsonFormatter)
json_logging.init(custom_formatter=_CustomJsonFormatter)
if logger_name == "root":
json_logging.config_root_logger()
else:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTabl
continue
existing_v = tab_a.get(k)
if existing_v != v:
print(f"{k} ==? {v} ==? {existing_v}")
# print(f"{k} ==? {v} ==? {existing_v}")
partial_table[k] = v # type: ignore

# this should not really happen
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/storages/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ class FileItem(TypedDict):
def filesystem(protocol: str, credentials: FileSystemCredentials = None) -> Tuple[AbstractFileSystem, str]:
"""Instantiates an authenticated fsspec `FileSystem` for a given `protocol` and credentials.
Please supply credentials instance corresponding to the protocol
Please supply credentials instance corresponding to the protocol. The `protocol` is just the code name of the filesystem ie:
* s3
* az, abfs
* gcs, gs
also see filesystem_from_config
"""
return filesystem_from_config(FilesystemConfiguration(protocol, credentials))

Expand Down
9 changes: 8 additions & 1 deletion dlt/destinations/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,11 @@ class AthenaClientConfiguration(DestinationClientDwhWithStagingConfiguration):
athena_work_group: Optional[str] = None
aws_data_catalog: Optional[str] = "awsdatacatalog"

__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]
__config_gen_annotations__: ClassVar[List[str]] = ["athena_work_group"]

def __str__(self) -> str:
"""Return displayable destination location"""
if self.staging_config:
return str(self.staging_config.credentials)
else:
return "[no staging set]"
17 changes: 17 additions & 0 deletions dlt/destinations/motherduck/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration
from dlt.common.exceptions import DestinationTerminalException
from dlt.common.typing import TSecretValue
from dlt.common.utils import digest128
from dlt.common.configuration.exceptions import ConfigurationValueError
Expand All @@ -28,6 +29,16 @@ def _token_to_password(self) -> None:
if self.query and "token" in self.query:
self.password = TSecretValue(self.query.pop("token"))

def borrow_conn(self, read_only: bool) -> Any:
from duckdb import HTTPException
try:
return super().borrow_conn(read_only)
except HTTPException as http_ex:
if http_ex.status_code == 403 and 'Failed to download extension "motherduck"' in str(http_ex):
from importlib.metadata import version as pkg_version
raise MotherduckLocalVersionNotSupported(pkg_version("duckdb")) from http_ex
raise

def parse_native_representation(self, native_value: Any) -> None:
super().parse_native_representation(native_value)
self._token_to_password()
Expand All @@ -50,3 +61,9 @@ def fingerprint(self) -> str:
if self.credentials and self.credentials.password:
return digest128(self.credentials.password)
return ""


class MotherduckLocalVersionNotSupported(DestinationTerminalException):
def __init__(self, duckdb_version: str) -> None:
self.duckdb_version = duckdb_version
super().__init__(f"Looks like your local duckdb version ({duckdb_version}) is not supported by Motherduck")
Loading

0 comments on commit 2c5043c

Please sign in to comment.