Skip to content

Commit

Permalink
add rmb_tester tool
Browse files Browse the repository at this point in the history
  • Loading branch information
sameh-farouk committed Dec 24, 2023
1 parent d869bec commit d0fe665
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 0 deletions.
88 changes: 88 additions & 0 deletions tools/rmb_tester/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# RMB tools (CLI tools/scripts)

You can find here CLI tools and scripts that can be used for testing and benchmarking [RMB](https://github.com/threefoldtech/rmb-rs). You can use either RMB_Tester, RMB_echo, or both to quickly test the communications over RMB.

## Installation:
- clone the repo
- create a new env
```py
python3 -m venv venv
```
- activate the new env
```py
source ./venv/bin/activate
```
- install dependencies
```py
pip install -r requirements.txt
```

## Usage:
RMB tools comprise two Python programs that can be used independently or in conjunction with each other.

### RMB_Tester
RMB_Tester is a CLI tool that serves as an RMB client to automate the process of crafting a specified number of test messages to be sent to one or more destinations. The number of messages, command, data, destination list, and other parameters can be configured through the command line. The tool will wait for the correct number of responses and report some statistics.

Please ensure that there is a process running on the destination side that can handle this command and respond back or use RMB_echo for this purpose.

example:
```sh
# We sending to two destinations
# The default test command will be used and can be handled by RMB_echo process
python3 ./rmb_tester.py --dest 41 55
```

to just print the summary use `--short` option

to override default command use the `--command`
```sh
# The `rmb.version` command will be handled by RMB process itself
python3 ./rmb_tester.py --dest 41 --command rmb.version
```

for all optional args see
```sh
python3 ./rmb_tester.py -h
```

### RMB_Echo (message handler)
This tool will automate handling the messages coming to $queue and respond with same message back to the source and display the count of processed messages.

example:
```sh
python3 ./msg_handler.py
```

or specify the redis queue (command) to handle the messages from
```sh
python3 ./msg_handler.py --queue helloworld
```

for all optional args see
```sh
python3 ./msg_handler.py -h
```

## Recipes:
- Test all online nodes (based on up reports) to ensure that they are reachable over RMB
```sh
# The nodes.sh script when used with `--likely-up` option will output the IDs of the online nodes in the network using the gridproxy API.
python3 ./rmb_tester.py -d $(./scripts/twins.sh --likely-up main) -c "rmb.version" -t 600 -e 600
```
Note: this tool is for testing purposes and not optimized for speed, for large number of destinations use appropriate expiration and timeout values.

you can copy and paste all non responsive twins and run `./twinid_to_nodeid.sh` with the list of twins ids for easy lookup node id and verfiying the status (like know if node in standby mode).
```sh
./scripts/twinid_to_nodeid.sh main 2562 5666 2086 2092
```

First arg is network (one of `dev`, `qa`, `test`, `main`)
Then you follow it with space separated list of twin ids

the output would be like
```sh
twin ID: 2562 node ID: 1419 status: up
twin ID: 5666 node ID: 3568 status: up
twin ID: 2086 node ID: 943 status: up
twin ID: 2092 node ID: 949 status: up
```
28 changes: 28 additions & 0 deletions tools/rmb_tester/msg_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python3

import redis
from rmb_tester import Message
import time
import argparse

def listen(q):
count = 0
while True:
result = r.blpop(f'msgbus.{q}')
msg = Message.from_json(result[1])
msg.epoch = int(time.time())
msg.twin_dst, msg.twin_src = msg.twin_src, msg.twin_dst
r.lpush(msg.reply_to, msg.to_json())
count += 1
print(f"Responses sent out: {count}", end='\r')

if __name__ == "__main__":
parser = argparse.ArgumentParser("RMB_echo")
parser.add_argument("-q", "--queue", help="redis queue name. defaults to 'testme'", type=str, default='testme')
parser.add_argument("-p", "--redis-port", help="redis port for the instance used by rmb-peer", type=int, default=6380)
args = parser.parse_args()

r = redis.Redis(host='localhost', port=args.redis_port, db=0)
print("RMB_echo")
print(f"handling command msgbus.{args.queue}")
listen(args.queue)
3 changes: 3 additions & 0 deletions tools/rmb_tester/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
alive-progress==2.4.1
redis==4.3.1
hiredis==2.0.0
160 changes: 160 additions & 0 deletions tools/rmb_tester/rmb_tester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python3

from dataclasses import dataclass
import uuid
import time
from timeit import default_timer as timer
from alive_progress import alive_bar
import json
import base64
import redis
import argparse
import string
import random

@dataclass
class Message:
version: int
ref: str
command: str
expiration: int
data: str
#tags: str
twin_dst: list
reply_to: uuid.UUID
schema: str
epoch: time.struct_time
err: dict
twin_src: string

def to_json(self):
msg_dct = {
"ver": self.version,
"ref": self.ref,
"cmd": self.command,
"exp": self.expiration,
"dat": base64.b64encode(self.data.encode('utf-8')).decode('utf-8'),
# "tag": self.tags,
"dst": self.twin_dst,
"ret": self.reply_to,
"shm": self.schema,
"now": self.epoch,
}
return json.dumps(msg_dct)

@classmethod
def from_json(cls, json_data):
msg_dict = json.loads(json_data)
return cls(
version=msg_dict.get('ver'),
ref=msg_dict.get('ref'),
command=msg_dict.get('cmd'),
expiration=msg_dict.get('exp'),
data=base64.b64decode(msg_dict.get('dat')).decode('utf-8'),
reply_to=msg_dict.get('ret'),
schema=msg_dict.get('shm'),
epoch=msg_dict.get('now'),
err= msg_dict.get('err'),
twin_src=msg_dict.get('src'),
twin_dst=msg_dict.get('dst')
)

# init new message
def new_message(command: str, twin_dst: list, data: dict = {}, expiration: int = 120):
version = 1
reply_to = str(uuid.uuid4())
schema = "application/json"
epoch = int(time.time())
err = {"code": 0, "message": ""}
ref = ""
# tags = ""
return Message(version, ref, command, expiration, data, twin_dst, reply_to, schema, epoch, err, "")

def send_all(messages):
responses_expected = 0
return_queues = []
with alive_bar(len(messages), title='Sending ..', title_length=12) as bar:
for msg in messages:
r.lpush("msgbus.system.local", msg.to_json())
responses_expected += len(msg.twin_dst)
return_queues += [msg.reply_to]
bar()
return responses_expected, return_queues

def wait_all(responses_expected, return_queues, timeout):
responses = []
err_count = 0
success_count = 0
with alive_bar(responses_expected, title='Waiting ..', title_length=12) as bar:
for _ in range(responses_expected):
start = timer()
result = r.blpop(return_queues, timeout=timeout)
if not result:
break
timeout = timeout - round(timer() - start, 3)
response = Message.from_json(result[1])
responses.append(response)
if response.err is not None:
err_count += 1
bar.text('received an error ❌')
else:
success_count += 1
bar.text(f'received a response from twin {response.twin_src} ✅')
bar()
return responses, err_count, success_count

def main():
global r
parser = argparse.ArgumentParser("RMB_tester", "python3 rmb_tester.py --dest 41 -c rmb.version")
parser.add_argument("-d", "--dest", help="list of twin ids(integer) to send message/s to. (required at least one)", nargs='+', type=int, required=True)
parser.add_argument("-n", "--count", help="count of messages to send. defaults to 1.", type=int, default=1)
parser.add_argument("-c", "--command", help="command which will handle the message. defaults to 'testme'", type=str, default='testme')
parser.add_argument("--data", help="data to send. defaults to random chars.", type=str, default=''.join(random.choices(string.ascii_uppercase + string.digits, k = 56)) )
parser.add_argument("-e", "--expiration", help="message expiration time in seconds. defaults to 120.", type=int, default=120)
parser.add_argument("-t", "--timeout", help="client will give up waiting if no new message received during the amount of seconds. defaults to 120.", type=int, default=120)
parser.add_argument("--short", help="omit responses output and shows only the stats.", action='store_true')
parser.add_argument("-p", "--redis-port", help="redis port for the instance used by rmb-peer", type=int, default=6379)
args = parser.parse_args()
r = redis.Redis(host='localhost', port=args.redis_port, db=0)
# print(args)
msg = new_message(args.command, args.dest, data=args.data, expiration=args.expiration)
# print(msg.to_json())
msgs = [msg] * args.count
start = timer()
responses_expected, return_queues = send_all(msgs)
if args.timeout < args.expiration:
print("Note: The timeout value you provided is less than the message expiration (TTL) value. As a result, responses may arrive after the client has given up waiting for them.")
responses, err_count, success_count = wait_all(responses_expected, return_queues, timeout=args.timeout)
elapsed_time = timer() - start
no_responses = responses_expected - len(responses)
print("=======================")
print("Summary:")
print("=======================")
print(f"sent: {len(msgs)}")
print(f"expected_responses: {responses_expected}")
print(f"received_success: {success_count}")
print(f"received_errors: {err_count}")
print(f"no response errors (client give up): {no_responses}")
responding = {int(response.twin_src) for response in responses}
not_responding = set(args.dest) - responding
print(f"twins not responding (twin IDs): {' '.join(map(str, not_responding))}")
print(f"elapsed time: {elapsed_time}")
print("=======================")
if not args.short:
print("Responses:")
print("=======================")
for response in responses:
print(response)
print("=======================")
print("Errors:")
print("=======================")
for response in responses:
if response.err is not None:
print(f"Error: {response.err}")
print(f"Source: {'Twin '+response.twin_src if response.twin_src != '0' else 'Relay'}")


if __name__ == "__main__":
NUM_RETRY = 3
r = None
main()
36 changes: 36 additions & 0 deletions tools/rmb_tester/scripts/twinid_to_nodeid.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

# Check if the script receives a list of numbers as arguments
if [ $# -lt 2 ]; then
echo "Too few arguments, expecting at least 2"
exit 1
fi

case $1 in
main|dev|qa|test|"" ) # Ok
;;
*)
# The wrong first argument.
echo 'Expected "dev", "qa", "test", or "main" as second arg' >&2
exit 1
esac

