Skip to content

Commit

Permalink
v0.4.1
Browse files Browse the repository at this point in the history
- Added a better approach for the reverse DNS Lookup via the Driftnet API.
- Error handling.
  • Loading branch information
B0lg0r0v committed Sep 11, 2024
1 parent 430e6c0 commit d10b886
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 55 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
__pycache__
header
header*
elyzer.spec
mail.ico
Elyzer_Logo.png
Elyzer_Logo_Black.png
Elyzer_Logo.ico
__pycache__
13 changes: 13 additions & 0 deletions .idea/.idea.Elyzer.dir/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .idea/.idea.Elyzer.dir/.idea/encodings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/.idea.Elyzer.dir/.idea/indexLayout.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/.idea.Elyzer.dir/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 95 additions & 54 deletions core/spoofing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from email.parser import BytesParser
from core.colors import Colors
from core.utils import Utils
import json


class Spoofing:
Expand All @@ -18,6 +19,7 @@ def __init__(self, header):
self.indent = " "
self.resolver = dns.resolver.Resolver()
self.api_key = "YOUR VT API KEY" # Get your API key here: https://www.virustotal.com/api-d/signup/
self.api_key_driftnet = "YOUR DRIFTNET API KEY" # Get your API key here: https://driftnet.io
self.base_url = "https://www.virustotal.com/api/v3"


Expand Down Expand Up @@ -78,8 +80,13 @@ def spoofing_all_checks(self):
#------------------------Regex and Field definitions------------------------#

x = next(iter(reversed(self.utils.getReceivedFields())), None)
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)
if x is not None:
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
#ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)
else:
ipv4 = []
print(self.colors.red("No 'Received' fields found in the email header."))

# get rid of the localhost IP Address
filteredIpv4 = [ip for ip in ipv4 if ip != '127.0.0.1']

Expand Down Expand Up @@ -108,7 +115,12 @@ def spoofing_all_checks(self):
authResultOrigIP = None

# Getting the Domain Name from the "From" Field
fromEmailDomain = fromMatch.group(1).split('@')[1]
if fromMatch is not None:
fromEmailDomain = fromMatch.group(1).split('@')[1]
else:
print(self.colors.white("Could not detect SMTP Server. Manual reviewing required."))
report.append(f'Could not detect SMTP Server. Manual reviewing required.\n')

# Getting the MX Records from the Domain Name
try:
getMx = self.resolver.resolve(fromEmailDomain, 'MX')
Expand All @@ -131,8 +143,8 @@ def spoofing_all_checks(self):
print(self.colors.green("No Mismatch detected."))
report.append(f'No Mismatch detected.')
else:
print(self.colors.yellow(f'Potential SMTP Server Mismatch detected. Sender SMTP Server is "{fromEmailDomain} [{"".join(filteredIpv4[0])}]" and should be "{fromEmailDomain} [{", ".join(aRecordsOfMx)}]" <- (current MX Record(s) for this domain.)'))
report.append(f'Potential SMTP Server Mismatch detected. Sender SMTP Server is "{fromEmailDomain} [{"".join(filteredIpv4[0])}]" and should be "{fromEmailDomain} [{", ".join(aRecordsOfMx)}]" <- (current MX Record(s) for this domain.)')
print(self.colors.yellow(f'{self.indent}Potential SMTP Server Mismatch detected. Sender SMTP Server is "{fromEmailDomain} [{"".join(filteredIpv4[0])}]" and should be "{fromEmailDomain} [{", ".join(aRecordsOfMx)}]" <- (current MX Record(s) for this domain.)'))
report.append(f'{self.indent}Potential SMTP Server Mismatch detected. Sender SMTP Server is "{fromEmailDomain} [{"".join(filteredIpv4[0])}]" and should be "{fromEmailDomain} [{", ".join(aRecordsOfMx)}]" <- (current MX Record(s) for this domain.)')
else:

#------------------------ If we get no IP addresses from the received from field, we will try to get the IP Address from the "Authentication-Results-Original" Field and do diverse lookups with it. ------------------------#
Expand Down Expand Up @@ -554,33 +566,47 @@ def get_mx_records_from_vt(self, domain):
return []


def passive_reverse_dns_hackertarget(self, ip):
url = f"https://api.hackertarget.com/reverseiplookup/?q={ip}"
def passive_reverse_dns_driftnet(self, ip):
url = f"https://api.driftnet.io/v1/domain/rdns?ip={ip}"

headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
'Authorization': f'Bearer {self.api_key_driftnet}',
'Content-Type': 'application/json'
}

try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
# The API returns a newline-separated list of domains
domains = response.text.strip().split('\n')

# Filter out any error messages or empty strings
domains = [domain for domain in domains if domain and not domain.startswith('error')]

if domains:
return domains
else:
print(f"No reverse DNS entries found for IP: {ip}")
return []
parsed_data = self.parse_driftnet_response(response.json())
return parsed_data['ptr_records']
else:
print(f"Error: Received status code {response.status_code} from API")
print(self.colors.red(f"Error getting reverse DNS for {ip}: {response.status_code}"))
return []
except requests.RequestException as e:
print(f"An error occurred while querying the API: {str(e)}")
except Exception as e:
print(self.colors.red(f"Error getting reverse DNS for {ip}: {str(e)}"))
return []

def parse_driftnet_response(self, data):
try:
values = []
ptr_records = []

for result in data.get('results', []):
for item in result.get('items', []):
value = item.get('value')
if value:
values.append(value)
if item.get('context') == 'dns-ptr':
ptr_records.append(value)

