Skip to content

Commit

Permalink
Simplify the code by using GitHub CLI
Browse files Browse the repository at this point in the history
Simplify the code by using GitHub CLI and the GitHub REST API. Also add
tests.

Fixes #5
  • Loading branch information
seanh committed Nov 7, 2023
1 parent 37c123b commit 468bea0
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 221 deletions.
2 changes: 1 addition & 1 deletion .cookiecutter/cookiecutter.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"template": "https://github.com/hypothesis/cookiecutters",
"checkout": null,
"directory": "pypackage",
"ignore": ["tests/unit/dependabot_alerts/core_test.py"],
"ignore": ["tests/functional/sanity_test.py"],
"extra_context": {
"name": "Dependabot Alerts",
"package_name": "dependabot_alerts",
Expand Down
6 changes: 4 additions & 2 deletions .cookiecutter/includes/README/head.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
`dependabot-alerts` lists Dependabot security alerts for all repos of a GitHub
user or organization. You can run it from the command line:
organization. You can run it from the command line:

```terminal
$ dependabot-alerts <your_github_user_or_organization>
$ dependabot-alerts <your_github_organization>
```

You'll need to have [GitHub CLI](https://cli.github.com/) installed and logged in.

There's also a [GitHub Actions workflow](.github/workflows/alert.yml) that runs
automatically on a schedule and notifies us in Slack of any Dependabot alerts
in the `hypothesis` GitHub organization.
1 change: 0 additions & 1 deletion .cookiecutter/includes/setuptools/install_requires

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/alert.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: |
{
echo 'SLACK_MESSAGE<<EOF'
dependabot-alerts hypothesis
dependabot-alerts --format slack hypothesis
echo EOF
} >> "$GITHUB_OUTPUT"
env:
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
Notifications of Dependabot alerts across a GitHub organization.

`dependabot-alerts` lists Dependabot security alerts for all repos of a GitHub
user or organization. You can run it from the command line:
organization. You can run it from the command line:

```terminal
$ dependabot-alerts <your_github_user_or_organization>
$ dependabot-alerts <your_github_organization>
```

You'll need to have [GitHub CLI](https://cli.github.com/) installed and logged in.

There's also a [GitHub Actions workflow](.github/workflows/alert.yml) that runs
automatically on a schedule and notifies us in Slack of any Dependabot alerts
in the `hypothesis` GitHub organization.
Expand Down
1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package_dir =
packages = find:
python_requires = >=3.11
install_requires =
requests

[options.packages.find]
where = src
Expand Down
53 changes: 18 additions & 35 deletions src/dependabot_alerts/cli.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,32 @@
import subprocess
from argparse import ArgumentParser
from importlib.metadata import version

from dependabot_alerts.core import GitHubClient, Vulnerability, fetch_alerts
from dependabot_alerts.core import GitHub
from dependabot_alerts.format import format_slack, format_text


def cli(_argv=None): # pragma: no cover
def cli(argv=None):
parser = ArgumentParser()
parser.add_argument(
"-v", "--version", action="version", version=version("dependabot-alerts")
)
parser.add_argument("organization", help="GitHub user or organization")

args = parser.parse_args(_argv)

gh_client = GitHubClient.init()
vulns = fetch_alerts(gh_client, args.organization)
print(format_slack_message(args.organization, vulns))

return 0


def format_slack_message(
organization: str, vulns: list[Vulnerability]
) -> str: # pragma: no cover
"""
Format a Slack status report from a list of vulnerabilities.
parser.add_argument(
"--format", choices=["text", "slack"], default="text", help="output format"
)
parser.add_argument("organization", help="GitHub organization")
args = parser.parse_args(argv)

Returns a message using Slack's "mrkdwn" format. See
https://api.slack.com/reference/surfaces/formatting.
"""
if not vulns:
return "Found no open vulnerabilities."
alerts = GitHub(subprocess.run).alerts(args.organization)

n_repos = len(set(vuln.repo for vuln in vulns))
formatters = {
"text": format_text,
"slack": format_slack,
}

msg_parts = []
msg_parts.append(f"*Found {len(vulns)} vulnerabilities in {n_repos} repositories.*")
formatter = formatters[args.format]

for vuln in vulns:
vuln_msg = []
vuln_msg.append(
f"{organization}/{vuln.repo}: <{vuln.url}|{vuln.package_name} {vuln.severity} - {vuln.title}>"
)
if vuln.pr:
vuln_msg.append(f" Resolved by {vuln.pr}")
msg_parts.append("\n".join(vuln_msg))
output = formatter(alerts, args.organization)

return "\n\n".join(msg_parts)
if output:
print(output)
233 changes: 57 additions & 176 deletions src/dependabot_alerts/core.py
Original file line number Diff line number Diff line change
@@ -1,184 +1,65 @@
import json
import os
from dataclasses import dataclass
from getpass import getpass
from typing import Optional

import requests


class GitHubClient: # pragma: no cover
"""
Client for GitHub's GraphQL API.
from __future__ import annotations

See https://docs.github.com/en/graphql.
"""
import json
from dataclasses import dataclass, field
from subprocess import CalledProcessError

def __init__(self, token):
self.token = token
self.endpoint = "https://api.github.com/graphql"

def query(self, query, variables=None):
data = {"query": query, "variables": variables if variables is not None else {}}
result = requests.post(
url=self.endpoint,
headers={"Authorization": f"Bearer {self.token}"},
data=json.dumps(data),
timeout=(30, 30),
)
body = result.json()
result.raise_for_status()
if "errors" in body:
errors = body["errors"]
raise RuntimeError(f"Query failed: {json.dumps(errors)}")
return body["data"]
@dataclass(frozen=True)
class Alert: # pylint:disable=too-many-instance-attributes
repo_full_name: str | None
ghsa_id: str | None
html_url: str | None = field(compare=False)
package: str | None = field(compare=False)
manifest_path: str | None = field(compare=False)
summary: str | None = field(compare=False)
severity: str | None = field(compare=False)
duplicates: list[Alert] = field(compare=False, default_factory=list)

@classmethod
def init(cls):
"""
Initialize an authenticated GitHubClient.
This will read from the `GITHUB_TOKEN` env var if set, or prompt for
a token otherwise.
"""
access_token = os.environ.get("GITHUB_TOKEN")
if not access_token:
access_token = getpass("GitHub API token: ")
return GitHubClient(access_token)


@dataclass
class Vulnerability: # pylint:disable=too-many-instance-attributes
repo: str
"""Repository where this vulnerability was reported."""

created_at: str
"""ISO date when this alert was created."""

package_name: str
"""Name of the vulnerable package."""

ecosystem: str
"""Package ecosytem (eg. npm) that the package comes from."""

severity: str
"""Vulnerability severity level."""

version_range: str
"""Version ranges of package affected by vulnerability."""

number: str
"""Number of this vulnerability report."""

url: str
"""Link to the vulernability report on GitHub."""

pr: Optional[str]
"""Link to the Dependabot update PR that resolves this vulnerability."""

title: str
"""Summary of what the vulnerability is."""


def fetch_alerts(
gh: GitHubClient, organization: str
) -> list[Vulnerability]: # pragma: no cover
"""
Fetch details of all open vulnerability alerts in `organization`.
To reduce the volume of noise, especially for repositories which include the
same dependency in multiple lockfiles, only one vulnerability is reported
per package per repository.
Vulnerabilities are not reported from archived repositories.
"""
# pylint:disable=too-many-locals

query = """
query($organization: String!, $cursor: String) {
organization(login: $organization) {
repositories(first: 100, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
name
vulnerabilityAlerts(first: 100, states:OPEN) {
nodes {
number
createdAt
dependabotUpdate {
pullRequest {
url
}
}
securityAdvisory {
summary
}
securityVulnerability {
package {
name
ecosystem
}
severity
vulnerableVersionRange
}
}
}
}
}
}
}
"""

vulns = []
cursor = None
has_next_page = True

while has_next_page:
result = gh.query(
query=query, variables={"organization": organization, "cursor": cursor}
def make(cls, alert_dict):
return cls(
repo_full_name=alert_dict["repository"]["full_name"],
ghsa_id=alert_dict["security_advisory"]["ghsa_id"],
html_url=alert_dict["html_url"],
package=alert_dict["dependency"]["package"]["name"],
manifest_path=alert_dict["dependency"]["manifest_path"],
summary=alert_dict["security_advisory"]["summary"],
severity=alert_dict["security_advisory"]["severity"],
)
page_info = result["organization"]["repositories"]["pageInfo"]
cursor = page_info["endCursor"]
has_next_page = page_info["hasNextPage"]

for repo in result["organization"]["repositories"]["nodes"]:
alerts = repo["vulnerabilityAlerts"]["nodes"]

if alerts:
repo_name = repo["name"]
vulnerable_packages = set()

for alert in alerts:
sa = alert["securityAdvisory"]
sv = alert["securityVulnerability"]
number = alert["number"]
package_name = sv["package"]["name"]

if package_name in vulnerable_packages:
continue
vulnerable_packages.add(package_name)

pr = None

dep_update = alert["dependabotUpdate"]
if dep_update and dep_update["pullRequest"]:
pr = dep_update["pullRequest"]["url"]

vuln = Vulnerability(
repo=repo_name,
created_at=alert["createdAt"],
ecosystem=sv["package"]["ecosystem"],
number=number,
package_name=sv["package"]["name"],
pr=pr,
severity=sv["severity"],
title=sa["summary"],
url=f"https://github.com/{organization}/{repo_name}/security/dependabot/{number}",
version_range=sv["vulnerableVersionRange"],
)
vulns.append(vuln)

return vulns
class GitHub:
def __init__(self, run):
self._run = run

def alerts(self, organization) -> list[Alert]:
try:
result = self._run(
[
"gh",
"api",
"--paginate",
f"/orgs/{organization}/dependabot/alerts?state=open",
],
check=True,
capture_output=True,
text=True,
)
except CalledProcessError as err: # pragma: no cover
print(err.stdout)
print(err.stderr)
raise

alert_dicts = json.loads(result.stdout)

alerts = {}

for alert_dict in alert_dicts:
alert = Alert.make(alert_dict)
if alert in alerts:
alerts[alert].duplicates.append(alert)
else:
alerts[alert] = alert

return list(alerts.values())
Loading

0 comments on commit 468bea0

Please sign in to comment.