-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Reorganize authentification methods See merge request cdos-pub/dinamis-sdk!35
- Loading branch information
Showing
20 changed files
with
444 additions
and
365 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
*.egg-info | ||
*/**/__pycache__ | ||
.idea | ||
*.egg-info/ | ||
__pycache__/ | ||
.idea/ | ||
dinamis_sdk/test | ||
build | ||
dist | ||
build/ | ||
dist/ | ||
*venv/ | ||
.vscode/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
"""HTTP connections with various methods.""" | ||
|
||
from typing import Dict | ||
from ast import literal_eval | ||
from pydantic import BaseModel, ConfigDict | ||
from .utils import get_logger_for, create_session | ||
from .oauth2 import OAuth2Session | ||
from .model import ApiKey | ||
from .settings import ENV, SIGNING_ENDPOINT | ||
|
||
|
||
log = get_logger_for(__name__) | ||
|
||
|
||
class BareConnectionMethod(BaseModel): | ||
"""Bare connection method, no extra headers.""" | ||
|
||
model_config = ConfigDict(arbitrary_types_allowed=True) | ||
endpoint: str = SIGNING_ENDPOINT | ||
|
||
def get_headers(self) -> Dict[str, str]: | ||
"""Get the headers.""" | ||
return {} | ||
|
||
def model_post_init(self, __context): | ||
"""Post initialization.""" | ||
if not self.endpoint.lower().startswith(("http://", "https://")): | ||
raise ValueError(f"{self.endpoint} must start with http[s]://") | ||
if not self.endpoint.endswith("/"): | ||
self.endpoint += "/" | ||
return self.endpoint | ||
|
||
|
||
class OAuth2ConnectionMethod(BareConnectionMethod): | ||
"""OAuth2 connection method.""" | ||
|
||
oauth2_session: OAuth2Session = OAuth2Session() | ||
|
||
def get_headers(self): | ||
"""Return the headers.""" | ||
return {"authorization": f"bearer {self.oauth2_session.get_access_token()}"} | ||
|
||
|
||
class ApiKeyConnectionMethod(BareConnectionMethod): | ||
"""API key connection method.""" | ||
|
||
api_key: ApiKey | ||
|
||
def get_headers(self): | ||
"""Return the headers.""" | ||
return self.api_key.to_dict() | ||
|
||
|
||
class HTTPSession: | ||
"""HTTP session class.""" | ||
|
||
def __init__(self, timeout=10): | ||
"""Initialize the HTTP session.""" | ||
self.session = create_session( | ||
retry_total=ENV.dinamis_sdk_retry_total, | ||
retry_backoff_factor=ENV.dinamis_sdk_retry_backoff_factor, | ||
) | ||
self.timeout = timeout | ||
self.headers = { | ||
"Content-Type": "application/json", | ||
"Accept": "application/json", | ||
} | ||
self._method = None | ||
|
||
def get_method(self): | ||
"""Get method.""" | ||
log.debug("Get method") | ||
if not self._method: | ||
# Lazy instantiation | ||
self.prepare_connection_method() | ||
return self._method | ||
|
||
def prepare_connection_method(self): | ||
"""Set the connection method.""" | ||
# Custom server without authentication method | ||
if ENV.dinamis_sdk_bypass_auth_api: | ||
self._method = BareConnectionMethod( | ||
endpoint=ENV.dinamis_sdk_bypass_auth_api | ||
) | ||
|
||
# API key method | ||
elif api_key := ApiKey.grab(): | ||
self._method = ApiKeyConnectionMethod(api_key=api_key) | ||
|
||
# OAuth2 method | ||
else: | ||
self._method = OAuth2ConnectionMethod() | ||
|
||
def post(self, route: str, params: Dict): | ||
"""Perform a POST request.""" | ||
method = self.get_method() | ||
url = f"{method.endpoint}{route}" | ||
headers = {**self.headers, **method.get_headers()} | ||
log.debug("POST to %s", url) | ||
response = self.session.post(url, params=params, headers=headers, timeout=10) | ||
try: | ||
response.raise_for_status() | ||
except Exception as e: | ||
log.error(literal_eval(response.text)) | ||
raise e | ||
|
||
return response | ||
|
||
|
||
session = HTTPSession() | ||
|
||
|
||
def get_headers(): | ||
"""Return the headers.""" | ||
return session.get_method().get_headers() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
"""Models.""" | ||
|
||
import os | ||
import json | ||
from typing import Dict | ||
from pydantic import BaseModel, Field, ConfigDict # pylint: disable = no-name-in-module | ||
from .utils import get_logger_for | ||
from .settings import ENV, get_config_path | ||
|
||
log = get_logger_for(__name__) | ||
|
||
|
||
class Serializable(BaseModel): # pylint: disable = R0903 | ||
"""Base class for serializable pyantic models.""" | ||
|
||
model_config = ConfigDict( | ||
populate_by_name=True, | ||
) | ||
|
||
@classmethod | ||
def get_cfg_file_name(cls) -> str | None: | ||
"""Get the config file name (without full path).""" | ||
name = f".{cls.__name__.lower()}" | ||
log.debug("Looking for config file for %s", name) | ||
cfg_pth = get_config_path() | ||
cfg_file = os.path.join(cfg_pth, name) if cfg_pth else None | ||
log.debug("Config file %sfound %s", "" if cfg_file else "not ", cfg_file or "") | ||
return cfg_file | ||
|
||
@classmethod | ||
def from_config_dir(cls): | ||
"""Try to load from config directory.""" | ||
cfg_file = cls.get_cfg_file_name() | ||
return cls.from_file(cfg_file) if cfg_file else None | ||
|
||
def to_config_dir(self): | ||
"""Try to save to config files.""" | ||
cfg_file = self.get_cfg_file_name() | ||
if cfg_file: | ||
self.to_file(cfg_file) | ||
|
||
@classmethod | ||
def from_dict(cls, dict: Dict): | ||
"""Get the object from dict.""" | ||
return cls(**dict) | ||
|
||
def to_dict(self) -> Dict[str, str]: | ||
"""To dict.""" | ||
return self.model_dump(by_alias=True) | ||
|
||
@classmethod | ||
def from_file(cls, file_path: str): | ||
"""Load object from a file.""" | ||
try: | ||
log.debug("Reading JSON file %s", file_path) | ||
with open(file_path, "r", encoding="utf-8") as file_handler: | ||
return cls(**json.load(file_handler)) | ||
except (FileNotFoundError, IOError, json.decoder.JSONDecodeError) as err: | ||
log.debug("Cannot read object from config directory (%s).", err) | ||
|
||
return None | ||
|
||
def to_file(self, file_path: str): | ||
"""Save the object to file.""" | ||
try: | ||
log.debug("Writing JSON file %s", file_path) | ||
with open(file_path, "w", encoding="utf-8") as file_handler: | ||
json.dump(self.to_dict(), file_handler) | ||
except IOError as io_err: | ||
log.warning("Unable to save file %s (%s)", file_path, io_err) | ||
|
||
@classmethod | ||
def delete_from_config_dir(cls): | ||
"""Delete the config file, if there.""" | ||
cfg_file = cls.get_cfg_file_name() | ||
if cfg_file: | ||
os.remove(cfg_file) | ||
|
||
|
||
class JWT(Serializable): | ||
"""JWT model.""" | ||
|
||
access_token: str | ||
expires_in: int | ||
refresh_token: str | ||
refresh_expires_in: int | ||
token_type: str | ||
|
||
|
||
class DeviceGrantResponse(BaseModel): # pylint: disable = R0903 | ||
"""Device grant login response model.""" | ||
|
||
verification_uri_complete: str | ||
device_code: str | ||
expires_in: int | ||
interval: int | ||
|
||
|
||
class ApiKey(Serializable): | ||
"""API key class.""" | ||
|
||
access_key: str = Field(alias="access-key") | ||
secret_key: str = Field(alias="secret-key") | ||
|
||
@classmethod | ||
def from_env(cls): | ||
"""Try to load from env.""" | ||
if ENV.dinamis_sdk_access_key and ENV.dinamis_sdk_secret_key: | ||
return cls( | ||
access_key=ENV.dinamis_sdk_access_key, | ||
secret_key=ENV.dinamis_sdk_secret_key, | ||
) | ||
return None | ||
|
||
@classmethod | ||
def grab(cls): | ||
"""Try to load an API key from env. or file.""" | ||
return cls.from_env() or cls.from_config_dir() |
Oops, something went wrong.