Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move timeouts #42

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ RUN apt-get update && apt-get install --no-install-recommends -yy -qq \
unzip \
python-is-python3

## Add stretch backports
#RUN echo 'deb http://ftp.debian.org/debian stretch-backports main' | sudo tee /etc/apt/sources.list.d/stretch-backports.list
#RUN apt-get update && apt-get install -yy -qq \
# openjdk-11-jdk
RUN apt-get update && apt-get install -yy -qq \
openjdk-11-jdk

# Runtime dependencies
RUN python -m pip install --upgrade wheel setuptools
Expand Down
9 changes: 2 additions & 7 deletions reckon/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,15 @@

print("BENCHMARK: Starting Test")

from multiprocessing import Process

p = Process(target = run_test, args=(
run_test(
args.result_location,
clients,
ops_provider,
args.duration,
system,
cluster,
failures,
))
p.start()
p.join(max(args.duration * 10, 150))
p.terminate()
)
finally:
for stopper in stoppers.values():
stopper()
Expand Down
69 changes: 65 additions & 4 deletions reckon/client_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from threading import Thread
import threading
import time
import sys
import selectors
Expand All @@ -12,8 +12,67 @@

from tqdm import tqdm

class MBox:
def __init__(self, value=None):
self.value = value
self.mtx = threading.Lock()

def preload(ops_provider: t.AbstractWorkload, duration: float) -> int:
def __get__(self, instance, owner):
with self.mtx:
return self.value

def __set__(self, instance, value):
with self.mtx:
self.value = value

class StallCheck:
def __init__(self, init):
self._value = init
self.prev_read = self._value

self.mtx = threading.Lock()

def incr(self, delta = 1):
with self.mtx:
self._value += delta

def check_if_stalled(self) -> bool:
with self.mtx:
current_val = self._value
prev_val = self.prev_read

self.prev_read = current_val

return current_val == prev_val

class StalledException(Exception):
pass

def terminate_if_stalled(task, timeout=30):
stall_check = StallCheck(0)
result = MBox()
def wrapped_task(sc=stall_check, result=result):
result.value = task(sc)

thread = threading.Thread(target=wrapped_task, daemon=True)
thread.start()

ticker = 0

while thread.is_alive():
time.sleep(1.)
if not stall_check.check_if_stalled():
ticker = 0
continue

ticker += 1
if ticker > timeout:
raise StalledException

thread.join(10)
return result.value

def preload(stall_checker: StallCheck, ops_provider: t.AbstractWorkload, duration: float) -> int:
logging.debug("PRELOAD: begin")

for op, client in zip(ops_provider.prerequisites, it.cycle(ops_provider.clients)):
Expand All @@ -23,6 +82,7 @@ def preload(ops_provider: t.AbstractWorkload, duration: float) -> int:
total_reqs = 0
with tqdm(total=duration) as pbar:
for client, op in ops_provider.workload:
stall_checker.incr()
if op.time >= duration:
break

Expand All @@ -33,6 +93,7 @@ def preload(ops_provider: t.AbstractWorkload, duration: float) -> int:
sim_t = op.time

for client in ops_provider.clients:
stall_checker.incr()
client.send(t.finalise())

