diff --git a/pyproject.toml b/pyproject.toml index 9006d32fa..e5fe62fe3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,8 @@ common-service = "scenario_player.services.common.blueprints" [tool.flit.scripts] # CLI commands installed along with out package. scenario_player = "scenario_player.__main__:main" +spaas-rpc = "scenario_player.services.rpc.app:service_daemon" +spaas-stack = "scenario_player.services.common.app:service_daemon" ####################### # ISORT CONFIGURATION # diff --git a/scenario_player/main.py b/scenario_player/main.py index 5e3781e77..54244003d 100644 --- a/scenario_player/main.py +++ b/scenario_player/main.py @@ -2,6 +2,7 @@ import json import os +import subprocess import sys import tarfile import traceback @@ -127,9 +128,24 @@ def main(ctx, chains, data_path): default=sys.stdout.isatty(), help="En-/disable console UI. [default: auto-detect]", ) +@click.option( + "--spaas-no-teardown", + "no_spaas_teardown", + default=False, + help="Do NOT tear down the spaas stack after scenario execution completes. " + "Allows runnign scenarios concurrently. [Default: False]", +) @click.pass_context def run( - ctx, mailgun_api_key, auth, password, keystore_file, scenario_file, notify_tasks, enable_ui + ctx, + mailgun_api_key, + auth, + password, + keystore_file, + scenario_file, + notify_tasks, + enable_ui, + no_spaas_teardown, ): scenario_file = Path(scenario_file.name).absolute() data_path = ctx.obj["data_path"] @@ -158,10 +174,9 @@ def run( # Dynamically import valid Task classes from sceanrio_player.tasks package. collect_tasks(tasks) - # Start our Services - service_process = ServiceProcess() - - service_process.start() + # Start the SPaaS Service stack. + # This is a no-op if it's already running. + subprocess.run("spaas-stack start") # Run the scenario using the configurations passed. runner = ScenarioRunner( @@ -217,14 +232,14 @@ def run( log.warning("Press q to exit") while not ui_greenlet.dead: gevent.sleep(1) - service_process.stop() - except ServiceProcessException: - service_process.kill() + finally: runner.node_controller.stop() if ui_greenlet is not None and not ui_greenlet.dead: ui_greenlet.kill(ExitMainLoop) ui_greenlet.join() + if not no_spaas_teardown: + subprocess.run("spaas-stack stop") @main.command(name="reclaim-eth") diff --git a/scenario_player/services/common/app.py b/scenario_player/services/common/app.py index dc168b4f6..006c4ee76 100644 --- a/scenario_player/services/common/app.py +++ b/scenario_player/services/common/app.py @@ -1,96 +1,50 @@ -import multiprocessing as mp +import logging -import requests import structlog import waitress -from scenario_player.exceptions.services import ServiceProcessException -from scenario_player.services.utils.factories import construct_flask_app +from scenario_player.services.utils.factories import ( + construct_flask_app, + default_service_daemon_cli, + start_daemon, + stop_daemon, +) log = structlog.getLogger(__name__) -class ServiceProcess(mp.Process): - """:class:`multiprocessing.Process` subclass for running the SP services. +def serve_spaas_stack(logfile_path, host, port): + """Run an RPC flask app as a daemonized process.""" + logging.basicConfig(filename=logfile_path, filemode="a+", level=logging.DEBUG) + log = structlog.getLogger() - The class's :meth:`.stop` method checks the server status and shuts it down - via a POST request. + app = construct_flask_app() - Instances of this class have their :attr:`.daemon` attribute **always** set - to `True`, regardless of any keywords passed to the class constructor. - It can only be overridden by setting it after class initialization. + log.info("Starting SPaaS Service Stack", host=host, port=port) + waitress.serve(app, host=host, port=port) - Should the service not be reachable, it calls :meth:`.kill` instead, since - the assumption is that it is stuck in a deadlock. - Also offers a :func:`.app_is_responsive` property, which returns a boolean. - This return `True` if a `GET /status` request returns a response (the status - code does not matter) and `False` if a connection error or timeout occurred. - """ +def service_daemon(): + parser = default_service_daemon_cli() - def __init__(self, *args, host: str = "127.0.0.1", port: int = 5000, **kwargs): - if "target" in kwargs: - raise ValueError("'target' is not supported by this class!") - super(ServiceProcess, self).__init__(*args, **kwargs) - self.daemon = True - self.host = host - self.port = port + args = parser.parse_args() - @property - def is_reachable(self) -> bool: - try: - requests.get(f"http://{self.host}:{self.port}/status") - except (requests.ConnectionError, requests.Timeout): - # The service does not appear to be reachable. - return False - else: - return True + logfile_path = args.raiden_dir.joinpath("spaas") + logfile_path.mkdir(exist_ok=True, parents=True) + logfile_path = logfile_path.joinpath("SPaaS-Stack.log") + logfile_path.touch() - def stop(self) -> None: - """Gracefully stop the service, if possible. Otherwise, kill it. + PIDFILE = args.raiden_dir.joinpath("spaas", "service-stack.pid") - This depends on if and how the Service's `/shutdown` endpoint responds. - If we cannot connect to it, or our request times out, we assume the service - is dead in the water and kill it. - - If we do get a response, but its status code is not in the `2xx` range, - we check if the service is otherwise working, by making a GET request to - the `/status` endpoint. If this does succeed, we must assume the - shutdown sequence is faulty, and raise a :exc:`ServiceProcessException`. - - .. Note:: - - In order for this method to work correctly, it requires the - :var:`scenario_player.services.common.blueprints.admin_blueprint` - to be registered with the app to gracefully shut it down. - - Should the blueprint not be registered, it will *always* call :meth:`.kill`. - - :raises ServiceProcessException: if we failed to shut down the service. - """ - shutdown_type = "werkzeug.server.shutdown" - try: - resp = requests.post(f"http://{self.host}:{self.port}/shutdown") - except (requests.ConnectionError, requests.Timeout): - # The server is not responding. Kill it with a hammer. - shutdown_type = "SIGKILL" - return self.kill() - else: - try: - resp.raise_for_status() - except requests.HTTPError: - if self.is_reachable: - # The server doesn't want to play ball - "gently" terminate its ass. - shutdown_type = "SIGTERM" - self.terminate() - finally: - if self.is_reachable: - # The server still exists.Notify a human about its insubordinantion. - raise ServiceProcessException("Shutdown sequence could not be initialized!") - - log.info("SPaaS Server shutdown", shutdown_type=shutdown_type) - - def run(self): - """Run the Service.""" - app = construct_flask_app() - waitress.serve(app, host=self.host, port=self.port) + if args.command == "start": + start_daemon( + PIDFILE, + serve_spaas_stack, + logfile_path, + args.host, + args.port, + stdout=logfile_path, + stderr=logfile_path, + ) + elif args.command == "stop": + stop_daemon(PIDFILE) diff --git a/scenario_player/services/rpc/app.py b/scenario_player/services/rpc/app.py new file mode 100644 index 000000000..f58c3a902 --- /dev/null +++ b/scenario_player/services/rpc/app.py @@ -0,0 +1,88 @@ +"""Utility script to run an RPC Service instance from the command line. + +TODO: Actually make use of the `--log-service` argument and configure + logging accordingly, if given. +""" +import logging +import pathlib + +import flask +import structlog +import waitress + +from scenario_player.services.common.blueprints import admin_blueprint, metrics_blueprint +from scenario_player.services.rpc.blueprints import ( + instances_blueprint, + tokens_blueprint, + transactions_blueprint, +) +from scenario_player.services.rpc.utils import RPCRegistry +from scenario_player.services.utils.factories import ( + default_service_daemon_cli, + start_daemon, + stop_daemon, +) + + +def rpc_app(): + """Create a :mod:`flask` app using only the RPC blueprints.""" + from scenario_player import __version__ + + log = logging.getLogger() + NAME = "SPaaS-RPC-Service" + + log.info("Creating RPC Flask App", version=__version__, name=NAME) + + app = flask.Flask(NAME) + + log.debug("Creating RPC Client Registry") + app.config["rpc-client"] = RPCRegistry() + + blueprints = [ + admin_blueprint, + instances_blueprint, + metrics_blueprint, + tokens_blueprint, + transactions_blueprint, + ] + for bp in blueprints: + log.debug("Registering blueprint", blueprint=bp.name) + app.register_blueprint(bp) + return app + + +def serve_rpc(logfile_path, host, port): + """Run an RPC flask app as a daemonized process.""" + logging.basicConfig(filename=logfile_path, filemode="a+", level=logging.DEBUG) + log = structlog.getLogger() + + app = rpc_app() + + log.info("Starting RPC Service", host=host, port=port) + waitress.serve(app, host=host, port=port) + + +def service_daemon(): + parser = default_service_daemon_cli() + + args = parser.parse_args() + + logfile_path = args.raiden_dir.joinpath("spaas") + logfile_path.mkdir(exist_ok=True, parents=True) + logfile_path = logfile_path.joinpath("SPaaS-RPC.log") + logfile_path.touch() + + PIDFILE = args.raiden_dir.joinpath("spaas", "rpc-service.pid") + + if args.command == "start": + start_daemon( + PIDFILE, + serve_rpc, + logfile_path, + args.host, + args.port, + stdout=logfile_path, + stderr=logfile_path, + ) + elif args.command == "stop": + stop_daemon(PIDFILE) diff --git a/scenario_player/services/rpc/blueprints/__init__.py b/scenario_player/services/rpc/blueprints/__init__.py index 9fa789415..ff1746eba 100644 --- a/scenario_player/services/rpc/blueprints/__init__.py +++ b/scenario_player/services/rpc/blueprints/__init__.py @@ -6,7 +6,7 @@ from scenario_player.services.rpc.blueprints.transactions import transactions_blueprint from scenario_player.services.rpc.utils import RPCRegistry -__all__ = ["transactions_blueprint", "instances_blueprint"] +__all__ = ["transactions_blueprint", "instances_blueprint", "tokens_blueprint"] HOOK_IMPL = pluggy.HookimplMarker(HOST_NAMESPACE) diff --git a/scenario_player/services/utils/factories.py b/scenario_player/services/utils/factories.py index cc93b19ec..4bb130705 100644 --- a/scenario_player/services/utils/factories.py +++ b/scenario_player/services/utils/factories.py @@ -1,3 +1,9 @@ +import argparse +import atexit +import os +import pathlib +import signal +import sys from typing import Mapping import flask @@ -12,7 +18,7 @@ def construct_flask_app( config_file: str = "config.py", enable_plugins: bool = True, ) -> flask.Flask: - """Construct a flask app with the given blueprints registered. + """Construct a flask app with a set of default blueprints registered. By default all constructed apps use the :var:`admin_blueprint` and :var:`metrics_blueprint`, and therefore have the following endpoints: @@ -45,3 +51,118 @@ def construct_flask_app( SP_PM.hook.register_blueprints(app=app) return app + + +def daemonize(pidfile: pathlib.Path, *, stdin="/dev/null", stdout="/dev/null", stderr="/dev/null"): + """Daemonize the currently run script, using a double fork. + + Any commands executed after this function is called will be run in the daemonized process. + + Usage:: + + if __name__ == "__main__": + daemonize("/tmp/my_daemon.pid") + my_func_to_run_daemonized() + + """ + if pidfile.exists(): + raise RuntimeError("Already running") + + # First fork (detaches from parent) + try: + if os.fork() > 0: + raise SystemExit(0) # Parent exit + except OSError as e: + raise RuntimeError("fork #1 failed.") + + os.chdir("/") + os.umask(0) + os.setsid() + # Second fork (relinquish session leadership) + try: + if os.fork() > 0: + raise SystemExit(0) + except OSError as e: + raise RuntimeError("fork #2 failed.") + + # Flush I/O buffers + sys.stdout.flush() + sys.stderr.flush() + + # Replace file descriptors for stdin, stdout, and stderr + with open(stdin, "rb", 0) as f: + os.dup2(f.fileno(), sys.stdin.fileno()) + with open(stdout, "ab", 0) as f: + os.dup2(f.fileno(), sys.stdout.fileno()) + with open(stderr, "ab", 0) as f: + os.dup2(f.fileno(), sys.stderr.fileno()) + + # Write the PID file + pidfile.write_text(str(os.getpid())) + + # Arrange to have the PID file removed on exit/signal + atexit.register(lambda: pidfile.unlink()) + + # Signal handler for termination (required) + def sigterm_handler(signo, frame): + raise SystemExit(1) + + signal.signal(signal.SIGTERM, sigterm_handler) + + +def start_daemon(pid_file: pathlib.Path, func, *args, stdout=None, stderr=None, **kwargs): + """Run a function as a daemon process. + + Takes care of redirecting stdout and stderr to a logfile, instead of /dev/null. + + Any additional args and kwargs are passed to `func`. + """ + stdout = stdout or "/var/log/scenario-player/{func.__name__}.stdout" + stderr = stderr or "/var/log/scenario-player/{func.__name__}.stderr" + try: + daemonize(pid_file, stdout=stdout, stderr=stderr) + except RuntimeError as e: + print(e, file=sys.stderr) + raise SystemExit(1) + + func(*args, **kwargs) + + +def stop_daemon(pid_file: pathlib.Path): + """Stop the daemon with the given `pid_file`.""" + if pid_file.exists(): + pid = int(pid_file.read_text()) + os.kill(pid, signal.SIGTERM) + else: + print("Not running", file=sys.stderr) + raise SystemExit(1) + + +def default_service_daemon_cli(): + """Create an :class:`argparse.ArgumentParser` with a minimalistic set of CLI options. + + Configures the following options: + + * (required) - must be one of `start` or `stop`. + * --port (optional) - the port to assign to the service. Defaults to 5000. + * --host (optional) - the host to assign to the service. Defaults to 127.0.0.1 + """ + parser = argparse.ArgumentParser() + parser.add_argument("command", choices=["start", "stop"]) + parser.add_argument( + "--port", + default=5000, + help="Port number to run this service on. Defaults to '5000'", + type=int, + ) + parser.add_argument( + "--host", default="127.0.0.1", help="Host to run this service on. Defaults to '127.0.0.1'" + ) + parser.add_argument("--log-service", default=None, help="netloc of a SPaaS Logging Service.") + parser.add_argument( + "--raiden-dir", + default=pathlib.Path.home().joinpath(".raiden"), + help="Path to the .raiden dir. defaults to ~/.raiden", + type=pathlib.Path, + ) + return parser