From f8a7312a304e40c1ff632b42f24ab9bd42811d63 Mon Sep 17 00:00:00 2001 From: nizar-m <19857260+nizar-m@users.noreply.github.com> Date: Fri, 19 Jun 2020 22:40:17 +0530 Subject: [PATCH] Regression benchmarks setup (#3310) * Benchmark GraphQL queries using wrk * fix console assets dir * Store wrk parameters as well * Add details about storing results in Readme * Remove files in bench-wrk while computing server shasum * Instead of just getting maximum throughput per query per version, create plots using wrk2 for a given set of requests per second. The maximum throughput is used to see what values of requests per second are feasible. * Add id for version dropdown * Allow specifiying env and args for GraphQL Engine 1) Arguments defined after -- will be applied as arguments to Hasura GraphQL Engine 2) Script will also pass the environmental variables to Hasura GraphQL Engine instances Hasura GraphQL engine can be run with the given environmental variables and arguments as follows $ export HASURA_GRAPHQL_...=.... $ python3 hge_wrk_bench.py -- --hge_arg1 val1 --hge_arg2 val2 ... * Use matplotlib instead of plotly for figures * Show throughput graph also. It maybe useful in checking performance regression across versions * Support storing results in s3 Use --upload-root-uri 's3://bucket/path' to upload results inside the given path.When specified, the results will be uploaded to the bucket, including latencies, latency histogram, and the test setup info. The s3 credentials should be provided as given in AWS boto3 documentation. * Allow specifying a name for the test scenario * Fix open latency uri bug * Update wrk docker image * Keep ylim a little higher than maximum so that the throughput plot is clearly visible * Show throughput plots for multiple queries at the same time * 1) Adjust size of dropdowns 2) Make label for requests/sec invisible when plot type is throughput * 1) Adding boto3 to requirements.txt 2) Removing CPU Key print line 3) Adding info about the tests that will be run with wrk2 * Docker builder fo wrk-websocket-server * Make it optional to setup remote graphql-engine * Listen on all interfaces and enable ping thread * Add bench_scripts to wrk-websocket-server docker * Use 127.0.0.1 instead of 'localhost' to address local hge For some reason it seems wrk was hanging trying to resolve 'localhost'. ping was able to fine from the same container, so I'm not sure what the deal was. Probably some local misconfiguration on my machine, but maybe this change will also help others. * Store latency samples in subdirectory, server_shasum just once at start, additional docs * Add a note on running the benchmarks in the simplest way * Add a new section on how to run benchmarks on a new linux hosted instance Co-authored-by: Nizar Malangadan Co-authored-by: Brandon Simmons Co-authored-by: Karthikeyan Chinnakonda Co-authored-by: Brandon Simmons Co-authored-by: Vamshi Surabhi <0x777@users.noreply.github.com> --- server/bench-wrk/.gitignore | 5 + server/bench-wrk/.python-version | 1 + server/bench-wrk/Readme.md | 139 +++++ server/bench-wrk/gen-version.sh | 46 ++ server/bench-wrk/get-server-sha.sh | 6 + server/bench-wrk/hge_wrk_bench.py | 558 ++++++++++++++++++ server/bench-wrk/plot.py | 516 ++++++++++++++++ server/bench-wrk/port_allocator.py | 27 + server/bench-wrk/queries.graphql | 128 ++++ server/bench-wrk/requirements-top-level.txt | 17 + server/bench-wrk/requirements.txt | 52 ++ server/bench-wrk/results_schema.yaml | 167 ++++++ server/bench-wrk/run_hge.py | 464 +++++++++++++++ server/bench-wrk/run_postgres.py | 195 ++++++ server/bench-wrk/sportsdb_setup.py | 321 ++++++++++ .../wrk-websocket-server/.dockerignore | 2 + .../bench-wrk/wrk-websocket-server/.gitignore | 35 ++ .../wrk-websocket-server/CHANGELOG.md | 5 + .../bench-wrk/wrk-websocket-server/Dockerfile | 38 ++ server/bench-wrk/wrk-websocket-server/LICENSE | 1 + .../bench-wrk/wrk-websocket-server/Setup.hs | 2 + .../wrk-websocket-server/app/Main.hs | 24 + .../bench_scripts/bench-lib-wrk.lua | 54 ++ .../bench_scripts/bench-lib-wrk2.lua | 64 ++ .../bench_scripts/bench-wrk.lua | 20 + .../bench_scripts/bench-wrk2.lua | 23 + .../wrk-websocket-server/cabal.project | 5 + .../wrk-websocket-server/cabal.project.dev | 1 + .../wrk-websocket-server/cabal.project.freeze | 156 +++++ .../wrk-websocket-server/cabal.project.local | 8 + .../wrk-websocket-server/cabal.project.local~ | 0 .../example_benchmark_request.yaml | 20 + .../wrk-websocket-server/src/Wrk/Server.hs | 257 ++++++++ .../src/Wrk/Server/Types.hs | 249 ++++++++ .../wrk-websocket-server.cabal | 48 ++ server/cabal.project.dev | 2 + 36 files changed, 3656 insertions(+) create mode 100644 server/bench-wrk/.gitignore create mode 100644 server/bench-wrk/.python-version create mode 100644 server/bench-wrk/Readme.md create mode 100755 server/bench-wrk/gen-version.sh create mode 100755 server/bench-wrk/get-server-sha.sh create mode 100644 server/bench-wrk/hge_wrk_bench.py create mode 100755 server/bench-wrk/plot.py create mode 100644 server/bench-wrk/port_allocator.py create mode 100644 server/bench-wrk/queries.graphql create mode 100644 server/bench-wrk/requirements-top-level.txt create mode 100644 server/bench-wrk/requirements.txt create mode 100644 server/bench-wrk/results_schema.yaml create mode 100644 server/bench-wrk/run_hge.py create mode 100644 server/bench-wrk/run_postgres.py create mode 100644 server/bench-wrk/sportsdb_setup.py create mode 100644 server/bench-wrk/wrk-websocket-server/.dockerignore create mode 100644 server/bench-wrk/wrk-websocket-server/.gitignore create mode 100644 server/bench-wrk/wrk-websocket-server/CHANGELOG.md create mode 100644 server/bench-wrk/wrk-websocket-server/Dockerfile create mode 120000 server/bench-wrk/wrk-websocket-server/LICENSE create mode 100644 server/bench-wrk/wrk-websocket-server/Setup.hs create mode 100644 server/bench-wrk/wrk-websocket-server/app/Main.hs create mode 100644 server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk.lua create mode 100644 server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk2.lua create mode 100644 server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk.lua create mode 100644 server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk2.lua create mode 100644 server/bench-wrk/wrk-websocket-server/cabal.project create mode 100644 server/bench-wrk/wrk-websocket-server/cabal.project.dev create mode 100644 server/bench-wrk/wrk-websocket-server/cabal.project.freeze create mode 100644 server/bench-wrk/wrk-websocket-server/cabal.project.local create mode 100644 server/bench-wrk/wrk-websocket-server/cabal.project.local~ create mode 100644 server/bench-wrk/wrk-websocket-server/example_benchmark_request.yaml create mode 100644 server/bench-wrk/wrk-websocket-server/src/Wrk/Server.hs create mode 100644 server/bench-wrk/wrk-websocket-server/src/Wrk/Server/Types.hs create mode 100644 server/bench-wrk/wrk-websocket-server/wrk-websocket-server.cabal diff --git a/server/bench-wrk/.gitignore b/server/bench-wrk/.gitignore new file mode 100644 index 0000000000000..eeae8a1df74b8 --- /dev/null +++ b/server/bench-wrk/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +test_output +.previous_work_dir +.#* +venv diff --git a/server/bench-wrk/.python-version b/server/bench-wrk/.python-version new file mode 100644 index 0000000000000..897e56be0b691 --- /dev/null +++ b/server/bench-wrk/.python-version @@ -0,0 +1 @@ +3.7.6 diff --git a/server/bench-wrk/Readme.md b/server/bench-wrk/Readme.md new file mode 100644 index 0000000000000..728d99f908ecb --- /dev/null +++ b/server/bench-wrk/Readme.md @@ -0,0 +1,139 @@ +## Benchmarking Hasura GraphQL Engine ## + +The script `hge_wrk_bench.py` helps in benchmarking the given version of Hasura +GraphQL Engine using a set of GraphQL queries. The results are stored (into the +*results GraphQL engine*) along with details like the version of GraphQL engine +against which the benchmark is run, the version of Postgres database etc. The +stored results can help in comparing benchmarks of different versions of +GraphQL engine. + +### Setup ### + +The setup includes two Postgres databases with +[sportsdb](https://www.thesportsdb.com/) schema and data, and two GraphQL +engines running on the Postgres databases. Then one of the GraphQL engines is +added as a remote schema to another GraphQL engine. + +The data will be same in both the databases. The tables reside in different +database schema in-order to avoid GraphQL schema conflicts. + +The methods in script `sportsdb_setup.py` helps in setting up the databases, +starting the Hasura GraphQL engines, and setting up relationships. This script +can either take urls of already running Postgres databases as input, or it can +start the databases as Docker instances. The GraphQL engines can be run either +with `cabal run` or as Docker containers. + +### Run benchmark ### +- Install Python 3.7.6 using pyenv +```sh +$ pyenv install 3.7.6 +``` +- Install dependencies for the Python script in a virtual environment. +```sh +$ python3 -m venv venv +$ source venv/bin/activate +$ pip3 install -r requirements.txt +``` +- To run benchmarks, do +```sh +$ python3 hge_wrk_bench.py +``` +This script uses [wrk](https://github.com/wg/wrk) to benchmark Hasura GraphQL +Engine against a list of queries defined in `queries.graphql`. The results are +then stored through a results Hasura GraphQL Engine. + +You can configure the build and runtime parameters for the graphql-engine's +under test by modifying your local `cabal.project.local` file. + +### Interpreting the plots + +For each query under test we first run `wrk` to try to determine the maximum +throughput we can sustain for that query. This result is plotted under the `max +throughput` graph. This can be considered the point after which graphql-engine +will start to fall over. + +Then for each query we measure latency under several different loads (but +making sure not to approach max throughput) using `wrk2` which measures latency +in a principled way. Latency can be viewed as a continuous histogram or as a +violin plot that also plots each latency sample. The latter provides the most +visual information and can be useful for observing clustering or other +patterns, or validating the benchmark run. + +### Cleaning up test runs + +Data will be stored locally in the work directory (`test_output` by default). +This entire directory can be deleted safely. + +If you are using the default results graphql-engine and want to just remove old +benchmark runs but avoid rebuilding the sportsdb data, you can do: + +``` +$ sudo rm -r test_output/{benchmark_runs,sportsdb_data} +``` + +### Arguments ### +- For the list of arguments supported, do +```sh +$ python3 hge_wrk_bench.py --help +``` + +#### Postgres #### + - In order to use already runnning Postgres databases, use argument `--pg-urls PG_URL,REMOTE_PG_URL`, or environmental variable `export HASURA_BENCH_PG_URLS=PG_URL,REMOTE_PG_URL` + - Set the docker image using argument `--pg-docker-image DOCKER_IMAGE`, or environmental variable `HASURA_BENCH_PG_DOCKER_IMAGE` + +#### GraphQL Engine #### + - Inorder to run as a docker container, use argument `--hge-docker-image DOCKER_IMAGE`, or environmental variable `HASURA_BENCH_HGE_DOCKER_IMAGE` + - To skip stack build, use argument `--skip-stack-build` + +#### wrk #### + - Number of open connections can be set using argument `--connections CONNECTIONS`, or environmental variable `HASURA_BENCH_CONNECTIONS` + - Duration of tests can be controlled using argument `--duration DURATION`, or environmental variable `HASURA_BENCH_CONNECTIONS` + - If plots should not have to be shown at the end of benchmarks, use argument `--skip-plots` + - The Hasura GraphQL Engine to which resuls should be pushed can be specified using argument + `--results-hge-url HGE_URL`, or environmental variable `HASURA_BENCH_RESULTS_HGE_URL`. By + default the launched (non-"remote") graphql-engine will be used, and its data stored in + `test_output/sportsdb_data`. The admin secret for this GraphQL engine can be specified + using environmental variable `HASURA_BENCH_RESULTS_HGE_ADMIN_SECRET`. + +### Work directory ### +- The files used by Postgres docker containers, logs of Hasura GraphQL engines run with `cabal run`, and other stuff are stored in the work directory. +- Storing data volumes of Postgres docker containers in the work directory (`test_output` by default) helps in avoiding database setup time for benchmarks after the first time setup. +- The logs of Hasura GraphQL engines (when they are run using `cabal run`) are stored in files *hge.log* and *remote\_hge.log* + +### Default settings ### +- Postgres databases will be run as docker containers +- Hasura GraphQL Engines by default will be run using `cabal run` +- With wrk + - Number of threads used by *wrk* will be number of CPUs + - Number of connections = 50 + - Test duration = 5 minutes (300 sec) +- By default the results are stored in the Hasura GraphQL Engine used for benchmarking. + +### Storing results ### +- The results are stored in schema `hge_bench`. +- For schema, see file `results_schema.yaml` +- The main table is `hge_bench.results`. This table stores the following details + - *cpu_key*: This is a foreign key reference to *cpu_info(key)*. The table *cpu_info* captures the various parameters of the CPU inwhich the benchmark was run, including the model and number of vCPUS + - *query_name*: This is a forieng key reference to *gql_query(name)*. The table *gql_query* stores the name of the query and the query itself used for tests. + - *docker_image*: Stores the docker images of Hasura GraphQL Engine when the HGE is run as docker + - *server_shasum*, *version*: These are stored when HGE is run with `cabal run`. Version stores the version generated by script *gen-version.sh*. The *server_shasum* stores the shasum of the files in the server folder (excluding tests folder). This shasum shows whether the server code has actually varied between the commits. + - *postgres_version* : Stores the version of Postgres + - *latency*, *requests_per_sec*: Stores the benchmark latency and requests\_per\_sec results + - *wrk_parameters*: Stores the parameters used by wrk during benchmarking, including number of threads, total number of open connections, and duration of tests + +### The simplest way to setup the benchmark ### +- Note: This method currently only works on linux instances +- run the benchmarks on a docker-image using +``` +python3 hge_wrk_bench.py --hge-docker-image DOCKER_IMAGE +``` +- The command will prompt for a ``WORK_DIR`` which will store all the results,volumes and databases. +- To compare the results, with another docker build, run the same command again with the modified ``DOCKER_IMAGE`` and the same ``WORK_DIR`` +- If the catalog versions of the two docker builds are not the same, run the benchmarks first on the docker image with a lower + catalog version and then run the benchmarks on the docker image with the higher catalog version. + +### Steps to run benchmarks on a new linux hosted instance ### +- Install docker,python3 +- optional: install ghcup (cabal and ghc will be installed with it), you'll need cabal to be setup only when + you want to run the benchmarks on a branch directly (i.e. there's no docker image for it). +- Run the benchmarks following the steps in the ``The simplest way to setup the benchmark`` diff --git a/server/bench-wrk/gen-version.sh b/server/bench-wrk/gen-version.sh new file mode 100755 index 0000000000000..e7655ba673e02 --- /dev/null +++ b/server/bench-wrk/gen-version.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env sh +set -e + +get_changes_hash() +{ + # We use this to determine if there are any new additions + local GIT_STATUS="$(git status --porcelain)" + # To determine if there are any changes + local GIT_DIFF_INDEX="$(git diff-index -p HEAD --)" + # Whether anything changed in the repo + export GIT_DIRTY="$GIT_STATUS$GIT_DIFF_INDEX" + if [ -n "$GIT_DIRTY" ]; then + DIRTY_HASH_SHORT="$(echo $GIT_DIRTY | sha256sum | awk '{print $1}' | tail -c 9)" + echo -dirty-$DIRTY_HASH_SHORT + else + echo '' + fi +} + +get_main_version() +{ + # Get the branch name + local GIT_BRANCH="$(git rev-parse --abbrev-ref HEAD)" + # Get the current commit id + local COMMIT_HASH_SHORT="$(git rev-parse --short HEAD)" + + case "$GIT_BRANCH" in + + # The master branch + 'master') + echo $COMMIT_HASH_SHORT;; + + # The release branches + release-*) + local RELEASE_VER="$(git describe --match "v[0-9]*" HEAD 2>/dev/null)" + test -n "$RELEASE_VER" || + RELEASE_VER="$(expr "$GIT_BRANCH" : release-*'\(.*\)')"-$COMMIT_HASH_SHORT + echo $RELEASE_VER;; + + # Everything else + *) + echo $GIT_BRANCH-$COMMIT_HASH_SHORT;; + esac +} + +echo "$(get_main_version)$(get_changes_hash)" diff --git a/server/bench-wrk/get-server-sha.sh b/server/bench-wrk/get-server-sha.sh new file mode 100755 index 0000000000000..b5ce66eae9483 --- /dev/null +++ b/server/bench-wrk/get-server-sha.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# We are generating shasum of files in the server directory (excluding packaging and test files). +# This is to track whether server directory has changed or not +dir=$(dirname $0) +cd $dir/../.. +git ls-files -- ':!tests-py' . ':!packaging' . ':!.*' . ':!bench-wrk' | sort | xargs cat | shasum | awk '{print $1}' | tail -c 9 diff --git a/server/bench-wrk/hge_wrk_bench.py b/server/bench-wrk/hge_wrk_bench.py new file mode 100644 index 0000000000000..612dde60b8139 --- /dev/null +++ b/server/bench-wrk/hge_wrk_bench.py @@ -0,0 +1,558 @@ +from sportsdb_setup import HGETestSetup, HGETestSetupArgs +from run_hge import HGE +import graphql +import multiprocessing +import json +import os +import docker +import ruamel.yaml as yaml +import cpuinfo +import subprocess +import threading +import time +import datetime +from colorama import Fore, Style +from plot import run_dash_server +import webbrowser +import pathlib +from urllib.parse import urlparse, urlunparse +import boto3 + + +fileLoc = os.path.dirname(os.path.abspath(__file__)) + +def uri_path_join(uri, *paths): + p = urlparse(uri) + new_path = os.path.join(p.path, *paths) + return urlunparse(p._replace(path=new_path)) + + +class HGEWrkBench(HGETestSetup): + + wrk_docker_image = 'hasura/wrk:v0.3' + + # We'll bind mount the lua script dir to this directory within the wrk container: + lua_dir = '/tmp/bench_scripts' + + rps_steps = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000] + + def __init__( + self, pg_url, remote_pg_url, pg_docker_image, hge_url=None, + remote_hge_url=None, hge_docker_image=None, + hge_args=[], skip_stack_build=False, + graphql_queries_file='queries.graphql', connections=50, + duration=300, results_hge_url = None, results_hge_admin_secret = None + ): + self.load_queries(graphql_queries_file) + super().__init__( + pg_url = pg_url, + remote_pg_url = remote_pg_url, + pg_docker_image = pg_docker_image, + hge_url = hge_url, + remote_hge_url = remote_hge_url, + hge_docker_image = hge_docker_image, + hge_args = hge_args, + skip_stack_build = skip_stack_build + ) + self.connections = connections + self.duration = duration + self.results_hge_url = results_hge_url + self.results_hge_admin_secret = results_hge_admin_secret + self.extract_cpu_info() + # NOTE: we generally want to do this just once; otherwise if we happen + # to be editing the tree while this script is running the shasum will + # keep changing: + self.server_shasum = self.get_server_shasum() + + def load_queries(self, graphql_queries_file): + self.graphql_queries_file = graphql_queries_file + with open(self.graphql_queries_file) as f: + queries = f.read() + self.query_names = [] + self.queries = [] + for oper in graphql.parse(queries).definitions: + self.query_names.append(oper.name.value) + self.queries.append(oper) + + def get_wrk2_params(self): + cpu_count = multiprocessing.cpu_count() + return { + 'threads': cpu_count, + 'connections': self.connections, + 'duration': self.duration + } + + def get_current_user(self): + return '{}:{}'.format(os.geteuid(), os.getegid()) + + def wrk2_test(self, query, rps): + def upload_files(files): + if self.upload_root_uri: + p = urlparse(self.upload_root_uri) + if p.scheme == 's3': + bucket = p.netloc + key = p.path.lstrip('/') + s3_client = boto3.client('s3') + for (f, f_key) in files: + s3_client.upload_file(f, bucket, os.path.join(key, f_key)) + + query_str = graphql.print_ast(query) + params = self.get_wrk2_params() + print(Fore.GREEN + "Running benchmark wrk2 for at {} req/s (duration: {}) for query\n".format(rps, params['duration']), query_str + Style.RESET_ALL) + bench_script = os.path.join(self.lua_dir, 'bench-wrk2.lua') + graphql_url = self.hge.url + '/v1/graphql' + timestamp = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + results_dir = self.results_root_dir + tests_path = [str(rps), timestamp] + results_dir = os.path.join(results_dir, *tests_path) + os.makedirs(results_dir, exist_ok=True) + wrk2_command = [ + 'wrk2', + '-R', str(rps), + '-t', str(params['threads']), + '-c', str(params['connections']), + '-d', str(params['duration']), + '--latency', + '-s', bench_script, + graphql_url, + query_str, + results_dir + ] + volumes = self.get_scripts_vol() + volumes[results_dir] = { + 'bind': results_dir, + 'mode': 'rw' + } + self.docker_client = docker.from_env() + result = self.docker_client.containers.run( + self.wrk_docker_image, + detach = False, + stdout = True, + stderr = False, + command = wrk2_command, + network_mode = 'host', + environment = self.get_lua_env(), + volumes = volumes, + remove = True, + user = self.get_current_user() + ).decode('ascii') + histogram_file = os.path.join(results_dir, 'latencies.hgrm') + histogram = self.get_latency_histogram(result, histogram_file) + + summary_file = os.path.join(results_dir, 'summary.json') + with open(summary_file) as f: + summary = json.load(f) + latencies_file = os.path.join(results_dir, 'latencies') + + def extract_data(v): + return v['data'] if isinstance(v, dict) and 'data' in v else v + tests_info = { k:extract_data(v) for (k, v) in self.gen_test_info(query, rps).items() } + tests_setup_file = os.path.join(results_dir, 'test_setup.json') + with open(tests_setup_file, 'w') as f: + f.write(json.dumps(tests_info, indent=2)) + + upload_files([ + (x, os.path.join(*tests_path,y)) + for (x,y) in [ + (summary_file, 'summary.json'), + (latencies_file, 'latencies'), + (histogram_file, 'latencies.hgrm'), + (tests_setup_file, 'test_setup.json') + ] + ]) + if self.upload_root_uri: + latencies_uri = uri_path_join(self.upload_root_uri, *tests_path, 'latencies') + else: + latencies_uri = pathlib.Path(latencies_file).as_uri() + self.insert_result(query, rps, summary, histogram, latencies_uri) + return (summary, histogram) + + def get_latency_histogram(self, result, write_histogram_file): + const_true = lambda l : True + state_changes = { + 'start' : { + (lambda l: 'Detailed Percentile spectrum' in l) : 'histogram_start' + }, + 'histogram_start': { + (lambda l: 'Value' in l and 'Percentile' in l): 'histogram_headers' + }, + 'histogram_headers': { + const_true: 'histogram_empty_line' + }, + 'histogram_empty_line' : { + const_true: 'histogram_values' + }, + 'histogram_values': { + (lambda l: l.strip().startswith('#')): 'histogram_summary' + }, + 'histogram_summary': { + (lambda l: not l.strip().startswith('#')): 'histogram_end' + } + } + state = 'start' + histogram = [] + print(Fore.CYAN + "Latency histogram summary" + Style.RESET_ALL) + with open(write_histogram_file, 'w') as f: + for line in result.splitlines(): + # Change the state + for (check, next_state) in state_changes[state].items(): + if check(line): + state = next_state + break + if state == 'start': + continue + elif state == 'histogram_end': + break + if state == 'histogram_summary': + print(Fore.CYAN + line + Style.RESET_ALL) + if state in ['histogram_headers','histogram_values','histogram_summary']: + f.write(line+'\n') + + if state == 'histogram_values': + (val, percentile, total_count, _) = line.strip().split() + histogram.append({ + 'percentile': float(percentile), + 'latency': float(val), + 'total_count': float(total_count) + }) + return histogram + + # The appropriate Lua env vars for execution within wrk container: + def get_lua_env(self): + return { + 'LUA_PATH': '/usr/share/lua/5.1/?.lua;' + + os.path.join(self.lua_dir, '?.lua') + ';;', + 'LUA_CPATH': '/usr/lib/lua/5.1/?.so;/usr/lib/x86_64-linux-gnu/lua/5.1/?.so;;' + } + + def get_scripts_vol(self): + return { + os.path.join(fileLoc, 'wrk-websocket-server', 'bench_scripts'): { + 'bind' : self.lua_dir, + 'mode' : 'ro' + } + } + + def max_rps_test(self, query): + query_str = graphql.print_ast(query) + print(Fore.GREEN + "(Compute maximum Request per second) Running wrk benchmark for query\n", query_str + Style.RESET_ALL) + self.hge.graphql_q(query_str) # Test query once for errors + bench_script = os.path.join(self.lua_dir + '/bench-wrk.lua') + graphql_url = self.hge.url + '/v1/graphql' + params = self.get_wrk2_params() + duration = 30 + wrk_command = [ + 'wrk', + '-t', str(params['threads']), + '-c', str(params['connections']), + '-d', str(duration), + '--latency', + '-s', bench_script, + graphql_url, + query_str + ] + self.docker_client = docker.from_env() + + result = self.docker_client.containers.run( + self.wrk_docker_image, + detach = False, + stdout = False, + stderr = True, + command = wrk_command, + network_mode = 'host', + environment = self.get_lua_env(), + volumes = self.get_scripts_vol(), + remove = True, + user = self.get_current_user() + ) + summary = json.loads(result)['summary'] + # TODO explain this calculation. Why aren't we using wrk's reported 'max'? Should we call this avg_sustained_rps or something? + max_rps = round(summary['requests']/float(duration)) + self.insert_max_rps_result(query, max_rps) + print("Max RPS", max_rps) + return max_rps + + def get_version(self): + script = os.path.join(fileLoc, 'gen-version.sh') + return subprocess.check_output([script]).decode('ascii').strip() + + def get_server_shasum(self): + script = os.path.join(fileLoc, 'get-server-sha.sh') + return subprocess.check_output([script]).decode('ascii').strip() + + def extract_cpu_info(self): + self.cpu_info = cpuinfo.get_cpu_info() + for k in ['flags', 'python_version', 'hz_actual', 'hz_actual_raw']: + if self.cpu_info.get(k): + del self.cpu_info[k] + + def get_results(self): + query = ''' +query results { + latency: hge_bench_latest_results { + query_name + requests_per_sec + docker_image + version + latencies_uri + latency_histogram { + percentile + latency + } + } + max_rps: hge_bench_avg_query_max_rps { + query_name + docker_image + version + max_rps + } +} + ''' + output = self.results_hge.graphql_q(query) + return output['data'] + + def set_cpu_info(self, insert_var): + cpu_key = self.cpu_info['brand'] + ' vCPUs: ' + str(self.cpu_info['count']) + insert_var['cpu']= { + 'data' : { + 'info': self.cpu_info, + 'key': cpu_key + }, + "on_conflict": { + "constraint": "cpu_info_pkey", + "update_columns": "key" + } + } + + + def set_query_info(self, insert_var, query): + insert_var["query"] = { + "data": { + "name" : query.name.value, + "query" : graphql.print_ast(query) + }, + "on_conflict" : { + "constraint": "gql_query_query_key", + "update_columns": "query" + } + } + + #TODO add executable shasum also + def set_version_info(self, insert_var): + if self.hge_docker_image: + insert_var["docker_image"] = self.hge_docker_image + else: + insert_var["version"] = self.get_version() + insert_var["server_shasum"] = self.server_shasum + + insert_var['postgres_version'] = self.pg.get_server_version() + if self.scenario_name: + insert_var['scenario_name'] = self.scenario_name + + def set_hge_args_env_vars(self, insert_var): + to_hide_env = ['HASURA_GRAPHQL_' + env for env in + [ 'ADMIN_SECRET', 'DATABASE_URL', 'JWT_SECRET'] + ] + env = { k:v for (k,v) in self.hge.get_hge_env().items() if (k.startswith('HASURA_GRAPHQL') and k not in to_hide_env) or k in ['GHCRTS'] } + args = self.hge.args + insert_var['hge_conf'] = { + 'env': env, + 'args': args + } + + def gen_max_rps_insert_var(self, query, max_rps): + insert_var = dict() + self.set_cpu_info(insert_var) + self.set_query_info(insert_var, query) + self.set_version_info(insert_var) + self.set_hge_args_env_vars(insert_var) + insert_var['max_rps'] = max_rps + insert_var['wrk_parameters'] = self.get_wrk2_params() + return insert_var + + def plot_results(self): + def open_plot_in_browser(): + time.sleep(1) + webbrowser.open_new_tab('http://127.0.0.1:8050/') + threading.Thread(target=open_plot_in_browser).start() + run_dash_server(self.get_results()) + + # Collect info about the test environment + def gen_test_info(self, query, rps): + test_info = dict() + self.set_cpu_info(test_info) + self.set_query_info(test_info, query) + self.set_version_info(test_info) + self.set_hge_args_env_vars(test_info) + test_info["requests_per_sec"] = rps + test_info['wrk2_parameters'] = self.get_wrk2_params() + return test_info + + def gen_result_insert_var(self, query, rps, summary, latency_histogram, latencies_uri): + insert_var = self.gen_test_info(query, rps) + insert_var["summary"] = summary + insert_var['latency_histogram'] = { + 'data' : latency_histogram + } + insert_var['latencies_uri'] = latencies_uri + return insert_var + + def insert_result(self, query, rps, summary, latency_histogram, latencies_uri): + result_var = self.gen_result_insert_var(query, rps, summary, latency_histogram, latencies_uri) + insert_query = """ +mutation insertResult($result: hge_bench_results_insert_input!) { + insert_hge_bench_results(objects: [$result]){ + affected_rows + } +}""" + variables = {'result': result_var} + self.results_hge.graphql_q(insert_query, variables) + + def insert_max_rps_result(self, query, max_rps): + result_var = self.gen_max_rps_insert_var(query, max_rps) + insert_query = """ +mutation insertMaxRps($result: hge_bench_query_max_rps_insert_input!) { + insert_hge_bench_query_max_rps(objects: [$result]){ + affected_rows + } +}""" + variables = {'result': result_var} + self.results_hge.graphql_q(insert_query, variables) + + def setup_results_schema(self): + if not self.results_hge_url: + self.results_hge_url = self.hge.url + self.results_hge_admin_secret = self.hge.admin_secret() + if self.results_hge_admin_secret: + results_hge_args = ['--admin-secret', self.results_hge_admin_secret] + else: + results_hge_args = [] + self.results_hge = HGE(None, None, args=results_hge_args, log_file=None, url=self.results_hge_url) + results_table = { + 'name' : 'results', + 'schema': 'hge_bench' + } + if results_table in self.results_hge.get_all_tracked_tables(): + return + schema_file = os.path.join(fileLoc, 'results_schema.yaml') + with open(schema_file) as f: + queries = yaml.safe_load(f) + self.results_hge.run_bulk(queries) + + + def run_query_benchmarks(self): + def get_results_root_dir(query): + if self.hge_docker_image: + ver_info = 'docker-tag-' + self.hge_docker_image.split(':')[1] + else: + ver_info = self.get_version() + query_name = query.name.value + # Store versioned runs under e.g. test_output/benchmark_runs// + results_root_dir = os.path.abspath(os.path.join(self.work_dir, 'benchmark_runs')) + return os.path.join(results_root_dir, ver_info, query_name) + + for query in self.queries: + try: + self.results_root_dir = get_results_root_dir(query) + max_rps = self.max_rps_test(query) + # The tests should definitely not be running very close to or higher than maximum requests per second + rps_steps = [ r for r in self.rps_steps if r < 0.6*max_rps] + print("Benchmarking queries with wrk2 for the following requests/sec", rps_steps) + for rps in rps_steps: + if rps < int(0.6*max_rps): + self.wrk2_test(query, rps) + except Exception: + print(Fore.RED + "Benchmarking Graphql Query '" + query.name.value + "' failed" + Style.RESET_ALL) + raise + + def run_tests(self): + with self.graphql_engines_setup(): + self.setup_results_schema() + if self.run_benchmarks: + self.run_query_benchmarks() + if not self.skip_plots: + self.plot_results() + + +class HGEWrkBenchArgs(HGETestSetupArgs): + + def __init__(self): + self.set_arg_parse_options() + self.parse_args() + + def set_arg_parse_options(self): + HGETestSetupArgs.set_arg_parse_options(self) + self.set_wrk_options() + + def parse_args(self): + HGETestSetupArgs.parse_args(self) + self.parse_wrk_options() + + def set_wrk_options(self): + def boolean_string(s): + s = s.lower() + if s not in {'false', 'true'}: + raise ValueError('Not a valid boolean string') + return s == 'true' + wrk_opts = self.arg_parser.add_argument_group('wrk') + wrk_opts.add_argument('--queries-file', metavar='HASURA_BENCH_QUERIES_FILE', help='Queries file for benchmarks', default='queries.graphql') + wrk_opts.add_argument('--connections', metavar='HASURA_BENCH_CONNECTIONS', help='Total number of open connections', default=50) + wrk_opts.add_argument('--duration', metavar='HASURA_BENCH_DURATION', help='Duration of tests in seconds', default=300) + wrk_opts.add_argument('--upload-root-uri', metavar='HASURA_BENCH_UPLOAD_ROOT_URI', help='The URI to which the latency results should be uploaded. Curently only s3 is supported', required=False) + wrk_opts.add_argument('--set-scenario-name', metavar='HASURA_BENCH_SCENARIO_NAME', help='Set a name for the test scenario. This will be shown in logs', required=False) + wrk_opts.add_argument('--results-hge-url', metavar='HASURA_BENCH_RESULTS_HGE_URL', help='The GraphQL engine to which the results should be uploaded', required=False) + wrk_opts.add_argument('--results-hge-admin-secret', metavar='HASURA_BENCH_RESULTS_HGE_ADMIN_SECRET', help='Admin secret of the GraphQL engine to which the results should be uploaded', required=False) + wrk_opts.add_argument('--skip-plots', help='Skip plotting', action='store_true', required=False) + wrk_opts.add_argument('--run-benchmarks', metavar='HASURA_BENCH_RUN_BENCHMARKS', help='Whether benchmarks should be run or not', default=True, type=boolean_string) + + def get_s3_caller_identity(self): + return boto3.client('sts').get_caller_identity() + + def parse_wrk_options(self): + self.connections, self.duration, self.graphql_queries_file, self.res_hge_url, upload_root_uri, self.res_hge_admin_secret, self.run_benchmarks, self.scenario_name = \ + self.get_params([ + ('connections', 'HASURA_BENCH_CONNECTIONS'), + ('duration', 'HASURA_BENCH_DURATION'), + ('queries_file', 'HASURA_BENCH_QUERIES_FILE'), + ('results_hge_url', 'HASURA_BENCH_RESULTS_HGE_URL'), + ('upload_root_uri', 'HASURA_BENCH_UPLOAD_ROOT_URI'), + ('results_hge_admin_secret', 'HASURA_BENCH_RESULTS_HGE_ADMIN_SECRET'), + ('run_benchmarks', 'HASURA_BENCH_RUN_BENCHMARKS'), + ('set_scenario_name', 'HASURA_BENCH_SCENARIO_NAME'), + ]) + self.upload_root_uri = None + if upload_root_uri: + p = urlparse(upload_root_uri) + if p.scheme == 's3': + # Check if aws credentials are set + self.get_s3_caller_identity() + self.upload_root_uri = upload_root_uri + + + + self.skip_plots = self.parsed_args.skip_plots + + +class HGEWrkBenchWithArgs(HGEWrkBenchArgs, HGEWrkBench): + + def __init__(self): + HGEWrkBenchArgs.__init__(self) + HGEWrkBench.__init__( + self, + pg_url = self.pg_url, + remote_pg_url = self.remote_pg_url, + pg_docker_image = self.pg_docker_image, + hge_url = self.hge_url, + remote_hge_url = self.remote_hge_url, + hge_docker_image = self.hge_docker_image, + hge_args = self.hge_args, + skip_stack_build = self.skip_stack_build, + graphql_queries_file = self.graphql_queries_file, + connections = self.connections, + duration = self.duration + ) + +if __name__ == "__main__": + bench = HGEWrkBenchWithArgs() + bench.run_tests() + diff --git a/server/bench-wrk/plot.py b/server/bench-wrk/plot.py new file mode 100755 index 0000000000000..65ec88df59bc4 --- /dev/null +++ b/server/bench-wrk/plot.py @@ -0,0 +1,516 @@ +#!/usr/bin/env python3 + +# Avoid tkinter dependency +import matplotlib +matplotlib.use('agg') + +import dash +from dash.dependencies import Input, Output, State +import dash_core_components as dcc +import dash_bootstrap_components as dbc +import dash_html_components as html +from urllib.request import urlopen +import pandas as pd +import json + +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as mpatches +import matplotlib.ticker as ticker +import seaborn as sns +from io import BytesIO +import sys +import argparse +import base64 +from urllib.parse import urlparse +import boto3 + +def as_pairs(l, pair_size=2): + if len(l) < pair_size: + return [l] + else: + return [l[0:pair_size]] + as_pairs(l[pair_size:], pair_size) + + +def get_scenario_results(results, scenarios): + out_results = [] + for snro in scenarios: + def allowed_value(x): + for k in snro: + if x[k] != snro[k]: + return False + return True + req_results = [x for x in results if allowed_value(x)] + out_results.append((snro, req_results)) + return out_results + +def throughput_data(max_rps_results, scenarios): + results = get_scenario_results(max_rps_results, scenarios) + data = [] + for (snro, req_results) in results: + ver_info = snro['version'] or snro['docker_image'].split(':')[1] + if req_results: + data.append({ + 'version': ver_info, + 'query name': snro['query_name'], + 'max throughput': float(req_results[0]['max_rps']) + }) + return pd.DataFrame(data) + +def throughput_figure(df): + sns.set_style("whitegrid") + fig, ax = plt.subplots(figsize=(14,6)) + uniq_versions = set(df['version']) + if len(uniq_versions) >1: + sns.lineplot(x='version', y='max throughput', hue='query name', data=df, ax=ax) + else: + sns.barplot(x='version', y='max throughput', hue='query name', data=df, ax=ax) + ax.grid() + ax.set( + xlabel='Version/Docker image', + ylabel='Throughput (req/s)' + ) + ymax = round(max(df['max throughput'])*1.05) + plt.ylim(0, ymax) + out_fig = gen_plot_figure_data(plt) + plt.close() + return out_fig + +def gen_plot_figure_data(plt): + with BytesIO() as out_img: + plt.savefig(out_img, format='png', bbox_inches='tight', dpi=100) + out_img.seek(0) + encoded = base64.b64encode(out_img.read()).decode('ascii').replace('\n','') + return "data:image/png;base64,{}".format(encoded) + + +def uri_readlines(uri): + print('Latency file:', uri) + p = urlparse(uri) + if p.scheme == 'file': + return urlopen(uri).readlines() + elif p.scheme == 's3': + s3 = boto3.resource('s3') + obj = s3.Object(bucket_name=p.netloc, key=p.path.lstrip('/')) + with BytesIO() as data: + obj.download_fileobj(data) + return data.getvalue().splitlines() + + +def violin_plot_data(latency_results, scenarios): + y_label = 'latency (ms)' + x_label = 'version' + category_label = 'req/sec' + data = [] + results = get_scenario_results(latency_results, scenarios) + for (snro, req_results) in results: + ver_info = snro['version'] or snro['docker_image'].split(':')[1] + if req_results: + latencies = uri_readlines(req_results[0]['latencies_uri']) + for x in latencies: + if isinstance(x, bytes): + x = x.decode() + val_ms = float(x)/1000.0 + data.append({ + y_label: val_ms, + x_label: ver_info, + category_label: snro['requests_per_sec'] + }) + return pd.DataFrame(data) + + +def violin_plot_figure(df): + y_label = 'latency (ms)' + x_label = 'version' + category_label = 'req/sec' + sns.set_style("whitegrid") + plt.figure(figsize=(14,6)) + + # All points latency plot + ax = sns.stripplot( + x=x_label, y=y_label, hue=category_label, + data=df, palette=sns.color_palette(["#AAAAAA"]), + jitter=0.40, size=2.2, dodge=True) + plt.setp(ax.collections, zorder=-1000) + + # Plot percentiles using boxplot whisker caps: + percentiles = [(99.9, 'red'), (99, 'blue'), (95, 'green')] + for pctl, color in percentiles: + ax = sns.boxplot( + x=x_label, y=y_label, hue=category_label, data=df, + showfliers=False, showbox=False, + # Showing bottom percentiles just seemed to add visual noise: + whis=[0, pctl], + capprops={'color': color, 'linewidth': 1}, + # hide all but cap: + whiskerprops={'linewidth': 0} + ) + # Keep boxplots from adding nonsense to legend: + handles, _ = ax.get_legend_handles_labels() + for h in handles: + h.remove() + + # This will get overwritten; add back below: + pctl_legend = plt.legend( + title='Percentile markers', loc='upper left', + handles=[ + mpatches.Patch(color=c, label=str(pctl)+"th") + for pctl, c in percentiles + ] + + [mpatches.Patch(color="black", label='median')] + ) + + # See: https://seaborn.pydata.org/generated/seaborn.violinplot.html + sns.violinplot( + x=x_label, y=y_label, hue=category_label, data=df, palette="Set1", + scale_hue=True, + # All violins get the same area (number of samples may differ): + scale="area", + # More granular violins: + bw=.02, + # This seems to wreck things: + # width=1.5, + linewidth=0, + # inner="quartile" + ) + + # Add back percentile legend: + ax.add_artist(pctl_legend) + + approx_target_y_tics = 20 + + ax.yaxis.set_major_locator(ticker.MaxNLocator(approx_target_y_tics)) + + ax.yaxis.set_major_formatter(ticker.FuncFormatter(y_fmt)) + + plt.ylim(0, None) + out_fig = gen_plot_figure_data(plt) + plt.close() + return out_fig + +def hdrhistogram_figure(df): + sns.set_style("whitegrid") + fig, ax = plt.subplots(figsize=(14,6)) + sns.lineplot(x='percentile', y='latency', hue='version', + data=df, ax=ax) + ax.grid() + ax.set( + xlabel='Percentile', + ylabel='Latency (ms)' + ) + ax.set_xscale('logit') + ticks = [0.25, 0.5, 0.9, 0.99, 0.999, 0.9999, 0.99999] + plt.xticks(ticks) + majors = ['25%', '50%', '90%', '99%' , '99.9%', '99.99%'] + ax.xaxis.set_major_formatter(ticker.FixedFormatter(majors)) + ax.xaxis.set_minor_formatter(ticker.NullFormatter()) + out_fig = gen_plot_figure_data(plt) + plt.close() + return out_fig + + +def hdrhistogram_data(latency_results, scenarios): + results = get_scenario_results(latency_results, scenarios) + data = [] + for (snro, req_results) in results: + ver_info = snro['version'] or snro['docker_image'].split(':')[1] + if req_results: + histogram = req_results[0]['latency_histogram'] + for e in histogram: + data.append({ + 'latency': float(e['latency']), + 'percentile': float(e['percentile']), + 'req/sec': snro['requests_per_sec'], + 'version': ver_info + }) + return pd.DataFrame(data) + + +# Human friendly Y labels. +# Copy-pasta: https://stackoverflow.com/a/40573071 +def y_fmt(y, pos): + decades = [1e9, 1e6, 1e3, 1e0, 1e-3, 1e-6, 1e-9 ] + suffix = ["G", "M", "k", "" , "m" , "u", "n"] + if y == 0: + return str(0) + for i, d in enumerate(decades): + if np.abs(y) >=d: + val = y/float(d) + signf = len(str(val).split(".")[1]) + if signf == 0: + return '{val:d} {suffix}'.format(val=int(val), suffix=suffix[i]) + else: + if signf == 1: + if str(val).split(".")[1] == "0": + return '{val:d} {suffix}'.format(val=int(round(val)), suffix=suffix[i]) + tx = "{"+"val:.{signf}f".format(signf = signf) +"} {suffix}" + return tx.format(val=val, suffix=suffix[i]) + return y + +def run_dash_server(bench_results): + latency_results = bench_results['latency'] + max_rps_results = bench_results['max_rps'] + latency_results.sort(key=lambda x : x['version'] or x['docker_image']) + + app = dash.Dash(external_stylesheets=[dbc.themes.BOOTSTRAP]) + + uniq_queries = list( set( [ x['query_name'] for x in latency_results ] ) ) + children = [] + + + rows = [] + plot_types = ['latency histogram','latency violins', 'max throughput'] + plots_filters_1 = [ + dbc.Col([ + html.Label('Plot type'), + dcc.Dropdown( + id='plot-type', + options=[{'label':q, 'value': q} for q in plot_types], + value= plot_types[2] + ) + ], width=2), + dbc.Col([ + html.Label('Query'), + dcc.Dropdown( + id='query-name', + options=[{'label':q, 'value': q} for q in uniq_queries], + value= uniq_queries[:min(len(uniq_queries), 4)], + multi=True + ) + ]), + dbc.Col(html.Div( + children=[ + html.Label('Requests/sec'), + dcc.Dropdown(id='rps', multi=False) + + ], + id='rps-div' + ), width=3) + ] + rows.append(dbc.Row(plots_filters_1)) + + plots_filter_2 = [ + dbc.Col([ + html.Label('version(s)/docker_image(s)'), + dcc.Dropdown( + id='ver', + multi=True + ) + ]) + ] + rows.append(dbc.Row(plots_filter_2)) + + graph = dbc.Col( + html.Div([html.Img(id = 'graph', src = '')], id='plot_div') + ) + rows.append(dbc.Row(graph)) + + children.append(html.Div(dbc.Container(rows))) + + app.layout = html.Div(children=children) + + def as_list(x): + if not isinstance(x, list): + return [x] + else: + return x + + @app.callback( + Output('ver', 'options'), + [ Input('plot-type', 'value'), Input('query-name', 'value'), Input('rps', 'value') ] + ) + def updateVerOptions(plot_type, query_names, rps_list): + query_names = as_list(query_names) + rps_list = as_list(rps_list) + relvnt_q = [ + x for x in latency_results + if x['query_name'] in query_names and + (x['requests_per_sec'] in rps_list or plot_type == 'max throughput') + ] + uniq_vers = list(set([ + (x['version'], x['docker_image']) + for x in relvnt_q + ])) + print("Updating version options to", uniq_vers) + return [ + { + 'label': x[0] or x[1], + 'value': json.dumps({ + 'version': x[0], + + 'docker_image': x[1] + }) + } + for x in uniq_vers + ] + + @app.callback( + Output('ver', 'value'), + [ Input('plot-type', 'value'), Input('ver', 'options') ], + [ State('ver', 'value') ] + ) + def updateVerValue(plot_types, options, vers): + print('Updating version value, options', options) + return updateMultiValue(options, vers) + + def updateMultiValue(options, vals, def_list_size=2): + vals = as_list(vals) + new_vals = [] + allowed_vals = [ x['value'] for x in options] + def_val_size = min(def_list_size, len(allowed_vals)) + default_vals = allowed_vals[:def_val_size] + if not vals: + return default_vals + for val in vals: + if val in allowed_vals: + new_vals.append(val) + if new_vals: + return new_vals + else: + return default_vals + + def updateSingleValue(options, val): + allowed_vals = [ x['value'] for x in options] + if val and val in allowed_vals: + return val + elif allowed_vals: + return allowed_vals[0] + else: + return None + + # Change queries to multi for throughput plot + @app.callback( + Output('query-name', 'multi'), + [ Input('plot-type', 'value') ], + ) + def query_dropdown_multi(plot_type): + return plot_type == 'max throughput' + + @app.callback( + Output('query-name', 'value'), + [ Input('plot-type', 'value'), Input('query-name', 'options'), Input('query-name', 'multi') ], + [ State('query-name', 'value') ] + ) + def updateQueryValue(plot_type, options, multi, query_names): + if plot_type == 'max throughput': + return updateMultiValue(options, query_names, 4) + else: + return updateSingleValue(options, query_names) + + + @app.callback( + Output('rps', 'options'), + [ Input('plot-type', 'value'), Input('query-name', 'value') ] + ) + def updateRPSOptions(plot_type, query_name): + relvnt_q = [ x for x in latency_results if x['query_name'] == query_name ] + rps = list( set( [x['requests_per_sec'] for x in relvnt_q ] ) ) + return [ + { + 'label': str(x), + 'value': x + } for x in sorted(rps) + ] + + @app.callback( + Output('rps', 'value'), + [ Input('plot-type', 'value'), Input('rps', 'options') ], + [ State('rps', 'value') ] + ) + def updateRPSValue(plot_type, options, rps): + if plot_type == 'latency histogram': + return updateSingleValue(options, rps) + else: + rps = as_list(rps) + return updateMultiValue(options, rps) + + # Change RPS to multi for violin plot + @app.callback( + Output('rps', 'multi'), + [ Input('plot-type', 'value') ], + ) + def rps_dropdown_multi(plot_type): + return plot_type == 'latency violins' + + # Hide RPS dropdown if plot type is throughput + @app.callback( + Output('rps-div', 'style'), + [ Input('plot-type', 'value') ], + ) + def rps_dropdown_style(plot_type): + if plot_type == 'max throughput': + return { 'display': 'none' } + else: + return {'display': 'block'} + + def get_hdrhistogram_figure(scenarios): + df = hdrhistogram_data(latency_results, scenarios) + return hdrhistogram_figure(df) + + def get_violin_figure(scenarios): + df = violin_plot_data(latency_results, scenarios) + return violin_plot_figure(df) + + def get_throughput_figure(scenarios): + df = throughput_data(max_rps_results, scenarios) + return throughput_figure(df) + + @app.callback( + Output('graph', 'src'), + [ + Input('plot-type', 'value'), + Input('query-name', 'value'), + Input('rps', 'value'), + Input('ver', 'value') + ] + ) + def updateGraph(plot_type, query_name, rps_list, vers): + rps_list = as_list(rps_list) + def latency_scenarios(): + return [ + { + 'query_name': query_name, + 'requests_per_sec': rps, + **json.loads(v) + } + for v in set(vers) + for rps in set(rps_list) + ] + def throughput_scenarios(): + return [ + { + 'query_name': qname, + **json.loads(v) + } + for v in set(vers) + for qname in as_list(query_name) + ] + if plot_type == 'max throughput': + scenarios = throughput_scenarios() + else: + scenarios = latency_scenarios() + + + if plot_type == 'latency histogram': + return get_hdrhistogram_figure(scenarios) + elif plot_type == 'latency violins': + return get_violin_figure(scenarios) + elif plot_type == 'max throughput': + return get_throughput_figure(scenarios) + + app.run_server(host="127.0.0.1", debug=False) + +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + parser.add_argument( + '--results', nargs='?', type=argparse.FileType('r'), + default=sys.stdin) + args = parser.parse_args() + bench_results = json.load(args.results) + print(bench_results) + + print("=" * 20) + print("starting dash server for graphs") + + run_dash_server(bench_results) diff --git a/server/bench-wrk/port_allocator.py b/server/bench-wrk/port_allocator.py new file mode 100644 index 0000000000000..a788743140278 --- /dev/null +++ b/server/bench-wrk/port_allocator.py @@ -0,0 +1,27 @@ +import threading +import socket + + +class PortAllocator: + + def __init__(self): + self.allocated_ports = set() + self.lock = threading.Lock() + + def get_unused_port(self, start): + port = start + if self.is_port_open(port) or port in self.allocated_ports: + return self.get_unused_port(port + 1) + else: + return port + + def is_port_open(self, port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + res = sock.connect_ex(('127.0.0.1', port)) + return res == 0 + + def allocate_port(self, start): + with self.lock: + port = self.get_unused_port(start) + self.allocated_ports.add(port) + return port diff --git a/server/bench-wrk/queries.graphql b/server/bench-wrk/queries.graphql new file mode 100644 index 0000000000000..18efea4ccb7eb --- /dev/null +++ b/server/bench-wrk/queries.graphql @@ -0,0 +1,128 @@ + +# A query which carries a lot of data +query rows_50_nesting_level_1 { + hge_events(limit: 50){ + event_status + last_update + affiliations_events_by_event_id(limit: 2) { + affiliation{ + publisher{ + publisher_name + } + } + } + } +} + +# Simple query which returns one row with its object foreign key relationship +query one_row_nesting_level_1 { + hge_events_by_pk(id: 1) { + id + attendance + duration + event_status + last_update + publisher { + id + publisher_name + } + } +} + +# Query with nesting level of 4 +query one_row_nesting_level_4 { + hge_baseball_action_plays_by_pk(id: 1) { + play_type + baseball_event_state { + current_state + pitcher { + person_key + publisher { + publisher_key + } + } + } + } +} + +# Query with nesting level of 7 +query one_row_nesting_level_7 { + hge_baseball_action_plays_by_pk(id: 1) { + play_type + baseball_event_state { + current_state + pitcher { + person_key + publisher { + publisher_key + persons_by_publisher_id(limit: 1){ + person_key + baseball_event_states_by_batter_id { + current_state + event { + event_status + } + } + } + } + } + } + } +} + +# Remote query +query remote_one_row { + remote_hge_events_by_pk(id: 1) { + id + attendance + duration + event_status + last_update + publisher { + id + publisher_name + } + } +} + +# Remote query with two top level fields +query remote_two_fields { + events_1: remote_hge_events_by_pk(id: 1) { + id + attendance + duration + event_status + last_update + publisher { + id + publisher_name + } + } + + events_2: remote_hge_events_by_pk(id: 2) { + id + attendance + duration + event_status + last_update + publisher { + id + publisher_name + } + } +} + +# Remote query lot of data +query remote_50_rows { + remote_hge_events(limit: 50){ + event_status + last_update + affiliations_events_by_event_id(limit: 2) { + affiliation{ + publisher{ + publisher_name + } + } + } + } +} diff --git a/server/bench-wrk/requirements-top-level.txt b/server/bench-wrk/requirements-top-level.txt new file mode 100644 index 0000000000000..81b2a33589c5f --- /dev/null +++ b/server/bench-wrk/requirements-top-level.txt @@ -0,0 +1,17 @@ +requests-cache +ruamel.yaml +docker +psycopg2 +colorama +inflection +py-cpuinfo +dash +dash-renderer +dash-html-components +dash-bootstrap-components +dash-core-components +graphene==3.0b1 +matplotlib +pandas +boto3 +seaborn diff --git a/server/bench-wrk/requirements.txt b/server/bench-wrk/requirements.txt new file mode 100644 index 0000000000000..f217ec9fd6352 --- /dev/null +++ b/server/bench-wrk/requirements.txt @@ -0,0 +1,52 @@ +aniso8601==8.0.0 +boto3==1.13.1 +botocore==1.16.1 +Brotli==1.0.7 +certifi==2020.4.5.1 +chardet==3.0.4 +click==7.1.2 +colorama==0.4.3 +cycler==0.10.0 +dash==1.11.0 +dash-bootstrap-components==0.9.2 +dash-core-components==1.9.1 +dash-html-components==1.0.3 +dash-renderer==1.4.0 +dash-table==4.6.2 +docker==4.2.0 +docutils==0.15.2 +Flask==1.1.2 +Flask-Compress==1.5.0 +future==0.18.2 +graphene==3.0b1 +graphql-core==3.1.0 +graphql-relay==3.0.0 +idna==2.9 +inflection==0.4.0 +itsdangerous==1.1.0 +Jinja2==2.11.2 +jmespath==0.9.5 +kiwisolver==1.2.0 +MarkupSafe==1.1.1 +matplotlib==3.2.1 +numpy==1.18.4 +pandas==1.0.3 +plotly==4.6.0 +psycopg2==2.8.5 +py-cpuinfo==5.0.0 +pyparsing==2.4.7 +python-dateutil==2.8.1 +pytz==2020.1 +requests==2.23.0 +requests-cache==0.5.2 +retrying==1.3.3 +ruamel.yaml==0.16.10 +ruamel.yaml.clib==0.2.0 +s3transfer==0.3.3 +scipy==1.4.1 +seaborn==0.10.1 +six==1.14.0 +Unidecode==1.1.1 +urllib3==1.25.9 +websocket-client==0.57.0 +Werkzeug==1.0.1 diff --git a/server/bench-wrk/results_schema.yaml b/server/bench-wrk/results_schema.yaml new file mode 100644 index 0000000000000..9a8b0cc29cb25 --- /dev/null +++ b/server/bench-wrk/results_schema.yaml @@ -0,0 +1,167 @@ +- type: run_sql + args: + sql: | + create schema if not exists hge_bench; + + create table if not exists hge_bench.gql_query ( + name text primary key not null, + query text not null unique + ); + + create table if not exists hge_bench.cpu_info ( + key text primary key not null, + info jsonb not null unique + ); + + create table if not exists hge_bench.query_max_rps( + id serial primary key, + cpu_key text references hge_bench.cpu_info (key), + query_name text references hge_bench.gql_query (name) not null, + docker_image text, + version text, + scenario_name text, + postgres_version text, + server_shasum text, + time timestamptz not null default now(), + max_rps integer not null, + wrk_parameters jsonb, + hge_conf jsonb + constraint should_have_tag CHECK (docker_image is not null or version is not null) + ); + + create table if not exists hge_bench.results( + id serial primary key, + cpu_key text references hge_bench.cpu_info (key), + query_name text references hge_bench.gql_query (name) not null, + docker_image text, + version text, + scenario_name text, + postgres_version text, + server_shasum text, + time timestamptz not null default now(), + requests_per_sec integer not null, + summary jsonb, + latencies_uri text, + wrk2_parameters jsonb, + hge_conf jsonb + constraint should_have_tag CHECK (docker_image is not null or version is not null) + ); + + create or replace view hge_bench.latest_results as + select + distinct on (cpu_key, docker_image, version, query_name, requests_per_sec) + id, cpu_key, query_name, docker_image, version, + postgres_version, server_shasum, time, requests_per_sec, summary, + latencies_uri, wrk2_parameters + from hge_bench.results + order by cpu_key, docker_image, version, query_name, requests_per_sec, time desc; + + create table if not exists hge_bench.latency_histogram ( + id integer references hge_bench.results(id), + percentile double precision not null, + latency double precision not null, + total_count integer not null + ); + + create or replace view hge_bench.avg_query_max_rps as + select cpu_key, query_name, docker_image, version, avg(max_rps) as max_rps + from hge_bench.query_max_rps + group by cpu_key, query_name, docker_image, version; + +- type: track_table + args: + schema: hge_bench + name: gql_query + +- type: track_table + args: + schema: hge_bench + name: results + +- type: track_table + args: + schema: hge_bench + name: query_max_rps + +- type: track_table + args: + schema: hge_bench + name: cpu_info + +- type: track_table + args: + schema: hge_bench + name: latency_histogram + +- type: track_table + args: + schema: hge_bench + name: latest_results + +- type: track_table + args: + schema: hge_bench + name: avg_query_max_rps + +- type: create_object_relationship + args: + table: + schema: hge_bench + name: results + name: query + using: + foreign_key_constraint_on: query_name + +- type: create_object_relationship + args: + table: + schema: hge_bench + name: results + name: cpu + using: + foreign_key_constraint_on: cpu_key + +- type: create_object_relationship + args: + table: + schema: hge_bench + name: query_max_rps + name: query + using: + foreign_key_constraint_on: query_name + +- type: create_object_relationship + args: + table: + schema: hge_bench + name: query_max_rps + name: cpu + using: + foreign_key_constraint_on: cpu_key + +- type: create_array_relationship + args: + table: + schema: hge_bench + name: results + name: latency_histogram + using: + foreign_key_constraint_on: + table: + schema: hge_bench + name: latency_histogram + column: id + +- type: create_array_relationship + args: + table: + schema: hge_bench + name: latest_results + name: latency_histogram + using: + manual_configuration: + remote_table: + schema: hge_bench + name: latency_histogram + column_mapping: + id: id diff --git a/server/bench-wrk/run_hge.py b/server/bench-wrk/run_hge.py new file mode 100644 index 0000000000000..bdb578174ff92 --- /dev/null +++ b/server/bench-wrk/run_hge.py @@ -0,0 +1,464 @@ +import os +import subprocess +import argparse +import json +import signal +import time +import contextlib +import requests +import inflection +import docker +from colorama import Fore, Style + + +def rm_file_if_exists(f): + """Remove a file if it exists""" + with contextlib.suppress(FileNotFoundError): + os.remove(f) + + +class HGEError(Exception): + """Exception type for class HGE""" + + +class HGE: + + default_graphql_env = { + 'HASURA_GRAPHQL_ENABLE_TELEMETRY': 'false', + 'EVENT_WEBHOOK_HEADER': "MyEnvValue", + 'HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES': 'true', + 'HASURA_GRAPHQL_CONSOLE_ASSETS_DIR' : '../../console/static/dist/', + 'HASURA_GRAPHQL_ENABLE_CONSOLE' : 'true' + } + + def __init__(self, pg, port_allocator, docker_image=None, log_file='hge.log', url=None, args=[]): + self.pg = pg + self.log_file = log_file + if self.log_file: + self.tix_file = self.log_file[:-4] + '.tix' + self.docker_image = docker_image + self.introspection = None + self.obj_fk_rels = set() + self.arr_fk_rels = set() + self.port_allocator = port_allocator + self.url = url + self.proc = None + self.container = None + self.args = args + + + def admin_secret(self): + admin_secret_env = os.environ.get('HASURA_GRAPHQL_ADMIN_SECRET') + parser = argparse.ArgumentParser() + parser.add_argument('--admin-secret', metavar='HASURA_GRAPHQL_ADMIN_SECRET', required=False) + admin_secret_arg = parser.parse_known_args(self.args)[0].admin_secret + return admin_secret_arg or admin_secret_env + + @classmethod + def do_stack_build(cls): + print(Fore.YELLOW + "Performing Stack build first" + Style.RESET_ALL) + # 'stack run' below will also build, but we want to make sure that's a + # noop so the server starts right away + subprocess.check_call( ['cabal', 'new-build', 'exe:graphql-engine']) + + def get_hge_env(self): + hge_env = { + **os.environ, + **self.default_graphql_env.copy(), + 'HASURA_GRAPHQL_DATABASE_URL': self.pg.url, + 'HASURA_GRAPHQL_SERVER_PORT': str(self.port), + 'HASURA_GRAPHQL_SERVER_HOST': '127.0.0.1', + 'HPCTIXFILE' : self.tix_file + } + return hge_env + + def run(self): + if self.url: + return + if self.docker_image: + self.run_with_docker() + else: + self.run_with_cabal() + + def run_with_docker(self): + if self.url: + return + self.port = self.port_allocator.allocate_port(8080) + hge_env = self.get_hge_env() + process_args = ['graphql-engine', 'serve', *self.args] + docker_ports = {str(self.port) + '/tcp': ('127.0.0.1', self.port)} + self.docker_client = docker.from_env() + print("Running GraphQL Engine docker with image:", + self.docker_image, '(port:{})'.format(self.port)) + print(process_args) + self.container = self.docker_client.containers.run( + self.docker_image, + command=process_args, + detach=True, + ports=docker_ports, + environment=hge_env, + network_mode='host', + volumes={} + ) + self.url = 'http://127.0.0.1:' + str(self.port) + print("Waiting for GraphQL Engine to be running.", end='') + self.wait_for_start() + + + def run_with_cabal(self): + if self.url: + return + self.port = self.port_allocator.allocate_port(8080) + rm_file_if_exists(self.tix_file) + hge_env = self.get_hge_env() + process_args = ['cabal', 'new-run', '--', 'exe:graphql-engine', 'serve', *self.args] + print("Running GraphQL with 'cabal run': (port:{})".format(self.port)) + print(process_args) + self.log_fp = open(self.log_file, 'w') + self.proc = subprocess.Popen( + process_args, + env=hge_env, + shell=False, + bufsize=-1, + start_new_session=True, + stdout=self.log_fp, + stderr=subprocess.STDOUT + ) + self.url = 'http://127.0.0.1:' + str(self.port) + print("Waiting for GraphQL Engine to be running.", end='') + self.wait_for_start() + + def check_if_process_is_running(self): + if self.proc.poll() is not None: + with open(self.log_file) as fr: + raise HGEError( + "GraphQL engine failed with error: " + fr.read()) + + def check_if_container_is_running(self): + self.container.reload() + if self.container.status == 'exited': + raise HGEError( + "GraphQL engine failed with error: \n" + + self.container.logs(stdout=True, stderr=True).decode('ascii') + ) + + def wait_for_start(self, timeout=120): + if timeout <= 0: + raise HGEError("Timeout waiting for graphql process to start") + if self.proc: + self.check_if_process_is_running() + elif self.container: + self.check_if_container_is_running() + try: + q = { 'query': 'query { __typename }' } + r = requests.post(self.url + '/v1/graphql',json.dumps(q),headers=self.admin_auth_headers()) + if r.status_code == 200: + print() + return + except requests.exceptions.ConnectionError: + pass + except ConnectionError: + pass + print(".", end="", flush=True), + sleep_time = 0.5 + time.sleep(sleep_time) + self.wait_for_start(timeout - sleep_time) + + def teardown(self): + if getattr(self, 'log_fp', None): + self.log_fp.close() + self.log_fp = None + if self.proc: + self.cleanup_process() + elif self.container: + self.cleanup_docker() + + def cleanup_process(self): + # TODO hangs + print(Fore.YELLOW + "Stopping graphql engine at port:", self.port, Style.RESET_ALL) + + pgrp = os.getpgid(self.proc.pid) + os.killpg(pgrp, signal.SIGTERM) + # NOTE this doesn't seem to work, although a SIGINT from terminal does ... + # self.proc.send_signal(signal.SIGINT) + self.proc.wait() + self.proc = None + + def cleanup_docker(self): + cntnr_info = "HGE docker container " + self.container.name + " " + repr(self.container.image) + print(Fore.YELLOW + "Stopping " + cntnr_info + Style.RESET_ALL) + self.container.stop() + print(Fore.YELLOW + "Removing " + cntnr_info + Style.RESET_ALL) + self.container.remove() + self.container = None + + def admin_auth_headers(self): + headers = {} + if self.admin_secret(): + headers['X-Hasura-Admin-Secret'] = self.admin_secret() + return headers + + def v1q(self, q, exp_status=200): + resp = requests.post(self.url + '/v1/query', json.dumps(q), headers=self.admin_auth_headers()) + assert resp.status_code == exp_status, (resp.status_code, resp.json()) + return resp.json() + + def graphql_q(self, query, variables={}, exp_status = 200): + q = {'query': query} + if variables: + q['variables'] = variables + resp = requests.post(self.url + '/v1/graphql', json.dumps(q), headers=self.admin_auth_headers()) + assert resp.status_code == exp_status, (resp.status_code, resp.json()) + assert 'errors' not in resp.json(), resp.json() + return resp.json() + + def track_all_tables_in_schema(self, schema='public'): + print("Track all tables in schema ", schema) + all_tables = self.pg.get_all_tables_in_a_schema(schema) + all_tables = [ {'schema': schema, 'name': t} + for t in all_tables ] + return self.track_tables(all_tables) + + def run_bulk(self, queries, exp_status = 200): + bulk_q = { + 'type': 'bulk', + 'args': queries + } + return self.v1q(bulk_q, exp_status) + + def select_simple(self, table, columns): + query = { + 'type': 'select', + 'args': { + 'table': table, + 'columns': columns + } + } + return self.v1q(query) + + def get_all_tracked_tables(self): + table = { + 'schema': 'hdb_catalog', + 'name': 'hdb_table' + } + columns = ['table_schema', 'table_name'] + resp = self.select_simple(table, columns) + tables = [] + for row in resp: + tables.append({ + 'schema': row['table_schema'], + 'name': row['table_name'] + }) + return tables + + + def track_tables(self, tables, exp_status=200): + queries = [] + for table in tables: + q = { + 'type' : 'track_table', + 'args' : table + } + queries.append(q) + return self.run_bulk(queries, exp_status) + + def track_table(self, table, exp_status=200): + q = self.mk_track_table_q(table) + return self.v1q(q, exp_status) + + def mk_track_table_q(self, table): + return { + 'type' : 'track_table', + 'args' : table + } + + def add_remote_schema(self, name, remote_url, headers={}, client_hdrs=False): + def hdr_name_val_pair(headers): + nvp = [] + for (k,v) in headers.items(): + nvp.append({'name': k, 'value': v}) + return nvp + if len(headers) > 0: + client_hdrs = True + q = { + 'type' : 'add_remote_schema', + 'args': { + 'name': name, + 'comment': name, + 'definition': { + 'url': remote_url, + 'headers': hdr_name_val_pair(headers), + 'forward_client_headers': client_hdrs + } + } + } + return self.v1q(q) + + + def create_remote_obj_rel_to_itself(self, tables_schema, remote, remote_tables_schema): + print("Creating remote relationship to the tables in schema {} to itself using remote {}".format(tables_schema, remote)) + fk_constrnts = self.pg.get_all_fk_constraints(tables_schema) + for (s, _, t, c, _, ft, _) in fk_constrnts: + table_cols = self.pg.get_all_columns_of_a_table(t, s) + if not 'id' in table_cols: + continue + rel_name = 'remote_' + inflection.singularize(t) + '_via_' + c + query ={ + 'type': 'create_remote_relationship', + 'args' : { + 'name' : rel_name, + 'table' : { + 'schema': s, + 'name': t + }, + 'remote_schema': remote, + 'hasura_fields': ['id', c], + 'remote_field': { + remote_tables_schema + '_' + ft + '_by_pk' : { + 'arguments': { + 'id': '$' + c + }, + 'field': { + inflection.pluralize(t) + '_by_' + c: { + 'arguments' : { + 'where': { + c : { + '_eq': '$id' + } + } + } + } + } + } + } + } + } + print(query) + self.v1q(query) + + def create_remote_obj_fk_ish_relationships(self, tables_schema, remote, remote_tables_schema): + print("Creating object foreign key ish relationships for tables in schema {} using remote {}".format(tables_schema, remote)) + fk_constrnts = self.pg.get_all_fk_constraints(tables_schema) + for (s, _, t, c, _, ft, _) in fk_constrnts: + rel_name = inflection.singularize(ft) + if c.endswith('_id'): + rel_name = c[:-3] + rel_name = 'remote_' + rel_name + query ={ + 'type': 'create_remote_relationship', + 'args' : { + 'name' : rel_name, + 'table' : { + 'schema': s, + 'name': t + }, + 'remote_schema': remote, + 'hasura_fields': [c], + 'remote_field': { + remote_tables_schema + '_' + ft + '_by_pk' : { + 'arguments' : { + 'id': '$' + c + } + } + } + + } + } + print(query) + self.v1q(query) + + def create_obj_fk_relationships(self, schema='public'): + print("Creating object foreign key relationships for tables in schema ", schema) + fk_constrnts = self.pg.get_all_fk_constraints(schema) + queries = [] + for (s, _, t, c, _, ft, _) in fk_constrnts: + rel_name = inflection.singularize(ft) + if c.endswith('_id'): + rel_name = c[:-3] + table_cols = self.pg.get_all_columns_of_a_table(t, s) + if rel_name in table_cols: + rel_name += '_' + inflection.singularize(ft) + queries.append({ + 'type' : 'create_object_relationship', + 'args': { + 'table': { + 'schema': s, + 'name': t + }, + 'name': rel_name, + 'using': { + 'foreign_key_constraint_on': c + } + } + }) + self.obj_fk_rels.add(((s,t),rel_name)) + return self.run_bulk(queries) + + def create_remote_arr_fk_ish_relationships(self, tables_schema, remote, remote_tables_schema): + fk_constrnts = self.pg.get_all_fk_constraints(tables_schema) + for (_, _, t, c, fs, ft, _) in fk_constrnts: + rel_name = 'remote_' + inflection.pluralize(t) + '_by_' + c + query ={ + 'type': 'create_remote_relationship', + 'args' : { + 'name' : rel_name, + 'table' : { + 'schema': fs, + 'name': ft + }, + 'remote_schema': remote, + 'hasura_fields': ['id'], + 'remote_field': { + remote_tables_schema + '_' + t : { + 'arguments' : { + 'where': { + c : { + '_eq': '$id' + } + } + } + } + } + } + } + print(query) + self.v1q(query) + + def create_arr_fk_relationships(self, schema='public'): + print("Creating array foreign key relationships for tables in schema ", schema) + fk_constrnts = self.pg.get_all_fk_constraints(schema) + queries = [] + for (s, _, t, c, fs, ft, _) in fk_constrnts: + rel_name = inflection.pluralize(t) + '_by_' + c + queries.append({ + 'type' : 'create_array_relationship', + 'args': { + 'table': { + 'schema': fs, + 'name': ft + }, + 'name': rel_name, + 'using': { + 'foreign_key_constraint_on': { + 'table': { + 'schema': s, + 'name': t + }, + 'column': c + } + } + } + }) + self.arr_fk_rels.add(((fs,ft),rel_name)) + return self.run_bulk(queries) + + def run_sql(self, sql): + """Run given SQL query""" + def mk_run_sql_q(sql): + return { + 'type' : 'run_sql', + 'args': { + 'sql' : sql + } + } + return self.v1q(mk_run_sql_q(sql)) diff --git a/server/bench-wrk/run_postgres.py b/server/bench-wrk/run_postgres.py new file mode 100644 index 0000000000000..13d3839e57fa8 --- /dev/null +++ b/server/bench-wrk/run_postgres.py @@ -0,0 +1,195 @@ +import docker +import time +from contextlib import contextmanager +import psycopg2 +from psycopg2.sql import SQL, Identifier +from colorama import Fore, Style +import os + + +class PostgresError(Exception): + pass + + +class Postgres: + + def __init__(self, docker_image, db_data_dir, port_allocator, url): + self.port_allocator = port_allocator + self.docker_image = docker_image + self.db_data_dir = os.path.abspath(db_data_dir) + self.url = url + + def setup(self): + if self.docker_image and not self.url: + self.start_postgres_docker() + + def start_postgres_docker(self): + if self.url: + print("Postgres is already running") + return + self.port = self.port_allocator.allocate_port(5433) + self.user = 'hge_test' + self.password = 'hge_pass' + self.database = 'hge_test' + env = { + 'POSTGRES_USER' : self.user, + 'POSTGRES_PASSWORD' : self.password, + 'POSTGRES_DB' : self.database + } + docker_ports = {'5432/tcp': ('127.0.0.1', self.port)} + docker_vols = { + self.db_data_dir: { + 'bind': '/var/lib/postgresql/data', + 'mode': 'rw' + } + } + + self.docker_client = docker.from_env() + print("Running postgres docker with image:", + self.docker_image, '(port:{})'.format(self.port)) + cntnr = self.docker_client.containers.run( + self.docker_image, + detach=True, + ports=docker_ports, + environment=env, + volumes = docker_vols + ) + self.pg_container = cntnr + self.url = 'postgresql://' + self.user + ':' + self.password + '@localhost:' + str(self.port) + '/' + self.database + print("Waiting for database to be up and running.", end="", flush=True) + self.wait_for_db_start() + print("") + + def check_if_container_is_running(self): + self.pg_container.reload() + if self.pg_container.status == 'exited': + raise PostgresError( + "Postgres docker failed with error: \n" + + self.pg_container.logs(stdout=True, stderr=True).decode('ascii') + ) + + def wait_for_db_start(self, timeout=60): + if self.pg_container: + self.check_if_container_is_running() + if timeout > 0: + try: + self.run_sql('select 1') + return + except Exception as e: + if timeout < 5: + print("\nWaiting for database to be up and running:" + repr(e), end=""), + else: + print(".", end="", flush=True), + sleep_time = 0.5 + time.sleep(sleep_time) + self.wait_for_db_start(timeout-sleep_time) + else: + raise PostgresError("Timeout waiting for database to start") + + @contextmanager + def cursor(self): + with psycopg2.connect(self.url) as conn: + with conn.cursor() as cursor: + yield cursor + + def get_all_fk_constraints(self, schema='public'): + with self.cursor() as cursor: + cursor.execute(''' + SELECT + tc.table_schema, + tc.constraint_name, + tc.table_name, + kcu.column_name, + ccu.table_schema AS foreign_table_schema, + ccu.table_name AS foreign_table_name, + ccu.column_name AS foreign_column_name + FROM + information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' and tc.table_schema=%s + ''',(schema,)) + return cursor.fetchall() + + def move_tables_to_schema(self, cur_schema, target_schema): + print("Moving tables in schema {} to schema {}".format(cur_schema, target_schema)) + table_names = self.get_all_tables_in_a_schema(cur_schema) + with self.cursor() as cursor: + cursor.execute(SQL('CREATE SCHEMA IF NOT EXISTS {} ').format(Identifier(target_schema))) + for table in table_names: + cursor.execute(SQL('''ALTER TABLE {}.{} SET SCHEMA {};''').format( Identifier(cur_schema), Identifier(table), Identifier(target_schema) )) + + def get_all_columns_of_a_table(self, table_name, table_schema='public'): + columns = [] + with self.cursor() as cursor: + cursor.execute(''' + SELECT column_name + FROM information_schema.columns + WHERE table_name= %s and table_schema=%s; + ''', (table_name, table_schema)) + for row in cursor.fetchall(): + columns.append(row[0]) + return columns + + def get_all_tables_with_column(self, column, schema='public'): + tables = [] + with self.cursor() as cursor: + cursor.execute(''' + SELECT table_name + FROM information_schema.columns + WHERE column_name= %s and table_schema=%s; + ''', (column,schema)) + for row in cursor.fetchall(): + tables.append(row[0]) + return tables + + def set_id_as_primary_key_for_tables(self, schema='public'): + print("Setting id as primary key for all tables in schema ", schema) + tables_with_id_col = self.get_all_tables_with_column('id', schema) + with self.cursor() as cursor: + for table in tables_with_id_col: + self.set_id_as_primary_key_for_table(cursor, table, schema) + + def set_id_as_primary_key_for_table(self, cursor, table, schema='public'): + cursor.execute(SQL('''ALTER TABLE {}.{} ADD PRIMARY KEY (id);''').format( Identifier(schema), Identifier(table) )) + + def get_all_tables_in_a_schema(self, schema='public'): + tables = [] + with self.cursor() as cursor: + cursor.execute(''' + SELECT table_name + FROM information_schema.tables + WHERE table_schema=%s; + ''', (schema,)) + for row in cursor.fetchall(): + tables.append(row[0]) + return tables + + def run_sql(self, sql): + with self.cursor() as cursor: + cursor.execute(sql) + + def get_server_version(self): + with self.cursor() as cursor: + cursor.execute("select setting from pg_config where name = 'VERSION';") + return cursor.fetchall()[0][0] + + def run_sql_from_file(self, sql_file): + print("Running sql from file:", sql_file) + self.run_sql(open(sql_file, 'r').read()) + + def teardown(self): + self.cleanup_docker() + + def cleanup_docker(self): + if getattr(self, 'pg_container', None): + cntnr_info = "Postgres docker container " + self.pg_container.name + " " + self.pg_container.image.tags[0] + print(Fore.YELLOW + "Stopping " + cntnr_info + Style.RESET_ALL) + self.pg_container.stop() + print(Fore.YELLOW + "Removing " + cntnr_info + Style.RESET_ALL) + self.pg_container.remove() + self.pg_container = None diff --git a/server/bench-wrk/sportsdb_setup.py b/server/bench-wrk/sportsdb_setup.py new file mode 100644 index 0000000000000..a33a3b08e51f9 --- /dev/null +++ b/server/bench-wrk/sportsdb_setup.py @@ -0,0 +1,321 @@ +import argparse +from zipfile import ZipFile +import os +from contextlib import contextmanager +import threading +import requests +import requests_cache +from colorama import Fore, Style + +from port_allocator import PortAllocator +from run_postgres import Postgres +from run_hge import HGE + + +def _first_true(iterable, default=False, pred=None): + return next(filter(pred, iterable), default) + + +class HGETestSetup: + + sportsdb_url='http://www.sportsdb.org/modules/sd/assets/downloads/sportsdb_sample_postgresql.zip' + + default_work_dir = 'test_output' + + previous_work_dir_file = '.previous_work_dir' + + def __init__(self, pg_url, remote_pg_url, pg_docker_image, hge_url, remote_hge_url, hge_docker_image=None, hge_args=[], skip_remote_graphql_setup=False, skip_stack_build=False): + self.pg_url = pg_url + self.remote_pg_url = remote_pg_url + self.pg_docker_image = pg_docker_image + self.hge_url = hge_url + self.remote_hge_url = remote_hge_url + self.hge_docker_image = hge_docker_image + self.hge_args = hge_args + self.skip_remote_graphql_setup = skip_remote_graphql_setup + self.skip_stack_build = skip_stack_build + self.port_allocator = PortAllocator() + self.init_work_dir() + self.init_pgs() + self.init_hges() + self.set_previous_work_dir() + + def get_previous_work_dir(self): + """Get the work directory of the previous error""" + try: + with open(self.previous_work_dir_file) as f: + return f.read() + except FileNotFoundError: + return None + + def set_previous_work_dir(self): + """ + Set the current work directory as previous work directory + This directory will be used as the default work directory choice + """ + with open(self.previous_work_dir_file, 'w') as f: + return f.write(self.work_dir) + + def get_work_dir(self): + """ + Get the work directory from environmental variable, or from the user input + """ + default_work_dir = self.get_previous_work_dir() or self.default_work_dir + return os.environ.get('WORK_DIR') \ + or input(Fore.YELLOW + '(Set WORK_DIR environmental variable to avoid this)\n' + + 'Please specify the work directory. (default:{}):'.format(default_work_dir) + + Style.RESET_ALL).strip() \ + or default_work_dir + + def init_work_dir(self): + """ + Get the current work directory from user input or environmental variables + and create the work directory if it is not present + """ + self.work_dir = self.get_work_dir() + print ("WORK_DIR: ", self.work_dir) + os.makedirs(self.work_dir, exist_ok=True) + requests_cache.install_cache(self.work_dir + '/sportsdb_cache') + + def init_pgs(self): + def _init_pg(data_dir, url): + return Postgres( + port_allocator=self.port_allocator, docker_image=self.pg_docker_image, + db_data_dir= self.work_dir + '/' + data_dir, url=url + ) + + self.pg = _init_pg('sportsdb_data', self.pg_url) + + if not self.skip_remote_graphql_setup: + self.remote_pg = _init_pg('remote_sportsdb_data', self.remote_pg_url) + + def init_hges(self): + def _init_hge(pg, hge_url, log_file): + return HGE( + pg=pg, url=hge_url, port_allocator=self.port_allocator, + args=self.hge_args, log_file= self.work_dir + '/' + log_file, + docker_image=self.hge_docker_image + ) + + self.hge = _init_hge(self.pg, self.hge_url, 'hge.log') + + if not self.skip_remote_graphql_setup: + self.remote_hge = _init_hge(self.remote_pg, self.remote_hge_url, 'remote_hge.log') + + @contextmanager + def graphql_engines_setup(self): + try: + self._setup_graphql_engines() + yield + finally: + self.teardown() + + def _setup_graphql_engines(self): + + if not self.hge_docker_image and not self.skip_stack_build: + HGE.do_stack_build() + + def run_concurrently(threads): + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + def run_concurrently_fns(*fns): + threads = [threading.Thread(target=fn) for fn in fns] + return run_concurrently(threads) + + def set_hge(hge, schema, hge_type): + pg = hge.pg + # Schema and data + pg.run_sql_from_file(sql_file) + pg.set_id_as_primary_key_for_tables(schema='public') + pg.move_tables_to_schema('public', schema) + + # Metadata stuff + hge.track_all_tables_in_schema(schema) + hge.create_obj_fk_relationships(schema) + hge.create_arr_fk_relationships(schema) + + def start_remote_postgres_docker(): + if not self.skip_remote_graphql_setup: + self.remote_pg.start_postgres_docker() + + run_concurrently_fns( + self.pg.start_postgres_docker, + start_remote_postgres_docker) + + print("Postgres url:", self.pg.url) + + if not self.skip_remote_graphql_setup: + print("Remote Postgres url:", self.remote_pg.url) + self.remote_hge.run() + + self.hge.run() + + # Skip if the tables are already present + tables = self.pg.get_all_tables_in_a_schema('hge') + if len(tables) > 0: + return + + # Download sportsdb + zip_file = self.download_sportsdb_zip(self.work_dir+ '/sportsdb.zip') + sql_file = self.unzip_sql_file(zip_file) + + def set_remote_hge(): + if not self.skip_remote_graphql_setup: + set_hge(self.remote_hge, 'remote_hge', 'Remote') + + # Create the required tables and move them to required schemas + hge_thread = threading.Thread( + target=set_hge, args=(self.hge, 'hge', 'Main')) + remote_hge_thread = threading.Thread( + target=set_remote_hge) + run_concurrently([hge_thread, remote_hge_thread]) + + if not self.skip_remote_graphql_setup: + # Add remote_hge as remote schema + self.hge.add_remote_schema( + 'remote_hge', self.remote_hge.url + '/v1/graphql', + self.remote_hge.admin_auth_headers() + ) + + # TODO update the remote schema url if needed + tables = self.pg.get_all_tables_in_a_schema('hdb_catalog') + + # Create remote relationships only if it is supported + if 'hdb_remote_relationship' not in tables: + return + + # Create remote relationships + if not self.skip_remote_graphql_setup: + self.create_remote_relationships() + + def create_remote_relationships(self): + self.hge.create_remote_obj_rel_to_itself('hge', 'remote_hge', 'remote_hge') + self.hge.create_remote_arr_fk_ish_relationships('hge', 'remote_hge', 'remote_hge') + self.hge.create_remote_obj_fk_ish_relationships('hge', 'remote_hge', 'remote_hge') + + def teardown(self): + for res in [self.hge, self.pg]: + res.teardown() + if not self.skip_remote_graphql_setup: + for res in [self.remote_hge, self.remote_pg]: + res.teardown() + + def unzip_sql_file(self, zip_file): + with ZipFile(zip_file, 'r') as zip: + sql_file = zip.infolist()[0] + print('DB SQL file:', sql_file.filename) + zip.extract(sql_file, self.work_dir) + return self.work_dir + '/' + sql_file.filename + + def download_sportsdb_zip(self, filename, url=sportsdb_url): + with requests.get(url, stream=True) as r: + r.raise_for_status() + total = 0 + print() + with open(filename, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + total += len(chunk) + print("\rDownloaded: ", int(total/1024) , 'KB', end='') + f.write(chunk) + print('\nDB Zip File:', filename) + return filename + + +class HGETestSetupArgs: + + default_pg_docker_image = 'circleci/postgres:11.5-alpine-postgis' + + def __init__(self): + self.set_arg_parse_options() + self.parse_args() + + def set_arg_parse_options(self): + self.arg_parser = argparse.ArgumentParser() + self.set_pg_options() + self.set_hge_options() + + def parse_args(self): + self.parsed_args = self.arg_parser.parse_args() + self.set_pg_confs() + self.set_hge_confs() + + def set_pg_confs(self): + self.pg_url, self.remote_pg_url, self.pg_docker_image = self.get_params( + ['pg_url', 'remote_pg_url', 'pg_docker_image'] + ) + if not self.pg_url: + self.pg_docker_image = self.pg_docker_image or self.default_pg_docker_image + + def set_hge_confs(self): + self.hge_docker_image, self.hge_url, self.remote_hge_url = self.get_params( + ['hge_docker_image', 'hge_url', 'remote_hge_url'] + ) + self.skip_stack_build = self.parsed_args.skip_stack_build + self.skip_remote_graphql_setup = self.parsed_args.skip_remote_graphql_setup + self.hge_args = self.parsed_args.hge_args[1:] + + def set_pg_options(self): + pg_opts = self.arg_parser.add_argument_group('Postgres').add_mutually_exclusive_group() + pg_opts.add_argument('--pg-url', metavar='HASURA_BENCH_PG_URLS', help='Postgres database url to be used for tests', required=False) + pg_opts.add_argument('--remote-pg-url', metavar='HASURA_BENCH_REMOTE_PG_URLS', help='Url of Postgres database which is attached/has to be attached, with remote graphql-engine', required=False) + pg_opts.add_argument('--pg-docker-image', metavar='HASURA_BENCH_PG_DOCKER_IMAGE', help='Postgres docker image to be used for tests', required=False) + + def set_hge_options(self): + hge_opts = self.arg_parser.add_argument_group('Hasura GraphQL Engine') + hge_opts.add_argument('--hge-url', metavar='HASURA_BENCH_HGE_URL', help='Url of Hasura graphql-engine') + hge_opts.add_argument('--remote-hge-url', metavar='HASURA_BENCH_REMOTE_HGE_URL', help='Url of remote Hasura graphql-engine') + hge_opts.add_argument('--hge-docker-image', metavar='HASURA_BENCH_HGE_DOCKER_IMAGE', help='GraphQl engine docker image to be used for tests', required=False) + hge_opts.add_argument('--skip-stack-build', help='Skip stack build if this option is set', action='store_true', required=False) + hge_opts.add_argument('--skip-remote-graphql-setup', help='Skip setting up of remote graphql engine', action='store_true', required=False) + self.arg_parser.add_argument('hge_args', nargs=argparse.REMAINDER) + + def default_env(self, attr): + return 'HASURA_BENCH_' + attr.upper() + + def get_param(self, attr, env_key=None): + if not env_key: + env_key = self.default_env(attr) + return _first_true([getattr(self.parsed_args, attr), os.getenv(env_key)]) + + def get_params(self, params_loc): + params_out = [] + for param_loc in params_loc: + # You can specify just the attribute name for the parameter + if isinstance(param_loc, str): + attr = param_loc + env = self.default_env(attr) + # Or you can specify attribute and environmental variables as a tuple + else: + (attr, env) = param_loc + param = self.get_param(attr, env) + params_out.append(param) + return params_out + + +class HGETestSetupWithArgs(HGETestSetup, HGETestSetupArgs): + + def __init__(self): + HGETestSetupArgs.__init__(self) + HGETestSetup.__init__( + self, + pg_url = self.pg_url, + remote_pg_url = self.remote_pg_url, + pg_docker_image = self.pg_docker_image, + hge_url = self.hge_url, + remote_hge_url = self.remote_hge_url, + hge_docker_image = self.hge_docker_image, + skip_stack_build = self.skip_stack_build, + skip_remote_graphql_setup = self.skip_remote_graphql_setup, + hge_args = self.hge_args + ) + +if __name__ == "__main__": + test_setup = HGETestSetupWithArgs() + with test_setup.graphql_engines_setup(): + print("Hasura GraphQL engine is running on URL:",test_setup.hge.url+ '/v1/graphql') + input(Fore.BLUE+'Press Enter to stop GraphQL engine' + Style.RESET_ALL) diff --git a/server/bench-wrk/wrk-websocket-server/.dockerignore b/server/bench-wrk/wrk-websocket-server/.dockerignore new file mode 100644 index 0000000000000..21d9e6c0d3d93 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/.dockerignore @@ -0,0 +1,2 @@ +dist +dist-newstyle diff --git a/server/bench-wrk/wrk-websocket-server/.gitignore b/server/bench-wrk/wrk-websocket-server/.gitignore new file mode 100644 index 0000000000000..1fa2bd01155ec --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/.gitignore @@ -0,0 +1,35 @@ +__pycache__/ +dist +dist-newstyle +cabal-dev +Pipfile +Pipfile.lock +*.o +*.hi +*.chi +*.chs.h +*.dyn_o +*.dyn_hi +.virtualenv +.python-venv +.hpc +*.tix +.hsenv +.cabal-sandbox/ +cabal.sandbox.config +cabal.config +*.prof* +*.aux +*.hp +TAGS +.stack-work* +*.log + +# ws related +ws/ +build/rootfs + +random*.sql + +# example related +sample/data diff --git a/server/bench-wrk/wrk-websocket-server/CHANGELOG.md b/server/bench-wrk/wrk-websocket-server/CHANGELOG.md new file mode 100644 index 0000000000000..1654733b0fc9b --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/CHANGELOG.md @@ -0,0 +1,5 @@ +# Revision history for wrk-websocket-server + +## 0.1.0.0 -- YYYY-mm-dd + +* First version. Released on an unsuspecting world. diff --git a/server/bench-wrk/wrk-websocket-server/Dockerfile b/server/bench-wrk/wrk-websocket-server/Dockerfile new file mode 100644 index 0000000000000..6476f0ac984c4 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/Dockerfile @@ -0,0 +1,38 @@ +# Building wrk-websocket-server +FROM alpine:3.11.3 as server-builder +RUN apk add cabal ghc +RUN apk add ca-certificates +RUN apk add wget +RUN apk add musl-dev +RUN apk add zlib zlib-dev +RUN apk add git +RUN apk add postgresql postgresql-dev +RUN apk add binutils-gold +COPY wrk-websocket-server.cabal cabal.project.local cabal.project cabal.project.freeze /root/wrk-websocket-dir/ +WORKDIR /root/wrk-websocket-dir +RUN cabal new-update +RUN cabal new-build --only-dependencies +COPY src/ src/ +COPY app/ app/ +RUN cabal new-build +RUN cp $(cabal new-exec which wrk-websocket-server) /tmp + +# Building wrk2 +FROM alpine:3.11.3 as wrk2-builder +ARG tag=print-latencies +RUN apk add alpine-sdk openssl-dev luajit-dev +RUN apk add zlib zlib-dev +RUN wget -O /tmp/wrk2.zip https://github.com/hasura/wrk2/archive/$tag.zip +RUN unzip /tmp/wrk2.zip -d /tmp/ +RUN make -C /tmp/wrk2-${tag} + +# Install wrk as an alpine package +FROM alpine:3.11.3 +ARG tag=print-latencies +ENV HASURA_BENCH_WRK_LUA_SCRIPT=/root/bench_scripts/bench-wrk.lua +ENV HASURA_BENCH_WRK2_LUA_SCRIPT=/root/bench_scripts/bench-wrk2.lua +MAINTAINER nizar@hasura.io +RUN apk add wrk lua5.1-cjson gmp libffi-dev +COPY --from=server-builder /tmp/wrk-websocket-server /usr/bin/wrk-websocket-server +COPY --from=wrk2-builder /tmp/wrk2-$tag/wrk /usr/bin/wrk2 +COPY bench_scripts /root/bench_scripts diff --git a/server/bench-wrk/wrk-websocket-server/LICENSE b/server/bench-wrk/wrk-websocket-server/LICENSE new file mode 120000 index 0000000000000..5853aaea53bc0 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/server/bench-wrk/wrk-websocket-server/Setup.hs b/server/bench-wrk/wrk-websocket-server/Setup.hs new file mode 100644 index 0000000000000..9a994af677b0d --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/server/bench-wrk/wrk-websocket-server/app/Main.hs b/server/bench-wrk/wrk-websocket-server/app/Main.hs new file mode 100644 index 0000000000000..2fa56126218db --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/app/Main.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE FlexibleContexts #-} +module Main where + +import Control.Monad (fail, void) + +import qualified Control.Concurrent.STM.TMVar as TMVar +import qualified Network.WebSockets as WS +import qualified System.Directory as Dir + +import Wrk.Server + +main :: IO () +main = do + void checkIfWrksPresent + lock <- TMVar.newEmptyTMVarIO + print "Running wrk websocket server or port 9160" + WS.runServer "0.0.0.0" 9160 $ benchWsApp lock + +checkIfWrksPresent :: IO [FilePath] +checkIfWrksPresent = mapM findExec ["wrk", "wrk2"] + where + findExec e = Dir.findExecutable e >>= maybe (noExecErr e) return + noExecErr e = fail $ "Could not find executable " <> e + diff --git a/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk.lua b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk.lua new file mode 100644 index 0000000000000..6de58409f0968 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk.lua @@ -0,0 +1,54 @@ +-- A library that can be used in custom lua scripts for benchmarking + +local _M = {} + +local json = require "cjson" +local os = require "os" + +function _M.init(args) + local query = args[1] + return json.encode({query=query}), args[2], args[3] +end + +function _M.request(wrk, req_body, auth_header_key, auth_header_value) + wrk.method = "POST" + wrk.headers["Content-Type"] = "application/json" + if auth_header_key ~= nil and auth_header_value ~= nil then + wrk.headers[auth_header_key] = auth_header_value + end + wrk.body = req_body + return wrk.format() +end + +local function get_stat_summary(stat) + local dist = {} + for _, p in pairs({ 95, 98, 99 }) do + dist[tostring(p)] = stat:percentile(p) + end + return { + min=stat.min, + max=stat.max, + stdev=stat.stdev, + mean=stat.mean, + dist=dist + } +end + +local function getTime() + return os.date("%c %Z") +end + +function _M.done(summary, latency, requests) + io.stderr:write( + json.encode({ + time=getTime(), + -- Latency info from wrk framework is not that useful + -- latency=get_stat_summary(latency), + summary=summary, + requests=get_stat_summary(requests) + }) + ) + io.stderr:write('\n') +end + +return _M diff --git a/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk2.lua b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk2.lua new file mode 100644 index 0000000000000..fcda8961383b8 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-lib-wrk2.lua @@ -0,0 +1,64 @@ +-- A library that can be used in custom lua scripts for benchmarking + +local _M = {} + +local json = require "cjson" +local os = require "os" +-- Use `null` to represent any NaN or Inf: +json.encode_invalid_numbers("null") + +function _M.init(args) + local query = args[1] + return json.encode({query=query}), args[2], args[3], args[4] +end + +function _M.request(wrk, req_body, auth_header_key, auth_header_value) + wrk.method = "POST" + wrk.headers["Content-Type"] = "application/json" + if auth_header_key ~= nil and auth_header_value ~= nil then + wrk.headers[auth_header_key] = auth_header_value + end + wrk.body = req_body + return wrk.format() +end + +local function get_stat_summary(stat) + local dist = {} + for _, p in pairs({ 95, 98, 99 }) do + dist[tostring(p)] = stat:percentile(p) + end + return { + min=stat.min, + max=stat.max, + stdev=stat.stdev, + mean=stat.mean, + dist=dist + } +end + +local function getTime() + return os.date("%c %Z") +end + +function _M.done(summary, latency, requests, results_dir) + local summary_file = io.open(results_dir .. '/summary.json','w') + local summary_output = json.encode({ + time=getTime(), + latency=get_stat_summary(latency), + summary=summary, + requests=get_stat_summary(requests) + }) + io.stderr:write(summary_output) + summary_file:write(summary_output .. '\n') + summary_file:close() + latencies_file = io.open(results_dir .. '/latencies','w') + for i = 1, summary.requests do + if (latency[i] ~= 0) + then + latencies_file:write(latency[i] .. '\n') + end + end + latencies_file:close() +end + +return _M diff --git a/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk.lua b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk.lua new file mode 100644 index 0000000000000..5f8c80b1ef356 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk.lua @@ -0,0 +1,20 @@ +-- This is the default lua script that is used when +-- a script is not specified by the user. + +-- This expects that the url specified confirms to +-- the graphql url POST spec. + +local gqbench = require "bench-lib-wrk" + +local req_body = "" +function init(args) + req_body, auth_header_key, auth_header_value = gqbench.init(args) +end + +function request() + return gqbench.request(wrk, req_body, auth_header_key, auth_header_value) +end + +function done(s, l, r) + gqbench.done(s, l, r) +end diff --git a/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk2.lua b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk2.lua new file mode 100644 index 0000000000000..04a6101419785 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/bench_scripts/bench-wrk2.lua @@ -0,0 +1,23 @@ +-- This is the default lua script that is used when +-- a script is not specified by the user. + +-- This expects that the url specified confirms to +-- the graphql url POST spec. + +local gqbench = require "bench-lib-wrk2" + +local req_body = "" + +local results_dir = nil + +function init(args) + req_body, results_dir, auth_header_key, auth_header_value = gqbench.init(args) +end + +function request() + return gqbench.request(wrk, req_body, auth_header_key, auth_header_value) +end + +function done(s, l, r) + return gqbench.done(s, l, r, results_dir) +end diff --git a/server/bench-wrk/wrk-websocket-server/cabal.project b/server/bench-wrk/wrk-websocket-server/cabal.project new file mode 100644 index 0000000000000..3a0089d8a478d --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/cabal.project @@ -0,0 +1,5 @@ +packages: . + +package wrk-websocket-server + ghc-options: -j + diff --git a/server/bench-wrk/wrk-websocket-server/cabal.project.dev b/server/bench-wrk/wrk-websocket-server/cabal.project.dev new file mode 100644 index 0000000000000..9b7c8351f5485 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/cabal.project.dev @@ -0,0 +1 @@ +shared: True diff --git a/server/bench-wrk/wrk-websocket-server/cabal.project.freeze b/server/bench-wrk/wrk-websocket-server/cabal.project.freeze new file mode 100644 index 0000000000000..b7f939e6e6144 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/cabal.project.freeze @@ -0,0 +1,156 @@ +constraints: any.Cabal ==2.4.0.1, + any.RSA ==2.4.1, + any.SHA ==1.6.4.4, + SHA -exe, + any.StateVar ==1.2, + any.adjunctions ==4.4, + any.aeson ==1.4.6.0, + aeson -bytestring-builder -cffi -developer -fast, + any.aeson-casing ==0.2.0.0, + any.ansi-terminal ==0.10.3, + ansi-terminal -example, + any.ansi-wl-pprint ==0.6.9, + ansi-wl-pprint -example, + any.array ==0.5.3.0, + any.asn1-encoding ==0.9.6, + any.asn1-parse ==0.9.5, + any.asn1-types ==0.3.4, + any.async ==2.2.2, + async -bench, + any.attoparsec ==0.13.2.3, + attoparsec -developer, + any.authenticate-oauth ==1.6.0.1, + any.base ==4.12.0.0, + any.base-compat ==0.11.0, + any.base-orphans ==0.8.1, + any.base16-bytestring ==0.1.1.6, + any.base64-bytestring ==1.0.0.3, + any.basement ==0.0.11, + any.bifunctors ==5.5.7, + bifunctors +semigroups +tagged, + any.binary ==0.8.6.0, + any.blaze-builder ==0.4.1.0, + any.bytestring ==0.10.8.2, + any.bytestring-builder ==0.10.8.2.0, + bytestring-builder +bytestring_has_builder, + any.cabal-doctest ==1.0.8, + any.call-stack ==0.2.0, + any.case-insensitive ==1.2.1.0, + any.cereal ==0.5.8.1, + cereal -bytestring-builder, + any.clock ==0.8, + clock -llvm, + any.colour ==2.3.5, + any.comonad ==5.0.6, + comonad +containers +distributive +test-doctests, + any.connection ==0.3.1, + any.containers ==0.6.0.1, + any.contravariant ==1.5.2, + contravariant +semigroups +statevar +tagged, + any.cookie ==0.4.5, + any.crypto-api ==0.13.3, + crypto-api -all_cpolys, + any.crypto-pubkey-types ==0.4.3, + any.cryptonite ==0.26, + cryptonite -check_alignment +integer-gmp -old_toolchain_inliner +support_aesni +support_deepseq -support_pclmuldq +support_rdrand -support_sse, + any.data-default ==0.7.1.1, + any.data-default-class ==0.1.2.0, + any.data-default-instances-containers ==0.0.1, + any.data-default-instances-dlist ==0.0.1, + any.data-default-instances-old-locale ==0.0.1, + any.deepseq ==1.4.4.0, + any.directory ==1.3.3.0, + any.distributive ==0.6.2, + distributive +semigroups +tagged, + any.dlist ==0.8.0.7, + any.entropy ==0.4.1.5, + entropy -halvm, + any.exceptions ==0.10.4, + exceptions +transformers-0-4, + any.filepath ==1.4.2.1, + any.free ==5.1.3, + any.ghc-boot-th ==8.6.5, + any.ghc-prim ==0.5.3, + any.hashable ==1.3.0.0, + hashable -examples +integer-gmp +sse2 -sse41, + any.hourglass ==0.2.12, + any.hsc2hs ==0.68.6, + hsc2hs -in-ghc-tree, + any.http-client ==0.6.4.1, + http-client +network-uri, + any.http-client-tls ==0.3.5.3, + any.http-types ==0.12.3, + any.integer-gmp ==1.0.2.0, + any.integer-logarithms ==1.0.3, + integer-logarithms -check-bounds +integer-gmp, + any.invariant ==0.5.3, + any.kan-extensions ==5.2, + any.lens ==4.19.2, + lens -benchmark-uniplate -dump-splices +inlining -j -old-inline-pragmas -safe +test-doctests +test-hunit +test-properties +test-templates +trustworthy, + any.lens-aeson ==1.1, + lens-aeson +test-doctests, + any.memory ==0.15.0, + memory +support_basement +support_bytestring +support_deepseq +support_foundation, + any.mime-types ==0.1.0.9, + any.mtl ==2.2.2, + any.network ==3.1.1.1, + any.network-uri ==2.6.3.0, + any.old-locale ==1.0.0.7, + any.optparse-applicative ==0.15.1.0, + any.parallel ==3.2.2.0, + any.parsec ==3.1.13.0, + any.pem ==0.2.4, + any.pretty ==1.1.3.6, + any.primitive ==0.7.0.0, + any.process ==1.6.5.0, + any.profunctors ==5.5.2, + any.psqueues ==0.2.7.2, + any.random ==1.1, + any.reflection ==2.1.5, + reflection -slow +template-haskell, + any.rts ==1.0, + any.scientific ==0.3.6.2, + scientific -bytestring-builder -integer-simple, + any.semigroupoids ==5.3.4, + semigroupoids +comonad +containers +contravariant +distributive +doctests +tagged +unordered-containers, + any.semigroups ==0.19.1, + semigroups +binary +bytestring -bytestring-builder +containers +deepseq +hashable +tagged +template-haskell +text +transformers +unordered-containers, + any.socks ==0.6.1, + any.stm ==2.5.0.0, + any.streaming-commons ==0.2.1.2, + streaming-commons -use-bytestring-builder, + any.tagged ==0.8.6, + tagged +deepseq +transformers, + any.template-haskell ==2.14.0.0, + any.text ==1.2.3.1, + any.th-abstraction ==0.3.1.0, + any.time ==1.8.0.2, + any.time-compat ==1.9.2.2, + time-compat -old-locale, + any.time-locale-compat ==0.1.1.5, + time-locale-compat -old-locale, + any.tls ==1.5.4, + tls +compat -hans +network, + any.transformers ==0.5.6.2, + any.transformers-base ==0.4.5.2, + transformers-base +orphaninstances, + any.transformers-compat ==0.6.5, + transformers-compat -five +five-three -four +generic-deriving +mtl -three -two, + any.unix ==2.7.2.2, + any.unordered-containers ==0.2.10.0, + unordered-containers -debug, + any.uuid-types ==1.0.3, + any.vector ==0.12.0.3, + vector +boundschecks -internalchecks -unsafechecks -wall, + any.void ==0.7.3, + void -safe, + any.websockets ==0.12.7.0, + websockets -example, + any.wreq ==0.5.3.2, + wreq -aws -developer +doctest -httpbin, + any.x509 ==1.7.5, + any.x509-store ==1.6.7, + any.x509-system ==1.6.6, + any.x509-validation ==1.6.11, + any.zlib ==0.6.2.1, + zlib -non-blocking-ffi -pkg-config diff --git a/server/bench-wrk/wrk-websocket-server/cabal.project.local b/server/bench-wrk/wrk-websocket-server/cabal.project.local new file mode 100644 index 0000000000000..042a733a40b72 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/cabal.project.local @@ -0,0 +1,8 @@ + +shared: True + + +package wrk-websocket-server + optimization: 0 + documentation: false + ghc-options: -j diff --git a/server/bench-wrk/wrk-websocket-server/cabal.project.local~ b/server/bench-wrk/wrk-websocket-server/cabal.project.local~ new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/server/bench-wrk/wrk-websocket-server/example_benchmark_request.yaml b/server/bench-wrk/wrk-websocket-server/example_benchmark_request.yaml new file mode 100644 index 0000000000000..3dd0bb5c03229 --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/example_benchmark_request.yaml @@ -0,0 +1,20 @@ +framework: wrk +arguments: + rate: 10 + duration: 15 + threads: 2 + graphql_url: http://127.0.0.1:8181/v1/graphql + query: | + query rows_50_nesting_level_1 { + hge_events(limit: 50){ + event_status + last_update + affiliations_events_by_event_id(limit: 2) { + affiliation{ + publisher{ + publisher_name + } + } + } + } + } diff --git a/server/bench-wrk/wrk-websocket-server/src/Wrk/Server.hs b/server/bench-wrk/wrk-websocket-server/src/Wrk/Server.hs new file mode 100644 index 0000000000000..53e1d5e0cf00e --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/src/Wrk/Server.hs @@ -0,0 +1,257 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +module Wrk.Server +where + +import Wrk.Server.Types + +import Control.Applicative (liftA2, many, (<|>)) +import Control.Concurrent (forkIO) +import Control.Lens ((&), (^.), (^?), (.~)) +import Control.Monad (forever, unless, void, when) +import Control.Monad.Except (MonadError, runExceptT, throwError) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Data.Text.Encoding (encodeUtf8) +import System.Environment (lookupEnv, setEnv) +import System.FilePath.Posix (takeDirectory) + +import qualified Control.Concurrent.STM as STM +import qualified Control.Concurrent.STM.TMVar as TMVar +import qualified Control.Exception as E +import qualified Data.Aeson as J +import qualified Data.Aeson.Lens as J +import qualified Data.Attoparsec.Text as AT +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Lazy.Char8 as BLC +import qualified Data.CaseInsensitive as CI +import qualified Data.Default as Def +import qualified Data.Text as T +import qualified Data.Text.IO as T +import qualified Data.Text.Read as T +import qualified Network.WebSockets as WS +import qualified Network.Wreq as NW +import qualified System.Directory as Dir +import qualified System.Exit as SE +import qualified System.Process as Proc + +benchWsApp :: TMVar.TMVar () -> WS.ServerApp +benchWsApp lock pending = do + conn <- WS.acceptRequest pending + WS.withPingThread conn 30 (return ()) $ + forever $ do + msg <- WS.receiveData conn + fork_ $ processReq conn msg lock + where + fork_ = void . forkIO + +processReq :: WS.Connection -> BLC.ByteString -> TMVar.TMVar () -> IO () +processReq conn msg lock = do + resE <- catchIO $ runExceptT $ processReq' conn msg lock + either sendErrMsg sendResult resE + where + sendJson = WS.sendTextData conn . J.encode + sendErrMsg = sendJson . BMError + sendResult = sendJson . uncurry BMResult + catchIO :: IO (Either ErrorMessage a) -> IO (Either ErrorMessage a) + catchIO f = do + resIOE <- E.try f + either (return . Left . ioExToErr) return resIOE + ioExToErr :: E.SomeException -> ErrorMessage + ioExToErr e = ErrorMessage $ J.object ["IOError" J..= show e ] + +processReq' :: (MonadIO m, MonadError ErrorMessage m) + => WS.Connection -> BL.ByteString -> TMVar.TMVar () + -> m (BenchConf, BenchResult) +processReq' conn msg lock = do + conf <- parseIncomingMsg + res <- withLock $ runBench conn conf + return (conf, res) + where + parseIncomingMsg = eitherToMonadErr (ErrorMessage . J.toJSON) $ J.eitherDecode msg + withLock f = do + locked <- acquireLock + unless locked $ throwError $ ErrorMessage $ J.toJSON ("A benchmark is already running" :: T.Text) + resE <- liftIO $ flip E.finally releaseLock $ runExceptT f + eitherToMonadErr id resE + acquireLock = atomic_ $ TMVar.tryPutTMVar lock () + releaseLock = STM.atomically $ TMVar.takeTMVar lock + atomic_ = liftIO . STM.atomically + +runBench :: (MonadIO m, MonadError ErrorMessage m) + => WS.Connection -> BenchConf -> m BenchResult +runBench conn conf= do + sendStartMsg + result <- case conf of + BCWrk args -> BRWrk <$> runWrkBench args + BCWrk2 args -> BRWrk2 <$> runWrk2Bench args + void sendFinishMsg + return result + where + sendStartMsg = sendJson $ BMStart conf + sendFinishMsg = sendJson $ BMFinish conf + sendJson = liftIO . WS.sendTextData conn . J.encode + +runWrkBench :: (MonadIO m, MonadError ErrorMessage m) + => WrkBenchArgs -> m WrkResultOut +runWrkBench args@WrkBenchArgs{..} = do + _ <- liftIO $ runQueryOnce -- run the GraphQL query once to ensure there are no errors + script <- liftIO wrkScript + liftIO $ setLuaEnv $ show script + (exitCode, _, stderr) <- liftIO $ Proc.readProcessWithExitCode "wrk" (wrkArgs script) "" + liftIO $ putStr stderr + case exitCode of + SE.ExitSuccess -> wrkResult stderr + SE.ExitFailure _ -> throwError $ ErrorMessage $ J.toJSON $ "Failed with error " <> stderr + where + runQueryOnce = runQuery wbaGraphqlUrl wbaQuery wbaAuth + wrkResult stderr = jsonDecode $ BLC.pack stderr + wrkArgs script = toArgsList (`notElem` ["query","graphql-url"]) args <> luaScriptArgs script + luaScriptArgs script = ["-s", show script, wbaGraphqlUrl, T.unpack $ getQuery wbaQuery] <> authHeader + wrkScript = maybe Def.def WrkScript <$> lookupEnv "HASURA_BENCH_WRK_LUA_SCRIPT" + authHeader = maybe [] (\AuthHeader{..} -> map T.unpack [ahKey, ahValue]) wbaAuth + +setLuaEnv :: FilePath -> IO () +setLuaEnv wrkScript = do + setEnv "LUA_PATH" $ "/usr/share/lua/5.1/?.lua;" <> wrkScriptDir <> "/?.lua" + setEnv "LUA_CPATH" "/usr/lib/lua/5.1/?.so;/usr/lib/x86_64-linux-gnu/lua/5.1/?.so;;" + where wrkScriptDir = takeDirectory wrkScript + +-- TODO duration cannot be less than 10 seconds +runWrk2Bench :: (MonadIO m, MonadError ErrorMessage m) + => Wrk2BenchArgs -> m Wrk2ResultOut +runWrk2Bench args@Wrk2BenchArgs{..} = do + _ <- liftIO runQueryOnce -- run the GraphQL query once to ensure there are no errors + liftIO $ Dir.createDirectoryIfMissing True resultsDir + script <- liftIO wrk2Script + liftIO $ setLuaEnv $ show script + (exitCode, stdout, stderr) <- liftIO $ Proc.readProcessWithExitCode "wrk2" (wrk2Args script) "" + liftIO $ putStr stdout + case exitCode of + SE.ExitSuccess -> wrk2Result stdout + SE.ExitFailure e -> throwError $ ErrorMessage $ J.toJSON $ + "wrk2 exited with ExitCode" <> show e <> "\nError: " <> stderr + where + runQueryOnce = runQuery w2baGraphqlUrl w2baQuery w2baAuth + wrk2Result stdout = do + resultStr <- liftIO $ BLC.readFile summaryFile + -- Read summary from summary file + resultIn <- jsonDecode resultStr + -- Parse histogram values from stdout + histogram <- eitherToMonadErr (ErrorMessage . J.toJSON) $ AT.parseOnly histogramParser $ T.pack stdout + -- Read latency values from latencies-file + strLatencies <- liftIO $ T.lines <$> T.readFile latenciesFile + -- Convert to number from string + numLatencies <- eitherToMonadErr asErrMessage $ + mapM ( fmap (/1000.0) . readDoubleT) strLatencies + return $ makeResultOut resultIn histogram numLatencies + + makeResultOut (Wrk2ResultIn summ reqSumm latSumm) hist latVals = + Wrk2ResultOut summ reqSumm $ LatencyResultOut latVals hist latSumm + + wrk2Args script = toArgsList (`notElem` ["query","graphql_url"]) args <> ["--latency"] <> luaScriptArgs script + luaScriptArgs script = ["-s", show script] <> [ w2baGraphqlUrl, query, resultsDir] <> authHeaderArgs + query = T.unpack $ getQuery w2baQuery + resultsDir = "/tmp/results" + latenciesFile = resultsDir <> "/latencies" + summaryFile = resultsDir <> "/summary.json" + authHeaderArgs = maybe [] (\AuthHeader{..} -> map T.unpack [ahKey, ahValue]) w2baAuth + wrk2Script = maybe Def.def Wrk2Script <$> lookupEnv "HASURA_BENCH_WRK2_LUA_SCRIPT" + +runQuery :: GraphQLURL -> Query -> Maybe AuthHeader -> IO BLC.ByteString +runQuery url query authHeaders = do + resp <- NW.postWith queryHeaders url $ J.object [ "query" J..= getQuery query ] + let err = foldl1 (<|>) $ map (\x -> resp ^? NW.responseBody . J.key x) ["errors", "error"] + maybe (return $ resp ^. NW.responseBody) (fail . BLC.unpack . J.encode ) err + where + queryHeaders = maybe NW.defaults (\AuthHeader{..} -> NW.defaults & NW.header (CI.mk $ encodeUtf8 ahKey) .~ [encodeUtf8 ahValue] ) authHeaders + +simpleIntrospectQuery :: Query +simpleIntrospectQuery = Query "\ + \ query foo { \ + \ __schema { \ + \ queryType { \ + \ kind \ + \ } \ + \ } \ + \ }" + + +jsonDecode :: (J.FromJSON a, MonadError ErrorMessage m) => BLC.ByteString -> m a +jsonDecode = eitherToMonadErr (ErrorMessage . J.toJSON) . J.eitherDecode + +asErrMessage :: String -> ErrorMessage +asErrMessage = ErrorMessage . J.toJSON + +eitherToMonadErr :: (MonadError e' m) => (e -> e') -> Either e a -> m a +eitherToMonadErr modifyErr = either (throwError . modifyErr) return + +readDoubleT :: T.Text -> Either String Double +readDoubleT s = do + (a, rest) <- T.double s + unless (T.null rest) $ Left "Not a rational number" + return a + +histogramParser :: AT.Parser HdrHistogram +histogramParser = do + -- Histogram starts in the line after this match + _ <- takeIncludingFirstMatch "Detailed Percentile spectrum" AT. "Histogram start heading" + -- Take the rest of the line + void takeLineText AT. "Till the end of histogram start heading" + -- Take upto the next line with histogram headers. + _ <- takeIncludingFirstMatch "Value" AT. "Till Histogram headers" + void takeLineText AT. "Till Histogram headers" + -- Take the next empty line + void AT.skipSpace AT. "Skip space till histogram values" + -- The values start here + histRows <- parseHistogramRows AT. "Histogram rows" + histSummary <- parseHistogramSummary + return $ HdrHistogram histSummary histRows + where + parseHistogramRows = many parseHistogramRow + + parseHistogramSummary = do + (mean, stdDev) <- collectSummaryPairs + ("Mean", AT.double, "StdDeviation", AT.double) + (maxVal, tc) <- collectSummaryPairs + ("Max", AT.double, "Total count" , AT.decimal) + (bkts, subBkts) <- collectSummaryPairs + ("Buckets", AT.decimal, "SubBuckets", AT.decimal) + return $ HistogramSummary mean stdDev maxVal tc bkts subBkts + + collectSummaryPairs (name1,parser1,name2,parser2) = do + _ <- AT.string ("#[" <> name1) AT. ("1. Parse " <> T.unpack name1) + void spacedEqualTo AT. ("2. Parse " <> T.unpack name1) + d1 <- parser1 + _ <- AT.string (", " <> name2) AT. ("1. Parse " <> T.unpack name2) + void spacedEqualTo AT. ("2. Parse " <> T.unpack name1) + d2 <- parser2 + void takeLineText + return (d1, d2) + + spacedEqualTo = AT.skipSpace >> AT.char '=' >> AT.skipSpace + + parseHistogramRow = do + char <- AT.peekChar' + when (char == '#') $ fail "Reached histogram summary" + val <- withSkipSpace AT.double + percentile <- withSkipSpace AT.double + count <- withSkipSpace AT.decimal + void takeLineText + return $ HistogramRow val percentile count + + withSkipSpace p = AT.skipSpace >> p + +takeLineText :: AT.Parser T.Text +takeLineText = AT.takeTill AT.isEndOfLine <* AT.endOfLine + +uptoFirstMatch :: T.Text -> AT.Parser T.Text +uptoFirstMatch str = fmap T.concat $ AT.manyTill nextPossibleMatch $ AT.string str + where + nextPossibleMatch = liftA2 T.cons AT.anyChar $ AT.takeWhile (/= T.head str) + +takeIncludingFirstMatch :: T.Text -> AT.Parser T.Text +takeIncludingFirstMatch str = withSubStr <|> errMsg + where + withSubStr = fmap (`T.append` str) $ uptoFirstMatch str + errMsg = fail $ "Could not find sub-string: " <> T.unpack str diff --git a/server/bench-wrk/wrk-websocket-server/src/Wrk/Server/Types.hs b/server/bench-wrk/wrk-websocket-server/src/Wrk/Server/Types.hs new file mode 100644 index 0000000000000..df53e56004ccb --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/src/Wrk/Server/Types.hs @@ -0,0 +1,249 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TemplateHaskell #-} +module Wrk.Server.Types +where + +import Control.Lens ((^@..)) +import Data.Default (Default (..)) + +import qualified Data.Aeson as J +import qualified Data.Aeson.Casing as J +import qualified Data.Aeson.Lens as J +import qualified Data.Aeson.TH as J +import qualified Data.ByteString.Lazy.Char8 as BLC +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T + +newtype ErrorMessage = ErrorMessage { getError :: J.Value } + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype Duration = Duration { getDuration :: Int} + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype Threads = Threads { getThreads :: Int} + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype Connections = Connections { getConnections :: Int} + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype Rate = Rate { getRate :: Int} + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype Query = Query { getQuery :: T.Text } + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +type GraphQLURL = String + +data ServerConf + = ServerConf + { scGraphQLUrl :: !GraphQLURL + } + +data AuthHeader + = AuthHeader + { ahKey :: !T.Text + , ahValue :: !T.Text + } + deriving (Show, Eq) + +instance J.ToJSON AuthHeader where + toJSON AuthHeader{..} = J.object [(ahKey, J.String ahValue)] + +instance J.FromJSON AuthHeader where + parseJSON = J.withObject "AuthHeader key value" $ \o -> case HM.toList o of + [(k,J.String v)] -> return $ AuthHeader k v + _ -> fail "Expecting only a single (key: value) pair as authentication header" + +data WrkBenchArgs = WrkBenchArgs + { wbaDuration :: !(Maybe Duration) + , wbaThreads :: !(Maybe Threads) + , wbaConnections :: !(Maybe Connections) + , wbaQuery :: !Query + , wbaGraphqlUrl :: !GraphQLURL + , wbaAuth :: !(Maybe AuthHeader) + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 3 J.trainCase){J.omitNothingFields = True} ''WrkBenchArgs) + +data Wrk2BenchArgs = Wrk2BenchArgs + { w2baDuration :: !(Maybe Duration) + , w2baThreads :: !(Maybe Threads) + , w2baConnections :: !(Maybe Connections) + , w2baQuery :: !Query + , w2baRate :: !Rate + , w2baGraphqlUrl :: !GraphQLURL + , w2baAuth :: !(Maybe AuthHeader) + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 4 J.trainCase){J.omitNothingFields = True} ''Wrk2BenchArgs) + +-- If the JSON output is a dictionary of pairs (k,v), and v is neither an object nor a list, convert it into set of arguments of the form --k v +toArgsList :: (J.ToJSON a) => (T.Text -> Bool) -> a -> [String] +toArgsList keysFilter a = concatMap toArg $ filter (keysFilter . fst) $ J.toJSON a ^@.. J.members + where + toArg (_, J.Object _) = [] + toArg (_, J.Array _) = [] + toArg (k, v) = ["--" <> T.unpack k, BLC.unpack $ J.encode v] + +data BenchConf + = BCWrk WrkBenchArgs + | BCWrk2 Wrk2BenchArgs + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 2 J.trainCase) + { J.sumEncoding = J.TaggedObject "framework" "arguments" + , J.constructorTagModifier= J.trainCase . drop 2 + } + ''BenchConf) + +data HistogramSummary = HistogramSummary + { hsMean :: !Double + , hsStdDeviation :: !Double + , hsMax :: !Double + , hsTotalCount :: !Integer + , hsBuckets :: !Integer + , hsSubBuckets :: !Integer + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 2 J.camelCase) ''HistogramSummary) + +data HistogramRow = HistogramRow + { hrValue :: !Double + , hrPercentile :: !Double + , hrCount :: !Integer + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 2 J.camelCase) ''HistogramRow) + +data HdrHistogram = HdrHistogram + { hdrSummary :: !HistogramSummary + , hdrData :: ![HistogramRow] + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 2 J.camelCase) ''HdrHistogram) + +type ValuesDist = HM.HashMap Double Double + +data StatsSummary = StatsSummary + { rqsMin :: Double + , rqsMax :: Double + , rqsMean :: Double + , rqsStdev :: Double + , rqsDist :: ValuesDist + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 3 J.camelCase) ''StatsSummary) + +newtype RequestsSummary = RequestsSummary + { getReqSummary :: StatsSummary } + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +newtype LatencySummary = LatencySummary + { getLatencySummary :: StatsSummary } + deriving (Show, Eq, J.ToJSON, J.FromJSON) + +data WrkResult = WrkResult + { wrkSummary :: J.Value + , wrkRequests :: RequestsSummary + } + deriving (Show, Eq) + +type WrkResultOut = WrkResult + +$(J.deriveJSON (J.aesonDrop 3 J.camelCase) ''WrkResult) + +data Wrk2ResultIn = Wrk2ResultIn + { wrk2Summary :: !J.Value + , wrk2Requests :: !RequestsSummary + , wrk2Latency :: !LatencySummary + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 4 J.camelCase) ''Wrk2ResultIn) + +type LatencyValues = Double + +data LatencyResultOut = LatencyResultOut + { lroRaw :: ![LatencyValues] + , lroHistogram :: !HdrHistogram + , lroSummary :: !LatencySummary + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 3 J.camelCase) ''LatencyResultOut) + +data Wrk2ResultOut = Wrk2ResultOut + { wrk2oSummary :: !J.Value + , wrk2oRequests :: !RequestsSummary + , wrk2oLatency :: !LatencyResultOut + } + deriving (Show, Eq) + +$(J.deriveJSON (J.aesonDrop 5 J.camelCase) ''Wrk2ResultOut) + +data BenchResult + = BRWrk WrkResultOut + | BRWrk2 Wrk2ResultOut + deriving (Show, Eq) + +$(J.deriveJSON J.defaultOptions {J.sumEncoding = J.UntaggedValue} + ''BenchResult) + +data BenchMessage + = BMRunBenchmark + { bmConfiguration :: !BenchConf } + | BMStart + { bmConfiguration :: !BenchConf } + | BMResult + { bmConfiguration :: !BenchConf + , bmResult :: !BenchResult + } + | BMFinish + { bmConfiguration :: !BenchConf } + | BMError + { bmMessage :: ErrorMessage } + deriving (Show, Eq) + +$(J.deriveJSON J.defaultOptions + { J.sumEncoding = J.ObjectWithSingleField + , J.constructorTagModifier = J.trainCase . drop 2 + , J.fieldLabelModifier = J.trainCase. drop 2 + } + ''BenchMessage + ) + +newtype WrkScriptsDir = WrkScriptsDir { getWrkScriptsDir :: FilePath} + deriving (Eq) + +instance Show WrkScriptsDir where + show (WrkScriptsDir d) = d + +instance Default WrkScriptsDir where + def = WrkScriptsDir "bench_scripts" + +newtype WrkScript = WrkScript { getWrkScript :: FilePath } + deriving (Eq) + +instance Show WrkScript where + show (WrkScript s) = s + +instance Default WrkScript where + def = WrkScript $ show (def :: WrkScriptsDir) <> "/bench-wrk.lua" + +newtype Wrk2Script = Wrk2Script { getWrk2Script :: FilePath } + deriving (Eq) + +instance Show Wrk2Script where + show (Wrk2Script s) = s + +instance Default Wrk2Script where + def = Wrk2Script $ show (def :: WrkScriptsDir) <> "/bench-wrk2.lua" diff --git a/server/bench-wrk/wrk-websocket-server/wrk-websocket-server.cabal b/server/bench-wrk/wrk-websocket-server/wrk-websocket-server.cabal new file mode 100644 index 0000000000000..2aa402a37cd9c --- /dev/null +++ b/server/bench-wrk/wrk-websocket-server/wrk-websocket-server.cabal @@ -0,0 +1,48 @@ +cabal-version: >=1.10 +-- Initial package description 'wrk-websocket-server.cabal' generated by +-- 'cabal init'. For further documentation, see +-- http://haskell.org/cabal/users-guide/ + +name: wrk-websocket-server +version: 0.1.0.0 +-- synopsis: +-- description: +-- bug-reports: +license: Apache-2.0 +author: Nizar Malangadan +maintainer: nizar-m@users.noreply.github.com +-- copyright: +-- category: +build-type: Simple + +executable wrk-websocket-server + ghc-options: -Wall -Wcompat + + main-is: Main.hs + hs-source-dirs: src + , app + other-modules: Wrk.Server + , Wrk.Server.Types + + -- other-extensions: + build-depends: base >=4.12 && <4.13 + , websockets + , stm + , aeson + , aeson-casing + , process + , mtl + , bytestring >= 0.10 + , lens-aeson + , lens + , text + , directory + , attoparsec + , unordered-containers + , data-default + , wreq + , optparse-applicative + , filepath + , case-insensitive + -- hs-source-dirs: + default-language: Haskell2010 diff --git a/server/cabal.project.dev b/server/cabal.project.dev index bddc8b1230467..817488041e79f 100644 --- a/server/cabal.project.dev +++ b/server/cabal.project.dev @@ -6,6 +6,8 @@ -- Or, if you want to customize the configuration: -- $ cp cabal.project.dev cabal.project.local +with-compiler: ghc-8.6.5 + package * documentation: true