Skip to content

oidc: Refactor lookup strategies into single functions #18169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tests/unit/manage/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7551,6 +7551,90 @@ def test_add_oidc_publisher_already_registered_with_project(
)
]

def test_add_oidc_publisher_already_registered_after_normalization(
self, monkeypatch, db_request
):
publisher = GitHubPublisher(
repository_name="some-repository",
repository_owner="some-owner",
repository_owner_id="666",
workflow_filename="some-workflow-filename.yml",
environment="some-environment",
)
post_body = MultiDict(
{
"owner": "some-owner",
"repository": "some-repository",
"workflow_filename": "some-workflow-filename.yml",
"environment": "SOME-environment",
}
)
db_request.user = UserFactory.create()
EmailFactory(user=db_request.user, verified=True, primary=True)
db_request.db.add(publisher)
db_request.db.flush() # To get it in the DB

project = pretend.stub(
name="fakeproject",
oidc_publishers=[publisher],
record_event=pretend.call_recorder(lambda *a, **kw: None),
)

db_request.registry = pretend.stub(
settings={
"github.token": "fake-api-token",
}
)
db_request.flags = pretend.stub(
disallow_oidc=pretend.call_recorder(lambda f=None: False)
)
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
)
db_request.POST = post_body

view = views.ManageOIDCPublisherViews(project, db_request)
monkeypatch.setattr(
views.GitHubPublisherForm,
"_lookup_owner",
lambda *a: {"login": "some-owner", "id": "some-owner-id"},
)

monkeypatch.setattr(
view, "_hit_ratelimits", pretend.call_recorder(lambda: None)
)
monkeypatch.setattr(
view, "_check_ratelimits", pretend.call_recorder(lambda: None)
)

assert view.add_github_oidc_publisher() == {
"disabled": {
"GitHub": False,
"GitLab": False,
"Google": False,
"ActiveState": False,
},
"project": project,
"github_publisher_form": view.github_publisher_form,
"gitlab_publisher_form": view.gitlab_publisher_form,
"google_publisher_form": view.google_publisher_form,
"activestate_publisher_form": view.activestate_publisher_form,
"prefilled_provider": view.prefilled_provider,
}
assert view.metrics.increment.calls == [
pretend.call(
"warehouse.oidc.add_publisher.attempt",
tags=["publisher:GitHub"],
),
]
assert project.record_event.calls == []
assert db_request.session.flash.calls == [
pretend.call(
f"{str(publisher)} is already registered with fakeproject",
queue="error",
)
]

