From 232ea16606201bce842ac656db43be14aa1edc45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20=22decko=22=20de=20Brito?= Date: Fri, 16 Aug 2024 08:12:57 -0300 Subject: [PATCH] Remove token persistence and local retrieve. --- pulp-glue/pulp_glue/common/authentication.py | 48 +++++++------------- pulp-glue/pulp_glue/common/openapi.py | 1 - 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/pulp-glue/pulp_glue/common/authentication.py b/pulp-glue/pulp_glue/common/authentication.py index c6c9b9d55..8fd3a9396 100644 --- a/pulp-glue/pulp_glue/common/authentication.py +++ b/pulp-glue/pulp_glue/common/authentication.py @@ -1,21 +1,21 @@ -import json import typing as t from datetime import datetime, timedelta -from pathlib import Path -import click import requests class OAuth2ClientCredentialsAuth(requests.auth.AuthBase): + """ + This implements the OAuth2 ClientCredentials Grant authentication flow. + https://datatracker.ietf.org/doc/html/rfc6749#section-4.4 + """ + def __init__( self, client_id: str = "", client_secret: str = "", token_url: str = "", scopes: t.List[str] = [], - *args: t.List[t.Any], - **kwargs: t.Dict[t.Any, t.Any], ): self.client_id = client_id self.client_secret = client_secret @@ -24,7 +24,8 @@ def __init__( self.token: t.Dict[t.Any, t.Any] = {} def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: - self.retrieve_local_token() + if self.is_token_expired(): + self.retrieve_token() access_token = self.token.get("access_token") request.headers["Authorization"] = f"Bearer {access_token}" @@ -34,12 +35,13 @@ def __call__(self, request: requests.PreparedRequest) -> requests.PreparedReques return request def handle401( - self, response: requests.Response, **kwargs: t.Dict[t.Any, t.Any] + self, + response: requests.Response, + **kwargs: t.Any, ) -> requests.Response: if response.status_code != 401: return response - self.retrieve_local_token() if self.is_token_expired(): self.retrieve_token() @@ -49,6 +51,8 @@ def handle401( access_token = self.token.get("access_token") prep.headers["Authorization"] = f"Bearer {access_token}" + prep.deregister_hook("response", self.handle401) + _response: requests.Response = response.connection.send(prep, **kwargs) # type: ignore _response.history.append(response) _response.request = prep @@ -57,28 +61,9 @@ def handle401( def is_token_expired(self) -> bool: if self.token: - issued_at = datetime.fromisoformat(self.token["issued_at"]) - expires_in = timedelta(seconds=self.token["expires_in"]) - token_timedelta = issued_at + expires_in - timedelta(seconds=5) - - if token_timedelta >= datetime.now(): - return False - + return self.token["expires_at"] < datetime.now() return True - def store_local_token(self) -> None: - TOKEN_LOCATION = Path(click.utils.get_app_dir("pulp"), "token.json") - with Path(TOKEN_LOCATION).open("w") as token_file: - token = json.dumps(self.token) - token_file.write(token) - - def retrieve_local_token(self) -> None: - token_file = Path(click.utils.get_app_dir("pulp"), "token.json") - if token_file.exists(): - with token_file.open("r") as tf: - token_json = tf.read() - self.token = json.loads(token_json) - def retrieve_token(self) -> None: data = { "client_id": self.client_id, @@ -89,6 +74,7 @@ def retrieve_token(self) -> None: response: requests.Response = requests.post(self.token_url, data=data) - self.token = response.json() if response.status_code == 200 else None - self.token["issued_at"] = datetime.now().isoformat() - self.store_local_token() + response.raise_for_status() + + self.token = response.json() + self.token["expires_at"] = datetime.now() + timedelta(seconds=self.token["expires_in"]) diff --git a/pulp-glue/pulp_glue/common/openapi.py b/pulp-glue/pulp_glue/common/openapi.py index 75e266586..44a8aa771 100644 --- a/pulp-glue/pulp_glue/common/openapi.py +++ b/pulp-glue/pulp_glue/common/openapi.py @@ -658,7 +658,6 @@ def render_request( security: t.List[t.Dict[str, t.List[str]]] = method_spec.get( "security", self.api_spec.get("security", {}) ) - auth: t.Optional[t.Union[tuple[str, str], requests.auth.AuthBase]] = None if security and self.auth_provider: if "Authorization" in self._session.headers: # Bad idea, but you wanted it that way.