Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PR #1051/6c6a5c22 backport][0.28] Refactor the auth mechanism selection algorithm #1052

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/pulp-glue/+fix_auth_selection_regressions.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed regressions in the auth selection algorithm of `AuthProviderBase`.
In particular, proposals requiring multiple mechanisms are ignored for now instead of considering each constituent individually,
"HTTP Bearer" and other IANA schemes are no longer interpreted as "HTTP Basic" and the empty proposal rightfully reflects no needed authentication.
112 changes: 58 additions & 54 deletions pulp-glue/pulp_glue/common/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import typing as t
from collections import defaultdict
from contextlib import suppress
from io import BufferedReader
from urllib.parse import urljoin
Expand All @@ -24,7 +25,6 @@
SAFE_METHODS = ["GET", "HEAD", "OPTIONS"]
ISO_DATE_FORMAT = "%Y-%m-%d"
ISO_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
AUTH_PRIORITY = ("oauth2", "basic")


class OpenAPIError(Exception):
Expand All @@ -45,30 +45,6 @@ class UnsafeCallError(OpenAPIError):
pass


class OpenAPISecurityScheme:
def __init__(self, security_scheme: t.Dict[str, t.Any]):
self.security_scheme = security_scheme

self.security_type = self.security_scheme["type"]
self.description = self.security_scheme.get("description", "")

if self.security_type == "oauth2":
try:
self.flows: t.Dict[str, t.Any] = self.security_scheme["flows"]
client_credentials: t.Optional[t.Dict[str, t.Any]] = self.flows.get(
"clientCredentials"
)
if client_credentials:
self.flow_type: str = "clientCredentials"
self.token_url: str = client_credentials["tokenUrl"]
self.scopes: t.List[str] = list(client_credentials["scopes"].keys())
except KeyError:
raise OpenAPIValidationError

if self.security_type == "http":
self.scheme = self.security_scheme["scheme"]


class AuthProviderBase:
"""
Base class for auth providers.
Expand All @@ -78,61 +54,89 @@ class AuthProviderBase:
Returned auth objects need to be compatible with `requests.auth.AuthBase`.
"""

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
"""Implement this to provide means of http basic auth."""
return None

