diff --git a/.cookiecutter/cookiecutter.json b/.cookiecutter/cookiecutter.json index 1abc1b8..f0f3b1e 100644 --- a/.cookiecutter/cookiecutter.json +++ b/.cookiecutter/cookiecutter.json @@ -2,7 +2,7 @@ "template": "https://github.com/hypothesis/cookiecutters", "checkout": null, "directory": "pypackage", - "ignore": ["tests/unit/dependabot_alerts/core_test.py"], + "ignore": ["tests/functional/sanity_test.py"], "extra_context": { "name": "Dependabot Alerts", "package_name": "dependabot_alerts", diff --git a/.cookiecutter/includes/README/head.md b/.cookiecutter/includes/README/head.md index 7da0881..5e34360 100644 --- a/.cookiecutter/includes/README/head.md +++ b/.cookiecutter/includes/README/head.md @@ -1,10 +1,12 @@ `dependabot-alerts` lists Dependabot security alerts for all repos of a GitHub -user or organization. You can run it from the command line: +organization. You can run it from the command line: ```terminal -$ dependabot-alerts +$ dependabot-alerts ``` +You'll need to have [GitHub CLI](https://cli.github.com/) installed and logged in. + There's also a [GitHub Actions workflow](.github/workflows/alert.yml) that runs automatically on a schedule and notifies us in Slack of any Dependabot alerts in the `hypothesis` GitHub organization. diff --git a/.cookiecutter/includes/setuptools/install_requires b/.cookiecutter/includes/setuptools/install_requires deleted file mode 100644 index f229360..0000000 --- a/.cookiecutter/includes/setuptools/install_requires +++ /dev/null @@ -1 +0,0 @@ -requests diff --git a/.github/workflows/alert.yml b/.github/workflows/alert.yml index 59d1722..da83f34 100644 --- a/.github/workflows/alert.yml +++ b/.github/workflows/alert.yml @@ -29,7 +29,7 @@ jobs: run: | { echo 'SLACK_MESSAGE<> "$GITHUB_OUTPUT" env: diff --git a/README.md b/README.md index acf29d0..ebabf3c 100644 --- a/README.md +++ b/README.md @@ -9,12 +9,14 @@ Notifications of Dependabot alerts across a GitHub organization. `dependabot-alerts` lists Dependabot security alerts for all repos of a GitHub -user or organization. You can run it from the command line: +organization. You can run it from the command line: ```terminal -$ dependabot-alerts +$ dependabot-alerts ``` +You'll need to have [GitHub CLI](https://cli.github.com/) installed and logged in. + There's also a [GitHub Actions workflow](.github/workflows/alert.yml) that runs automatically on a schedule and notifies us in Slack of any Dependabot alerts in the `hypothesis` GitHub organization. diff --git a/setup.cfg b/setup.cfg index f1872cd..ec7c6c0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,7 +18,6 @@ package_dir = packages = find: python_requires = >=3.11 install_requires = - requests [options.packages.find] where = src diff --git a/src/dependabot_alerts/cli.py b/src/dependabot_alerts/cli.py index f245b93..8a04b8f 100644 --- a/src/dependabot_alerts/cli.py +++ b/src/dependabot_alerts/cli.py @@ -1,49 +1,32 @@ +import subprocess from argparse import ArgumentParser from importlib.metadata import version -from dependabot_alerts.core import GitHubClient, Vulnerability, fetch_alerts +from dependabot_alerts.core import GitHub +from dependabot_alerts.format import format_slack, format_text -def cli(_argv=None): # pragma: no cover +def cli(argv=None): parser = ArgumentParser() parser.add_argument( "-v", "--version", action="version", version=version("dependabot-alerts") ) - parser.add_argument("organization", help="GitHub user or organization") - - args = parser.parse_args(_argv) - - gh_client = GitHubClient.init() - vulns = fetch_alerts(gh_client, args.organization) - print(format_slack_message(args.organization, vulns)) - - return 0 - - -def format_slack_message( - organization: str, vulns: list[Vulnerability] -) -> str: # pragma: no cover - """ - Format a Slack status report from a list of vulnerabilities. + parser.add_argument( + "--format", choices=["text", "slack"], default="text", help="output format" + ) + parser.add_argument("organization", help="GitHub organization") + args = parser.parse_args(argv) - Returns a message using Slack's "mrkdwn" format. See - https://api.slack.com/reference/surfaces/formatting. - """ - if not vulns: - return "Found no open vulnerabilities." + alerts = GitHub(subprocess.run).alerts(args.organization) - n_repos = len(set(vuln.repo for vuln in vulns)) + formatters = { + "text": format_text, + "slack": format_slack, + } - msg_parts = [] - msg_parts.append(f"*Found {len(vulns)} vulnerabilities in {n_repos} repositories.*") + formatter = formatters[args.format] - for vuln in vulns: - vuln_msg = [] - vuln_msg.append( - f"{organization}/{vuln.repo}: <{vuln.url}|{vuln.package_name} {vuln.severity} - {vuln.title}>" - ) - if vuln.pr: - vuln_msg.append(f" Resolved by {vuln.pr}") - msg_parts.append("\n".join(vuln_msg)) + output = formatter(alerts, args.organization) - return "\n\n".join(msg_parts) + if output: + print(output) diff --git a/src/dependabot_alerts/core.py b/src/dependabot_alerts/core.py index c28f5f7..ced2ef4 100644 --- a/src/dependabot_alerts/core.py +++ b/src/dependabot_alerts/core.py @@ -1,184 +1,65 @@ -import json -import os -from dataclasses import dataclass -from getpass import getpass -from typing import Optional - -import requests - - -class GitHubClient: # pragma: no cover - """ - Client for GitHub's GraphQL API. +from __future__ import annotations - See https://docs.github.com/en/graphql. - """ +import json +from dataclasses import dataclass, field +from subprocess import CalledProcessError - def __init__(self, token): - self.token = token - self.endpoint = "https://api.github.com/graphql" - def query(self, query, variables=None): - data = {"query": query, "variables": variables if variables is not None else {}} - result = requests.post( - url=self.endpoint, - headers={"Authorization": f"Bearer {self.token}"}, - data=json.dumps(data), - timeout=(30, 30), - ) - body = result.json() - result.raise_for_status() - if "errors" in body: - errors = body["errors"] - raise RuntimeError(f"Query failed: {json.dumps(errors)}") - return body["data"] +@dataclass(frozen=True) +class Alert: # pylint:disable=too-many-instance-attributes + repo_full_name: str | None + ghsa_id: str | None + html_url: str | None = field(compare=False) + package: str | None = field(compare=False) + manifest_path: str | None = field(compare=False) + summary: str | None = field(compare=False) + severity: str | None = field(compare=False) + duplicates: list[Alert] = field(compare=False, default_factory=list) @classmethod - def init(cls): - """ - Initialize an authenticated GitHubClient. - - This will read from the `GITHUB_TOKEN` env var if set, or prompt for - a token otherwise. - """ - access_token = os.environ.get("GITHUB_TOKEN") - if not access_token: - access_token = getpass("GitHub API token: ") - return GitHubClient(access_token) - - -@dataclass -class Vulnerability: # pylint:disable=too-many-instance-attributes - repo: str - """Repository where this vulnerability was reported.""" - - created_at: str - """ISO date when this alert was created.""" - - package_name: str - """Name of the vulnerable package.""" - - ecosystem: str - """Package ecosytem (eg. npm) that the package comes from.""" - - severity: str - """Vulnerability severity level.""" - - version_range: str - """Version ranges of package affected by vulnerability.""" - - number: str - """Number of this vulnerability report.""" - - url: str - """Link to the vulernability report on GitHub.""" - - pr: Optional[str] - """Link to the Dependabot update PR that resolves this vulnerability.""" - - title: str - """Summary of what the vulnerability is.""" - - -def fetch_alerts( - gh: GitHubClient, organization: str -) -> list[Vulnerability]: # pragma: no cover - """ - Fetch details of all open vulnerability alerts in `organization`. - - To reduce the volume of noise, especially for repositories which include the - same dependency in multiple lockfiles, only one vulnerability is reported - per package per repository. - - Vulnerabilities are not reported from archived repositories. - """ - # pylint:disable=too-many-locals - - query = """ -query($organization: String!, $cursor: String) { - organization(login: $organization) { - repositories(first: 100, after: $cursor) { - pageInfo { - endCursor - hasNextPage - } - nodes { - name - vulnerabilityAlerts(first: 100, states:OPEN) { - nodes { - number - createdAt - dependabotUpdate { - pullRequest { - url - } - } - securityAdvisory { - summary - } - securityVulnerability { - package { - name - ecosystem - } - severity - vulnerableVersionRange - } - } - } - } - } - } -} -""" - - vulns = [] - cursor = None - has_next_page = True - - while has_next_page: - result = gh.query( - query=query, variables={"organization": organization, "cursor": cursor} + def make(cls, alert_dict): + return cls( + repo_full_name=alert_dict["repository"]["full_name"], + ghsa_id=alert_dict["security_advisory"]["ghsa_id"], + html_url=alert_dict["html_url"], + package=alert_dict["dependency"]["package"]["name"], + manifest_path=alert_dict["dependency"]["manifest_path"], + summary=alert_dict["security_advisory"]["summary"], + severity=alert_dict["security_advisory"]["severity"], ) - page_info = result["organization"]["repositories"]["pageInfo"] - cursor = page_info["endCursor"] - has_next_page = page_info["hasNextPage"] - - for repo in result["organization"]["repositories"]["nodes"]: - alerts = repo["vulnerabilityAlerts"]["nodes"] - - if alerts: - repo_name = repo["name"] - vulnerable_packages = set() - - for alert in alerts: - sa = alert["securityAdvisory"] - sv = alert["securityVulnerability"] - number = alert["number"] - package_name = sv["package"]["name"] - - if package_name in vulnerable_packages: - continue - vulnerable_packages.add(package_name) - - pr = None - - dep_update = alert["dependabotUpdate"] - if dep_update and dep_update["pullRequest"]: - pr = dep_update["pullRequest"]["url"] - vuln = Vulnerability( - repo=repo_name, - created_at=alert["createdAt"], - ecosystem=sv["package"]["ecosystem"], - number=number, - package_name=sv["package"]["name"], - pr=pr, - severity=sv["severity"], - title=sa["summary"], - url=f"https://github.com/{organization}/{repo_name}/security/dependabot/{number}", - version_range=sv["vulnerableVersionRange"], - ) - vulns.append(vuln) - return vulns +class GitHub: + def __init__(self, run): + self._run = run + + def alerts(self, organization) -> list[Alert]: + try: + result = self._run( + [ + "gh", + "api", + "--paginate", + f"/orgs/{organization}/dependabot/alerts?state=open", + ], + check=True, + capture_output=True, + text=True, + ) + except CalledProcessError as err: # pragma: no cover + print(err.stdout) + print(err.stderr) + raise + + alert_dicts = json.loads(result.stdout) + + alerts = {} + + for alert_dict in alert_dicts: + alert = Alert.make(alert_dict) + if alert in alerts: + alerts[alert].duplicates.append(alert) + else: + alerts[alert] = alert + + return list(alerts.values()) diff --git a/src/dependabot_alerts/format.py b/src/dependabot_alerts/format.py new file mode 100644 index 0000000..f9d447c --- /dev/null +++ b/src/dependabot_alerts/format.py @@ -0,0 +1,47 @@ +"""Functions for serializing lists of Alerts into different string formats.""" +from dependabot_alerts.core import Alert + + +def format_text(alerts: list[Alert], _organization: str) -> str: + return "\n".join( + [ + " ".join( + [ + alert.repo_full_name, + alert.ghsa_id, + f"alerts:{len(alert.duplicates) + 1}", + f"severity:{alert.severity}", + alert.package, + alert.html_url, + alert.summary, + ] + ) + for alert in alerts + ] + ) + + +def format_slack(alerts: list[Alert], organization: str) -> str: + if not alerts: + return f"There are no Dependabot security alerts in the {organization} GitHub organization." + + return "\n".join( + [ + f"*There are Dependabot security alerts in the {organization} GitHub organization:*", + "", + *[ + " ".join( + [ + f"<{alert.html_url}|{alert.repo_full_name} {alert.ghsa_id}>", + f"alerts:{len(alert.duplicates) + 1}", + f"severity:{alert.severity}", + f"`{alert.package}`", + alert.summary, + ] + ) + for alert in alerts + ], + "", + "Message generated by the `alerts.yml` workflow ", + ] + ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a2f07a7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,79 @@ +import factory +from pytest_factoryboy import named_model, register + +from dependabot_alerts.core import Alert + + +@register +class AlertFactory(factory.Factory): + """Factory for Alert objects.""" + + class Meta: + model = Alert + exclude = ("organization", "repo") + + organization = factory.Sequence(lambda n: f"organization-{n}") + repo = factory.Sequence(lambda n: f"repo-{n}") + repo_full_name = factory.LazyAttribute(lambda o: f"{o.organization}/{o.repo}") + ghsa_id = factory.Sequence(lambda n: f"GHSA-{n}") + html_url = factory.LazyAttributeSequence( + lambda o, n: f"https://github.com/{o.organization}/{o.repo}/security/dependabot/{n}" + ) + package = factory.Sequence(lambda n: f"package-{n}") + manifest_path = factory.Sequence(lambda n: f"manifest_path-{n}") + summary = factory.Sequence(lambda n: f"summary-{n}") + severity = factory.Faker("random_element", elements=["low", "medium", "high"]) + duplicates = [] + + @classmethod + def _adjust_kwargs(cls, **kwargs): + if isinstance(kwargs["duplicates"], int): + # Enable alert_factory(duplicates=n) to generate an Alert whose + # Alert.duplicates attr is a list of n generated duplicate Alert's. + kwargs["duplicates"] = cls.create_batch( + size=kwargs["duplicates"], + # Certain attributes would be expected to be the same for an + # Alert's duplicates. + organization=kwargs["organization"], + repo=kwargs["repo"], + ghsa_id=kwargs["ghsa_id"], + package=kwargs["package"], + summary=kwargs["summary"], + severity=kwargs["severity"], + ) + + return kwargs + + +@register +class AlertDictFactory(AlertFactory): + """Factory for alert dicts as returned by the GitHub API.""" + + class Meta: + model = named_model(dict, "AlertDict") + + @factory.post_generation + def post(obj, *_args, **_kwargs): # pylint:disable=no-self-argument + """Transform the generated dict into the format returned by the GitHub API.""" + # pylint:disable=no-member + repo_full_name = obj.pop("repo_full_name") + ghsa_id = obj.pop("ghsa_id") + package = obj.pop("package") + manifest_path = obj.pop("manifest_path") + summary = obj.pop("summary") + severity = obj.pop("severity") + del obj["duplicates"] + + # Serialise a dict in the format returned by the GitHub API. + obj["repository"] = {"full_name": repo_full_name} + obj["dependency"] = { + "package": { + "name": package, + }, + "manifest_path": manifest_path, + } + obj["security_advisory"] = { + "ghsa_id": ghsa_id, + "summary": summary, + "severity": severity, + } diff --git a/tests/functional/sanity_test.py b/tests/functional/sanity_test.py deleted file mode 100644 index b76ddae..0000000 --- a/tests/functional/sanity_test.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_it(): - assert True diff --git a/tests/unit/dependabot_alerts/cli_test.py b/tests/unit/dependabot_alerts/cli_test.py index e7816ae..a1475f7 100644 --- a/tests/unit/dependabot_alerts/cli_test.py +++ b/tests/unit/dependabot_alerts/cli_test.py @@ -5,6 +5,57 @@ from dependabot_alerts.cli import cli +def test_it(GitHub, github, subprocess, capsys, format_text): + cli(["test-organization"]) + + GitHub.assert_called_once_with(subprocess.run) + github.alerts.assert_called_once_with("test-organization") + format_text.assert_called_once_with(github.alerts.return_value, "test-organization") + captured = capsys.readouterr() + assert captured.out == f"{format_text.return_value}\n" + assert not captured.err + + +def test_format_slack(capsys, github, format_slack): + cli(["--format", "slack", "test-organization"]) + + format_slack.assert_called_once_with( + github.alerts.return_value, "test-organization" + ) + captured = capsys.readouterr() + assert captured.out == f"{format_slack.return_value}\n" + + +def test_it_prints_nothing_if_there_are_no_alerts(capsys, format_text): + format_text.return_value = None + + cli(["test-organization"]) + + captured = capsys.readouterr() + assert not captured.out + assert not captured.err + + +def test_it_crashes_if_no_organization_argument_is_given(capsys): + with pytest.raises(SystemExit) as exc_info: + cli([]) + + assert exc_info.value.code == 2 + captured = capsys.readouterr() + assert not captured.out + assert "error: the following arguments are required: organization" in captured.err + + +def test_it_crashes_if_an_invalid_format_is_given(capsys): + with pytest.raises(SystemExit) as exc_info: + cli(["--format", "foo", "organization"]) + + assert exc_info.value.code == 2 + captured = capsys.readouterr() + assert not captured.out + assert "error: argument --format: invalid choice: 'foo'" in captured.err + + def test_help(): with pytest.raises(SystemExit) as exc_info: cli(["--help"]) @@ -18,3 +69,28 @@ def test_version(capsys): assert capsys.readouterr().out.strip() == version("dependabot-alerts") assert not exc_info.value.code + + +@pytest.fixture(autouse=True) +def GitHub(mocker): + return mocker.patch("dependabot_alerts.cli.GitHub") + + +@pytest.fixture +def github(GitHub): + return GitHub.return_value + + +@pytest.fixture(autouse=True) +def format_slack(mocker): + return mocker.patch("dependabot_alerts.cli.format_slack") + + +@pytest.fixture(autouse=True) +def format_text(mocker): + return mocker.patch("dependabot_alerts.cli.format_text") + + +@pytest.fixture(autouse=True) +def subprocess(mocker): + return mocker.patch("dependabot_alerts.cli.subprocess") diff --git a/tests/unit/dependabot_alerts/core_test.py b/tests/unit/dependabot_alerts/core_test.py new file mode 100644 index 0000000..62ad5bf --- /dev/null +++ b/tests/unit/dependabot_alerts/core_test.py @@ -0,0 +1,77 @@ +import json +import subprocess +from unittest.mock import Mock, create_autospec + +import pytest + +from dependabot_alerts.core import GitHub + + +class TestGitHub: + def test_alerts(self, github, run, alert_factory, alert_dict_factory): + run.return_value = Mock( + stdout=json.dumps( + [ + *alert_dict_factory.create_batch( + size=2, + organization="hypothesis", + repo="foo", + ghsa_id="GHSA-1", + ), + alert_dict_factory( + organization="hypothesis", + repo="foo", + ghsa_id="GHSA-2", + ), + alert_dict_factory( + organization="hypothesis", + repo="bar", + ghsa_id="GHSA-1", + ), + ] + ) + ) + + repos = github.alerts("hypothesis") + + run.assert_called_once_with( + [ + "gh", + "api", + "--paginate", + "/orgs/hypothesis/dependabot/alerts?state=open", + ], + check=True, + capture_output=True, + text=True, + ) + assert repos == [ + alert_factory( + organization="hypothesis", + repo="foo", + ghsa_id="GHSA-1", + duplicates=1, + ), + alert_factory( + organization="hypothesis", + repo="foo", + ghsa_id="GHSA-2", + ), + alert_factory( + organization="hypothesis", + repo="bar", + ghsa_id="GHSA-1", + ), + ] + + def test_alerts_returns_an_empty_dict_if_there_are_no_alerts(self, github, run): + run.return_value = Mock(stdout=json.dumps([])) + assert github.alerts("hypothesis") == [] + + @pytest.fixture + def run(self): + return create_autospec(subprocess.run) + + @pytest.fixture + def github(self, run): + return GitHub(run) diff --git a/tests/unit/dependabot_alerts/format_test.py b/tests/unit/dependabot_alerts/format_test.py new file mode 100644 index 0000000..bcc5d5b --- /dev/null +++ b/tests/unit/dependabot_alerts/format_test.py @@ -0,0 +1,63 @@ +from unittest.mock import sentinel + +import pytest + +from dependabot_alerts.format import format_slack, format_text + + +class TestFormatText: + def test_with_no_alerts(self): + assert not format_text([], sentinel.organization) + + def test_with_alerts(self, alerts): + assert format_text(alerts, sentinel.organization) == "\n".join( + [ + "organization/foo GHSA-1 alerts:1 severity:low package_1 html_url_1 summary_1", + "organization/bar GHSA-2 alerts:3 severity:high package_2 html_url_2 summary_2", + ] + ) + + +class TestFormatSlack: + def test_with_no_alerts(self): + assert ( + format_slack([], "hypothesis") + == "There are no Dependabot security alerts in the hypothesis GitHub organization." + ) + + def test_with_alerts(self, alerts): + assert format_slack(alerts, "hypothesis") == "\n".join( + [ + "*There are Dependabot security alerts in the hypothesis GitHub organization:*", + "", + " alerts:1 severity:low `package_1` summary_1", + " alerts:3 severity:high `package_2` summary_2", + "", + "Message generated by the `alerts.yml` workflow ", + ] + ) + + +@pytest.fixture +def alerts(alert_factory): + return [ + alert_factory( + organization="organization", + repo="foo", + ghsa_id="GHSA-1", + package="package_1", + html_url="html_url_1", + summary="summary_1", + severity="low", + ), + alert_factory( + organization="organization", + repo="bar", + ghsa_id="GHSA-2", + package="package_2", + html_url="html_url_2", + summary="summary_2", + severity="high", + duplicates=2, + ), + ]