Skip to content

Commit

Permalink
Memoize auth objects
Browse files Browse the repository at this point in the history
Auth objects provided by the pulpcli auth provider are memoized. This
way, no password needs to be written back to the pulp_ctx variable and
the oauth token can be cached in memory for the lifetime of the context.
  • Loading branch information
mdellweg committed Sep 17, 2024
1 parent 7bc94ad commit dd858b6
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGES/+memoize_auth.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added memoization to CLI auth provider. This helps to reuse a retrieved oauth token for the lifetime of the process.
106 changes: 63 additions & 43 deletions pulpcore/cli/common/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class SecretStorageBasicAuth(requests.auth.AuthBase):
def __init__(self, pulp_ctx: PulpCLIContext):
self.pulp_ctx = pulp_ctx
assert self.pulp_ctx.username is not None
self.password: t.Optional[str] = None

self.attr: t.Dict[str, str] = {
"service": "pulp-cli",
Expand All @@ -186,78 +187,97 @@ def __init__(self, pulp_ctx: PulpCLIContext):
}

def response_hook(self, response: requests.Response, **kwargs: t.Any) -> requests.Response:
# Example adapted from:
# https://docs.python-requests.org/en/latest/_modules/requests/auth/#HTTPDigestAuth
if 200 <= response.status_code < 300 and not self.password_in_manager:
if click.confirm(_("Add password to password manager?")):
assert isinstance(self.pulp_ctx.password, str)
assert isinstance(self.password, str)

with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
collection.create_item(
"Pulp CLI", self.attr, self.pulp_ctx.password.encode(), replace=True
"Pulp CLI", self.attr, self.password.encode(), replace=True
)
elif response.status_code == 401 and self.password_in_manager:
with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
item = next(collection.search_items(self.attr), None)
if item:
if click.confirm(_("Remove failed password from password manager?")):
if click.confirm(_("Remove failed password from password manager?")):
with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
item = next(collection.search_items(self.attr), None)
if item is not None:
item.delete()
self.pulp_ctx.password = None
self.password = None
return response

def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
assert self.pulp_ctx.username is not None
with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
item = next(collection.search_items(self.attr), None)
if item:
self.pulp_ctx.password = item.get_secret().decode()
self.password_in_manager = True
else:
self.pulp_ctx.password = str(click.prompt("Password", hide_input=True))
self.password_in_manager = False
if self.password is None:
with closing(secretstorage.dbus_init()) as connection:
collection = secretstorage.get_default_collection(connection)
item = next(collection.search_items(self.attr), None)
if item:
self.password = item.get_secret().decode()
self.password_in_manager = True
else:
self.password = str(click.prompt("Password", hide_input=True))
self.password_in_manager = False
request.register_hook("response", self.response_hook) # type: ignore [no-untyped-call]
return requests.auth.HTTPBasicAuth( # type: ignore [no-any-return]
self.pulp_ctx.username,
self.pulp_ctx.password,
self.pulp_ctx.username, self.password
)(request)


class PulpCLIAuthProvider(AuthProviderBase):
def __init__(self, pulp_ctx: PulpCLIContext):
self.pulp_ctx = pulp_ctx
self._memoized: t.Dict[str, t.Optional[requests.auth.AuthBase]] = {}

def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.username is None:
# No username -> No basic auth.
return None
if self.pulp_ctx.password is None:
# TODO give the user a chance to opt out.
if SECRET_STORAGE:
# We could just try to fetch the password here,
# but we want to get a grip on the response_hook.
return SecretStorageBasicAuth(self.pulp_ctx)
if "BASIC_AUTH" not in self._memoized:
if self.pulp_ctx.username is None:
# No username -> No basic auth.
self._memoized["BASIC_AUTH"] = None
elif self.pulp_ctx.password is None:
# TODO give the user a chance to opt out.
if SECRET_STORAGE:
# We could just try to fetch the password here,
# but we want to get a grip on the response_hook.
self._memoized["BASIC_AUTH"] = SecretStorageBasicAuth(self.pulp_ctx)
else:
self._memoized["BASIC_AUTH"] = requests.auth.HTTPBasicAuth(
self.pulp_ctx.username, click.prompt("Password", hide_input=True)
)
else:
self.pulp_ctx.password = click.prompt("Password", hide_input=True)
return requests.auth.HTTPBasicAuth(self.pulp_ctx.username, self.pulp_ctx.password)
self._memoized["BASIC_AUTH"] = requests.auth.HTTPBasicAuth(
self.pulp_ctx.username, self.pulp_ctx.password
)
return self._memoized["BASIC_AUTH"]

def oauth2_client_credentials_auth(
self, flow: t.Any, scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.oauth2_client_id is None:
# No client_id -> No oauth2 client credentials.
return None
if self.pulp_ctx.oauth2_client_secret is None:
self.pulp_ctx.oauth2_client_secret = click.prompt("Client Secret")

return OAuth2ClientCredentialsAuth(
client_id=self.pulp_ctx.oauth2_client_id,
client_secret=self.pulp_ctx.oauth2_client_secret,
token_url=flow["tokenUrl"],
# Try to request all possible scopes.
scopes=flow["scopes"],
)
token_url = flow["tokenUrl"]
key = "OAUTH2_CLIENT_CREDENTIALS;" + token_url + ";" + ":".join(scopes)
if key not in self._memoized:
if self.pulp_ctx.oauth2_client_id is None:
# No client_id -> No oauth2 client credentials.
self._memoized[key] = None
elif self.pulp_ctx.oauth2_client_secret is None:
self._memoized[key] = OAuth2ClientCredentialsAuth(
client_id=self.pulp_ctx.oauth2_client_id,
client_secret=click.prompt("Client Secret"),
token_url=flow["tokenUrl"],
# Try to request all possible scopes.
scopes=flow["scopes"],
)
else:
self._memoized[key] = OAuth2ClientCredentialsAuth(
client_id=self.pulp_ctx.oauth2_client_id,
client_secret=self.pulp_ctx.oauth2_client_secret,
token_url=flow["tokenUrl"],
# Try to request all possible scopes.
scopes=flow["scopes"],
)
return self._memoized[key]


##############################################################################
Expand Down

0 comments on commit dd858b6

Please sign in to comment.