Skip to content
This repository has been archived by the owner on Jun 27, 2018. It is now read-only.

Commit

Permalink
query 2 added to demo
Browse files Browse the repository at this point in the history
  • Loading branch information
minmit committed Mar 5, 2016
1 parent 9f2c99f commit 262819d
Showing 1 changed file with 94 additions and 106 deletions.
200 changes: 94 additions & 106 deletions pyretic/examples/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime

import threading
import itertools

TEN_IP = '10.0.0.0/24'

Expand Down Expand Up @@ -36,22 +37,47 @@ def forwarding_policy():

def count_callback(test_num):
def actual_callback(counts):
print "***", str(datetime.now()), "| In user callback for bucket",
print test_num
print "Bucket", test_num, "(packet, byte) counts:", counts
print "Traffic statistics for query %s" % test_num
print "%d Packets" % counts[0]
print "%d Bytes" % counts[1]
print "-----------------------------------"
return actual_callback

def agg_callback(test_num):
first_hop_stat = {}
second_hop_stat = {}
third_hop_stat = {}
stat_dict_list = [first_hop_stat, second_hop_stat, third_hop_stat]
stat_lock = threading.Lock()

def aggr_callback(test_num, id_list):
def actual_callback(agg, res):
print '**************'
print datetime.now()
print 'Test', test_num, ' -- got a callback from installed path query!'
print res
print 'from aggregate', agg
print '**************'
sw_list = tuple([agg[i][0]['switch'] for i in id_list])
stat_dict = stat_dict_list[len(sw_list) - 1]
with stat_lock:
stat_dict[sw_list] = res
return actual_callback


def print_stat_dict(d):
for k in d:
if d[k][0] > 0:
print ",".join([str(x) for x in k]), ": ", "%d Packets, %d Bytes" % tuple(d[k])

def print_stats(interval):
while True:
with stat_lock:
print "First Hop"
print_stat_dict(first_hop_stat)
print "************"
print "Second Hop"
print_stat_dict(second_hop_stat)
print "************"
print "Third Hop"
print_stat_dict(third_hop_stat)
print "************"

time.sleep(interval)

def fw_callback(test_num):
def actual_callback(res):
print '**************'
Expand All @@ -63,118 +89,80 @@ def actual_callback(res):

def query_func(bucket, interval):
while True:
output = str(datetime.now())
output += " Pulling stats for bucket " + repr(bucket)
# output += bucket.get_matches()
print output
bucket.pull_stats()
time.sleep(interval)

def query_callback(test_num):
only_count_results = True

def actual_callback(pkt):
ac = actual_callback

def touch_vars():
""" Initialize function-specific counters, if uninitialized. """
try:
val = ac.pkt_count
val = ac.byte_count
val = ac.predwise_pkt_count
val = ac.predwise_byte_count
except AttributeError:
ac.pkt_count = 0
ac.byte_count = 0
ac.predwise_pkt_count = {}
ac.predwise_byte_count = {}

def get_count_key(pkt):
predwise_count_key = ['ethtype', 'srcip', 'dstip', 'switch', 'port']
return util.frozendict({k: pkt[k] for k in predwise_count_key})

def update_predwise_counts(pkt):
curr_key = get_count_key(pkt)
curr_pkt_count = ac.predwise_pkt_count.get(curr_key, 0)
ac.predwise_pkt_count[curr_key] = curr_pkt_count + 1
curr_byte_count = ac.predwise_byte_count.get(curr_key, 0)
ac.predwise_byte_count[curr_key] = (curr_byte_count +
pkt['payload_len'])

def get_key_str(pred):
try:
out = "int:%s,ethtype:%s,srcip:%s,dstip:%s" % (
"s%d-eth%d" % (pred['switch'], pred['port']),
"ip" if pred['ethtype']==2048 else "arp",
str(pred['srcip']), str(pred['dstip']))
except KeyError:
raise RuntimeError("Missing keys from count predicate!")
return out

def print_predwise_entries():
pkt_counts = ac.predwise_pkt_count
byte_counts = ac.predwise_byte_count
for pred in pkt_counts.keys():
assert pred in byte_counts.keys()
print "Bucket %s %s counts: [%d, %d]" % (
str(test_num),
get_key_str(pred),
pkt_counts[pred],
byte_counts[pred])

