Skip to content

Commit

Permalink
🐛 fix: Corrected risk_score and trend analysis to include sensitive_s…
Browse files Browse the repository at this point in the history
…ites (Fixes #4)
  • Loading branch information
DevaOnBreaches committed Dec 28, 2023
1 parent 23a3ee3 commit 88c43a7
Showing 1 changed file with 69 additions and 26 deletions.
95 changes: 69 additions & 26 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,48 +1540,51 @@ def get_metrics():
abort(404)


@XON.route("/v1/breach-analytics/<email>", methods=["GET"])
@XON.route("/v1/breach-analytics", methods=["GET"])
@LIMITER.limit("500 per day;100 per hour;2 per second")
def search_data_breaches(email):
def search_data_breaches():
verification_token = request.args.get("token", default=None)
email = request.args.get("email", default=None)
"""Returns summary and details of data breaches for a given email"""
email = email.lower()
if (
not email
or not validate_email(email)
or not validate_variables(email)
or not validate_url()
):
return make_response(jsonify({"Error": "Not found"}), 404)

verification_token = request.args.get("token", default="None")
if verification_token != "None" and not validate_variables(verification_token):
if not email or not validate_email(email) or not validate_url():
return make_response(jsonify({"Error": "Not found"}), 404)

try:
datastore_client = datastore.Client()
alert_datastore_client = datastore.Client()
paste_datastore_client = datastore.Client()

breach_key = datastore_client.key("xon", email)
breach_record = datastore_client.get(breach_key)

alert_key = alert_datastore_client.key("xon_alert", email)
alert_record = alert_datastore_client.get(alert_key)

include_sensitive = False

if verification_token:
if alert_record and alert_record["token"] == verification_token:
include_sensitive = True
else:
return make_response(jsonify({"Error": "Invalid token"}), 403)

breach_key = datastore_client.key("xon", email)
breach_record = datastore_client.get(breach_key)

paste_key = paste_datastore_client.key("xon_paste", email)
paste_record = paste_datastore_client.get(paste_key)

# Check if the token is set to "None" or matches the stored token
if (
verification_token != "None"
and alert_record
and alert_record["token"] != verification_token
):
raise ShieldOnException("Invalid token")

# Check if the shieldOn flag is set and raise exception if True
if alert_record and alert_record["shieldOn"]:
if alert_record and alert_record.get("shieldOn"):
raise ShieldOnException("Shield is on")
if include_sensitive:
combined_breach_data = get_combined_breach_data(
breach_record, include_sensitive
)
existing_sites = (
set(breach_record["site"].split(";"))
if "site" in breach_record and breach_record["site"]
else set()
)
unique_sites = existing_sites.union(combined_breach_data)
breach_record["site"] = ";".join(unique_sites)

(
breach_summary,
Expand Down Expand Up @@ -1616,7 +1619,6 @@ def search_data_breaches(email):
)
except ShieldOnException as shield_error:
abort(404)

except Exception as exception_details:
log_except(request.url, exception_details)
abort(404)
Expand All @@ -1626,6 +1628,47 @@ class ShieldOnException(Exception):
pass


def get_combined_breach_data(breach_record, include_sensitive=False):
"""
Combine standard and sensitive breach data based on authorization.
:param breach_record: The datastore record containing breach information.
:param include_sensitive: Flag to indicate if sensitive data should be included.
:return: A set of unique breach data.
"""
standard_sites = (
set(breach_record["site"].split(";"))
if "site" in breach_record and breach_record["site"]
else set()
)

if (
include_sensitive
and "sensitive_site" in breach_record
and breach_record["sensitive_site"]
):
sensitive_sites = set(breach_record["sensitive_site"].split(";"))
return standard_sites.union(sensitive_sites)
else:
return standard_sites


def merge_data(standard_data, sensitive_data):
"""Merge standard and sensitive data"""
merged_data = []

standard_sites = standard_data.split(";") if standard_data else []
sensitive_sites = sensitive_data.split(";") if sensitive_data else []

merged_data.extend(standard_sites)

for site in sensitive_sites:
if site not in merged_data:
merged_data.append(site)

return merged_data


@XON.route("/v1/analytics/<user_email>", methods=["GET"])
@LIMITER.limit("500 per day;100 per hour;2 per second")
def get_breach_analytics(user_email):
Expand Down

0 comments on commit 88c43a7

Please sign in to comment.