From 8845bca360acba9c2db2c7f9116b480acade60e9 Mon Sep 17 00:00:00 2001 From: huitema Date: Tue, 11 Feb 2025 15:05:30 -0800 Subject: [PATCH] Pivot by CC+AS --- resolver/rsv_as_study.py | 37 +++++++------- resolver/rsv_log_parse.py | 105 +++++++++++++++++++++----------------- resolver/top_as.py | 22 +++++++- 3 files changed, 97 insertions(+), 67 deletions(-) diff --git a/resolver/rsv_as_study.py b/resolver/rsv_as_study.py index 26f7f56..f0ca119 100644 --- a/resolver/rsv_as_study.py +++ b/resolver/rsv_as_study.py @@ -69,7 +69,7 @@ def usage(): nb_events = ppq.quicker_load(log_file, ip2a4, ip2a6, as_names, experiment=['0du'], \ rr_types = [ 'A', 'AAAA', 'HTTPS' ], query_ASes = target_ASes, \ time_start=time_start) - print("Quick load of " + str(len(ppq.ASes)) + " ASes with " + str(nb_events) + " events.") + print("Quick load of " + str(len(ppq.cc_AS_list)) + " CC+AS with " + str(nb_events) + " events.") time_file_read = time.time() print("File read at " + str(time_file_read - time_start) + " seconds.") @@ -87,21 +87,22 @@ def usage(): ppq.compute_delta_t() time_delays_computed = time.time() print("Delays computed at " + str(time_file_read - time_start) + " seconds.") - # Compute the list of ASes for which we have data to graph, i.e. the intersection - # of the list that we filter and the list of ASes present in the log file. + + # Compute the list of (Country, AS) tuples for which we want a graph if len(target_ASes) == 0: - as_list = ppq.AS_list() + key_list = ppq.key_list() else: - as_list = [] - for asn in target_ASes: - if asn in ppq.ASes: - as_list.append(asn) + key_list = [] + for key in ppq.key_list(): + for asn in target_ASes: + if key.endswith(asn): + key_list.append(key) - # get the summaries per AS - summary_df = ppq.get_summaries(as_list, False); + # get the summaries per cc + AS + summary_df = ppq.get_summaries(key_list, False); summary_file = os.path.join(image_dir, "summary.csv" ) summary_df.to_csv(summary_file, sep=",") - print("Published summaries for " + str(len(as_list)) + " Ases" + " in " + summary_file) + print("Published summaries for " + str(len(key_list)) + " CC+AS" + " in " + summary_file) time_summaries_computed = time.time() print("Summaries computed at " + str(time_summaries_computed - time_start) + " seconds.") @@ -116,13 +117,13 @@ def usage(): # Analyse the spread of delays for the AS that have a sufficient share of UID with events # from both ISP resolvers and public resolvers. nb_published = 0 - for target_AS in as_list: - if ppq.ASes[target_AS].nb_both > target_threshold: - dot_df = ppq.get_delta_t_both(target_AS) - plot_delay_file = os.path.join(image_dir, target_AS + "_plot_delays" ) - rsv_log_parse.do_graph(target_AS, dot_df, plot_delay_file, x_delay=True, log_y=True) - host_delay_files = os.path.join(image_dir, target_AS + "_hist_delays" ) - rsv_log_parse.do_hist(target_AS, dot_df, image_file=host_delay_files) + for key in key_list: + if ppq.cc_AS_list[key].nb_both > target_threshold: + dot_df = ppq.get_delta_t_both(key) + plot_delay_file = os.path.join(image_dir, key[:2] + "_" + key[2:] + "_plot_delays" ) + rsv_log_parse.do_graph(key, dot_df, plot_delay_file, x_delay=True, log_y=True) + host_delay_files = os.path.join(image_dir, key[:2] + "_" + key[2:] + "_hist_delays" ) + rsv_log_parse.do_hist(key, dot_df, image_file=host_delay_files) nb_published += 1 if (nb_published%100) == 0: print("Published " + str(nb_published) + " AS graphs") diff --git a/resolver/rsv_log_parse.py b/resolver/rsv_log_parse.py index 889b5df..7edba6a 100644 --- a/resolver/rsv_log_parse.py +++ b/resolver/rsv_log_parse.py @@ -376,7 +376,8 @@ def row(self): class pivoted_record: # Record is created for the first time an event appears in an AS record. - def __init__(self, qt, tag, query_AS, uid): + def __init__(self, qt, tag, query_cc, query_AS, uid): + self.query_cc = query_cc self.query_AS = query_AS self.query_user_id = uid self.first_tag = tag @@ -421,27 +422,30 @@ def compute_delta_t(self, delta_max = 0.5): self.has_public = True class subnet_record: - def __init__(self, query_AS, resolver_AS, subnet, count): + def __init__(self, query_cc, query_AS, resolver_AS, subnet, count): + self.query_cc = query_cc self.query_AS = query_AS self.resolver_AS = resolver_AS self.subnet = str(subnet) self.count = count def headers(): - return [ "query_AS", "resolver_AS", "subnet", "count" ] + return [ "query_cc", "query_AS", "resolver_AS", "subnet", "count" ] - def key(query_AS, resolver_AS, subnet): - return query_AS + "_" + resolver_AS + "_" + str(subnet) + def key(query_cc, query_AS, resolver_AS, subnet): + return query_cc + query_AS + "_" + resolver_AS + "_" + str(subnet) - def as_list(self): - return [ + def subnet_row(self): + return [ + self.query_cc, self.query_AS, self.resolver_AS, self.subnet, self.count ] -class pivoted_AS_record: - def __init__(self,query_AS): +class pivoted_cc_AS_record: + def __init__(self, query_cc,query_AS): + self.query_cc = query_cc self.query_AS = query_AS self.rqt = dict() self.user_ids = set() @@ -454,11 +458,11 @@ def __init__(self,query_AS): # process event # For each UID, we compute a pivoted record, which contains a dict() of "tags". # If a tag is present in the dict, we only retain the earliest time for that ta - def process_event(self, qt, tag, query_AS, uid, resolver_IP, resolver_AS): + def process_event(self, qt, tag, query_cc, query_AS, uid, resolver_IP, resolver_AS): if uid in self.rqt: self.rqt[uid].add_event2(qt, tag) else: - self.rqt[uid] = pivoted_record(qt, tag, query_AS, uid) + self.rqt[uid] = pivoted_record(qt, tag, query_cc, query_AS, uid) if not uid in self.user_ids: self.user_ids.add(uid) @@ -469,11 +473,11 @@ def process_event(self, qt, tag, query_AS, uid, resolver_IP, resolver_AS): subnet = ipaddress.IPv6Network(resolver_IP + "/40", strict=False) else: subnet = ipaddress.IPv4Network(resolver_IP + "/16", strict=False) - key = subnet_record.key(query_AS, resolver_AS, subnet) + key = subnet_record.key(query_cc, query_AS, resolver_AS, subnet) if key in self.subnets: self.subnets[key].count += 1 else: - self.subnets[key] = subnet_record(query_AS, resolver_AS, subnet, 1) + self.subnets[key] = subnet_record(query_cc, query_AS, resolver_AS, subnet, 1) except Exception as exc: traceback.print_exc() print('\nCode generated an exception: %s' % (exc)) @@ -496,16 +500,18 @@ def compute_delta_t(self, delta_max = 0.5): # Produce a one line summary record for the ASN # Return a list of values: - # r[0] = ASN - # r[1] = total number of UIDs - # r[2] = total number of queries (should be same as total number UIDs) - # r[3] = total number of ISP only queries - # r[4] = total number of public DNS only queries - # r[5] = total number of queries served by both ISP and public DNS - # r[6]..[5+N] = total number of queries served by a given category + # r[0] = CC + # r[1] = ASN + # r[2] = total number of UIDs + # r[3] = total number of queries (should be same as total number UIDs) + # r[4] = total number of ISP only queries + # r[5] = total number of public DNS only queries + # r[6] = total number of queries served by both ISP and public DNS + # r[7]..[5+N] = total number of queries served by a given category def get_summary(self, first_only): r = [ + self.query_cc, self.query_AS, len(self.user_ids), self.nb_total, @@ -518,7 +524,7 @@ def get_summary(self, first_only): for key in self.rqt: rqt_r = self.rqt[key] - rank = 6 + rank = 7 for tag in tag_list: if tag in rqt_r.rsv_times: r[rank] += 1 @@ -526,7 +532,7 @@ def get_summary(self, first_only): return r # get_delta_t_both: - # we produce a list of dots" records suitable for statistics and graphs + # we produce a list of "dots" records suitable for statistics and graphs def get_delta_t_both(self): dots = [] for key in self.rqt: @@ -545,20 +551,21 @@ def get_delta_t_both(self): def get_subnets(self): snts = [] for key in self.subnets: - snts.append(self.subnets[key].as_list()) - snts.sort(key=lambda x: x[3], reverse=True) + snts.append(self.subnets[key].subnet_row()) + snts.sort(key=lambda x: x[4], reverse=True) return snts class pivoted_per_query: def __init__(self): - self.ASes = dict() + self.cc_AS_list = dict() self.tried = 0 - def process_event(self, qt, tag, query_AS, uid, resolver_IP, resolver_AS): - if not query_AS in self.ASes: - self.ASes[query_AS] = pivoted_AS_record(query_AS) + def process_event(self, qt, tag, query_cc, query_AS, uid, resolver_IP, resolver_AS): + key = query_cc + query_AS + if not key in self.cc_AS_list: + self.cc_AS_list[key] = pivoted_cc_AS_record(query_cc,query_AS) - self.ASes[query_AS].process_event(qt, tag, query_AS, uid, resolver_IP, resolver_AS) + self.cc_AS_list[key].process_event(qt, tag, query_cc, query_AS, uid, resolver_IP, resolver_AS) def quicker_load(self, file_name, ip2a4, ip2a6, as_table, rr_types=[], experiment=[], query_ASes=[], log_threshold = 15625, time_start=0): nb_events = 0 @@ -584,7 +591,7 @@ def quicker_load(self, file_name, ip2a4, ip2a6, as_table, rr_types=[], experimen if parsed: if (not filtering) or x.filter(rr_types=rr_types, experiment=experiment, query_ASes=q_set): x.set_resolver_AS(ip2a4, ip2a6, as_table) - self.process_event(x.query_time, x.resolver_tag, x.query_AS, x.query_user_id, x.resolver_IP, x.resolver_AS) + self.process_event(x.query_time, x.resolver_tag, x.query_cc, x.query_AS, x.query_user_id, x.resolver_IP, x.resolver_AS) nb_events += 1 if (nb_events%lth) == 0: if time_start > 0: @@ -596,16 +603,18 @@ def quicker_load(self, file_name, ip2a4, ip2a6, as_table, rr_types=[], experimen return nb_events - def AS_list(self): - return list(self.ASes.keys()) + def key_list(self): + return list(self.cc_AS_list.keys()) def compute_delta_t(self): - for query_AS in self.ASes: - self.ASes[query_AS].compute_delta_t() + for key in self.cc_AS_list: + self.cc_AS_list[key].compute_delta_t() - def get_summaries(self, AS_list, first_only): + def get_summaries(self, key_list, first_only): # compose the headers - headers = [ 'q_AS', \ + headers = [ \ + 'q_cc', \ + 'q_AS', \ 'uids', 'q_total', 'isp', @@ -614,24 +623,24 @@ def get_summaries(self, AS_list, first_only): for tag in tag_list: headers.append(tag) s_list = [] - for target_AS in AS_list: - if target_AS in self.ASes: - s_list.append(self.ASes[target_AS].get_summary(first_only)) - s_list.sort(key=lambda x: x[1], reverse=True) + for key in key_list: + if key in self.cc_AS_list: + s_list.append(self.cc_AS_list[key].get_summary(first_only)) + s_list.sort(key=lambda x: x[2], reverse=True) df = pd.DataFrame(s_list, columns=headers) return df - def get_delta_t_both(self, target_AS): - return self.ASes[target_AS].get_delta_t_both() + def get_delta_t_both(self, key): + return self.cc_AS_list[key].get_delta_t_both() def get_subnets(self): sn = [] - for target_AS in self.ASes: - sn += self.ASes[target_AS].get_subnets() + for key in self.cc_AS_list: + sn += self.cc_AS_list[key].get_subnets() sn_df = pd.DataFrame(sn, columns=subnet_record.headers()) return sn_df -def do_graph(asn, dot_df, image_file="", x_delay=False, log_y=False): +def do_graph(key, dot_df, image_file="", x_delay=False, log_y=False): if log_y: # replace 0 by low value so logy plots will work dot_df.loc[dot_df['delay'] == 0, 'delay'] += 0.00001 @@ -655,7 +664,7 @@ def do_graph(asn, dot_df, image_file="", x_delay=False, log_y=False): sub_df[i].plot.scatter(ax=axa, x=x_value, y="delay", logy=log_y, alpha=0.25, color=rsv_color) is_first = False legend_list.append(rsv) - plt.title("Delay (seconds) per provider for " + asn) + plt.title("Delay (seconds) per provider for " + key[:2] + "/" + key[2:]) plt.legend(legend_list) if len(image_file) == 0: plt.show() @@ -664,7 +673,7 @@ def do_graph(asn, dot_df, image_file="", x_delay=False, log_y=False): plt.close() -def do_hist(asn, dot_df, image_file): +def do_hist(key, dot_df, image_file): # get a frame from the list dot_df.loc[dot_df['delay'] == 0, 'delay'] += 0.00001 is_first = True @@ -696,7 +705,7 @@ def do_hist(asn, dot_df, image_file): if not is_first: logbins = np.logspace(np.log10(x_min),np.log10(x_max), num=20) axa = plt.hist(row_list, logbins, histtype='bar', color=clrs) - plt.title("Histogram of delays (seconds) per provider for " + asn) + plt.title("Histogram of delays (seconds) per provider for " + key[:2] + "/" + key[2:]) plt.legend(legend_list) plt.xscale('log') if len(image_file) == 0: diff --git a/resolver/top_as.py b/resolver/top_as.py index b5fe8c2..13d0ea9 100644 --- a/resolver/top_as.py +++ b/resolver/top_as.py @@ -48,7 +48,27 @@ "AS3215": ["Orange", "EU", "Europe"], "AS3269": ["Telecom Italia", "EU", "Europe"], "AS3320": ["Deutsche Telekom", "EU", "Europe"], - "AS3352": ["Telefonica", "EU", "Europe"]} + "AS3352": ["Telefonica", "EU", "Europe"], + "AS197207": ["Mobile Communication of Iran PLC", "IR", "Mobile"], + "AS58224": ["TCI[Iran]", "IR", "Mix"], + "AS203214": ["Hulum Almustakbal", "IQ", "ISP"], + "AS199739": ["Earthlink [Iran]", "IR", "Mix"], + "AS48832": ["ZAIN[Jordan]", "JO", "Mix"], + "AS28885": ["Omantel [Oman]", "OM", "Mix"], + "AS43766": ["Zain [Saudi Arabia]", "SA", "Mix"], + "AS39801": ["Al Jawal [Saudi Arabia]", "SA", "Mobile"], + "AS35819": ["Etihad Etisalat [Saudi Arabia]", "SA", "Mix"], + "AS25019": ["Saudinetstc [Saudi Arabia]", "SA", "ISP"], + "AS29256": ["Int pdn ste [Syria]", "SY", "Mix"], + "AS34984": ["Tellcom [Turkiye]", "TR", "ISP"], + "AS20978": ["TT Mobil [Turkiye]", "TR", "Mobile"], + "AS16135": ["Turkcell [Turkiye]", "TR", "Mobile"], + "AS15897": ["Vodafone Turkiye [Turkiye]", "TR", "Mix"], + "AS9121": ["Turk Telekom [Turkiye]", "TR", "Mix"], + "AS8966": ["Etisalat [UAE]", "AE", "Mix"], + "AS5384": ["Emirates Telecommunications [UAE]", "AE", "Mix"], + "AS30873": ["Public Telecommunication [Yemen]", "YE", "Mix"] +} def top_as_list():