diff --git a/dlt/sources/helpers/rest_client/auth.py b/dlt/sources/helpers/rest_client/auth.py index 6e156a1c68..cd5f616f3a 100644 --- a/dlt/sources/helpers/rest_client/auth.py +++ b/dlt/sources/helpers/rest_client/auth.py @@ -1,6 +1,17 @@ from base64 import b64encode import math -from typing import List, Dict, Final, Literal, Optional, Union, Any, cast, Iterable +from typing import ( + List, + Dict, + Final, + Literal, + Optional, + Union, + Any, + cast, + Iterable, + TYPE_CHECKING, +) from dlt.sources.helpers import requests from requests.auth import AuthBase from requests import PreparedRequest # noqa: I251 @@ -8,25 +19,16 @@ from dlt.common.exceptions import MissingDependencyException -try: - import jwt -except ModuleNotFoundError: - raise MissingDependencyException("dlt OAuth helpers", ["PyJWT"]) - -try: - from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes -except ModuleNotFoundError: - raise MissingDependencyException("dlt OAuth helpers", ["cryptography"]) - -from dlt import config, secrets from dlt.common import logger from dlt.common.configuration.specs.base_configuration import configspec from dlt.common.configuration.specs import CredentialsConfiguration from dlt.common.configuration.specs.exceptions import NativeValueError from dlt.common.typing import TSecretStrValue +if TYPE_CHECKING: + from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes +else: + PrivateKeyTypes = Any TApiKeyLocation = Literal[ "header", "cookie", "query", "param" @@ -161,6 +163,11 @@ def is_token_expired(self) -> bool: return not self.token_expiry or pendulum.now() >= self.token_expiry def obtain_token(self) -> None: + try: + import jwt + except ModuleNotFoundError: + raise MissingDependencyException("dlt OAuth helpers", ["PyJWT"]) + payload = self.create_jwt_payload() data = { "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", @@ -191,7 +198,13 @@ def create_jwt_payload(self) -> Dict[str, Union[str, int]]: "scope": cast(str, self.scopes), } - def load_private_key(self) -> PrivateKeyTypes: + def load_private_key(self) -> "PrivateKeyTypes": + try: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives import serialization + except ModuleNotFoundError: + raise MissingDependencyException("dlt OAuth helpers", ["cryptography"]) + private_key_bytes = self.private_key.encode("utf-8") return serialization.load_pem_private_key( private_key_bytes,