def print_total_entries():
print "Bucket %s total counts: [%d, %d]" % (
str(test_num),
ac.pkt_count,
ac.byte_count)

print '**************'
print datetime.now()
print 'Test', test_num, ' -- got a callback from installed path query!'
if only_count_results:
if isinstance(pkt, pyretic.core.packet.Packet):
touch_vars()
ac.pkt_count += 1
ac.byte_count += pkt['payload_len']
update_predwise_counts(pkt)
# print_predwise_entries()
print_total_entries()
else:
print "Bucket %s (packet, byte) counts: %s" % (
str(test_num), pkt)
else:
print pkt
print '**************'
return actual_callback
def pull_func(bucket_list, interval):
while True:
for buc in bucket_list:
buc.pull_stats()
time.sleep(interval)

EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
match(switch = 3, port = 3) | match(switch = 3, port = 4)

def query1():
edge_network = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
match(switch = 3, port = 3) | match(switch = 3, port = 4)
tenant_pred = edge_network & match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)
q = in_atom(tenant_pred) ** out_atom(edge_network)
tenant_pred = EDGE & match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)
q = in_atom(tenant_pred) ** out_atom(EDGE)
q |= in_out_atom(tenant_pred, EDGE)
cb = CountBucket()
q.set_bucket(cb)
# q.register_callback(query_callback(1))
q.register_callback(count_callback(1))
q.register_callback(count_callback("1"))
query_thread = threading.Thread(target=query_func, args=(cb,5))
query_thread.daemon = True
query_thread.start()
'''
fb = FwdBucket()
q.set_bucket(fb)
q.register_callback(fw_callback(1))
'''
return q

def query2():
fvlist = {'switch': range(1,5)}
tenant_pred = EDGE & match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)

# first hop
q1 = in_group(tenant_pred, groupby=['switch'])
cb1 = CountBucket()
q1.set_bucket(cb1)
q1.register_callback(aggr_callback("2 - first hop", [id(q1)]))
res_q = path_grouping.expand_groupby(q1, fvlist)

# second hop
gb1 = in_group(tenant_pred, groupby=['switch'])
gb2 = in_group(identity, groupby=['switch'])
q2 = gb1 ^ gb2
cb2 = CountBucket()
q2.set_bucket(cb2)
q2.register_callback(aggr_callback("2 - second hop", [id(gb1), id(gb2)]))
res_q += path_grouping.expand_groupby(q2, fvlist)

# third hop
gb1 = in_group(tenant_pred, groupby=['switch'])
gb2 = in_group(identity, groupby=['switch'])
gb3 = in_group(identity, groupby=['switch'])
q3 = gb1 ^ gb2 ^ gb3
cb3 = CountBucket()
q3.set_bucket(cb3)
q3.register_callback(aggr_callback("2 - third hop", [id(gb1), id(gb2), id(gb3)]))
res_q += path_grouping.expand_groupby(q3, fvlist)

buckets = [p.piped_policy for p in res_q.path_policies]
# start pulling thread
query_thread = threading.Thread(target=pull_func, args=(buckets,5))
query_thread.daemon = True
query_thread.start()

print_thread = threading.Thread(target=print_stats, args=(10,))
print_thread.daemon = True
print_thread.start()

return res_q

def query_test():
fvlist = {'switch': range(1,5)}
edge_network = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
EDGE = match(switch = 1, port = 3) | match(switch = 1, port = 4) | \
match(switch = 3, port = 3) | match(switch = 3, port = 4)
tenant_pred = edge_network & match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)
q = in_group(tenant_pred, groupby=['switch']) ** out_group(edge_network, groupby=['switch'])
tenant_pred = EDGE & match(srcip = TEN_IP, dstip = TEN_IP, ethtype=2048)
q = in_group(tenant_pred, groupby=['switch']) ** out_group(EDGE, groupby=['switch'])
fb = CountBucket()
fb.register_callback(agg_callback("tm_groupby"))
fb.register_callback(aggr_callback("tm_groupby"))
q.set_bucket(fb)
res_paths = path_grouping.expand_groupby(q, fvlist)
buckets = [p.piped_policy for p in res_paths.path_policies]
Expand All @@ -183,7 +171,7 @@ def query_test():


def path_main(**kwargs):
return query1()
return query2()

def main():
return forwarding_policy()

0 comments on commit 262819d

Please sign in to comment.