-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathreposcanner.py
executable file
·292 lines (256 loc) · 10.4 KB
/
reposcanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/python3
import argparse
import binascii
import datetime
import math
import os
import re
import signal
import sys
from collections import Counter
try:
from git import *
from git.exc import NoSuchPathError
except ImportError:
print("\nPython git module missing (apt-get install python3-git)\n")
sys.exit(1)
###########
# Classes #
###########
class col:
if sys.stdout.isatty():
green = '\033[32m'
blue = '\033[94m'
red = '\033[31m'
brown = '\033[33m'
grey = '\033[90m'
end = '\033[0m'
else: # Colours mess up redirected output, disable them
green = ""
blue = ""
red = ""
brown = ""
grey = ""
end = ""
class Commit():
def __init__(self, sha, author, date, branch):
self.paths = {}
self.sha = sha
self.date = datetime.datetime.utcfromtimestamp(date).strftime('%d/%m/%Y %H:%M:%S')
self.author = author
self.branch = branch
def add_path(self, path):
self.paths[path] = Path(path)
def get_path(self, path):
if path in self.paths:
return path
else:
return False
def add_item(self, path, key_string, full_string):
self.paths[path].add_item(key_string, full_string)
def get_items(self, path):
return self.paths[path].get_items()
class Path():
def __init__(self, path):
self.path = path
self.items = {}
def add_item(self, key_string, full_string):
self.items[key_string] = full_string
def get_items(self):
return self.items.items()
#############
# Functions #
#############
# Calculate entropy
def get_entropy(data):
if len(data) <= 1:
return 0
p, lns = Counter(data), float(len(data))
return -sum( count/lns * math.log(count/lns, 2) for count in p.values())
def final_output(loot, error=False):
for sha,commit in loot.items():
print("%sCommit : %s@%s%s" % (col.blue, branch, sha, col.end))
print("%sDate : %s%s" % (col.blue, commit.date, col.end))
print("%sAuthor : %s%s" % (col.blue, commit.author, col.end))
for path in commit.paths:
if "github.com" in args.repo.lower():
url = args.repo + "/blob/" + sha + "/" + col.brown + path
print("%s%s%s" % (col.grey, url, col.end))
else:
print("%s%s%s" % (col.brown, path, col.end))
for key,full in commit.get_items(path):
output_string = full.replace(key, col.green + key + col.end)
print(output_string)
print("")
print("")
if error:
sys.exit(1)
else:
sys.exit(0)
# Main parsing loop
def scan_branch(repo, branch, count):
global loot, scanned_commits, regexes_key, regexes_full
prev_commit = None
print("Scanning branch %s" % str(branch))
commit_count = sum(1 for i in repo.iter_commits(branch, max_count=args.count)) - 1
for count,commit in enumerate(repo.iter_commits(branch, max_count=args.count)):
if sys.stdout.isatty():
print("Parsing commit %s%s/%s%s\r" % (col.green, count, commit_count, col.end), end="", flush=True)
if not prev_commit:
prev_commit = commit
continue
sha = prev_commit.hexsha
if sha in scanned_commits:
prev_commit = commit
continue
diff = commit.diff(prev_commit)
author = commit.author
date = commit.authored_date
for index in diff:
path = index.a_path
if path.lower().endswith(ignored_extensions) or path.lower().endswith(ignored_files):
continue
difftext = commit.diff(prev_commit, create_patch=True, paths=path)
for blob in difftext:
try:
lines = blob.diff.decode("utf-8").split("\n")
except UnicodeDecodeError:
lines = str(blob.diff).split("\n")
for line in lines:
if not line.startswith("+"):
continue
matches = re.findall("((^|\n).*?([a-zA-Z0-9+/=]{16,}|[A-Fa-f0-9]{12,).*?($|\n))", line[1:])
for m in matches:
entropy = 0
full_string = m[0].lstrip()
key_string = m[2]
# Very long strings are probably embeded files
if len(full_string) > args.length:
continue
entropy = get_entropy(key_string)
# Check if string is hexadecimal
try:
binascii.unhexlify(key_string)
# Lower entropy requirement for hex strings
entropy += 1.3
except ValueError:
pass
if entropy > args.entropy:
# Ignore certain patterns and strings we've already seen
if key_string in key_strings_found:
continue
# Check against regexes of boring patterns to include
matched = False
for regex in regexes_key:
if re.search(regex, key_string):
matched = True
break
if not matched:
for regex in regexes_full:
if re.match(regex, full_string):
matched = True
break
if matched:
continue
key_strings_found.append(key_string)
if args.verbose:
print("%s : %s" % (entropy, key_string))
# Add the commit if it doesn't exist
if not sha in loot:
loot[sha] = Commit(sha, author, date, branch)
if loot[sha].get_path(path):
loot[sha].add_item(path, key_string, full_string)
else:
loot[sha].add_path(path)
loot[sha].add_item(path, key_string, full_string)
prev_commit = commit
scanned_commits.append(sha)
print("")
# Declare some variables
key_strings_found = []
found = []
loot = {}
scanned_commits = []
ignored_extensions = (
".css", ".woff", ".woff2", ".jpg", ".jpeg", ".png", ".gif", ".ico", ".svg", ".tiff",
".ttf", ".eot", ".pyc", ".exe", ".dll", ".jar", ".apk", ".gz", ".zip", "csproj"
)
ignored_files = (
"composer.lock",
"vendor/composer/installed.json",
"gemfile.lock",
"yarn.lock",
"package-lock.json"
)
regexes_key = []
regexes_full = []
# Search for these strings in the key string
regexes_key.append(re.compile("[a-z]+/[a-z]+/[a-z]+/[a-z]+", re.IGNORECASE)) # Path
regexes_key.append(re.compile("abcdef", re.IGNORECASE)) # Alphabet
regexes_key.append(re.compile("[a-z]+[A-Z][a-z]+[A-Z[a-z]+[A-Z][a-zA-Z]+")) # camelCase
# Match against the full string
regexes_full.append(re.compile("Subproject commit [a-f0-9]{40}")) # Subproject commit
regexes_full.append(re.compile("\"commit\": \"[a-f0-9]{40}\"")) # Commit message
regexes_full.append(re.compile("publicKeyToken=\"[a-f0-9]{16}\"")) # .NET Public Key Token
regexes_full.append(re.compile(".*[a-f0-9]{12,}\.(css|js)", re.IGNORECASE)) # CSS or JS filenames
regexes_full.append(re.compile("[<>]{7} [a-f0-9]{40}", re.IGNORECASE)) # CSS or JS filenames
# Catch Ctrl+C
def signal_handler(signal, frame):
print("%sCaught Ctrl+C, exiting..%s" % (col.red, col.end))
final_output(loot, True)
signal.signal(signal.SIGINT, signal_handler)
# Parse arguments
parser = argparse.ArgumentParser('reposcanner.py', formatter_class=lambda prog:argparse.HelpFormatter(prog,max_help_position=40))
parser.add_argument('-r', '--repo', help='Repo to scan', dest='repo', required=True)
parser.add_argument('-c', '--count', help='Number of commits to scan (default all)', dest='count', default=sys.maxsize, type=int)
parser.add_argument('-e', '--entropy', help='Minimum entropy to report (default 4.3)', dest='entropy', default=4.3, type=float)
parser.add_argument('-l', '--length', help='Maxmimum line length (default 500)', dest='length', default=500, type=int)
parser.add_argument('-b', '--branch', help='Branch to scan', dest='branch' )
parser.add_argument('-v', '--verbose', help='Verbose output', dest='verbose', action='store_true', default=False)
args = parser.parse_args()
# Check if repo exists locally, otherwise try and clone it
try:
repo_name = args.repo.rsplit("/", 1)[1]
except IndexError:
repo_name = args.repo
if os.path.isdir(repo_name):
try:
repo = Repo(repo_name)
print("Using local copy of repo...")
except NoSuchPathError:
print(col.red + "Invalid repo " + repo_name + col.end)
sys.exit(1)
else:
try:
print("Trying to clone repo %s from %s..." % (repo_name, args.repo))
repo = Repo.clone_from(args.repo, repo_name)
print("Repo cloned sucessfully.\n")
except GitCommandError as e:
print("\n%sFailed to clone repo%s\n" % (col.red, col.end))
print(e)
sys.exit(1)
branches = repo.refs
# Off by one
args.count += 1
# Get active branch if none specified
if args.branch:
branch = "origin/" + args.branch
if branch in branches:
scan_branch(repo, branch, args.count)
else:
print("%sInvalid branch specified%s\n" % (col.red, col.end))
sys.exit(1)
else:
for branch in branches:
# Skip tags, HEAD and any invalid branches
if (
isinstance(branch, TagReference)
or str(branch) == "origin/HEAD"
or not str(branch).startswith("origin")
):
continue
scan_branch(repo, branch, args.count)
# Output
if sys.stdout.isatty():
print(" \r", end="")
final_output(loot)