Skip to content

Commit

Permalink
Tokenless V3 (#533)
Browse files Browse the repository at this point in the history
Signed-off-by: joseph-sentry <[email protected]>
  • Loading branch information
joseph-sentry authored Jun 11, 2024
1 parent bccac24 commit 7bed118
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 572 deletions.
108 changes: 46 additions & 62 deletions codecov_auth/authentication/repo_auth.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import json
import logging
import re
from datetime import datetime
from typing import List
from typing import Any, List, Tuple
from uuid import UUID

from asgiref.sync import async_to_sync
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import QuerySet
from django.http.request import HttpRequest
from django.utils import timezone
from jwt import PyJWTError
from rest_framework import authentication, exceptions
from rest_framework.exceptions import NotAuthenticated
from rest_framework.exceptions import AuthenticationFailed, NotAuthenticated
from rest_framework.response import Response
from rest_framework.views import exception_handler
from sentry_sdk import metrics as sentry_metrics
from shared.metrics import metrics
Expand All @@ -23,7 +26,7 @@
Service,
TokenTypeChoices,
)
from core.models import Repository
from core.models import Commit, Repository
from services.repo_providers import RepoProviderService
from upload.helpers import get_global_tokens, get_repo_with_github_actions_oidc_token
from upload.views.helpers import get_repository_from_string
Expand Down Expand Up @@ -253,86 +256,67 @@ def authenticate_credentials(self, token):


class TokenlessAuthentication(authentication.TokenAuthentication):
"""This is named ironically, but provides authentication for tokenless uploads.
Tokenless only works in FORKs of PUBLIC repos.
It allows PRs from external contributors (forks) to upload coverage
for the upstream repo when running in a PR.
While it uses the same "shell" (authentication.TokenAuthentication)
it doesn't really rely on tokens to authenticate.
"""

# TODO: replace this with the message from repo_auth_custom_exception_handler
auth_failed_message = "Not valid tokenless upload"
rate_limit_failed_message = "Tokenless has reached GitHub rate limit. Please upload using a token: https://docs.codecov.com/docs/adding-the-codecov-token."

def _get_repo_info_from_request_path(self, request) -> Repository:
def _get_info_from_request_path(
self, request
) -> tuple[Repository, str | None] | None:
path_info = request.get_full_path_info()
# The repo part comes from https://stackoverflow.com/a/22312124
upload_views_prefix_regex = r"\/upload\/(\w+)\/([\w\.@:_/\-~]+)\/commits"
upload_views_prefix_regex = (
r"\/upload\/(\w+)\/([\w\.@:_/\-~]+)\/commits(?:\/([a-f0-9]{40}))?"
)
match = re.search(upload_views_prefix_regex, path_info)

if match is None:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
# Validate provider

service = match.group(1)
encoded_slug = match.group(2)
commitid = match.group(3)

# Validate provider
try:
service_enum = Service(service)
# Currently only Github is supported
# TODO [codecov/engineering-team#914]: Extend tokenless support to other providers
if service_enum != Service.GITHUB:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
except ValueError:
raise exceptions.AuthenticationFailed(self.auth_failed_message)

# Validate that next group exists and decode slug
encoded_slug = match.group(2)
repo = get_repository_from_string(service_enum, encoded_slug)
if repo is None:
# Purposefully using the generic message so that we don't tell that
# we don't have a certain repo
raise exceptions.AuthenticationFailed(self.auth_failed_message)
return repo

@async_to_sync
async def get_pull_request_info(self, repository_service, fork_pr: str):
try:
return await repository_service.get_pull_request(fork_pr)
except TorngitObjectNotFoundError:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
except TorngitRateLimitError as e:
metrics.incr("auth.get_pr_info.rate_limit_hit")
sentry_metrics.incr("auth.get_pr_info.rate_limit_hit")
if e.reset:
now_timestamp = datetime.now().timestamp()
retry_after = int(e.reset) - int(now_timestamp)
elif e.retry_after:
retry_after = int(e.retry_after)
raise exceptions.Throttled(
wait=retry_after, detail=self.rate_limit_failed_message
)
return repo, commitid

def get_branch(self, request, commitid=None):
if commitid:
commit = Commit.objects.filter(commitid=commitid).first()
if not commit:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
return commit.branch
else:
try:
body = json.loads(str(request.body, "utf8"))
except json.JSONDecodeError:
return None
else:
return body.get("branch")

def authenticate(self, request):
fork_slug = request.headers.get("X-Tokenless", None)
fork_pr = request.headers.get("X-Tokenless-PR", None)
if fork_slug is None or fork_pr is None:
return None # continue to next auth class
# Get the repo
repository = self._get_repo_info_from_request_path(request)
# Tokneless is only for public repos
if repository.private:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
# Get the provider service to check the tokenless claim
repository_service = RepoProviderService().get_adapter(
repository.author, repository
)
pull_request = self.get_pull_request_info(repository_service, fork_pr)
if (
pull_request["base"]["slug"]
!= f"{repository.author.username}/{repository.name}"
or pull_request["head"]["slug"] != fork_slug
):
repository, commitid = self._get_info_from_request_path(request)

if repository is None or repository.private:
raise exceptions.AuthenticationFailed(self.auth_failed_message)

return (
RepositoryAsUser(repository),
TokenlessAuth(repository),
)
branch = self.get_branch(request, commitid)

if (branch and ":" in branch) or request.method == "GET":
return (
RepositoryAsUser(repository),
TokenlessAuth(repository),
)
else:
raise exceptions.AuthenticationFailed(self.auth_failed_message)
Loading

0 comments on commit 7bed118

Please sign in to comment.