-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify the code by using GitHub CLI and the GitHub REST API #6
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
`dependabot-alerts` lists Dependabot security alerts for all repos of a GitHub | ||
user or organization. You can run it from the command line: | ||
organization. You can run it from the command line: | ||
|
||
```terminal | ||
$ dependabot-alerts <your_github_user_or_organization> | ||
$ dependabot-alerts <your_github_organization> | ||
``` | ||
|
||
You'll need to have [GitHub CLI](https://cli.github.com/) installed and logged in. | ||
|
||
There's also a [GitHub Actions workflow](.github/workflows/alert.yml) that runs | ||
automatically on a schedule and notifies us in Slack of any Dependabot alerts | ||
in the `hypothesis` GitHub organization. |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,49 +1,32 @@ | ||
import subprocess | ||
from argparse import ArgumentParser | ||
from importlib.metadata import version | ||
|
||
from dependabot_alerts.core import GitHubClient, Vulnerability, fetch_alerts | ||
from dependabot_alerts.core import GitHub | ||
from dependabot_alerts.format import format_slack, format_text | ||
|
||
|
||
def cli(_argv=None): # pragma: no cover | ||
def cli(argv=None): | ||
parser = ArgumentParser() | ||
parser.add_argument( | ||
"-v", "--version", action="version", version=version("dependabot-alerts") | ||
) | ||
parser.add_argument("organization", help="GitHub user or organization") | ||
|
||
args = parser.parse_args(_argv) | ||
|
||
gh_client = GitHubClient.init() | ||
vulns = fetch_alerts(gh_client, args.organization) | ||
print(format_slack_message(args.organization, vulns)) | ||
|
||
return 0 | ||
|
||
|
||
def format_slack_message( | ||
organization: str, vulns: list[Vulnerability] | ||
) -> str: # pragma: no cover | ||
""" | ||
Format a Slack status report from a list of vulnerabilities. | ||
parser.add_argument( | ||
"--format", choices=["text", "slack"], default="text", help="output format" | ||
) | ||
parser.add_argument("organization", help="GitHub organization") | ||
args = parser.parse_args(argv) | ||
|
||
Returns a message using Slack's "mrkdwn" format. See | ||
https://api.slack.com/reference/surfaces/formatting. | ||
""" | ||
if not vulns: | ||
return "Found no open vulnerabilities." | ||
alerts = GitHub(subprocess.run).alerts(args.organization) | ||
|
||
n_repos = len(set(vuln.repo for vuln in vulns)) | ||
formatters = { | ||
"text": format_text, | ||
"slack": format_slack, | ||
} | ||
|
||
msg_parts = [] | ||
msg_parts.append(f"*Found {len(vulns)} vulnerabilities in {n_repos} repositories.*") | ||
formatter = formatters[args.format] | ||
|
||
for vuln in vulns: | ||
vuln_msg = [] | ||
vuln_msg.append( | ||
f"{organization}/{vuln.repo}: <{vuln.url}|{vuln.package_name} {vuln.severity} - {vuln.title}>" | ||
) | ||
if vuln.pr: | ||
vuln_msg.append(f" Resolved by {vuln.pr}") | ||
msg_parts.append("\n".join(vuln_msg)) | ||
output = formatter(alerts, args.organization) | ||
|
||
return "\n\n".join(msg_parts) | ||
if output: | ||
print(output) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,184 +1,68 @@ | ||
import json | ||
import os | ||
from dataclasses import dataclass | ||
from getpass import getpass | ||
from typing import Optional | ||
|
||
import requests | ||
|
||
|
||
class GitHubClient: # pragma: no cover | ||
""" | ||
Client for GitHub's GraphQL API. | ||
|
||
See https://docs.github.com/en/graphql. | ||
""" | ||
from __future__ import annotations | ||
|
||
def __init__(self, token): | ||
self.token = token | ||
self.endpoint = "https://api.github.com/graphql" | ||
|
||
def query(self, query, variables=None): | ||
data = {"query": query, "variables": variables if variables is not None else {}} | ||
result = requests.post( | ||
url=self.endpoint, | ||
headers={"Authorization": f"Bearer {self.token}"}, | ||
data=json.dumps(data), | ||
timeout=(30, 30), | ||
) | ||
body = result.json() | ||
result.raise_for_status() | ||
if "errors" in body: | ||
errors = body["errors"] | ||
raise RuntimeError(f"Query failed: {json.dumps(errors)}") | ||
return body["data"] | ||
import json | ||
import sys | ||
from dataclasses import dataclass, field | ||
from subprocess import CalledProcessError | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Alert: # pylint:disable=too-many-instance-attributes | ||
repo_full_name: str | None | ||
ghsa_id: str | None | ||
html_url: str | None = field(compare=False) | ||
package: str | None = field(compare=False) | ||
manifest_path: str | None = field(compare=False) | ||
summary: str | None = field(compare=False) | ||
severity: str | None = field(compare=False) | ||
duplicates: list[Alert] = field(compare=False, default_factory=list) | ||
|
||
@classmethod | ||
def init(cls): | ||
""" | ||
Initialize an authenticated GitHubClient. | ||
|
||
This will read from the `GITHUB_TOKEN` env var if set, or prompt for | ||
a token otherwise. | ||
""" | ||
access_token = os.environ.get("GITHUB_TOKEN") | ||
if not access_token: | ||
access_token = getpass("GitHub API token: ") | ||
return GitHubClient(access_token) | ||
|
||
|
||
@dataclass | ||
class Vulnerability: # pylint:disable=too-many-instance-attributes | ||
repo: str | ||
"""Repository where this vulnerability was reported.""" | ||
|
||
created_at: str | ||
"""ISO date when this alert was created.""" | ||
|
||
package_name: str | ||
"""Name of the vulnerable package.""" | ||
|
||
ecosystem: str | ||
"""Package ecosytem (eg. npm) that the package comes from.""" | ||
|
||
severity: str | ||
"""Vulnerability severity level.""" | ||
|
||
version_range: str | ||
"""Version ranges of package affected by vulnerability.""" | ||
|
||
number: str | ||
"""Number of this vulnerability report.""" | ||
|
||
url: str | ||
"""Link to the vulernability report on GitHub.""" | ||
|
||
pr: Optional[str] | ||
"""Link to the Dependabot update PR that resolves this vulnerability.""" | ||
|
||
title: str | ||
"""Summary of what the vulnerability is.""" | ||
|
||
|
||
def fetch_alerts( | ||
gh: GitHubClient, organization: str | ||
) -> list[Vulnerability]: # pragma: no cover | ||
""" | ||
Fetch details of all open vulnerability alerts in `organization`. | ||
|
||
To reduce the volume of noise, especially for repositories which include the | ||
same dependency in multiple lockfiles, only one vulnerability is reported | ||
per package per repository. | ||
|
||
Vulnerabilities are not reported from archived repositories. | ||
""" | ||
# pylint:disable=too-many-locals | ||
|
||
query = """ | ||
query($organization: String!, $cursor: String) { | ||
organization(login: $organization) { | ||
repositories(first: 100, after: $cursor) { | ||
pageInfo { | ||
endCursor | ||
hasNextPage | ||
} | ||
nodes { | ||
name | ||
vulnerabilityAlerts(first: 100, states:OPEN) { | ||
nodes { | ||
number | ||
createdAt | ||
dependabotUpdate { | ||
pullRequest { | ||
url | ||
} | ||
} | ||
securityAdvisory { | ||
summary | ||
} | ||
securityVulnerability { | ||
package { | ||
name | ||
ecosystem | ||
} | ||
severity | ||
vulnerableVersionRange | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
""" | ||
|
||
vulns = [] | ||
cursor = None | ||
has_next_page = True | ||
|
||
while has_next_page: | ||
result = gh.query( | ||
query=query, variables={"organization": organization, "cursor": cursor} | ||
def make(cls, alert_dict): | ||
return cls( | ||
repo_full_name=alert_dict["repository"]["full_name"], | ||
ghsa_id=alert_dict["security_advisory"]["ghsa_id"], | ||
html_url=alert_dict["html_url"], | ||
package=alert_dict["dependency"]["package"]["name"], | ||
manifest_path=alert_dict["dependency"]["manifest_path"], | ||
summary=alert_dict["security_advisory"]["summary"], | ||
severity=alert_dict["security_advisory"]["severity"], | ||
) | ||
page_info = result["organization"]["repositories"]["pageInfo"] | ||
cursor = page_info["endCursor"] | ||
has_next_page = page_info["hasNextPage"] | ||
|
||
for repo in result["organization"]["repositories"]["nodes"]: | ||
alerts = repo["vulnerabilityAlerts"]["nodes"] | ||
|
||
if alerts: | ||
repo_name = repo["name"] | ||
vulnerable_packages = set() | ||
|
||
for alert in alerts: | ||
sa = alert["securityAdvisory"] | ||
sv = alert["securityVulnerability"] | ||
number = alert["number"] | ||
package_name = sv["package"]["name"] | ||
|
||
if package_name in vulnerable_packages: | ||
continue | ||
vulnerable_packages.add(package_name) | ||
|
||
pr = None | ||
|
||
dep_update = alert["dependabotUpdate"] | ||
if dep_update and dep_update["pullRequest"]: | ||
pr = dep_update["pullRequest"]["url"] | ||
|
||
vuln = Vulnerability( | ||
repo=repo_name, | ||
created_at=alert["createdAt"], | ||
ecosystem=sv["package"]["ecosystem"], | ||
number=number, | ||
package_name=sv["package"]["name"], | ||
pr=pr, | ||
severity=sv["severity"], | ||
title=sa["summary"], | ||
url=f"https://github.com/{organization}/{repo_name}/security/dependabot/{number}", | ||
version_range=sv["vulnerableVersionRange"], | ||
) | ||
vulns.append(vuln) | ||
|
||
return vulns | ||
class GitHub: | ||
def __init__(self, run): | ||
self._run = run | ||
|
||
def alerts(self, organization) -> list[Alert]: | ||
try: | ||
result = self._run( | ||
[ | ||
"gh", | ||
"api", | ||
"--paginate", | ||
f"/orgs/{organization}/dependabot/alerts?state=open", | ||
], | ||
check=True, | ||
capture_output=True, | ||
text=True, | ||
) | ||
except CalledProcessError as err: # pragma: no cover | ||
for line in err.stdout.splitlines(): | ||
print(f"GitHub CLI stdout> {line}") | ||
for line in err.stderr.splitlines(): | ||
print(f"GitHub CLI stderr> {line}", file=sys.stderr) | ||
raise | ||
|
||
alert_dicts = json.loads(result.stdout) | ||
|
||
alerts = {} | ||
|
||
for alert_dict in alert_dicts: | ||
alert = Alert.make(alert_dict) | ||
if alert in alerts: | ||
alerts[alert].duplicates.append(alert) | ||
else: | ||
alerts[alert] = alert | ||
Comment on lines
+63
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the deduplication here |
||
|
||
return list(alerts.values()) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only works for organizations, not users. (Not sure if the code on
main
actually works for user either?)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed the code on
main
does not actually work for user accounts. It does with a very slight change to the GraphQL query though - swaporganization(login: ...)
foruser(login: ...)
at the top. Support for querying user accounts might be nice to have for a general-purpose tool, but isn't need for our use case.