@pytest.mark.parametrize(
("view_name", "publisher_name"),
[
Expand Down
15 changes: 7 additions & 8 deletions tests/unit/oidc/models/test_activestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@
SUBJECT = f"org:{ORG_URL_NAME}:project:{PROJECT_NAME}"


def test_lookup_strategies():
assert (
len(ActiveStatePublisher.__lookup_strategies__)
== len(PendingActiveStatePublisher.__lookup_strategies__)
== 1
)


def new_signed_claims(
sub: str = SUBJECT,
actor: str = ACTOR,
Expand Down Expand Up @@ -381,6 +373,13 @@ def test_exists(self, db_request, exists_in_db):

assert publisher.exists(db_request.db) == exists_in_db

def test_lookup_no_matching_publishers(self, db_request):
signed_claims = new_signed_claims(actor_id="my_id")

with pytest.raises(InvalidPublisherError) as e:
ActiveStatePublisher.lookup_by_claims(db_request.db, signed_claims)
assert str(e.value) == "Publisher with matching claims was not found"


class TestPendingActiveStatePublisher:
def test_reify_does_not_exist_yet(self, db_request):
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/oidc/models/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ def test_check_claim_invariant():

class TestOIDCPublisher:
def test_lookup_by_claims_raises(self):
with pytest.raises(errors.InvalidPublisherError) as e:
with pytest.raises(NotImplementedError):
_core.OIDCPublisher.lookup_by_claims(pretend.stub(), pretend.stub())
assert str(e.value) == "All lookup strategies exhausted"

def test_oidc_publisher_not_default_verifiable(self):
publisher = _core.OIDCPublisher(projects=[])
Expand Down
32 changes: 24 additions & 8 deletions tests/unit/oidc/models/test_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ def test_check_sub(claim):


class TestGitHubPublisher:
def test_lookup_strategies(self):
assert (
len(github.GitHubPublisher.__lookup_strategies__)
== len(github.PendingGitHubPublisher.__lookup_strategies__)
== 2
)

@pytest.mark.parametrize("environment", [None, "some_environment"])
def test_lookup_fails_invalid_workflow_ref(self, environment):
signed_claims = {
Expand All @@ -116,7 +109,8 @@ def test_lookup_fails_invalid_workflow_ref(self, environment):

# The `job_workflow_ref` is malformed, so no queries are performed.
with pytest.raises(
errors.InvalidPublisherError, match="All lookup strategies exhausted"
errors.InvalidPublisherError,
match="Could not job extract workflow filename from OIDC claims",
):
github.GitHubPublisher.lookup_by_claims(pretend.stub(), signed_claims)

Expand Down Expand Up @@ -165,6 +159,28 @@ def test_lookup_escapes(self, db_request, environment, workflow_a, workflow_b):
== workflow
)

def test_lookup_no_matching_publishers(self, db_request):
GitHubPublisherFactory(
id="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
repository_owner="foo",
repository_name="bar",
repository_owner_id="1234",
workflow_filename="release.yml",
environment="environment",
)
signed_claims = {
"repository": "foo/bar",
"job_workflow_ref": (
"foo/bar/.github/workflows/release.yml@refs/heads/main"
),
"repository_owner_id": "1234",
"environment": "another_environment",
}

with pytest.raises(errors.InvalidPublisherError) as e:
github.GitHubPublisher.lookup_by_claims(db_request.db, signed_claims)
assert str(e.value) == "Publisher with matching claims was not found"

def test_github_publisher_all_known_claims(self):
assert github.GitHubPublisher.all_known_claims() == {
# required verifiable claims
Expand Down
19 changes: 11 additions & 8 deletions tests/unit/oidc/models/test_gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ def test_check_sub(claim):


class TestGitLabPublisher:
def test_lookup_strategies(self):
assert (
len(gitlab.GitLabPublisher.__lookup_strategies__)
== len(gitlab.PendingGitLabPublisher.__lookup_strategies__)
== 2
)

@pytest.mark.parametrize("environment", [None, "some_environment"])
def test_lookup_fails_invalid_ci_config_ref_uri(self, environment):
signed_claims = {
Expand All @@ -83,7 +76,8 @@ def test_lookup_fails_invalid_ci_config_ref_uri(self, environment):

# The `ci_config_ref_uri` is malformed, so no queries are performed.
with pytest.raises(
errors.InvalidPublisherError, match="All lookup strategies exhausted"
errors.InvalidPublisherError,
match="Could not extract workflow filename from OIDC claims",
):
gitlab.GitLabPublisher.lookup_by_claims(pretend.stub(), signed_claims)

Expand Down Expand Up @@ -131,6 +125,15 @@ def test_lookup_escapes(
== workflow_filepath
)

def test_lookup_no_matching_publisher(self, db_request):
signed_claims = {
"project_path": "foo/bar",
"ci_config_ref_uri": ("gitlab.com/foo/bar//.gitlab-ci.yml@refs/heads/main"),
}
with pytest.raises(errors.InvalidPublisherError) as e:
gitlab.GitLabPublisher.lookup_by_claims(db_request.db, signed_claims)
assert str(e.value) == "Publisher with matching claims was not found"

def test_gitlab_publisher_all_known_claims(self):
assert gitlab.GitLabPublisher.all_known_claims() == {
# required verifiable claims
Expand Down
18 changes: 10 additions & 8 deletions tests/unit/oidc/models/test_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@
from warehouse.oidc.models import _core, google


def test_lookup_strategies():
assert (
len(google.GooglePublisher.__lookup_strategies__)
== len(google.PendingGooglePublisher.__lookup_strategies__)
== 2
)


class TestGooglePublisher:
def test_publisher_name(self):
publisher = google.GooglePublisher(email="[email protected]")
Expand Down Expand Up @@ -196,6 +188,16 @@ def test_google_publisher_sub_is_optional(self, expected_sub, actual_sub, valid)
)
assert str(e.value) == "Check failed for optional claim 'sub'"

def test_lookup_no_matching_publishers(self, db_request):
signed_claims = {
"email": "[email protected]",
"email_verified": True,
}

with pytest.raises(errors.InvalidPublisherError) as e:
google.GooglePublisher.lookup_by_claims(db_request.db, signed_claims)
assert str(e.value) == "Publisher with matching claims was not found"

@pytest.mark.parametrize("exists_in_db", [True, False])
def test_exists(self, db_request, exists_in_db):
publisher = google.GooglePublisher(
Expand Down
16 changes: 3 additions & 13 deletions warehouse/oidc/models/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Unpack
from typing import TYPE_CHECKING, Any, Self, TypedDict, TypeVar, Unpack

import rfc3986
import sentry_sdk
Expand Down Expand Up @@ -174,20 +174,10 @@ class OIDCPublisherMixin:
# but there are a few problems: those claim sets don't map to their
# "equivalent" column (only to an instantiated property), and may not
# even have an "equivalent" column.
__lookup_strategies__: list = []

@classmethod
def lookup_by_claims(cls, session, signed_claims: SignedClaims):
for lookup in cls.__lookup_strategies__:
query = lookup(cls, signed_claims)
if not query:
# We might not build a query if we know the claim set can't
# satisfy it. If that's the case, then we skip.
continue

if publisher := query.with_session(session).one_or_none():
return publisher
raise InvalidPublisherError("All lookup strategies exhausted")
def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self:
raise NotImplementedError

@classmethod
def all_known_claims(cls) -> set[str]:
Expand Down
25 changes: 12 additions & 13 deletions warehouse/oidc/models/activestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import urllib

from typing import Any
from typing import Any, Self

from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists
from sqlalchemy.dialects.postgresql import UUID
Expand Down Expand Up @@ -92,18 +92,6 @@ class ActiveStatePublisherMixin:
"project_visibility",
}

@staticmethod
def __lookup_all__(klass, signed_claims: SignedClaims):
return Query(klass).filter_by(
organization=signed_claims["organization"],
activestate_project_name=signed_claims["project"],
actor_id=signed_claims["actor_id"],
)

__lookup_strategies__ = [
__lookup_all__,
]

@property
def sub(self) -> str:
return f"org:{self.organization}:project:{self.activestate_project_name}"
Expand Down Expand Up @@ -147,6 +135,17 @@ def exists(self, session) -> bool:
)
).scalar()

@classmethod
def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self:
query: Query = Query(cls).filter_by(
organization=signed_claims["organization"],
activestate_project_name=signed_claims["project"],
actor_id=signed_claims["actor_id"],
)
if publisher := query.with_session(session).one_or_none():
return publisher
raise InvalidPublisherError("Publisher with matching claims was not found")


class ActiveStatePublisher(ActiveStatePublisherMixin, OIDCPublisher):
__tablename__ = "activestate_oidc_publishers"
Expand Down
Loading