Skip to content
This repository has been archived by the owner on Jun 27, 2018. It is now read-only.

Commit

Permalink
added up_and_down, and per_switch_hop to demo
Browse files Browse the repository at this point in the history
  • Loading branch information
minmit committed Mar 13, 2016
1 parent b4cae7c commit 4d9d55e
Showing 1 changed file with 146 additions and 21 deletions.
167 changes: 146 additions & 21 deletions pyretic/examples/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from datetime import datetime
import time
from cmd import Cmd

import copy
import threading
import itertools

TEN_IP = '10.0.0.0/24'
EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
match(switch = 3, port = 3) | match(switch = 3, port = 4)
TEN_SUBNET = match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)

def forwarding_policy():
return if_(match(dstip = '10.0.0.1'),
Expand Down Expand Up @@ -41,29 +44,47 @@ def count_callback(test_num):
def actual_callback(counts):
print "Traffic statistics for query %s" % test_num
print "%d Packets" % counts[0]
print "%d Bytes" % counts[1]
print "-----------------------------------"
return actual_callback

first_hop_stat = {}
second_hop_stat = {}
third_hop_stat = {}
stat_dict_list = [first_hop_stat, second_hop_stat, third_hop_stat]
stat_lock = threading.Lock()
threshold = 0
pull_cnt = 0


def print_gb_stats():
print "First Hop"
print_stat_dict(first_hop_stat)
print "************"
print "Second Hop"
print_stat_dict(second_hop_stat)
print "************"
print "Third Hop"
print_stat_dict(third_hop_stat)
print "************"

def aggr_callback(test_num, id_list):
def actual_callback(agg, res):
global threshold
global pull_cnt
sw_list = tuple([agg[i][0]['switch'] for i in id_list])
stat_dict = stat_dict_list[len(sw_list) - 1]
with stat_lock:
stat_dict[sw_list] = res
pull_cnt += 1
if pull_cnt == threshold:
print_gb_stats()
pull_cnt = 0
return actual_callback


def print_stat_dict(d):
for k in d:
if d[k][0] > 0:
print ",".join([str(x) for x in k]), ": ", "%d Packets, %d Bytes" % tuple(d[k])
print ",".join([str(x) for x in k]), ": ", "%d Packets" % d[k][0]

def print_stats(interval):
while True:
Expand Down Expand Up @@ -100,8 +121,50 @@ def pull_func(bucket_list, interval):
buc.pull_stats()
time.sleep(interval)

EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
match(switch = 3, port = 3) | match(switch = 3, port = 4)

def up_and_down(q):
q1 = q
q2 = copy.copy(q)
cb1 = CountBucket()
q1.set_bucket(cb1)
q1.register_callback(count_callback("Upstream"))
q1.measure_upstream()

cb2 = CountBucket()
q2.set_bucket(cb2)
q2.register_callback(count_callback("Downstream"))
return [q1 + q2, cb1, cb2]

def per_switch_hop(pred, cnt):

fvlist = {'switch': range(1,5)}

resq = None

for i in range(1, cnt + 1):
q = in_group(pred, groupby=['switch'])
id_list = [id(q)]
for j in range(i - 1):
gb = in_group(identity, groupby=['switch'])
id_list.append(id(gb))
q = q ^ gb
cb = CountBucket()
q.set_bucket(cb)
q.register_callback(aggr_callback("Hop %d" % i, id_list))

queries = path_grouping.expand_groupby(q, fvlist)
if resq is None:
resq = queries
else:
resq += queries


buckets = [p.piped_policy for p in resq.path_policies]

global threshold
threshold = len(buckets)
return [resq] + buckets


class DemoCLI(Cmd):
""" Command-line prompt for demo queries. """
Expand All @@ -115,9 +178,18 @@ def __init__(self, dpp, default_cb):
assert isinstance(dpp, dynamic_path_policy)
self.dpp = dpp
self.default_cb = default_cb
self.refq_map = {id(dpp.path_policy): dpp.path_policy}
self.query_cnt = 0
zid = self.get_new_id()
self.refq_map = {zid: dpp.path_policy}
self.buck_map = {zid : []}
self.active_queries = set([0])
Cmd.__init__(self)

def get_new_id(self):
res = self.query_cnt
self.query_cnt += 1
return res

def get_qid(self, qidstr):
try:
qid = int(qidstr)
Expand All @@ -134,37 +206,89 @@ def emptyline(self):

def do_help(self, line):
print "The available commands are:"
print "\tinit\tInitialize a new query"
print "\tupstream\tmeasure a query upstream"
print "\tdownstream\tmeasure a query downstream"
print "\tadd\tAdd a new query"
print "\trm\tRemove an existing query"
print "\tpull\tPull statistics for an existing query"
print "\tshow\tShow installed queries"
print "\trm\tRemove an existing query"
print "\tshow_all\tShow all queries"
print "\tshow_active\tShow active queries"

def do_add(self, qstr):
def do_init(self, qstr):
buckets = []
try:
""" Dangerous!!! But this is the quickest way to do this. """
q = eval(qstr)
except Exception as e:
print "*** Not a well formed expression:", e
return
if not isinstance(q, path_policy):
if isinstance(q, list):
if len(q) > 0 and isinstance(q[0], path_policy):
for cb in q[1:]:
if isinstance(cb, CountBucket):
buckets.append(cb)
q = q[0]
else:
print "*** Not a well formed query."
return

elif not isinstance(q, path_policy):
print "*** Not a well formed query."
return
cb = CountBucket()
cb.register_callback(self.default_cb(id(q)))
q.set_bucket(cb)
self.refq_map.update({id(q): q})
self.dpp.path_policy += q
print "Added query: reference %d" % id(q)


qid = self.get_new_id()

if len(buckets) == 0:
cb = CountBucket()
cb.register_callback(self.default_cb(qid))
q.set_bucket(cb)
buckets = [cb]

self.refq_map.update({qid: q})
self.buck_map[qid] = buckets
print "Initialized query: reference %d" % qid

def do_upstream(self, qidstr):
qid = self.get_qid(qidstr)
if qid:
q = self.refq_map[qid]
q.measure_upstream()
print "Changed measurement of query to upstream: reference %d" % qid

def do_downstream(self, qidstr):
qid = self.get_qid(qidstr)
if qid:
q = self.refq_map[qid]
q.measure_downstream()
print "Changed measurement of query to downstream: reference %d" % qid

def do_show(self, line):
print "Showing installed queries:"
def do_add(self, qidstr):
qid = self.get_qid(qidstr)
if qid:
q = self.refq_map[qid]
self.active_queries.add(qid)
self.dpp.path_policy += q
print "Added query: reference %d" % qid

def do_show_all(self, line):
print "Showing queries:"
for (qid, q) in self.refq_map.items():
print qid, ':', q

def do_show_active(self, line):
print "Showing active queries:"
for qid in self.active_queries:
if qid in self.refq_map:
print qid, ":", self.refq_map[qid]

def do_rm(self, qidstr):
qid = self.get_qid(qidstr)
if qid:
del self.refq_map[qid]
del self.buck_map[qid]
self.active_queries.discard(qid)
queries = self.refq_map.values()
if len(queries) > 1:
self.dpp.path_policy = path_policy_union(self.refq_map.values())
Expand All @@ -176,8 +300,9 @@ def do_rm(self, qidstr):
def do_pull(self, qidstr):
qid = self.get_qid(qidstr)
if qid:
q = self.refq_map[qid]
q.get_policy().pull_stats()
for cb in self.buck_map[qid]:
cb.pull_stats()
time.sleep(3)

def do_EOF(self, line):
print '\n'
Expand Down

0 comments on commit 4d9d55e

Please sign in to comment.