From 88c43a78fe1aea4a0abe40875eb13dee0c3d6a27 Mon Sep 17 00:00:00 2001 From: DevaOnBreaches Date: Thu, 28 Dec 2023 18:47:56 +0530 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20Corrected=20risk=5Fscore?= =?UTF-8?q?=20and=20trend=20analysis=20to=20include=20sensitive=5Fsites=20?= =?UTF-8?q?(Fixes=20#4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 95 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 69 insertions(+), 26 deletions(-) diff --git a/main.py b/main.py index 42c0dc0..6b5dfd0 100755 --- a/main.py +++ b/main.py @@ -1540,21 +1540,14 @@ def get_metrics(): abort(404) -@XON.route("/v1/breach-analytics/", methods=["GET"]) +@XON.route("/v1/breach-analytics", methods=["GET"]) @LIMITER.limit("500 per day;100 per hour;2 per second") -def search_data_breaches(email): +def search_data_breaches(): + verification_token = request.args.get("token", default=None) + email = request.args.get("email", default=None) """Returns summary and details of data breaches for a given email""" email = email.lower() - if ( - not email - or not validate_email(email) - or not validate_variables(email) - or not validate_url() - ): - return make_response(jsonify({"Error": "Not found"}), 404) - - verification_token = request.args.get("token", default="None") - if verification_token != "None" and not validate_variables(verification_token): + if not email or not validate_email(email) or not validate_url(): return make_response(jsonify({"Error": "Not found"}), 404) try: @@ -1562,26 +1555,36 @@ def search_data_breaches(email): alert_datastore_client = datastore.Client() paste_datastore_client = datastore.Client() - breach_key = datastore_client.key("xon", email) - breach_record = datastore_client.get(breach_key) - alert_key = alert_datastore_client.key("xon_alert", email) alert_record = alert_datastore_client.get(alert_key) + include_sensitive = False + + if verification_token: + if alert_record and alert_record["token"] == verification_token: + include_sensitive = True + else: + return make_response(jsonify({"Error": "Invalid token"}), 403) + + breach_key = datastore_client.key("xon", email) + breach_record = datastore_client.get(breach_key) + paste_key = paste_datastore_client.key("xon_paste", email) paste_record = paste_datastore_client.get(paste_key) - # Check if the token is set to "None" or matches the stored token - if ( - verification_token != "None" - and alert_record - and alert_record["token"] != verification_token - ): - raise ShieldOnException("Invalid token") - - # Check if the shieldOn flag is set and raise exception if True - if alert_record and alert_record["shieldOn"]: + if alert_record and alert_record.get("shieldOn"): raise ShieldOnException("Shield is on") + if include_sensitive: + combined_breach_data = get_combined_breach_data( + breach_record, include_sensitive + ) + existing_sites = ( + set(breach_record["site"].split(";")) + if "site" in breach_record and breach_record["site"] + else set() + ) + unique_sites = existing_sites.union(combined_breach_data) + breach_record["site"] = ";".join(unique_sites) ( breach_summary, @@ -1616,7 +1619,6 @@ def search_data_breaches(email): ) except ShieldOnException as shield_error: abort(404) - except Exception as exception_details: log_except(request.url, exception_details) abort(404) @@ -1626,6 +1628,47 @@ class ShieldOnException(Exception): pass +def get_combined_breach_data(breach_record, include_sensitive=False): + """ + Combine standard and sensitive breach data based on authorization. + + :param breach_record: The datastore record containing breach information. + :param include_sensitive: Flag to indicate if sensitive data should be included. + :return: A set of unique breach data. + """ + standard_sites = ( + set(breach_record["site"].split(";")) + if "site" in breach_record and breach_record["site"] + else set() + ) + + if ( + include_sensitive + and "sensitive_site" in breach_record + and breach_record["sensitive_site"] + ): + sensitive_sites = set(breach_record["sensitive_site"].split(";")) + return standard_sites.union(sensitive_sites) + else: + return standard_sites + + +def merge_data(standard_data, sensitive_data): + """Merge standard and sensitive data""" + merged_data = [] + + standard_sites = standard_data.split(";") if standard_data else [] + sensitive_sites = sensitive_data.split(";") if sensitive_data else [] + + merged_data.extend(standard_sites) + + for site in sensitive_sites: + if site not in merged_data: + merged_data.append(site) + + return merged_data + + @XON.route("/v1/analytics/", methods=["GET"]) @LIMITER.limit("500 per day;100 per hour;2 per second") def get_breach_analytics(user_email):