forked from brian1917/veracode-mitigation-copier
-
Notifications
You must be signed in to change notification settings - Fork 10
/
MitigationCopier.py
559 lines (457 loc) · 29 KB
/
MitigationCopier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
import sys
import argparse
import logging
import json
import datetime
import os
import anticrlf
from veracode_api_py.api import VeracodeAPI as vapi, Applications, Findings, SCAApplications, Sandboxes
from veracode_api_py.constants import Constants
from veracode_api_signing.credentials import get_credentials
log = logging.getLogger(__name__)
ALLOWED_ACTIONS = ['COMMENT', 'FP', 'APPDESIGN', 'OSENV', 'NETENV', 'REJECTED', 'ACCEPTED', 'LIBRARY', 'ACCEPTRISK',
'APPROVE', 'REJECT', 'BYENV', 'BYDESIGN', 'LEGAL', 'COMMERCIAL', 'EXPERIMENTAL', 'INTERNAL', 'APPROVED']
class VeracodeApiCredentials():
api_key_id = None
api_key_secret = None
def __init__(self, api_key_id, api_key_secret):
self.api_key_id = api_key_id
self.api_key_secret = api_key_secret
def run_with_credentials(self, to_run):
old_id = os.environ.get('veracode_api_key_id', "")
old_secret = os.environ.get('veracode_api_key_secret', "")
os.environ['veracode_api_key_id'] = self.api_key_id
os.environ['veracode_api_key_secret'] = self.api_key_secret
try:
return to_run(None)
finally:
os.environ['veracode_api_key_id'] = old_id
os.environ['veracode_api_key_secret'] = old_secret
def setup_logger():
handler = logging.FileHandler('MitigationCopier.log', encoding='utf8')
handler.setFormatter(anticrlf.LogFormatter('%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'))
log = logging.getLogger(__name__)
log.addHandler(handler)
log.setLevel(logging.INFO)
def creds_expire_days_warning():
creds = vapi().get_creds()
exp = datetime.datetime.strptime(creds['expiration_ts'], "%Y-%m-%dT%H:%M:%S.%f%z")
delta = exp - datetime.datetime.now().astimezone() #we get a datetime with timezone...
if (delta.days < 7):
print('These API credentials expire ', creds['expiration_ts'])
def prompt_for_app(prompt_text):
appguid = ""
app_name_search = input(prompt_text)
app_candidates = Applications().get_by_name(app_name_search)
if len(app_candidates) == 0:
print("No matches were found!")
elif len(app_candidates) > 1:
print("Please choose an application:")
for idx, appitem in enumerate(app_candidates,start=1):
print("{}) {}".format(idx, appitem["profile"]["name"]))
i = input("Enter number: ")
try:
if 0 < int(i) <= len(app_candidates):
appguid = app_candidates[int(i)-1].get('guid')
except ValueError:
appguid = ""
else:
appguid = app_candidates[0].get('guid')
return appguid
def get_app_guid_from_legacy_id(app_id):
app = Applications().get(legacy_id=app_id)
if app is None:
return
return app['_embedded']['applications'][0]['guid']
def get_application_name(guid):
app = Applications().get(guid)
return app['profile']['name']
def get_findings_by_type(app_guid, scan_type='STATIC', sandbox_guid=None):
findings = []
if scan_type == 'STATIC':
findings = Findings().get_findings(app_guid,scantype=scan_type,annot='TRUE',sandbox=sandbox_guid)
elif scan_type == 'DYNAMIC':
findings = Findings().get_findings(app_guid,scantype=scan_type,annot='TRUE')
return findings
def logprint(log_msg):
log.info(log_msg)
print(log_msg)
def filter_approved(findings,id_list, skip_id_list):
if skip_id_list is not None:
log.info('Skipping the following findings provided in skip_id_list: {}'.format(id_list))
findings = [f for f in findings if not f['issue_id'] in skip_id_list]
elif id_list is not None:
log.info('Only copying the following findings provided in id_list: {}'.format(id_list))
findings = [f for f in findings if f['issue_id'] in id_list]
return [f for f in findings if (f['finding_status']['resolution_status'] == 'APPROVED')]
def format_file_path(file_path):
# special case - omit prefix for teamcity work directories, which look like this:
# teamcity/buildagent/work/d2a72efd0db7f7d7
if file_path is None:
return ''
suffix_length = len(file_path)
buildagent_loc = file_path.find('teamcity/buildagent/work/')
if buildagent_loc > 0:
#strip everything starting with this prefix plus the 17 characters after
# (25 characters for find string, 16 character random hash value, plus / )
formatted_file_path = file_path[(buildagent_loc + 42):suffix_length]
else:
formatted_file_path = file_path
return formatted_file_path
def create_match_format_policy(app_guid, sandbox_guid, policy_findings, finding_type):
findings = []
if finding_type == 'STATIC':
thesefindings = [{'app_guid': app_guid,
'sandbox_guid': sandbox_guid,
'id': pf['issue_id'],
'resolution': pf['finding_status']['resolution'],
'cwe': pf['finding_details']['cwe']['id'],
'procedure': pf['finding_details'].get('procedure'),
'relative_location': pf['finding_details'].get('relative_location'),
'source_file': format_file_path(pf['finding_details'].get('file_path')),
'line': pf['finding_details'].get('file_line_number'),
'finding': pf} for pf in policy_findings]
findings.extend(thesefindings)
elif finding_type == 'DYNAMIC':
thesefindings = [{'app_guid': app_guid,
'id': pf['issue_id'],
'resolution': pf['finding_status']['resolution'],
'cwe': pf['finding_details']['cwe']['id'],
'path': pf['finding_details']['path'],
'vulnerable_parameter': pf['finding_details'].get('vulnerable_parameter',''), # vulnerable_parameter may not be populated for some info leak findings
'finding': pf} for pf in policy_findings]
findings.extend(thesefindings)
return findings
def format_application_name(guid, app_name, sandbox_guid=None):
if sandbox_guid is None:
formatted_name = 'application {} (guid: {})'.format(app_name,guid)
else:
formatted_name = 'sandbox {} in application {} (guid: {})'.format(sandbox_guid,app_name,guid)
return formatted_name
def submit_sca_mitigation(app_guid, action, comment, component_id, annotation_type, issue_id):
try:
if annotation_type == "vulnerability":
SCAApplications().add_annotation(app_guid=app_guid, action=action, comment=comment, annotation_type="VULNERABILITY",
component_id=component_id,cve_name=issue_id)
else:
SCAApplications().add_annotation(app_guid=app_guid, action=action, comment=comment, annotation_type="LICENSE",
component_id=component_id,license_id=issue_id)
log.info(f'Updated {annotation_type} mitigation information to {action} for component {component_id} and issue_id {issue_id} in application {app_guid}')
return True
except:
log.error(f'Unable to submit {annotation_type} mitigation information to {action} for component {component_id} and issue_id {issue_id} in application {app_guid}')
return False
def update_sca_mitigation_info_rest(app_guid, action, comment, annotation_type, component_id, issue_id, propose_only):
# validate length of comment argument, gracefully handle overage
if len(comment) > 2048:
comment = comment[0:2048]
if not action in ALLOWED_ACTIONS:
log.warning(f'Cannot copy {action} mitigation for component {component_id} and issue_id {issue_id} in {app_guid}')
return
elif action == 'APPROVE':
if propose_only:
log.warning(f'propose_only set to True; skipping applying approval for component {component_id} and issue_id {issue_id} in {app_guid}')
return
return submit_sca_mitigation(app_guid, action, comment, component_id, annotation_type, issue_id)
def update_mitigation_info_rest(to_app_guid,flaw_id,action,comment,sandbox_guid=None, propose_only=False):
# validate length of comment argument, gracefully handle overage
if len(comment) > 2048:
comment = comment[0:2048]
if not action in ALLOWED_ACTIONS:
log.warning('Cannot copy {} mitigation for Flaw ID {} in {}'.format(action,flaw_id,to_app_guid))
return
elif action == 'APPROVED':
if propose_only:
log.info('propose_only set to True; skipping applying approval for flaw_id {}'.format(flaw_id))
return
action = Constants.ANNOT_TYPE[action]
flaw_id_list = [flaw_id]
if sandbox_guid==None:
Findings().add_annotation(to_app_guid,flaw_id_list,comment,action)
else:
Findings().add_annotation(to_app_guid,flaw_id_list,comment,action,sandbox=sandbox_guid)
log.info(
'Updated mitigation information to {} for Flaw ID {} in {}'.format(action, str(flaw_id_list), to_app_guid))
def set_in_memory_flaw_to_approved(findings_to,to_id):
# use this function to update the status of target findings in memory, so that, if it is found
# as a match for multiple flaws, we only copy the mitigations once.
for finding in findings_to:
if all (k in finding for k in ("id", "finding")):
if (finding["id"] == to_id):
finding['finding']['finding_status']['resolution_status'] = 'APPROVED'
def match_sca(findings_from_approved, from_app_guid, to_app_guid, dry_run, annotation_type, propose_only, from_credentials, to_credentials,
include_original_user=False, include_profile_name=False):
results_from_app_name = from_credentials.run_with_credentials(lambda _: get_application_name(from_app_guid))
formatted_from = format_application_name(from_app_guid,results_from_app_name)
logprint('Getting SCA findings for {}'.format(formatted_from))
count_from = len(findings_from_approved)
if count_from == 0:
logprint('No approved findings in "from" {}. Exiting.'.format(formatted_from))
return 0
logprint('Found {} approved mitigations on SCA findings in {}'.format(count_from,formatted_from))
results_to_app_name = to_credentials.run_with_credentials(lambda _: get_application_name(to_app_guid))
formatted_to = format_application_name(to_app_guid,results_to_app_name)
counter = 0
for sca_finding in findings_from_approved:
component_file_name = sca_finding['component']['filename']
component_id = sca_finding['component']['id']
if annotation_type == "license":
issue_id = sca_finding['license']['license_id']
else:
issue_id = sca_finding['vulnerability']['cve_name']
# might need to add some logic to skip already mitigated findings
mitigation_list = sca_finding['history']
logprint (f'Applying {len(mitigation_list)} {annotation_type} annotations to {component_file_name} with issue id {issue_id} in {formatted_to}...')
for mitigation_action in reversed(mitigation_list): # SCA mitigations API puts most recent action first
proposal_action = mitigation_action['annotation_action']
proposal_comment = f'(COPIED FROM {formatted_from if include_profile_name else (f"APP {from_app_guid}")}{f" - originally submitted by {mitigation_action['user_name']}" if include_original_user else ""}) {mitigation_action['comment']}'
if not(dry_run):
if not to_credentials.run_with_credentials(lambda _: update_sca_mitigation_info_rest(to_app_guid, proposal_action, proposal_comment, annotation_type, component_id, issue_id, propose_only)):
counter-=1
break
counter += 1
print('[*] Updated {} flaws in {}. See log file for details.'.format(str(counter),formatted_to))
def get_formatted_app_name(app_guid, sandbox_guid):
app_name = get_application_name(app_guid)
return format_application_name(app_guid,app_name,sandbox_guid)
def get_findings_from(from_app_guid, scan_type, from_sandbox_guid=None):
formatted_app_name = get_formatted_app_name(from_app_guid, from_sandbox_guid)
logprint('Getting {} findings for {}'.format(scan_type.lower(),formatted_app_name))
findings_from = get_findings_by_type(from_app_guid,scan_type=scan_type, sandbox_guid=from_sandbox_guid)
count_from = len(findings_from)
logprint('Found {} {} findings in "from" {}'.format(count_from,scan_type.lower(),formatted_app_name))
return findings_from
def match_for_scan_type(findings_from, from_app_guid, to_app_guid, dry_run, from_credentials, to_credentials, scan_type='STATIC',from_sandbox_guid=None,
to_sandbox_guid=None, propose_only=False, id_list=[], skip_id_list=[], fuzzy_match=False, include_original_user=False, include_profile_name=False):
if len(findings_from) == 0:
return 0 # no source findings to copy!
from_app_name = from_credentials.run_with_credentials(lambda _: get_application_name(from_app_guid))
formatted_from = format_application_name(from_app_guid,from_app_name,from_sandbox_guid)
findings_from = filter_approved(findings_from,id_list,skip_id_list)
if len(findings_from) == 0:
logprint('No approved {} findings in "from" {}. Exiting.'.format(scan_type.lower(), formatted_from))
return 0
results_to_app_name = to_credentials.run_with_credentials(lambda _: get_application_name(to_app_guid))
formatted_to = format_application_name(to_app_guid,results_to_app_name,to_sandbox_guid)
logprint('Getting {} findings for {}'.format(scan_type.lower(),formatted_to))
findings_to = to_credentials.run_with_credentials(lambda _: get_findings_by_type(to_app_guid,scan_type=scan_type, sandbox_guid=to_sandbox_guid))
count_to = len(findings_to)
logprint('Found {} {} findings in "to" {}'.format(count_to,scan_type.lower(),formatted_to))
if count_to == 0:
return 0 # no destination findings to mitigate!
# CREATE LIST OF UNIQUE VALUES FOR BUILD COPYING TO
copy_array_to = create_match_format_policy( app_guid=to_app_guid, sandbox_guid=to_sandbox_guid, policy_findings=findings_to,finding_type=scan_type)
# We'll return how many mitigations we applied
counter = 0
formatted_from = from_credentials.run_with_credentials(lambda _: get_formatted_app_name(from_app_guid, from_sandbox_guid))
# look for a match for each finding in the TO list and apply mitigations of the matching flaw, if found
for this_to_finding in findings_to:
to_id = this_to_finding['issue_id']
if this_to_finding['finding_status']['resolution_status'] == 'APPROVED':
logprint ('Flaw ID {} in {} already has an accepted mitigation; skipped.'.format(to_id,formatted_to))
continue
match = Findings().match(this_to_finding,findings_from,approved_matches_only=True,allow_fuzzy_match=fuzzy_match)
if match == None:
log.info('No approved match found for finding {} in {}'.format(to_id,formatted_from))
continue
from_id = match.get('id')
log.info('Source flaw {} in {} has a possible target match in flaw {} in {}.'.format(from_id,formatted_from,to_id,formatted_to))
mitigation_list = match['finding']['annotations']
logprint ('Applying {} annotations for flaw ID {} in {}...'.format(len(mitigation_list),to_id,formatted_to))
for mitigation_action in reversed(mitigation_list): #findings API puts most recent action first
proposal_action = mitigation_action['action']
if include_original_user:
proposal_comment = '(COPIED FROM {} - originally submitted by {}) {}'.format(formatted_from if include_profile_name else (f"APP {from_app_guid}"), mitigation_action['user_name'],mitigation_action['comment'])
else:
proposal_comment = '(COPIED FROM {}) {}'.format(formatted_from if include_profile_name else (f"APP {from_app_guid}"), mitigation_action['comment'])
if not(dry_run):
to_credentials.run_with_credentials(lambda _: update_mitigation_info_rest(to_app_guid, to_id, proposal_action, proposal_comment, to_sandbox_guid, propose_only))
set_in_memory_flaw_to_approved(copy_array_to,to_id) # so we don't attempt to mitigate approved finding twice
counter += 1
print('[*] Updated {} flaws in {}. See log file for details.'.format(str(counter),formatted_to))
def get_exact_sandbox_name_match(sandbox_name, sandbox_candidates):
for sandbox_candidate in sandbox_candidates:
if sandbox_candidate["name"] == sandbox_name:
return sandbox_candidate["guid"]
print("Unable to find sandbox named " + sandbox_name)
return None
def get_sandbox_by_name(application_id, sandbox_name):
sandbox_candidates = Sandboxes().get_all(application_id)
if len(sandbox_candidates) == 0:
print("No sandboxes found for application " + application_id)
return None
else:
return get_exact_sandbox_name_match(sandbox_name, sandbox_candidates)
def get_sandbox_guids_by_name(results_to_app_ids, results_to_sandbox_names):
sandbox_ids = []
names_as_list = [sandbox.strip() for sandbox in results_to_sandbox_names.split(", ")]
for index, sandbox_name in enumerate(names_as_list):
sandbox_id = get_sandbox_by_name(results_to_app_ids[index], sandbox_name)
if sandbox_id is not None:
sandbox_ids.append(sandbox_id)
return sandbox_ids
def get_exact_application_name_match(application_name, app_candidates):
for application_candidate in app_candidates:
if application_candidate["profile"]["name"] == application_name:
return application_candidate["guid"]
print("Unable to find application named " + application_name)
return None
def get_application_by_name(application_name):
app_candidates = Applications().get_by_name(application_name)
if len(app_candidates) == 0:
print("Unable to find application named " + application_name)
return None
elif len(app_candidates) > 1:
return get_exact_application_name_match(application_name, app_candidates)
else:
return app_candidates[0].get('guid')
def get_application_guids_by_name(application_names):
application_ids = []
names_as_list = [application.strip() for application in application_names.split(", ")]
for application_name in names_as_list:
application_id = get_application_by_name(application_name)
if application_id is not None:
application_ids.append(application_id)
return application_ids
def get_sca_findings_for(from_app_guid, annotation_type):
findings_from_approved = SCAApplications().get_annotations(app_guid=from_app_guid, annotation_type=annotation_type.upper())
if findings_from_approved:
return findings_from_approved['approved_annotations']
return []
def main():
parser = argparse.ArgumentParser(
description='This script looks at the results set of the FROM APP. For any flaws that have an '
'accepted mitigation, it checks the TO APP to see if that flaw exists. If it exists, '
'it copies all mitigation information.')
parser.add_argument('-f', '--fromapp', help='App GUID to copy from')
parser.add_argument('-fs', '--fromsandbox', help='Sandbox GUID to copy from (optional)')
parser.add_argument('-t', '--toapp', help='App GUID to copy to')
parser.add_argument('-ts', '--tosandbox', help="Sandbox GUID to copy to (optional)")
parser.add_argument('-fn', '--fromappname', help='Application Name to copy from')
parser.add_argument('-fsn', '--fromsandboxname', help='Sandbox Name to copy from')
parser.add_argument('-tn', '--toappnames', help='Comma-delimited list of Application Names to copy to')
parser.add_argument('-tsn', '--tosandboxnames', help='Comma-delimited list of Sandbox Names to copy to - should be in the same order as --toappnames')
parser.add_argument('-st','--scan_types', help='Comma-delimited list of scan types to copy mitigations (default: SAST, DAST)')
parser.add_argument('-sit','--sca_import_type', help='Comma-delimited list of types of SCA issues to import (default: licenses, vulnerabilities)')
parser.add_argument('-p', '--prompt', action='store_true', help='Specify to prompt for the applications to copy from and to.')
parser.add_argument('-d', '--dry_run', action='store_true', help="Log matched flaws instead of applying mitigations")
parser.add_argument('-l', '--legacy_ids',action='store_true', help='Use legacy Veracode app IDs instead of GUIDs')
parser.add_argument('-po', '--propose_only',action='store_true', help='Only propose mitigations, do not approve them')
parser.add_argument('-i','--id_list',nargs='*', help='Only copy mitigations for the flaws in the id_list')
parser.add_argument('-si','--skip_id_list',nargs='*', help='Skip mitigations for the flaws in the skip_id_list (replaces --id_list)')
parser.add_argument('-fm','--fuzzy_match',action='store_true', help='Look within a range of line numbers for a matching flaw')
parser.add_argument('-vid','--veracode_api_key_id', help='VERACODE_API_KEY_ID to use (if combined with --to_veracode_api_key_id and --to_veracode_api_key_secret, allows for moving mitigations between different instances of the platform)')
parser.add_argument('-vkey','--veracode_api_key_secret', help='VERACODE_API_KEY_SECRET to use (if combined with --to_veracode_api_key_id and --to_veracode_api_key_secret, allows for moving mitigations between different instances of the platform)')
parser.add_argument('-tid','--to_veracode_api_key_id', help='VERACODE_API_KEY_ID to use for TO apps/sandboxes (allows for moving mitigations between different instances of the platform)')
parser.add_argument('-tkey','--to_veracode_api_key_secret', help='VERACODE_API_KEY_SECRET to use for TO apps/sandboxes (allows for moving mitigations between different instances of the platform)')
parser.add_argument('-io','--include_original_user',action='store_true', help='Set to include original submitter/approver into the copied mitigation comments')
parser.add_argument('-in','--include_profile_name',action='store_true', help='Set to include original application profile name instead of GUID into the copied mitigation comments')
args = parser.parse_args()
setup_logger()
logprint('======== beginning MitigationCopier.py run ========')
# CHECK FOR CREDENTIALS EXPIRATION
creds_expire_days_warning()
# SET VARIABLES FOR FROM AND TO APPS
results_from_app_id = args.fromapp
results_to_app_ids = [args.toapp]
results_from_sandbox_id = args.fromsandbox
results_to_sandbox_ids = args.tosandbox
results_from_app_name = args.fromappname
results_from_sandbox_name = args.fromsandboxname
results_to_app_names = args.toappnames
results_to_sandbox_names = args.tosandboxnames
scan_types = args.scan_types
sca_import_type = args.sca_import_type
prompt = args.prompt
dry_run = args.dry_run
legacy_ids = args.legacy_ids
propose_only = args.propose_only
id_list = [int(id) for id in args.id_list] if args.id_list else None
skip_id_list = [int(id) for id in args.skip_id_list] if args.skip_id_list else None
fuzzy_match = args.fuzzy_match
from_credentials = None
to_credentials = None
include_original_user = args.include_original_user
include_profile_name = args.include_profile_name
if args.veracode_api_key_id and args.veracode_api_key_secret:
from_credentials = VeracodeApiCredentials(args.veracode_api_key_id, args.veracode_api_key_secret)
else:
api_key_id, api_key_secret = get_credentials()
from_credentials = VeracodeApiCredentials(api_key_id, api_key_secret)
if args.to_veracode_api_key_id and args.to_veracode_api_key_secret:
to_credentials = VeracodeApiCredentials(args.to_veracode_api_key_id, args.to_veracode_api_key_secret)
elif from_credentials:
to_credentials = from_credentials
if prompt:
results_from_app_id = from_credentials.run_with_credentials(lambda _: prompt_for_app("Enter the application name to copy mitigations from: "))
results_to_app_ids = to_credentials.run_with_credentials(lambda _: [prompt_for_app("Enter the application name to copy mitigations to: ")])
# ignore Sandbox arguments in the Prompt case
results_from_sandbox_id = None
results_to_sandbox_ids = None
else:
if results_from_app_name:
results_from_app_id = from_credentials.run_with_credentials(lambda _: get_application_guids_by_name(results_from_app_name)[0])
if results_from_sandbox_name:
results_from_sandbox_id = from_credentials.run_with_credentials(lambda _: get_sandbox_guids_by_name([results_from_app_id], results_from_sandbox_name)[0])
if results_to_app_names:
results_to_app_ids = to_credentials.run_with_credentials(lambda _: get_application_guids_by_name(results_to_app_names))
if results_to_sandbox_names:
results_to_sandbox_ids = to_credentials.run_with_credentials(lambda _: get_sandbox_guids_by_name(results_to_app_ids, results_to_sandbox_names))
is_sast = False
is_dast = False
is_sca = False
is_sca_vulnerabilities = False
is_sca_licences = False
if scan_types:
scan_types = scan_types.lower()
is_sast = 'sast' in scan_types
is_dast = 'dast' in scan_types
is_sca = 'sca' in scan_types
if is_sca:
if sca_import_type:
sca_import_type = sca_import_type.lower()
is_sca_vulnerabilities = 'vulnerabilit' in sca_import_type
is_sca_licences = 'license' in sca_import_type
else:
is_sca_vulnerabilities = True
is_sca_licences = True
if not is_dast and not is_sast and not is_sca_licences and not is_sca_licences:
print('No valid scan types were provided.')
print('Valid scan_types are: DAST, SAST, SCA.')
print('Valid sca_import_type are: licenses, vulnerabilities.')
return
else:
is_sast = True
is_dast = True
if results_from_app_id in ( None, '' ) or results_to_app_ids in ( None, '' ):
print('You must provide an application to copy mitigations to and from.')
return
if legacy_ids:
results_from = from_credentials.run_with_credentials(lambda _: get_app_guid_from_legacy_id(results_from_app_id))
results_to = to_credentials.run_with_credentials(lambda _: get_app_guid_from_legacy_id(results_to_app_ids))
results_from_app_id = results_from
results_to_app_ids = results_to
# get static findings and apply mitigations
if is_sast:
all_static_findings = from_credentials.run_with_credentials(lambda _: get_findings_from(from_app_guid=results_from_app_id, scan_type='STATIC',
from_sandbox_guid=results_from_sandbox_id))
if is_dast:
all_dynamic_findings = from_credentials.run_with_credentials(lambda _: get_findings_from(from_app_guid=results_from_app_id, scan_type='DYNAMIC',
from_sandbox_guid=results_from_sandbox_id))
if is_sca_vulnerabilities:
all_sca_vulnerabilities = from_credentials.run_with_credentials(lambda _: get_sca_findings_for(from_app_guid=results_from_app_id, annotation_type="vulnerability"))
if is_sca_licences:
all_sca_licenses = from_credentials.run_with_credentials(lambda _: get_sca_findings_for(from_app_guid=results_from_app_id, annotation_type="license"))
for index, to_app_id in enumerate(results_to_app_ids):
if is_sast:
match_for_scan_type(all_static_findings, from_app_guid=results_from_app_id, to_app_guid=to_app_id, dry_run=dry_run, scan_type='STATIC',
from_sandbox_guid=results_from_sandbox_id,to_sandbox_guid=results_to_sandbox_ids[index] if results_to_sandbox_ids else None,propose_only=propose_only,id_list=id_list,skip_id_list=skip_id_list,fuzzy_match=fuzzy_match, from_credentials=from_credentials, to_credentials=to_credentials, include_original_user=include_original_user, include_profile_name=include_profile_name)
if is_dast:
match_for_scan_type(all_dynamic_findings, from_app_guid=results_from_app_id, to_app_guid=to_app_id, dry_run=dry_run,
scan_type='DYNAMIC',propose_only=propose_only,id_list=id_list,skip_id_list=skip_id_list, from_credentials=from_credentials, to_credentials=to_credentials, include_original_user=include_original_user, include_profile_name=include_profile_name)
if is_sca_vulnerabilities:
match_sca(all_sca_vulnerabilities, from_app_guid=results_from_app_id, to_app_guid=to_app_id, dry_run=dry_run,annotation_type="vulnerability",propose_only=propose_only, from_credentials=from_credentials, to_credentials=to_credentials, include_original_user=include_original_user, include_profile_name=include_profile_name)
if is_sca_licences:
match_sca(all_sca_licenses, from_app_guid=results_from_app_id, to_app_guid=to_app_id, dry_run=dry_run,annotation_type="license",propose_only=propose_only, from_credentials=from_credentials, to_credentials=to_credentials, include_original_user=include_original_user, include_profile_name=include_profile_name)
if __name__ == '__main__':
main()