Skip to content

Commit

Permalink
Simplify the code by using GitHub CLI
Browse files Browse the repository at this point in the history
Simplify the code by using GitHub CLIand the GitHub REST API. Also add
tests.
  • Loading branch information
seanh committed Oct 25, 2023
1 parent 0a12996 commit 711a606
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 210 deletions.
2 changes: 1 addition & 1 deletion .cookiecutter/cookiecutter.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"template": "https://github.com/hypothesis/cookiecutters",
"checkout": null,
"directory": "pypackage",
"ignore": ["tests/unit/dependabot_alerts/core_test.py"],
"ignore": ["tests/functional/sanity_test.py"],
"extra_context": {
"name": "Dependabot Alerts",
"package_name": "dependabot_alerts",
Expand Down
1 change: 0 additions & 1 deletion .cookiecutter/includes/setuptools/install_requires

This file was deleted.

1 change: 1 addition & 0 deletions .cookiecutter/includes/tox/passenv
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
dev: GITHUB_ACTIONS
dev: GITHUB_TOKEN
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package_dir =
packages = find:
python_requires = >=3.11
install_requires =
requests

[options.packages.find]
where = src
Expand Down
69 changes: 31 additions & 38 deletions src/dependabot_alerts/cli.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,42 @@
import subprocess
from argparse import ArgumentParser
from importlib.metadata import version
from os import environ

from dependabot_alerts.core import GitHubClient, Vulnerability, fetch_alerts
from dependabot_alerts.core import GitHub


def cli(_argv=None): # pragma: no cover
def cli(argv=None):
parser = ArgumentParser()
parser.add_argument(
"-v", "--version", action="version", version=version("dependabot-alerts")
)
parser.add_argument("organization", help="GitHub user or organization")

args = parser.parse_args(_argv)

gh_client = GitHubClient.init()
vulns = fetch_alerts(gh_client, args.organization)
print(format_slack_message(args.organization, vulns))

return 0


def format_slack_message(
organization: str, vulns: list[Vulnerability]
) -> str: # pragma: no cover
"""
Format a Slack status report from a list of vulnerabilities.
Returns a message using Slack's "mrkdwn" format. See
https://api.slack.com/reference/surfaces/formatting.
"""
if not vulns:
return "Found no open vulnerabilities."

n_repos = len(set(vuln.repo for vuln in vulns))

msg_parts = []
msg_parts.append(f"*Found {len(vulns)} vulnerabilities in {n_repos} repositories.*")

for vuln in vulns:
vuln_msg = []
vuln_msg.append(
f"{organization}/{vuln.repo}: <{vuln.url}|{vuln.package_name} {vuln.severity} - {vuln.title}>"
parser.add_argument("organization", help="GitHub organization")

args = parser.parse_args(argv)
organization = args.organization

github = GitHub(subprocess.run)
repos = github.alerts(organization)

if not repos:
return

if environ.get("GITHUB_ACTIONS") == "true":
print(f"*Found Dependabot security alerts in {len(repos)} repos:*")
print()
for repo, packages in repos.items():
for package, alerts in packages.items():
print(
f"- <https://github.com/{repo}/security/dependabot|{repo}>: {package} ({len(alerts)} alerts)"
)
print()
print(
"Message generated by the `alerts.yml` workflow <https://github.com/hypothesis/dependabot-alerts/blob/main/.github/workflows/alert.yml|in dependabot-alerts>"
)
if vuln.pr:
vuln_msg.append(f" Resolved by {vuln.pr}")
msg_parts.append("\n".join(vuln_msg))
else:
for repo, packages in repos.items():
for package, alerts in packages.items():
print(f"{repo}: {package} ({len(alerts)} alerts)")

return "\n\n".join(msg_parts)
return
208 changes: 41 additions & 167 deletions src/dependabot_alerts/core.py
Original file line number Diff line number Diff line change
@@ -1,184 +1,58 @@
import json
import os
from collections import defaultdict
from dataclasses import dataclass
from getpass import getpass
from typing import Optional

import requests

def safe_get(dict_, keys):
keys = list(reversed(keys))
value = dict_

class GitHubClient: # pragma: no cover
"""
Client for GitHub's GraphQL API.
while keys:
key = keys.pop()
try:
value = value[key]
except Exception: # pylint:disable=broad-exception-caught
return None

See https://docs.github.com/en/graphql.
"""
return value

def __init__(self, token):
self.token = token
self.endpoint = "https://api.github.com/graphql"

def query(self, query, variables=None):
data = {"query": query, "variables": variables if variables is not None else {}}
result = requests.post(
url=self.endpoint,
headers={"Authorization": f"Bearer {self.token}"},
data=json.dumps(data),
timeout=(30, 30),
)
body = result.json()
result.raise_for_status()
if "errors" in body:
errors = body["errors"]
raise RuntimeError(f"Query failed: {json.dumps(errors)}")
return body["data"]

@classmethod
def init(cls):
"""
Initialize an authenticated GitHubClient.
This will read from the `GITHUB_TOKEN` env var if set, or prompt for
a token otherwise.
"""
access_token = os.environ.get("GITHUB_TOKEN")
if not access_token:
access_token = getpass("GitHub API token: ")
return GitHubClient(access_token)


@dataclass
class Vulnerability: # pylint:disable=too-many-instance-attributes
@dataclass(frozen=True)
class Alert:
repo: str
"""Repository where this vulnerability was reported."""

created_at: str
"""ISO date when this alert was created."""

package_name: str
"""Name of the vulnerable package."""

ecosystem: str
"""Package ecosytem (eg. npm) that the package comes from."""

severity: str
"""Vulnerability severity level."""

version_range: str
"""Version ranges of package affected by vulnerability."""

number: str
"""Number of this vulnerability report."""

url: str
"""Link to the vulernability report on GitHub."""
package: str

pr: Optional[str]
"""Link to the Dependabot update PR that resolves this vulnerability."""

title: str
"""Summary of what the vulnerability is."""


def fetch_alerts(
gh: GitHubClient, organization: str
) -> list[Vulnerability]: # pragma: no cover
"""
Fetch details of all open vulnerability alerts in `organization`.
To reduce the volume of noise, especially for repositories which include the
same dependency in multiple lockfiles, only one vulnerability is reported
per package per repository.
Vulnerabilities are not reported from archived repositories.
"""
# pylint:disable=too-many-locals

query = """
query($organization: String!, $cursor: String) {
organization(login: $organization) {
repositories(first: 100, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
name
vulnerabilityAlerts(first: 100, states:OPEN) {
nodes {
number
createdAt
dependabotUpdate {
pullRequest {
url
}
}
securityAdvisory {
summary
}
securityVulnerability {
package {
name
ecosystem
}
severity
vulnerableVersionRange
}
}
}
}
}
}
}
"""

vulns = []
cursor = None
has_next_page = True

while has_next_page:
result = gh.query(
query=query, variables={"organization": organization, "cursor": cursor}
@classmethod
def make(cls, alert_dict):
return cls(
repo=safe_get(alert_dict, ["repository", "full_name"]),
ecosystem=safe_get(alert_dict, ["dependency", "package", "ecosystem"]),
package=safe_get(alert_dict, ["dependency", "package", "name"]),
)
page_info = result["organization"]["repositories"]["pageInfo"]
cursor = page_info["endCursor"]
has_next_page = page_info["hasNextPage"]

for repo in result["organization"]["repositories"]["nodes"]:
alerts = repo["vulnerabilityAlerts"]["nodes"]

if alerts:
repo_name = repo["name"]
vulnerable_packages = set()

for alert in alerts:
sa = alert["securityAdvisory"]
sv = alert["securityVulnerability"]
number = alert["number"]
package_name = sv["package"]["name"]

if package_name in vulnerable_packages:
continue
vulnerable_packages.add(package_name)

pr = None
class GitHub:
def __init__(self, run):
self._run = run

def alerts(self, organization):
result = self._run(
[
"gh",
"api",
"--paginate",
f"/orgs/{organization}/dependabot/alerts?state=open",
],
check=True,
capture_output=True,
)
alert_dicts = json.loads(result.stdout)

dep_update = alert["dependabotUpdate"]
if dep_update and dep_update["pullRequest"]:
pr = dep_update["pullRequest"]["url"]
alerts = defaultdict(lambda: defaultdict(list))

vuln = Vulnerability(
repo=repo_name,
created_at=alert["createdAt"],
ecosystem=sv["package"]["ecosystem"],
number=number,
package_name=sv["package"]["name"],
pr=pr,
severity=sv["severity"],
title=sa["summary"],
url=f"https://github.com/{organization}/{repo_name}/security/dependabot/{number}",
version_range=sv["vulnerableVersionRange"],
)
vulns.append(vuln)
for alert_dict in alert_dicts:
alert = Alert.make(alert_dict)
alerts[alert.repo][(alert.ecosystem, alert.package)].append(alert)

return vulns
return alerts
40 changes: 40 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import factory
from pytest_factoryboy import named_model, register

from dependabot_alerts.core import Alert


@register
class AlertFactory(factory.Factory):
"""Factory for Alert objects."""

class Meta:
model = Alert
exclude = ("organization",)

organization = factory.Sequence(lambda n: f"organization-{n}")
repo = factory.LazyAttributeSequence(lambda o, n: f"{o.organization}/repo-{n}")
ecosystem = factory.Sequence(lambda n: f"ecosystem-{n}")
package = factory.Sequence(lambda n: f"package-{n}")


@register
class AlertDictFactory(AlertFactory):
"""Factory for alert dicts as returned by the GitHub API."""

class Meta:
model = named_model(dict, "AlertDict")

@factory.post_generation
def post(obj, *_args, **_kwargs): # pylint:disable=no-self-argument
repo = obj.pop("repo") # pylint:disable=no-member
ecosystem = obj.pop("ecosystem") # pylint:disable=no-member
package = obj.pop("package") # pylint:disable=no-member

obj["repository"] = {"full_name": repo}
obj["dependency"] = {
"package": {
"ecosystem": ecosystem,
"name": package,
}
}
2 changes: 0 additions & 2 deletions tests/functional/sanity_test.py

This file was deleted.

Loading

0 comments on commit 711a606

Please sign in to comment.