forked from axon-git/DeleFriend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·95 lines (74 loc) · 3.45 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import yaml
import argparse
from google.auth.credentials import Credentials
from src.gcp_sa_enum import ServiceAccountEnumerator
from googleapiclient.errors import HttpError
from google.auth.exceptions import RefreshError
from src import oauth_scope_enumrator
from src.domain_users_enum import DomainUserEnumerator
import os
import time
parser = argparse.ArgumentParser(description="DeleFriend Tool")
parser.add_argument('-c', '--config', type=str, required=True, help="Path to the GCP IAM configuration file")
parser.add_argument('-v', '--verbose', action='store_true', help="Enable verbose/debugging mode")
args = parser.parse_args()
# load configuration
with open(args.config, 'r') as file:
config = yaml.safe_load(file)
OAUTH_TOKEN = config.get('oauth_token')
WORKSPACE_USER_EMAIL = config.get('workspace_user_email')
SCOPES_FILE = 'src/oauth_scopes.txt' # scopes file
KEY_FOLDER = 'SA_private_keys'
class CustomCredentials(Credentials):
def __init__(self, token):
self.token = token
def apply(self, headers):
headers['Authorization'] = f'Bearer {self.token}'
def before_request(self, request, method, url, headers):
self.apply(headers)
def refresh(self, request):
pass
def results():
timestamp = int(time.time())
result_folder = 'results'
if not os.path.exists(result_folder):
os.makedirs(result_folder)
filename = f'results_{timestamp}.txt'
filepath = os.path.join(result_folder, filename)
print(f"\n\n[+] Saving results to results/{filename} ...")
with open(filepath, 'w') as f:
valid_results = oauth_scope_enumrator.get_valid_results()
for json_path, valid_scopes in valid_results.items():
if valid_scopes:
f.write(f'Service Account Key Name: {os.path.basename(json_path)}\n')
f.write('Valid OAuth Scopes:\n')
for scope in valid_scopes:
f.write(f'{scope}\n')
f.write('---\n')
def info():
print("""
┳┓ ┓ ┏┓ • ┓
┃┃┏┓┃┏┓┣ ┏┓┓┏┓┏┓┏┫
┻┛┗ ┗┗ ┻ ┛ ┗┗ ┛┗┗┻
By Axon - Hunters.security""")
if __name__ == "__main__":
try:
info()
credentials = CustomCredentials(OAUTH_TOKEN)
enumerator = ServiceAccountEnumerator(credentials, verbose=args.verbose)
print("\n[+] Enumerating GCP Resources: Projects and Service Accounts...")
enumerator.list_service_accounts()
domain_user_enumerator = DomainUserEnumerator(enumerator)
print("\n[+] Enumerating unique org domain and users on GCP (ONE user per domain) ...")
domain_user_enumerator.print_unique_domain_users()
oauth_scope_enumrator = oauth_scope_enumrator.OAuthEnumerator(enumerator, WORKSPACE_USER_EMAIL, SCOPES_FILE, KEY_FOLDER, verbose=args.verbose)
print("\n[+] Enumerating OAuth scopes and private key access tokens... (it might take a while based on the number of the JWT combinations) ")
oauth_scope_enumrator.run()
confirmed_dwd_keys = oauth_scope_enumrator.confirmed_dwd_keys
enumerator.key_creator.delete_keys_without_dwd(confirmed_dwd_keys)
results()
except HttpError as e:
if e.resp.status == 401 and b"ACCESS_TOKEN_TYPE_UNSUPPORTED" in e.content:
print("\nThe provided Bearer access token isn't valid. Refresh a new one.")
else:
print(f"An error occurred: {e}")