From dafdfd47ba2226a988c48cfb26ae71f7d45c4a3c Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 30 Jan 2024 13:22:39 +0000 Subject: [PATCH 1/7] reckon ammendments --- reckon/__main__.py | 2 +- reckon/client_runner.py | 2 +- reckon/reckon_types.py | 8 +- reckon/systems/ocons/__init__.py | 2 +- reckon/systems/ocons/ocons-src | 2 +- scripts/conspire.py | 115 ++++++++++ scripts/reckon_paper.py | 93 ++++++++ scripts/tester.py | 373 +------------------------------ scripts/util.py | 108 +++++++++ 9 files changed, 331 insertions(+), 374 deletions(-) create mode 100644 scripts/conspire.py create mode 100644 scripts/reckon_paper.py create mode 100644 scripts/util.py diff --git a/reckon/__main__.py b/reckon/__main__.py index 80da612..c5d1d6c 100644 --- a/reckon/__main__.py +++ b/reckon/__main__.py @@ -81,7 +81,7 @@ failures, )) p.start() - p.join(max(args.duration * 10, 150)) + p.join(max(args.duration * 10, 600)) p.terminate() finally: for stopper in stoppers.values(): diff --git a/reckon/client_runner.py b/reckon/client_runner.py index 01b12ac..e2d28e0 100644 --- a/reckon/client_runner.py +++ b/reckon/client_runner.py @@ -27,7 +27,7 @@ def preload(ops_provider: t.AbstractWorkload, duration: float) -> int: break total_reqs += 1 - client.send(t.preload(prereq=False, operation=op)) + client.send(t.preload(prereq=False, operation=op), flush=False) pbar.update(op.time - sim_t) sim_t = op.time diff --git a/reckon/reckon_types.py b/reckon/reckon_types.py index e6116c8..002bd3c 100644 --- a/reckon/reckon_types.py +++ b/reckon/reckon_types.py @@ -86,7 +86,6 @@ def __init__(self, p_in: IO[bytes], p_out: IO[bytes], id: str): def _send_packet(self, payload: str): size = pack(" str: size = self.stdout.read(4) @@ -98,9 +97,14 @@ def _recv_packet(self) -> str: logging.error(f"Tried to recv from |{self.id}|, received nothing") raise EOFError - def send(self, msg: Message): + def flush(self): + self.stdin.flush() + + def send(self, msg: Message, flush=True): payload = msg.json() self._send_packet(payload) + if flush: + self.flush() def recv(self) -> Message: pkt = self._recv_packet() diff --git a/reckon/systems/ocons/__init__.py b/reckon/systems/ocons/__init__.py index 9678ac8..6e63c53 100755 --- a/reckon/systems/ocons/__init__.py +++ b/reckon/systems/ocons/__init__.py @@ -260,7 +260,7 @@ def get_leader(self, cluster): class OConsConspireLeaderDC(OConsConspireLeader): system_kind = "conspire-leader-dc" - tick_period = 0.001 + tick_period = 0.01 def start_cmd(self, tag, nid, cluster): cmd = " ".join([ diff --git a/reckon/systems/ocons/ocons-src b/reckon/systems/ocons/ocons-src index 80c5488..2152efd 160000 --- a/reckon/systems/ocons/ocons-src +++ b/reckon/systems/ocons/ocons-src @@ -1 +1 @@ -Subproject commit 80c54887ef4bcebee4c8f73f03c0f964881a3fee +Subproject commit 2152efd051e9c713834d07bbcd60f391cb4dc032 diff --git a/scripts/conspire.py b/scripts/conspire.py new file mode 100644 index 0000000..869b4d6 --- /dev/null +++ b/scripts/conspire.py @@ -0,0 +1,115 @@ +from util import run_test +import itertools as it + +def tests(folder_path): + actions = [] + + #systems = ['ocons-raft', 'ocons-conspire-leader', 'ocons-conspire-dc', 'ocons-conspire-leader-dc'] + #systems = ['ocons-conspire-leader-dc'] + systems = ['ocons-raft', 'ocons-conspire-leader', 'ocons-conspire-leader-dc'] + + fd_timeouts = [0.01, 0.03, 0.06, 0.11, 0.21, 0.41] + + low_repeat = 5 + high_repeat = 50 + + def low_jitter_topo(system): + return { + 'topo':'wan', + 'nn': 3 if system in ['ocons-paxos', 'ocons-raft'] else 4, + 'delay':50, + 'system':system, + 'client':'ocaml', + 'tag':'low_jitter', + } + def high_jitter_topo(system): + return low_jitter_topo(system) | { + 'jitter':0.1, + 'loss':0.01, + 'tag':'high_jitter', + } + + # steady state erroneous election cost + for (system, topo_gen, fd_timeout, repeat) in it.product( + #systems, + ['ocons-conspire-leader-dc'], + [low_jitter_topo, high_jitter_topo], + fd_timeouts, + range(12), + ): + actions.append( + lambda params = topo_gen(system) | { + 'system':system, + 'rate': 10000, + 'failure_timeout':fd_timeout, + 'delay_interval': 0.1, + 'repeat':repeat, + 'failure':'none', + 'duration':10, + }: + run_test(folder_path, params) + ) + + # leader failure singles + for (system, repeat) in it.product( + systems, + range(low_repeat), + ): + actions.append( + lambda params = low_jitter_topo(system) | { + 'system':system, + 'rate': 10000, + 'failure_timeout':0.5, + 'delay_interval': 0.1, + 'repeat':repeat, + 'failure':'leader', + 'tcpdump':True, + 'duration':10, + }: + run_test(folder_path, params) + ) + + # leader failure bulk + for (system, topo_gen, fd_timeout, repeat) in it.product( + #systems, + ['ocons-conspire-leader'], + [low_jitter_topo, high_jitter_topo], + #fd_timeouts, + [0.01, 0.03], + range(high_repeat), + ): + actions.append( + lambda params = topo_gen(system) | { + 'system':system, + 'rate': 10000, + 'failure_timeout':fd_timeout, + 'delay_interval': 0.1, + 'repeat':repeat, + 'failure':'leader', + 'tcpdump':False, + 'duration':10, + }: + run_test(folder_path, params) + ) + + # Bulk performance graphs + for (system, rate, repeat) in it.product( + systems, + [20000, 40000, 60000, 80000, 100000, 120000, 140000], + range(low_repeat), + ): + actions.append( + lambda params = low_jitter_topo(system) | { + 'system':system, + 'rate': rate, + 'failure_timeout':0.5, + 'delay_interval': 0.1, + 'repeat':repeat, + 'failure':'none', + 'tcpdump':False, + 'duration':10, + }: + run_test(folder_path, params) + ) + + return actions diff --git a/scripts/reckon_paper.py b/scripts/reckon_paper.py new file mode 100644 index 0000000..d5b2169 --- /dev/null +++ b/scripts/reckon_paper.py @@ -0,0 +1,93 @@ +from util import run_test +import itertools as it + +def tests(folder_path): + params = [] + + reference_systems = ['ocons-paxos', 'ocons-raft', 'ocons-raft-pre-vote', 'ocons-raft+sbn', 'ocons-raft-pre-vote+sbn'] + industry_systems = ['etcd', 'zookeeper', 'zookeeper-fle', 'etcd-pre-vote', 'etcd+sbn', 'etcd-pre-vote+sbn'] + + fd_timeouts = [0.01, 0.03, 0.06, 0.11, 0.21, 0.41, 0.81] + + low_repeat = range(3) + high_repeat = range(25) + + def client_map(system): + if 'ocons' in system: + return 'ocaml' + if 'etcd' in system: + return 'go' + if 'zookeeper' in system: + return 'java' + + def low_jitter_topo(system): + return { + 'topo':'wan', + 'nn': 5, + 'delay':50, + 'system':system, + 'client': client_map(system), + 'tag':'low_jitter', + } + + def high_jitter_topo(system): + return low_jitter_topo(system) | { + 'jitter':0.1, + 'loss':0.01, + 'tag':'high_jitter', + } + + ## typical failure graphs + #for system, repeat in it.product( + # reference_systems + industry_systems, + # low_repeat + # ): + # params.append( + # low_jitter_topo(system) | { + # 'rate': 10000, + # 'repeat': repeat, + # 'duration': 10, + # 'failure':'leader', + # 'failure_timeout': 0.5, + # 'tcpdump': True, + # 'duration': 10, + # } + # ) + + # TTR varying rate + for rate, system, repeat in it.product( + [1000, 4000, 16000], + reference_systems + ['etcd', 'zookeeper', 'zookeeper-fle'], + high_repeat, + ): + params.append( + low_jitter_topo(system) | { + 'rate': rate, + 'repeat': repeat, + 'failure': 'leader', + 'failure_timeout': 0.5, + 'duration': 10, + } + ) + + # TTR FD_timeout + for fd_timeout, system, topo, repeat in it.product( + fd_timeouts, + reference_systems + industry_systems, + [low_jitter_topo , high_jitter_topo], + high_repeat, + ): + params.append( + topo(system) | { + 'rate': 10000, + 'repeat': repeat, + 'failure': 'leader', + 'failure_timeout': fd_timeout, + 'duration': 10, + } + ) + + for p in params: + print(p) + + return [lambda p=p: run_test(folder_path, p) for p in params] diff --git a/scripts/tester.py b/scripts/tester.py index 6ac3ae7..7f98a24 100644 --- a/scripts/tester.py +++ b/scripts/tester.py @@ -1,3 +1,4 @@ + from subprocess import call, Popen, run import shlex import itertools as it @@ -9,101 +10,6 @@ from typing import Dict, Any, AnyStr import math - -def call_tcp_dump(tcpdump_path, cmd): - tcp_dump_cmd = [ - "tcpdump", - "-i", - "any", - "-w", - tcpdump_path, - "net", - "10.0.0.0/16", - "-n", - ] - print(tcp_dump_cmd) - p = Popen(tcp_dump_cmd) - call(cmd) - p.terminate() - -default_parameters = { - 'system':'etcd', - 'client':'go', - 'topo':'simple', - 'failure':'none', - 'nn':3, - 'nc':1, - 'delay':20, - 'loss':0, - 'jitter':0, - 'ncpr':'False', - 'mtbf':1, - 'kill_n':0, - 'write_ratio':1, - 'rate':1000, - 'duration':60, - 'tag':'tag', - 'tcpdump':False, - 'arrival_process':'uniform', - 'repeat':-1, - 'failure_timeout':1, - 'delay_interval':0.1, - 'notes':{}, - } - -def run_test(folder_path, config : Dict[str, Any]): - run('rm -rf /data/*', shell=True).check_returncode() - run('mn -c', shell=True).check_returncode() - run('pkill client', shell=True) - - uid = uuid.uuid4() - - # Set params - params = default_parameters.copy() - for k,v in config.items(): - params[k] = v - del config - - print(params) - - assert (params['repeat'] != -1) - - result_folder = f"{folder_path}/{uid}/" - log_path = result_folder + f"logs" - config_path = result_folder + f"config.json" - result_path = result_folder + f"res.json" - tcpdump_path= result_folder + f"tcpdump.pcap" - - cmd = " ".join([ - f"python -m reckon {params['system']} {params['topo']} {params['failure']}", - f"--number-nodes {params['nn']} --number-clients {params['nc']} --client {params['client']}", - f"--link-latency {params['delay']} --link-loss {params['loss']} --link-jitter {params['jitter']}", - f"--new_client_per_request {params['ncpr']}", - f"--mtbf {params['mtbf']} --kill-n {params['kill_n']}", - f"--write-ratio {params['write_ratio']}", - f"--rate {params['rate']} --duration {params['duration']}", - f"--arrival-process {params['arrival_process']}", - f"--system_logs {log_path} --result-location {result_path} --data-dir=/data", - f"--failure_timeout {params['failure_timeout']}", - f"--delay_interval {params['delay_interval']}", - ]) - - run(f'mkdir -p {result_folder}', shell=True).check_returncode() - run(f'mkdir -p {log_path}', shell=True).check_returncode() - - with open(config_path, "w") as of: - json.dump(params, of) - - print(f"RUNNING TEST") - print(cmd) - - cmd = shlex.split(cmd) - - if params['tcpdump']: - call_tcp_dump(tcpdump_path, cmd) - else: - call(cmd) - from numpy.random import default_rng rng = default_rng() @@ -112,280 +18,11 @@ def run_test(folder_path, config : Dict[str, Any]): actions = [] -#low_repeat = 3 -#high_repeat = 50 -# -#nn_map = { -# 'ocons-conspire-dc': 4, -# 'ocons-conspire-mp': 4, -# 'ocons-paxos': 3, -# 'ocons-raft': 3, -# 'etcd': 3 -# } -#timeout_map = { -# 'ocons-conspire-dc': 0.1, -# 'ocons-conspire-mp': 0.5, -# 'ocons-paxos': 0.5, -# } -# -## rate-lat systems -#for system, rate, repeat in it.product( -# [('ocons-conspire-dc', 'ocaml'), ('ocons-conspire-mp', 'ocaml'), ('ocons-paxos', 'ocaml')], -# [10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000], -# range(low_repeat) -# ): -# system, client = system -# nn = nn_map[system] -# timeout = timeout_map[system] -# -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':nn, -# 'delay':50, -# 'system':system, -# 'client':client, -# 'rate':rate, -# 'repeat':repeat, -# 'failure_timeout': timeout, -# 'delay_interval': timeout, -# 'duration':10, -# 'tag':'ss-rate-lat', -# }: -# run_test(folder_path, params)) - -## latency results steady state -#for system, repeat in it.product( -# [('ocons-conspire-dc', 0.04), ('ocons-conspire-dc', 0.05), ('ocons-conspire-dc', 0.06), -# ('ocons-conspire-mp', 0.5), -# ('ocons-paxos', 0.5), -# ], -# range(low_repeat), -# ): -# system, timeout = system -# nn = nn_map[system] -# -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':nn, -# 'delay':50, -# 'system':system, -# 'client':'ocaml', -# 'rate':10000, -# 'repeat':repeat, -# 'failure_timeout': timeout, -# 'delay_interval': timeout, -# 'duration':10, -# 'tag':'ss-latency', -# }: -# run_test(folder_path, params)) - -## leader failure heatmap -#for system, repeat in it.product( -# ['ocons-conspire-dc', 'ocons-conspire-mp', 'ocons-paxos'], -# range(low_repeat), -# ): -# nn = nn_map[system] -# timeout = timeout_map[system] -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':nn, -# 'delay':50, -# 'system':system, -# 'client':'ocaml', -# 'rate':5000, -# 'repeat':repeat, -# 'failure_timeout': timeout, -# 'delay_interval': timeout, -# 'duration':10, -# 'failure':'leader', -# 'tcpdump':True, -# 'tag':'heatmap', -# }: -# run_test(folder_path, params)) - -## conspire-dc jitter latency -#for jitter, timeout, repeat in it.product( -# [0, 0.25, 0.5], -# [0.06, 0.11, 0.16, 0.21, 0.26, 0.31], -# range(low_repeat), -# ): -# nn = 4 -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':nn, -# 'delay':50, -# 'jitter':jitter, -# 'system':'ocons-conspire-dc', -# 'client':'ocaml', -# 'rate':10000, -# 'repeat':repeat, -# 'failure_timeout': timeout, -# 'delay_interval': timeout, -# 'duration':30, -# 'failure':'none', -# 'tag':'latency-dc-jitter', -# }: -# run_test(folder_path, params)) -# -# -## jitter failure aggregate -#for system, timeout, jitter, repeat in it.product( -# ['ocons-conspire-mp', 'ocons-paxos'], -# [0.06, 0.11, 0.16, 0.21, 0.26, 0.31, 0.36, 0.41], -# [0, 0.25, 0.5], -# range(high_repeat), -# ): -# nn = nn_map[system] -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':nn, -# 'delay':50, -# 'jitter':jitter, -# 'system':system, -# 'client':'ocaml', -# 'rate':5000, -# 'repeat':repeat, -# 'failure_timeout': timeout, -# 'delay_interval': timeout, -# 'duration':10, -# 'failure':'leader', -# 'tag':'jitter-aggregate-failure', -# }: -# run_test(folder_path, params)) - -#for system, repeat in it.product( -# ['ocons-conspire-leader-dc'], -# range(20)): -# actions.append( -# lambda params = { -# 'topo':'wan', -# 'nn':4, -# 'delay':50, -# 'system':system, -# 'client':'ocaml', -# 'rate':5000, -# 'repeat':repeat, -# 'failure_timeout':0.5, -# 'delay_interval':0.1, -# 'duration':10, -# 'failure':'leader', -# 'tcpdump': True, -# }: -# run_test(folder_path, params)) - -def conspire_paper(): - #systems = ['ocons-raft', 'ocons-conspire-leader', 'ocons-conspire-dc', 'ocons-conspire-leader-dc'] - #systems = ['ocons-conspire-leader-dc'] - systems = ['ocons-raft', 'ocons-conspire-leader'] - fd_timeouts = [0.01, 0.03]#, 0.06, 0.11, 0.21, 0.41, 0.81] - - low_repeat = 5 - high_repeat = 50 - - def low_jitter_topo(system): - return { - 'topo':'wan', - 'nn': 3 if system in ['ocons-paxos', 'ocons-raft'] else 4, - 'delay':50, - 'system':system, - 'client':'ocaml', - } - def high_jitter_topo(system): - return low_jitter_topo(system) | { - 'jitter':0.1, - 'loss':0.01, - } - - # steady state erroneous election cost - for (system, topo_gen, fd_timeout, repeat) in it.product( - systems, - [low_jitter_topo, high_jitter_topo], - fd_timeouts, - range(low_repeat), - ): - if system in ['ocons-paxos', 'ocons-raft'] and fd_timeout < 0.06: - continue - actions.append( - lambda params = topo_gen(system) | { - 'system':system, - 'rate': 10000, - 'failure_timeout':fd_timeout, - 'delay_interval': 0.1, - 'repeat':repeat, - 'failure':'none', - 'duration':60, - }: - run_test(folder_path, params) - ) - - ## leader failure singles - #for (system, repeat) in it.product( - # systems, - # range(low_repeat), - # ): - # actions.append( - # lambda params = low_jitter_topo(system) | { - # 'system':system, - # 'rate': 10000, - # 'failure_timeout':0.5, - # 'delay_interval': 0.1, - # 'repeat':repeat, - # 'failure':'leader', - # 'tcpdump':True, - # 'duration':10, - # }: - # run_test(folder_path, params) - # ) - - # leader failure bulk - for (system, topo_gen, fd_timeout, repeat) in it.product( - systems, - [low_jitter_topo, high_jitter_topo], - fd_timeouts, - range(high_repeat), - ): - if system in ['ocons-paxos', 'ocons-raft'] and fd_timeout < 0.06: - continue - actions.append( - lambda params = topo_gen(system) | { - 'system':system, - 'rate': 10000, - 'failure_timeout':fd_timeout, - 'delay_interval': 0.1, - 'repeat':repeat, - 'failure':'leader', - 'tcpdump':False, - 'duration':10, - }: - run_test(folder_path, params) - ) - - ## Bulk performance graphs - #for (system, rate, repeat) in it.product( - # systems, - # [20000, 40000, 60000, 80000, 100000, 120000], - # range(low_repeat), - # ): - # actions.append( - # lambda params = low_jitter_topo(system) | { - # 'system':system, - # 'rate': rate, - # 'failure_timeout':0.5, - # 'delay_interval': 0.1, - # 'repeat':repeat, - # 'failure':'leader', - # 'tcpdump':False, - # 'duration':10, - # }: - # run_test(folder_path, params) - # ) +#import conspire +#actions += conspire.tests(folder_path) -conspire_paper() +import reckon_paper +actions += reckon_paper.tests(folder_path) # Shuffle to isolate ordering effects rng.shuffle(actions) diff --git a/scripts/util.py b/scripts/util.py new file mode 100644 index 0000000..741a0b3 --- /dev/null +++ b/scripts/util.py @@ -0,0 +1,108 @@ +from subprocess import call, Popen, run +import shlex +import itertools as it +import uuid +from datetime import datetime +import json +import os + +from typing import Dict, Any, AnyStr + +import math + +def call_tcp_dump(tcpdump_path, cmd): + tcp_dump_cmd = [ + "tcpdump", + "-i", + "any", + "-w", + tcpdump_path, + "net", + "10.0.0.0/16", + "-n", + ] + print(tcp_dump_cmd) + p = Popen(tcp_dump_cmd) + call(cmd) + p.terminate() + +default_parameters = { + 'system':'etcd', + 'client':'go', + 'topo':'simple', + 'failure':'none', + 'nn':3, + 'nc':1, + 'delay':20, + 'loss':0, + 'jitter':0, + 'ncpr':'False', + 'mtbf':1, + 'kill_n':0, + 'write_ratio':1, + 'rate':1000, + 'duration':60, + 'tag':'tag', + 'tcpdump':False, + 'arrival_process':'uniform', + 'repeat':-1, + 'failure_timeout':1, + 'delay_interval':0.1, + 'notes':{}, + } + +def run_test(folder_path, config : Dict[str, Any]): + run('rm -rf /data/*', shell=True).check_returncode() + run('mn -c', shell=True).check_returncode() + run('pkill client', shell=True) + + uid = uuid.uuid4() + + for k,_ in config.items(): + assert(k in default_parameters.keys()) + + # Set params + params = default_parameters.copy() + for k,v in config.items(): + params[k] = v + del config + + print(params) + + assert (params['repeat'] != -1) + + result_folder = f"{folder_path}/{uid}/" + log_path = result_folder + f"logs" + config_path = result_folder + f"config.json" + result_path = result_folder + f"res.json" + tcpdump_path= result_folder + f"tcpdump.pcap" + + cmd = " ".join([ + f"python -m reckon {params['system']} {params['topo']} {params['failure']}", + f"--number-nodes {params['nn']} --number-clients {params['nc']} --client {params['client']}", + f"--link-latency {params['delay']} --link-loss {params['loss']} --link-jitter {params['jitter']}", + f"--new_client_per_request {params['ncpr']}", + f"--mtbf {params['mtbf']} --kill-n {params['kill_n']}", + f"--write-ratio {params['write_ratio']}", + f"--rate {params['rate']} --duration {params['duration']}", + f"--arrival-process {params['arrival_process']}", + f"--system_logs {log_path} --result-location {result_path} --data-dir=/data", + f"--failure_timeout {params['failure_timeout']}", + f"--delay_interval {params['delay_interval']}", + ]) + + run(f'mkdir -p {result_folder}', shell=True).check_returncode() + run(f'mkdir -p {log_path}', shell=True).check_returncode() + + with open(config_path, "w") as of: + json.dump(params, of) + + print(f"RUNNING TEST") + print(cmd) + + cmd = shlex.split(cmd) + + if params['tcpdump']: + call_tcp_dump(tcpdump_path, cmd) + else: + call(cmd) From f365ba8b4dd64512e5758b3e84ef54cf3ee55987 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 30 Jan 2024 15:13:05 +0000 Subject: [PATCH 2/7] Remove prints. clean up impl --- reckon/__main__.py | 9 ++------- reckon/client_runner.py | 6 +++--- scripts/tester.py | 13 ++----------- 3 files changed, 7 insertions(+), 21 deletions(-) diff --git a/reckon/__main__.py b/reckon/__main__.py index c5d1d6c..30de498 100644 --- a/reckon/__main__.py +++ b/reckon/__main__.py @@ -69,9 +69,7 @@ print("BENCHMARK: Starting Test") - from multiprocessing import Process - - p = Process(target = run_test, args=( + run_test( args.result_location, clients, ops_provider, @@ -79,10 +77,7 @@ system, cluster, failures, - )) - p.start() - p.join(max(args.duration * 10, 600)) - p.terminate() + ) finally: for stopper in stoppers.values(): stopper() diff --git a/reckon/client_runner.py b/reckon/client_runner.py index e2d28e0..fa449e8 100644 --- a/reckon/client_runner.py +++ b/reckon/client_runner.py @@ -1,5 +1,5 @@ import logging -from threading import Thread +import threading import time import sys import selectors @@ -133,10 +133,10 @@ def test_steps( assert(len(failures) >= 2) workload.clients = clients - total_reqs = preload(workload, duration) + total_reqs = terminate_if_stalled(lambda stall_check: preload(stall_check, workload, duration)) ready(clients) - t_execute = Thread( + t_execute = threading.Thread( target=execute, args=[clients, failures, duration], ) diff --git a/scripts/tester.py b/scripts/tester.py index 7f98a24..6bdb90e 100644 --- a/scripts/tester.py +++ b/scripts/tester.py @@ -1,18 +1,9 @@ - -from subprocess import call, Popen, run -import shlex -import itertools as it -import uuid from datetime import datetime -import json -import os - -from typing import Dict, Any, AnyStr - -import math from numpy.random import default_rng rng = default_rng() +import util + run_time = datetime.now().strftime("%Y%m%d%H%M%S") folder_path = f"/results/{run_time}" From 62f5243f5d0894f0d72caa2a46c6419765e6f9fc Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 30 Jan 2024 15:42:37 +0000 Subject: [PATCH 3/7] Add staller and improve reporting --- reckon/client_runner.py | 65 +++++++++++++++++++++++++++++++++++++++-- reckon/reckon_types.py | 8 ++--- scripts/tester.py | 3 +- 3 files changed, 67 insertions(+), 9 deletions(-) diff --git a/reckon/client_runner.py b/reckon/client_runner.py index fa449e8..fe70cf9 100644 --- a/reckon/client_runner.py +++ b/reckon/client_runner.py @@ -12,8 +12,67 @@ from tqdm import tqdm +class MBox: + def __init__(self, value=None): + self.value = value + self.mtx = threading.Lock() -def preload(ops_provider: t.AbstractWorkload, duration: float) -> int: + def __get__(self, instance, owner): + with self.mtx: + return self.value + + def __set__(self, instance, value): + with self.mtx: + self.value = value + +class StallCheck: + def __init__(self, init): + self._value = init + self.prev_read = self._value + + self.mtx = threading.Lock() + + def incr(self, delta = 1): + with self.mtx: + self._value += delta + + def check_if_stalled(self) -> bool: + with self.mtx: + current_val = self._value + prev_val = self.prev_read + + self.prev_read = current_val + + return current_val == prev_val + +class StalledException(Exception): + pass + +def terminate_if_stalled(task, timeout=30): + stall_check = StallCheck(0) + result = MBox() + def wrapped_task(sc=stall_check, result=result): + result.value = task(sc) + + thread = threading.Thread(target=wrapped_task, daemon=True) + thread.start() + + ticker = 0 + + while thread.is_alive(): + time.sleep(1.) + if not stall_check.check_if_stalled(): + ticker = 0 + continue + + ticker += 1 + if ticker > timeout: + raise StalledException + + thread.join(10) + return result.value + +def preload(stall_checker: StallCheck, ops_provider: t.AbstractWorkload, duration: float) -> int: logging.debug("PRELOAD: begin") for op, client in zip(ops_provider.prerequisites, it.cycle(ops_provider.clients)): @@ -23,16 +82,18 @@ def preload(ops_provider: t.AbstractWorkload, duration: float) -> int: total_reqs = 0 with tqdm(total=duration) as pbar: for client, op in ops_provider.workload: + stall_checker.incr() if op.time >= duration: break total_reqs += 1 - client.send(t.preload(prereq=False, operation=op), flush=False) + client.send(t.preload(prereq=False, operation=op)) pbar.update(op.time - sim_t) sim_t = op.time for client in ops_provider.clients: + stall_checker.incr() client.send(t.finalise()) logging.debug("PRELOAD: end") diff --git a/reckon/reckon_types.py b/reckon/reckon_types.py index 002bd3c..b69b20c 100644 --- a/reckon/reckon_types.py +++ b/reckon/reckon_types.py @@ -97,14 +97,10 @@ def _recv_packet(self) -> str: logging.error(f"Tried to recv from |{self.id}|, received nothing") raise EOFError - def flush(self): - self.stdin.flush() - - def send(self, msg: Message, flush=True): + def send(self, msg: Message): payload = msg.json() self._send_packet(payload) - if flush: - self.flush() + self.stdin.flush() def recv(self) -> Message: pkt = self._recv_packet() diff --git a/scripts/tester.py b/scripts/tester.py index 6bdb90e..c4e1ef1 100644 --- a/scripts/tester.py +++ b/scripts/tester.py @@ -23,7 +23,8 @@ total = len(actions) for i, act in enumerate(actions): print(bar, flush=True) - print(f"TEST-{i} out of {total}, {total - i} remaining", flush=True) + date = datetime.now().strftime("%m%d%H%M%S") + print(f"{date}: TEST-{i} out of {total}, {total - i} remaining", flush=True) print(bar, flush=True) act() From d87540033b6f68991b77e34c585168ecf1e7dbab Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 20 Feb 2024 12:59:44 +0000 Subject: [PATCH 4/7] Update zookeeper deps --- Dockerfile | 6 ++-- reckon/systems/zookeeper/__init__.py | 2 +- .../main/java/cjen1/reckon/zkclient/App.java | 32 ++++++++++++------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8e02324..0ff4685 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,10 +16,8 @@ RUN apt-get update && apt-get install --no-install-recommends -yy -qq \ unzip \ python-is-python3 -## Add stretch backports -#RUN echo 'deb http://ftp.debian.org/debian stretch-backports main' | sudo tee /etc/apt/sources.list.d/stretch-backports.list -#RUN apt-get update && apt-get install -yy -qq \ -# openjdk-11-jdk +RUN apt-get update && apt-get install -yy -qq \ + openjdk-11-jdk # Runtime dependencies RUN python -m pip install --upgrade wheel setuptools diff --git a/reckon/systems/zookeeper/__init__.py b/reckon/systems/zookeeper/__init__.py index 01a76af..fd9d72a 100644 --- a/reckon/systems/zookeeper/__init__.py +++ b/reckon/systems/zookeeper/__init__.py @@ -28,7 +28,7 @@ def __str__(self): class Zookeeper(t.AbstractSystem): - bin_dir = "reckon/systems/zookeeper/bins/apache-zookeeper-3.5.10-bin" + bin_dir = "reckon/systems/zookeeper/bins/apache-zookeeper-3.5.9-bin" electionAlg = 0 def get_client(self, args): diff --git a/reckon/systems/zookeeper/clients/app/src/main/java/cjen1/reckon/zkclient/App.java b/reckon/systems/zookeeper/clients/app/src/main/java/cjen1/reckon/zkclient/App.java index 62035c4..ed4062d 100644 --- a/reckon/systems/zookeeper/clients/app/src/main/java/cjen1/reckon/zkclient/App.java +++ b/reckon/systems/zookeeper/clients/app/src/main/java/cjen1/reckon/zkclient/App.java @@ -30,18 +30,26 @@ public ZKClient(String connectString) throws IOException { @Override public void Create(String k) throws ClientException { - try { - client.create( - key_to_path(k), - "NULL".getBytes(), - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - null - ); - } catch (KeeperException ex) { - throw new ClientException("Cause: " + ex.toString()); - } catch (InterruptedException ex) { - throw new ClientException("Cause: " + ex.toString()); + while(true) { + try { + client.create( + key_to_path(k), + "NULL".getBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + null + ); + System.err.println("Successful create"); + break; + } catch (KeeperException.NodeExistsException ex) { + break; + } catch (Exception ex) { + System.err.println(String.format("Failed to create key: %s", ex.toString())); + // sleep to prevent thundering herd + try { + Thread.sleep(1000, 0); + } catch (InterruptedException e) {} + } } } From 18485a37a749dd9dac9741fc7426832613779237 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Mon, 26 Feb 2024 15:25:06 +0000 Subject: [PATCH 5/7] Add redirect --- reckon/goclient/client.go | 1 + reckon/reckon_types.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/reckon/goclient/client.go b/reckon/goclient/client.go index a40bd3d..ff8b175 100755 --- a/reckon/goclient/client.go +++ b/reckon/goclient/client.go @@ -351,4 +351,5 @@ func Run(client_gen func() (Client, error), clientid string, new_client_per_requ <-results_complete send(map[string]interface{}{"kind":"finished"}) log.Print("Results sent, exiting") + os.Exit(0) } diff --git a/reckon/reckon_types.py b/reckon/reckon_types.py index b69b20c..f49bf20 100644 --- a/reckon/reckon_types.py +++ b/reckon/reckon_types.py @@ -120,6 +120,15 @@ def unregister_selector(self, s: BaseSelector, e: Any) -> SelectorKey: return s.unregister(self.stdin) raise KeyError() +class RedirectClient(Client): + def __init__(self, file, p_in: IO[bytes], p_out: IO[bytes], id: str): + self.file = open(file, "bw") + super().__init__(p_in, p_out, id) + + def _send_packet(self, payload: str): + size = pack(" str: pass - class AbstractSystem(ABC): def __init__(self, args): ctime = time.localtime() @@ -265,6 +273,16 @@ def add_stdout_logging(self, cmd: str, tag: str): log = self.log_location return f"{cmd} > {log}/{time}_{tag}.out" + def add_tee_stdout_logging(self, cmd: str, tag: str): + time = self.creation_time + log = self.log_location + return f"{cmd} | tee {log}/{time}_{tag}.out" + + def add_tee_stdin_logging(self, cmd: str, tag: str): + time = self.creation_time + log = self.log_location + return f"tee {log}/{time}_{tag}.in | {cmd}" + @abstractmethod def stat(self, host: MininetHost) -> str: pass From 6a20d9f0f7aad9485e85bb3eed9796073f931252 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Sat, 7 Sep 2024 12:49:14 +0100 Subject: [PATCH 6/7] Update ocons integration --- reckon/systems/ocons/clients/client.ml | 6 +++--- reckon/systems/ocons/clients/flake.lock | 24 ++++++++++++------------ reckon/systems/ocons/ocons-src | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/reckon/systems/ocons/clients/client.ml b/reckon/systems/ocons/clients/client.ml index 64523a2..0f83567 100644 --- a/reckon/systems/ocons/clients/client.ml +++ b/reckon/systems/ocons/clients/client.ml @@ -17,9 +17,9 @@ module Ocons_cli_shim : Types.S = struct let op = match op with | Types.Write (k, v) -> - O.Types.Write (k, v) + [|O.Types.Write (k, v)|] | Types.Read k -> - O.Types.Read k + [|O.Types.Read k|] in try let cmd = @@ -47,7 +47,7 @@ module Ocons_cli_shim : Types.S = struct match res with | O.Types.Failure msg -> Types.Failure (`Msg msg) - | O.Types.(Success | ReadSuccess _) -> + | O.Types.(Success _) -> Types.Success in f (rid, res) diff --git a/reckon/systems/ocons/clients/flake.lock b/reckon/systems/ocons/clients/flake.lock index 2de32f9..3e990ff 100644 --- a/reckon/systems/ocons/clients/flake.lock +++ b/reckon/systems/ocons/clients/flake.lock @@ -37,11 +37,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1701680307, - "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", "owner": "numtide", "repo": "flake-utils", - "rev": "4022d587cbbfd70fe950c1e2083a02621806a725", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", "type": "github" }, "original": { @@ -144,11 +144,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1704842529, - "narHash": "sha256-OTeQA+F8d/Evad33JMfuXC89VMetQbsU4qcaePchGr4=", + "lastModified": 1725534445, + "narHash": "sha256-Yd0FK9SkWy+ZPuNqUgmVPXokxDgMJoGuNpMEtkfcf84=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "eabe8d3eface69f5bb16c18f8662a702f50c20d5", + "rev": "9bb1e7571aadf31ddb4af77fc64b2d59580f9a39", "type": "github" }, "original": { @@ -212,11 +212,11 @@ "opam-repository": "opam-repository" }, "locked": { - "lastModified": 1704979972, - "narHash": "sha256-e3lD6iCYT9AX3+tMvUga379+ehh8ZnABVyoTYwH0VF0=", + "lastModified": 1721053195, + "narHash": "sha256-2m+OCkmV2KwmjZwQK0rTE/lwRZgqdfrWPd+nH4sSqaM=", "owner": "cjen1", "repo": "ocons", - "rev": "a17480852c24717dd000d049f7e7a137d8e53fd3", + "rev": "883c067f51393118a6f92fb0f0fbe45d5c1c6ec9", "type": "github" }, "original": { @@ -265,11 +265,11 @@ "opam2json": "opam2json_2" }, "locked": { - "lastModified": 1703843211, - "narHash": "sha256-z7X1i2T1H37Lj9hEIJA5T0+sdE5E+PSWiiSyvYGyGSY=", + "lastModified": 1716292162, + "narHash": "sha256-UOJNCbqvxABD56JZtZkv3C9ufdqrs7/Ep4AKkCHgPuo=", "owner": "tweag", "repo": "opam-nix", - "rev": "6a1d1802aec6bf4823c77b98512b269fa9927bf7", + "rev": "1d3cbd6d3f247db77cb581c88c9a1d72e4acad60", "type": "github" }, "original": { diff --git a/reckon/systems/ocons/ocons-src b/reckon/systems/ocons/ocons-src index 2152efd..883c067 160000 --- a/reckon/systems/ocons/ocons-src +++ b/reckon/systems/ocons/ocons-src @@ -1 +1 @@ -Subproject commit 2152efd051e9c713834d07bbcd60f391cb4dc032 +Subproject commit 883c067f51393118a6f92fb0f0fbe45d5c1c6ec9 From 0a0bc9b2de61482d4a5d6fe75f56ad09ca6ad91c Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 10 Sep 2024 14:39:44 +0100 Subject: [PATCH 7/7] checkpoint --- reckon/systems/etcd/Makefile | 2 +- scripts/diss.py | 152 +++++++++++++++++++++++++++++++++++ scripts/reckon_paper.py | 43 +++++----- scripts/tester.py | 10 ++- vendor/mininet | 2 +- 5 files changed, 184 insertions(+), 25 deletions(-) create mode 100644 scripts/diss.py diff --git a/reckon/systems/etcd/Makefile b/reckon/systems/etcd/Makefile index 716e1ab..a479ea0 100755 --- a/reckon/systems/etcd/Makefile +++ b/reckon/systems/etcd/Makefile @@ -29,4 +29,4 @@ docker-build-deps: unzip /tmp/pb.zip -d /tmp/pb cp /tmp/pb/bin/protoc /usr/bin rm -r /tmp/pb - go get google.golang.org/protobuf/cmd/protoc-gen-go + #go install google.golang.org/protobuf/cmd/protoc-gen-go@latest diff --git a/scripts/diss.py b/scripts/diss.py new file mode 100644 index 0000000..4925a8d --- /dev/null +++ b/scripts/diss.py @@ -0,0 +1,152 @@ +from util import run_test +import itertools as it + +def tests(folder_path): + params = [] + + debug = False + + fd_timeouts = [0.06, 0.11, 0.21, 0.31] if not debug else [0.31] + + low_repeat = range(3) if not debug else range(1) + high_repeat = range(50) if not debug else range(1) + + def client_map(system): + if 'ocons' in system: + return 'ocaml' + if 'etcd' in system: + return 'go' + if 'zookeeper' in system: + return 'java' + + def low_jitter_topo(system): + return { + 'topo':'wan', + 'nn': 5, + 'delay':50, + 'system':system, + 'client': client_map(system), + 'tag':'low_jitter', + } + + def high_jitter_topo(system): + return low_jitter_topo(system) | { + 'jitter':0.1, + 'loss':0.01, + 'tag':'high_jitter', + } + + # etcd stability + for system, repeat, fd_timeout in it.product( + ['etcd', 'zookeeper'], + high_repeat, + fd_timeouts + ): + params.append( + low_jitter_topo(system) | { + 'rate' : 5000, + 'repeat': repeat, + 'duration': 10, + 'failure' : "none", + 'failure_timeout': fd_timeout, + }) + params.append( + high_jitter_topo(system) | { + 'rate' : 5000, + 'repeat': repeat, + 'duration': 10, + 'failure' : "none", + 'failure_timeout': fd_timeout, + }) + + # etcd failure analysis + for system, repeat, fd_timeout in it.product( + ['etcd', 'etcd-pre-vote', 'etcd+sbn', 'etcd-pre-vote+sbn'], + high_repeat, + fd_timeouts, + ): + params.append( + low_jitter_topo(system) | { + 'rate' : 1000, + 'repeat': repeat, + 'duration': 10, + 'failure' : "leader", + 'failure_timeout': fd_timeout, + }) + params.append( + high_jitter_topo(system) | { + 'rate' : 1000, + 'repeat': repeat, + 'duration': 10, + 'failure' : "leader", + 'failure_timeout': fd_timeout, + }) + + # ocons failure bulk + for system, fd_timeout, repeat in it.product( + ['ocons-paxos', 'ocons-raft', 'ocons-raft-pre-vote', 'ocons-raft+sbn', 'ocons-raft-pre-vote+sbn'], + fd_timeouts, + high_repeat, + ): + params.append( + low_jitter_topo(system) | { + 'rate' : 1000, + 'repeat': repeat, + 'duration': 10, + 'failure': "leader", + 'failure_timeout': fd_timeout, + }) + + # typical failure graphs + for system, repeat in it.product( + ['ocons-paxos', 'ocons-raft', 'ocons-raft-pre-vote', 'ocons-raft+sbn', 'ocons-raft-pre-vote+sbn'] + + ['etcd', 'etcd-pre-vote'], + low_repeat + ): + params.append( + low_jitter_topo(system) | { + 'rate': 1000, + 'repeat': repeat, + 'duration': 10, + 'failure':'leader', + 'failure_timeout': 1, + 'tcpdump': True, + 'duration': 20, + }) + + # TTR varying rate + for rate, system, repeat in it.product( + [1000, 2000, 4000, 8000], + ['ocons-paxos', 'ocons-raft'] + + ['etcd', 'zookeeper'], + high_repeat, + ): + params.append( + low_jitter_topo(system) | { + 'rate': rate, + 'repeat': repeat, + 'failure': 'leader', + 'failure_timeout': 0.5, + 'duration': 10, + }) + +# # TTR varying rate with restriction +# for rate, system, repeat in it.product( +# [1000, 2000, 4000, 8000], +# ['ocons-paxos', 'ocons-raft', 'ocons-raft-pre-vote', 'ocons-raft+sbn', 'ocons-raft-pre-vote+sbn'], +# high_repeat, +# ): +# params.append( +# low_jitter_topo(system) | { +# 'rate': rate, +# 'repeat': repeat, +# 'failure': 'leader', +# 'failure_timeout': 0.5, +# 'duration': 10, +# } +# ) + + for p in params: + print(p) + + return [lambda p=p: run_test(folder_path, p) for p in params ] diff --git a/scripts/reckon_paper.py b/scripts/reckon_paper.py index d5b2169..b02a948 100644 --- a/scripts/reckon_paper.py +++ b/scripts/reckon_paper.py @@ -5,9 +5,9 @@ def tests(folder_path): params = [] reference_systems = ['ocons-paxos', 'ocons-raft', 'ocons-raft-pre-vote', 'ocons-raft+sbn', 'ocons-raft-pre-vote+sbn'] - industry_systems = ['etcd', 'zookeeper', 'zookeeper-fle', 'etcd-pre-vote', 'etcd+sbn', 'etcd-pre-vote+sbn'] + industry_systems = ['etcd', 'etcd-pre-vote', 'etcd+sbn', 'etcd-pre-vote+sbn', 'zookeeper', 'zookeeper-fle', ] - fd_timeouts = [0.01, 0.03, 0.06, 0.11, 0.21, 0.41, 0.81] + fd_timeouts = [0.06, 0.11, 0.21, 0.41, 0.81] low_repeat = range(3) high_repeat = range(25) @@ -37,27 +37,28 @@ def high_jitter_topo(system): 'tag':'high_jitter', } - ## typical failure graphs - #for system, repeat in it.product( - # reference_systems + industry_systems, - # low_repeat - # ): - # params.append( - # low_jitter_topo(system) | { - # 'rate': 10000, - # 'repeat': repeat, - # 'duration': 10, - # 'failure':'leader', - # 'failure_timeout': 0.5, - # 'tcpdump': True, - # 'duration': 10, - # } - # ) + # typical failure graphs + for system, repeat in it.product( + reference_systems + industry_systems, + #industry_systems, + low_repeat + ): + params.append( + low_jitter_topo(system) | { + 'rate': 1000, + 'repeat': repeat, + 'duration': 10, + 'failure':'leader', + 'failure_timeout': 1, + 'tcpdump': True, + 'duration': 20, + } + ) # TTR varying rate for rate, system, repeat in it.product( - [1000, 4000, 16000], - reference_systems + ['etcd', 'zookeeper', 'zookeeper-fle'], + [1000, 2000, 4000, 8000], + reference_systems + industry_systems, high_repeat, ): params.append( @@ -79,7 +80,7 @@ def high_jitter_topo(system): ): params.append( topo(system) | { - 'rate': 10000, + 'rate': 1000, 'repeat': repeat, 'failure': 'leader', 'failure_timeout': fd_timeout, diff --git a/scripts/tester.py b/scripts/tester.py index c4e1ef1..f97b621 100644 --- a/scripts/tester.py +++ b/scripts/tester.py @@ -12,8 +12,14 @@ #import conspire #actions += conspire.tests(folder_path) -import reckon_paper -actions += reckon_paper.tests(folder_path) +#import reckon_paper +#actions += reckon_paper.tests(folder_path) + +#import etcd_test +#actions += etcd_test.tests(folder_path) + +import diss +actions += diss.tests(folder_path) # Shuffle to isolate ordering effects rng.shuffle(actions) diff --git a/vendor/mininet b/vendor/mininet index 88f14e9..6eb8973 160000 --- a/vendor/mininet +++ b/vendor/mininet @@ -1 +1 @@ -Subproject commit 88f14e946a05cd0895e1a127bf89da9b3fb1d98b +Subproject commit 6eb8973c0bfd13c25c244a3871130c5e36b5fbd7