From 807e1daf12fb7c5dda926a0df470ada2ded6163d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20=22decko=22=20de=20Brito?= Date: Wed, 14 Aug 2024 15:10:34 -0300 Subject: [PATCH] Add some custom types --- pulp-glue/pulp_glue/common/openapi.py | 26 +++++++++++++------------- pulpcore/cli/common/generic.py | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulp-glue/pulp_glue/common/openapi.py b/pulp-glue/pulp_glue/common/openapi.py index b7c538d37..75e266586 100644 --- a/pulp-glue/pulp_glue/common/openapi.py +++ b/pulp-glue/pulp_glue/common/openapi.py @@ -20,6 +20,10 @@ _ = translation.gettext UploadType = t.Union[bytes, t.IO[bytes]] +ClientCredentials = t.Dict[str, t.Union[str, t.Dict[str, str]]] +OAuth2Flows = t.Dict[str, t.Union[str, ClientCredentials]] +OAuth2FlowsScopes = t.List[str] +SecurityScheme = t.Dict[str, t.Union[str, OAuth2Flows]] SAFE_METHODS = ["GET", "HEAD", "OPTIONS"] ISO_DATE_FORMAT = "%Y-%m-%d" @@ -46,24 +50,22 @@ class UnsafeCallError(OpenAPIError): class OpenAPISecurityScheme: - def __init__(self, security_scheme: t.Dict[str, str]): + def __init__(self, security_scheme: SecurityScheme): self.security_scheme = security_scheme - self.type: str = "" - self.description: str = "" - self.flows: t.Optional[t.Dict[str, str]] self.parse() - def parse(self): + def parse(self) -> None: self.type = self.security_scheme["type"] self.description = self.security_scheme.get("description", "") if self.type == "oauth2": - self.flows = self.security_scheme.get("flows") - if clientCredentials := self.flows.get("clientCredentials"): - self.flow_type: str = "clientCredentials" - self.token_url: str = clientCredentials.get("tokenUrl") - self.scopes: t.List[str] = list(clientCredentials.get("scopes").keys()) + self.flows: OAuth2Flows = self.security_scheme.get("flows") + client_credentials: ClientCredentials = self.flows["clientCredentials"] + if client_credentials: + self.flow_type: t.Optional[str] = "clientCredentials" + self.token_url: str = client_credentials["tokenUrl"] + self.scopes: OAuth2FlowsScopes = list(client_credentials.get("scopes").keys()) if self.type == "http": self.scheme = self.security_scheme["scheme"] @@ -82,9 +84,7 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth """Implement this to provide means of http basic auth.""" return None - def auth( - self, flow: t.Dict[t.Any, t.Any] - ) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: + def auth(self, flow: t.Any) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]: """Implement this to provide other authentication methods.""" return None diff --git a/pulpcore/cli/common/generic.py b/pulpcore/cli/common/generic.py index 96a7208d3..be870f473 100644 --- a/pulpcore/cli/common/generic.py +++ b/pulpcore/cli/common/generic.py @@ -232,7 +232,7 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth self.pulp_ctx.password = click.prompt("Password", hide_input=True) return (self.pulp_ctx.username, self.pulp_ctx.password) - def auth(self, flow: t.Dict[t.Any, t.Any]) -> t.Optional[requests.auth.AuthBase]: + def auth(self, flow: t.Any) -> t.Optional[requests.auth.AuthBase]: if self.pulp_ctx.username is None: self.pulp_ctx.username = click.prompt("Username/ClientID") if self.pulp_ctx.password is None: