From 4d9d55e335aa609084a9f045926fe5b2197dc770 Mon Sep 17 00:00:00 2001 From: minmit Date: Sun, 13 Mar 2016 08:04:47 -0700 Subject: [PATCH] added up_and_down, and per_switch_hop to demo --- pyretic/examples/demo.py | 167 ++++++++++++++++++++++++++++++++++----- 1 file changed, 146 insertions(+), 21 deletions(-) diff --git a/pyretic/examples/demo.py b/pyretic/examples/demo.py index 2ef592fb..253d0a6a 100644 --- a/pyretic/examples/demo.py +++ b/pyretic/examples/demo.py @@ -3,11 +3,14 @@ from datetime import datetime import time from cmd import Cmd - +import copy import threading import itertools TEN_IP = '10.0.0.0/24' +EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \ + match(switch = 3, port = 3) | match(switch = 3, port = 4) +TEN_SUBNET = match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048) def forwarding_policy(): return if_(match(dstip = '10.0.0.1'), @@ -41,8 +44,6 @@ def count_callback(test_num): def actual_callback(counts): print "Traffic statistics for query %s" % test_num print "%d Packets" % counts[0] - print "%d Bytes" % counts[1] - print "-----------------------------------" return actual_callback first_hop_stat = {} @@ -50,20 +51,40 @@ def actual_callback(counts): third_hop_stat = {} stat_dict_list = [first_hop_stat, second_hop_stat, third_hop_stat] stat_lock = threading.Lock() +threshold = 0 +pull_cnt = 0 + + +def print_gb_stats(): + print "First Hop" + print_stat_dict(first_hop_stat) + print "************" + print "Second Hop" + print_stat_dict(second_hop_stat) + print "************" + print "Third Hop" + print_stat_dict(third_hop_stat) + print "************" def aggr_callback(test_num, id_list): def actual_callback(agg, res): + global threshold + global pull_cnt sw_list = tuple([agg[i][0]['switch'] for i in id_list]) stat_dict = stat_dict_list[len(sw_list) - 1] with stat_lock: stat_dict[sw_list] = res + pull_cnt += 1 + if pull_cnt == threshold: + print_gb_stats() + pull_cnt = 0 return actual_callback def print_stat_dict(d): for k in d: if d[k][0] > 0: - print ",".join([str(x) for x in k]), ": ", "%d Packets, %d Bytes" % tuple(d[k]) + print ",".join([str(x) for x in k]), ": ", "%d Packets" % d[k][0] def print_stats(interval): while True: @@ -100,8 +121,50 @@ def pull_func(bucket_list, interval): buc.pull_stats() time.sleep(interval) -EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \ - match(switch = 3, port = 3) | match(switch = 3, port = 4) + +def up_and_down(q): + q1 = q + q2 = copy.copy(q) + cb1 = CountBucket() + q1.set_bucket(cb1) + q1.register_callback(count_callback("Upstream")) + q1.measure_upstream() + + cb2 = CountBucket() + q2.set_bucket(cb2) + q2.register_callback(count_callback("Downstream")) + return [q1 + q2, cb1, cb2] + +def per_switch_hop(pred, cnt): + + fvlist = {'switch': range(1,5)} + + resq = None + + for i in range(1, cnt + 1): + q = in_group(pred, groupby=['switch']) + id_list = [id(q)] + for j in range(i - 1): + gb = in_group(identity, groupby=['switch']) + id_list.append(id(gb)) + q = q ^ gb + cb = CountBucket() + q.set_bucket(cb) + q.register_callback(aggr_callback("Hop %d" % i, id_list)) + + queries = path_grouping.expand_groupby(q, fvlist) + if resq is None: + resq = queries + else: + resq += queries + + + buckets = [p.piped_policy for p in resq.path_policies] + + global threshold + threshold = len(buckets) + return [resq] + buckets + class DemoCLI(Cmd): """ Command-line prompt for demo queries. """ @@ -115,9 +178,18 @@ def __init__(self, dpp, default_cb): assert isinstance(dpp, dynamic_path_policy) self.dpp = dpp self.default_cb = default_cb - self.refq_map = {id(dpp.path_policy): dpp.path_policy} + self.query_cnt = 0 + zid = self.get_new_id() + self.refq_map = {zid: dpp.path_policy} + self.buck_map = {zid : []} + self.active_queries = set([0]) Cmd.__init__(self) + def get_new_id(self): + res = self.query_cnt + self.query_cnt += 1 + return res + def get_qid(self, qidstr): try: qid = int(qidstr) @@ -134,37 +206,89 @@ def emptyline(self): def do_help(self, line): print "The available commands are:" + print "\tinit\tInitialize a new query" + print "\tupstream\tmeasure a query upstream" + print "\tdownstream\tmeasure a query downstream" print "\tadd\tAdd a new query" - print "\trm\tRemove an existing query" print "\tpull\tPull statistics for an existing query" - print "\tshow\tShow installed queries" + print "\trm\tRemove an existing query" + print "\tshow_all\tShow all queries" + print "\tshow_active\tShow active queries" - def do_add(self, qstr): + def do_init(self, qstr): + buckets = [] try: """ Dangerous!!! But this is the quickest way to do this. """ q = eval(qstr) except Exception as e: print "*** Not a well formed expression:", e return - if not isinstance(q, path_policy): + if isinstance(q, list): + if len(q) > 0 and isinstance(q[0], path_policy): + for cb in q[1:]: + if isinstance(cb, CountBucket): + buckets.append(cb) + q = q[0] + else: + print "*** Not a well formed query." + return + + elif not isinstance(q, path_policy): print "*** Not a well formed query." return - cb = CountBucket() - cb.register_callback(self.default_cb(id(q))) - q.set_bucket(cb) - self.refq_map.update({id(q): q}) - self.dpp.path_policy += q - print "Added query: reference %d" % id(q) + + + qid = self.get_new_id() + + if len(buckets) == 0: + cb = CountBucket() + cb.register_callback(self.default_cb(qid)) + q.set_bucket(cb) + buckets = [cb] + + self.refq_map.update({qid: q}) + self.buck_map[qid] = buckets + print "Initialized query: reference %d" % qid + + def do_upstream(self, qidstr): + qid = self.get_qid(qidstr) + if qid: + q = self.refq_map[qid] + q.measure_upstream() + print "Changed measurement of query to upstream: reference %d" % qid + + def do_downstream(self, qidstr): + qid = self.get_qid(qidstr) + if qid: + q = self.refq_map[qid] + q.measure_downstream() + print "Changed measurement of query to downstream: reference %d" % qid - def do_show(self, line): - print "Showing installed queries:" + def do_add(self, qidstr): + qid = self.get_qid(qidstr) + if qid: + q = self.refq_map[qid] + self.active_queries.add(qid) + self.dpp.path_policy += q + print "Added query: reference %d" % qid + + def do_show_all(self, line): + print "Showing queries:" for (qid, q) in self.refq_map.items(): print qid, ':', q + + def do_show_active(self, line): + print "Showing active queries:" + for qid in self.active_queries: + if qid in self.refq_map: + print qid, ":", self.refq_map[qid] def do_rm(self, qidstr): qid = self.get_qid(qidstr) if qid: del self.refq_map[qid] + del self.buck_map[qid] + self.active_queries.discard(qid) queries = self.refq_map.values() if len(queries) > 1: self.dpp.path_policy = path_policy_union(self.refq_map.values()) @@ -176,8 +300,9 @@ def do_rm(self, qidstr): def do_pull(self, qidstr): qid = self.get_qid(qidstr) if qid: - q = self.refq_map[qid] - q.get_policy().pull_stats() + for cb in self.buck_map[qid]: + cb.pull_stats() + time.sleep(3) def do_EOF(self, line): print '\n'