logging.debug("PRELOAD: end")
Expand Down Expand Up @@ -133,10 +194,10 @@ def test_steps(
assert(len(failures) >= 2)

workload.clients = clients
total_reqs = preload(workload, duration)
total_reqs = terminate_if_stalled(lambda stall_check: preload(stall_check, workload, duration))
ready(clients)

t_execute = Thread(
t_execute = threading.Thread(
target=execute,
args=[clients, failures, duration],
)
Expand Down
1 change: 1 addition & 0 deletions reckon/goclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,5 @@ func Run(client_gen func() (Client, error), clientid string, new_client_per_requ
<-results_complete
send(map[string]interface{}{"kind":"finished"})
log.Print("Results sent, exiting")
os.Exit(0)
}
22 changes: 20 additions & 2 deletions reckon/reckon_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ def __init__(self, p_in: IO[bytes], p_out: IO[bytes], id: str):
def _send_packet(self, payload: str):
size = pack("<l", len(payload)) # Little endian signed long (4 bytes)
self.stdin.write(size + bytes(payload, "ascii"))
self.stdin.flush()

def _recv_packet(self) -> str:
size = self.stdout.read(4)
Expand All @@ -101,6 +100,7 @@ def _recv_packet(self) -> str:
def send(self, msg: Message):
payload = msg.json()
self._send_packet(payload)
self.stdin.flush()

def recv(self) -> Message:
pkt = self._recv_packet()
Expand All @@ -120,6 +120,15 @@ def unregister_selector(self, s: BaseSelector, e: Any) -> SelectorKey:
return s.unregister(self.stdin)
raise KeyError()

class RedirectClient(Client):
def __init__(self, file, p_in: IO[bytes], p_out: IO[bytes], id: str):
self.file = open(file, "bw")
super().__init__(p_in, p_out, id)

def _send_packet(self, payload: str):
size = pack("<l", len(payload)) # Little endian signed long (4 bytes)
self.stdin.write(size + bytes(payload, "ascii"))
self.file.write(size + bytes(payload, "ascii"))

WorkloadOperation = Tuple[Client, Operation]

Expand Down Expand Up @@ -213,7 +222,6 @@ class AbstractClient(ABC):
def cmd(self, ips: List[str], client_id: str) -> str:
pass


class AbstractSystem(ABC):
def __init__(self, args):
ctime = time.localtime()
Expand Down Expand Up @@ -265,6 +273,16 @@ def add_stdout_logging(self, cmd: str, tag: str):
log = self.log_location
return f"{cmd} > {log}/{time}_{tag}.out"

def add_tee_stdout_logging(self, cmd: str, tag: str):
time = self.creation_time
log = self.log_location
return f"{cmd} | tee {log}/{time}_{tag}.out"

def add_tee_stdin_logging(self, cmd: str, tag: str):
time = self.creation_time
log = self.log_location
return f"tee {log}/{time}_{tag}.in | {cmd}"

@abstractmethod
def stat(self, host: MininetHost) -> str:
pass
Expand Down
2 changes: 1 addition & 1 deletion reckon/systems/etcd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ docker-build-deps:
unzip /tmp/pb.zip -d /tmp/pb
cp /tmp/pb/bin/protoc /usr/bin
rm -r /tmp/pb
go get google.golang.org/protobuf/cmd/protoc-gen-go
#go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
2 changes: 1 addition & 1 deletion reckon/systems/ocons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def get_leader(self, cluster):
class OConsConspireLeaderDC(OConsConspireLeader):
system_kind = "conspire-leader-dc"

tick_period = 0.001
tick_period = 0.01

def start_cmd(self, tag, nid, cluster):
cmd = " ".join([
Expand Down
6 changes: 3 additions & 3 deletions reckon/systems/ocons/clients/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ module Ocons_cli_shim : Types.S = struct
let op =
match op with
| Types.Write (k, v) ->
O.Types.Write (k, v)
[|O.Types.Write (k, v)|]
| Types.Read k ->
O.Types.Read k
[|O.Types.Read k|]
in
try
let cmd =
Expand Down Expand Up @@ -47,7 +47,7 @@ module Ocons_cli_shim : Types.S = struct
match res with
| O.Types.Failure msg ->
Types.Failure (`Msg msg)
| O.Types.(Success | ReadSuccess _) ->
| O.Types.(Success _) ->
Types.Success
in
f (rid, res)
Expand Down
24 changes: 12 additions & 12 deletions reckon/systems/ocons/clients/flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion reckon/systems/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __str__(self):


class Zookeeper(t.AbstractSystem):
bin_dir = "reckon/systems/zookeeper/bins/apache-zookeeper-3.5.10-bin"
bin_dir = "reckon/systems/zookeeper/bins/apache-zookeeper-3.5.9-bin"
electionAlg = 0

def get_client(self, args):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,26 @@ public ZKClient(String connectString) throws IOException {

@Override
public void Create(String k) throws ClientException {
try {
client.create(
key_to_path(k),
"NULL".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
null
);
} catch (KeeperException ex) {
throw new ClientException("Cause: " + ex.toString());
} catch (InterruptedException ex) {
throw new ClientException("Cause: " + ex.toString());
while(true) {
try {
client.create(
key_to_path(k),
"NULL".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
null
);
System.err.println("Successful create");
break;
} catch (KeeperException.NodeExistsException ex) {
break;
} catch (Exception ex) {
System.err.println(String.format("Failed to create key: %s", ex.toString()));
// sleep to prevent thundering herd
try {
Thread.sleep(1000, 0);
} catch (InterruptedException e) {}
}
}
}

Expand Down
Loading
Loading