return {
'all_values': values,
'ptr_records': ptr_records
}

except KeyError as e:
print(f"Error: Missing expected key in JSON structure: {e}")
return {'all_values': [], 'ptr_records': []}


def spoofing_passive_dns(self):
Expand All @@ -600,8 +626,13 @@ def spoofing_passive_dns(self):
#------------------------Regex and Field definitions------------------------#

x = next(iter(reversed(self.utils.getReceivedFields())), None)
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)
if x is not None:
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
#ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)
else:
ipv4 = []
print(self.colors.red("No 'Received' fields found in the email header."))

# get rid of the localhost IP Address
filteredIpv4 = [ip for ip in ipv4 if ip != '127.0.0.1']

Expand Down Expand Up @@ -630,7 +661,7 @@ def spoofing_passive_dns(self):
authResultOrigIP = None

# Getting the Domain Name from the "From" Field
if fromMatch:
if fromMatch is not None:
fromEmailDomain = fromMatch.group(1).split('@')[1]
else:
print(self.colors.white("Could not detect SMTP Server. Manual reviewing required."))
Expand All @@ -642,9 +673,13 @@ def spoofing_passive_dns(self):
print(self.colors.red("Could not retrieve data from VirusTotal."))
return ''.join(report)

#print(VT_DATA)
mx_records = VT_DATA.get('MX', [])
mx.append([record['value'].lower() for record in mx_records if 'value' in record])
if mx_records:
mx.append([record['value'].lower() for record in mx_records if 'value' in record])

else:
print(self.colors.light_red("No MX records found via VT.."))
report.append(f'No MX records found via VT..\n')

for srv_list in mx:
for servers in srv_list:
Expand Down Expand Up @@ -676,35 +711,36 @@ def spoofing_passive_dns(self):

#print(authResultOrigIP)

try:
for domain in authResultOrigIP:
associated_domain = self.passive_reverse_dns_hackertarget(domain)
#print(associated_domain) # Debug statement

if authResultOrigIP:
try:
for domain in authResultOrigIP:
associated_domain = self.passive_reverse_dns_driftnet(domain)
#print(associated_domain) # Debug statement


for subdomain in associated_domain:
tmp = subdomain.split('.')
authResultFullDomain = '.'.join(tmp[-2:])
#print(authResultFullDomain) # Debug statement
for subdomain in associated_domain:
tmp = subdomain.split('.')
authResultFullDomain = '.'.join(tmp[-2:])
#print(authResultFullDomain) # Debug statement

mx_records_from_vt = self.get_mx_records_from_vt(authResultFullDomain)
#print(mx_records_from_vt) # Debug statement
mx_records_from_vt = self.get_mx_records_from_vt(authResultFullDomain)
#print(mx_records_from_vt) # Debug statement

for x in mx_records_from_vt:
aRecordsOfMxAuthResult.append(self.get_a_records_for_mx(x))

#print(aRecordsOfMxAuthResult)
#print(aRecordsOfMx)
if any(x in aRecordsOfMxAuthResult for x in aRecordsOfMx):
print(self.colors.green(f'{self.indent}→ No Mismatch detected.'))
report.append(f'{self.indent}→ No Mismatch detected.')
for x in mx_records_from_vt:
aRecordsOfMxAuthResult.append(self.get_a_records_for_mx(x))

else:
print(self.colors.light_yellow(f'{self.indent}No IPv4 Address detected in "FROM" Field. Manual reviewing required...'))
report.append(f'{self.indent}No IPv4 Address detected in "FROM" Field. Manual reviewing required...')
#print(aRecordsOfMxAuthResult) # Moooore Debug statements
#print(aRecordsOfMx)
if any(x in aRecordsOfMxAuthResult for x in aRecordsOfMx):
print(self.colors.green(f'{self.indent}→ No Mismatch detected.'))
report.append(f'{self.indent}→ No Mismatch detected.')

except Exception as e:
print(self.colors.red(f"An error occurred while querying the API: {str(e)}"))
else:
print(self.colors.light_yellow(f'{self.indent}Could not compare values. Manual reviewing required...'))
report.append(f'{self.indent}Could not compare values. Manual reviewing required...')

except Exception as e:
print(self.colors.red(f"An error occurred while querying the API: {str(e)}"))

#------------------------Check for Field Mismatches------------------------#

Expand Down Expand Up @@ -903,8 +939,13 @@ def spoofing_no_dns(self):
#------------------------Regex and Field definitions------------------------#

x = next(iter(reversed(self.utils.getReceivedFields())), None)
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)
if x is not None:
ipv4 = re.findall(r'[\[\(](\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})[\]\)]', x, re.IGNORECASE)
ipv6 = re.findall(r'[\[\(]([A-Fa-f0-9:]+)[\]\)]', x, re.IGNORECASE)

else:
print(self.colors.red("No 'Received' fields found in the email header."))
ipv4 = []
# get rid of the localhost IP Address
filteredIpv4 = [ip for ip in ipv4 if ip != '127.0.0.1']

Expand Down Expand Up @@ -933,7 +974,7 @@ def spoofing_no_dns(self):
authResultOrigIP = None

# Getting the Domain Name from the "From" Field
if fromMatch:
if fromMatch is not None:
fromEmailDomain = fromMatch.group(1).split('@')[1]
else:
print(self.colors.white("Could not detect SMTP Server. Manual reviewing required."))
Expand Down

0 comments on commit d10b886

Please sign in to comment.