if [[ "$1" == "main" ]]; then
gridproxy_url="https://gridproxy.grid.tf"
else
gridproxy_url="https://gridproxy.$2.grid.tf"
fi


# Store the arguments but the firts one in an array
numbers=("${@:2}")

# Query the api and store the json response in a variable
response=$(curl -s "${gridproxy_url}"/nodes?size=20000)

# Loop through the json objects and find the ones that match the twinid
for number in "${numbers[@]}"; do
# Use jq to filter the objects by twinid and print the twinid and nodeid values
# echo "Objects with twinid = ${number}:"
jq -r --arg number "${number}" '.[] | select(.twinId == ($number | tonumber)) | "twin ID: \(.twinId) node ID: \(.nodeId) status: \(.status)"' <<< "${response}"
done
56 changes: 56 additions & 0 deletions tools/rmb_tester/scripts/twins.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/bash

if [[ $# -gt 3 ]]; then
echo 'Too many arguments, expecting max of 3' >&2
exit 1
fi

case $1 in
--likely-up|--likely-down|--standby|all ) # Ok
;;
*)
# The wrong first argument.
echo 'Expected "--likely-up", "--likely-down", "--standby", "all" as first arg' >&2
exit 1
esac

case $2 in
main|dev|qa|test ) # Ok
;;
*)
# The wrong first argument.
echo 'Expected "dev", "qa", "test", or "main" as second arg' >&2
exit 1
esac

case $3 in
*[!0-9,]*)
echo 'Expected number or comma seprated list of farm numbers as third arg' >&2
exit 1
;;
*)
esac

if [[ "$2" == "main" ]]; then
gridproxy_url="https://gridproxy.grid.tf"
else
gridproxy_url="https://gridproxy.$2.grid.tf"
fi
if [[ "$1" == "--likely-up" ]]; then
url="$gridproxy_url/nodes?status=up&size=20000&farm_ids=$3"
elif [[ "$1" == "--likely-down" ]]; then
url="$gridproxy_url/nodes?status=down&size=20000&farm_ids=$3"
elif [[ "$1" == "--standby" ]]; then
url="$gridproxy_url/nodes?status=standby&size=20000&farm_ids=$3"
else
url="$gridproxy_url/nodes?size=20000&farm_ids=$3"
fi

# query gridproxy for the registred nodes and retrun a list of nodes' twin IDs
response=$(curl -s -X 'GET' \
"$url" \
-H 'accept: application/json')

twinIds=$(echo "$response" | jq -r '.[] | .twinId')

echo "${twinIds[*]}" | tr '\n' ' '

0 comments on commit d0fe665

Please sign in to comment.