def http_auth(
self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Select a suitable http auth scheme or return None."""
# https://www.iana.org/assignments/http-authschemes/http-authschemes.xhtml
if security_scheme["scheme"] == "basic":
result = self.basic_auth(scopes)
if result:
return result
return None

def oauth2_client_credentials_auth(
self, flow: t.Any
) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
self, flow: t.Any, scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Implement this to provide other authentication methods."""
return None

def oauth2_auth(
self, security_scheme: t.Dict[str, t.Any], scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
"""Select a suitable oauth2 flow or return None."""
# Check flows by preference.
if "clientCredentials" in security_scheme["flows"]:
flow = security_scheme["flows"]["clientCredentials"]
# Select this flow only if it claims to provide all the necessary scopes.
# This will allow subsequent auth proposals to be considered.
if set(scopes) - set(flow["scopes"]):
return None

result = self.oauth2_client_credentials_auth(flow, scopes)
if result:
return result
return None

def __call__(
self,
security: t.List[t.Dict[str, t.List[str]]],
security_schemes: t.Dict[str, t.Dict[str, t.Any]],
) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:

authorized_schemes = []
authorized_schemes_types = set()
) -> t.Optional[requests.auth.AuthBase]:

# Reorder the proposals by their type to prioritize properly.
# Select only single mechanism proposals on the way.
proposed_schemes: t.Dict[str, t.Dict[str, t.List[str]]] = defaultdict(dict)
for proposal in security:
for name in proposal:
authorized_schemes.append(security_schemes[name])
authorized_schemes_types.add(security_schemes[name]["type"])

# Check for oauth2 scheme first
if "oauth2" in authorized_schemes_types:
for flow in authorized_schemes:
if flow["type"] == "oauth2":
oauth2_flow = OpenAPISecurityScheme(flow)

# We "know" we'll have an outh2-flow here
if oauth2_flow.flow_type == "client_credentials":
result = self.oauth2_client_credentials_auth(oauth2_flow)
if len(proposal) == 0:
# Empty proposal: No authentication needed. Shortcut return.
return None
if len(proposal) == 1:
name, scopes = list(proposal.items())[0]
proposed_schemes[security_schemes[name]["type"]][name] = scopes
# Ignore all proposals with more than one required auth mechanism.

# Check for auth schemes by preference.
if "oauth2" in proposed_schemes:
for name, scopes in proposed_schemes["oauth2"].items():
result = self.oauth2_auth(security_schemes[name], scopes)
if result:
return result

# if we get here, either no-oauth2, OR we couldn't find creds
if "http" in authorized_schemes_types:
result = self.basic_auth()
if result:
return result
if "http" in proposed_schemes:
for name, scopes in proposed_schemes["http"].items():
result = self.http_auth(security_schemes[name], scopes)
if result:
return result

raise OpenAPIError(_("No suitable auth scheme found."))


class BasicAuthProvider(AuthProviderBase):
"""
Reference Implementation for AuthProviderBase providing basic auth with `username`, `password`.
Implementation for AuthProviderBase providing basic auth with fixed `username`, `password`.
"""

def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.auth = requests.auth.HTTPBasicAuth(username, password)

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
return (self.username, self.password)
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
return self.auth


class OpenAPI:
Expand Down
93 changes: 93 additions & 0 deletions pulp-glue/tests/test_auth_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pytest

from pulp_glue.common.openapi import AuthProviderBase, OpenAPIError

pytestmark = pytest.mark.glue


SECURITY_SCHEMES = {
"A": {"type": "http", "scheme": "bearer"},
"B": {"type": "http", "scheme": "basic"},
"C": {
"type": "oauth2",
"flows": {
"implicit": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
"authorizationCode": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"tokenUrl": "https://example.com/api/oauth/token",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
},
},
"D": {
"type": "oauth2",
"flows": {
"implicit": {
"authorizationUrl": "https://example.com/api/oauth/dialog",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
"clientCredentials": {
"tokenUrl": "https://example.com/api/oauth/token",
"scopes": {
"write:pets": "modify pets in your account",
"read:pets": "read your pets",
},
},
},
},
}


def test_auth_provider_select_mechanism(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(AuthProviderBase, "basic_auth", lambda *args: "BASIC")
monkeypatch.setattr(
AuthProviderBase,
"oauth2_client_credentials_auth",
lambda *args: "OAUTH2_CLIENT_CREDENTIALS",
)

# Error if no auth scheme is available.
with pytest.raises(OpenAPIError):
AuthProviderBase()([], SECURITY_SCHEMES)

# Error if a nonexisting mechanism is proposed.
with pytest.raises(KeyError):
AuthProviderBase()([{"foo": []}], SECURITY_SCHEMES)

# Succeed without mechanism for an empty proposal.
assert AuthProviderBase()([{}], SECURITY_SCHEMES) is None

# Try select a not implemented auth.
with pytest.raises(OpenAPIError):
AuthProviderBase()([{"A": []}], SECURITY_SCHEMES)

# Ignore proposals with multiple mechanisms.
with pytest.raises(OpenAPIError):
AuthProviderBase()([{"B": [], "C": []}], SECURITY_SCHEMES)

# Select Basic auth alone and from multiple.
assert AuthProviderBase()([{"B": []}], SECURITY_SCHEMES) == "BASIC"
assert AuthProviderBase()([{"A": []}, {"B": []}], SECURITY_SCHEMES) == "BASIC"

# Select oauth2 client credentials alone and over basic auth if scopes match.
assert AuthProviderBase()([{"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS"
assert (
AuthProviderBase()([{"B": []}, {"D": []}], SECURITY_SCHEMES) == "OAUTH2_CLIENT_CREDENTIALS"
)
assert (
AuthProviderBase()([{"B": []}, {"D": ["read:pets"]}], SECURITY_SCHEMES)
== "OAUTH2_CLIENT_CREDENTIALS"
)
assert AuthProviderBase()([{"B": []}, {"D": ["read:cattle"]}], SECURITY_SCHEMES) == "BASIC"
11 changes: 6 additions & 5 deletions pulpcore/cli/common/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class PulpCLIAuthProvider(AuthProviderBase):
def __init__(self, pulp_ctx: PulpCLIContext):
self.pulp_ctx = pulp_ctx

def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]]:
def basic_auth(self, scopes: t.List[str]) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.username is None:
self.pulp_ctx.username = click.prompt("Username")
if self.pulp_ctx.password is None:
Expand All @@ -230,10 +230,10 @@ def basic_auth(self) -> t.Optional[t.Union[t.Tuple[str, str], requests.auth.Auth
return SecretStorageBasicAuth(self.pulp_ctx)
else:
self.pulp_ctx.password = click.prompt("Password", hide_input=True)
return (self.pulp_ctx.username, self.pulp_ctx.password)
return requests.auth.HTTPBasicAuth(self.pulp_ctx.username, self.pulp_ctx.password)

def oauth2_client_credentials_auth(
self, oauth2_flow: t.Any
self, flow: t.Any, scopes: t.List[str]
) -> t.Optional[requests.auth.AuthBase]:
if self.pulp_ctx.username is None:
self.pulp_ctx.username = click.prompt("Username/ClientID")
Expand All @@ -243,8 +243,9 @@ def oauth2_client_credentials_auth(
return OAuth2ClientCredentialsAuth(
client_id=self.pulp_ctx.username,
client_secret=self.pulp_ctx.password,
token_url=oauth2_flow.token_url,
scopes=oauth2_flow.scopes,
token_url=flow["tokenUrl"],
# Try to request all possible scopes.
scopes=flow["scopes"],
)


Expand Down
Loading