Skip to content

Commit

Permalink
Run SPaaS services as daemons.
Browse files Browse the repository at this point in the history
Adds the following:

- daemonie(), start_daemon(), stop_daemon() utility funcs.
- RPC service app constructor and service daemon.
- SPaaS Service Stack daemon
- Entrypoints for the SPaaS stack and the SPaaS RPC daemons.
  • Loading branch information
brainbot-devops committed Aug 9, 2019
1 parent 9520f63 commit 39e1631
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 90 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ common-service = "scenario_player.services.common.blueprints"
[tool.flit.scripts]
# CLI commands installed along with out package.
scenario_player = "scenario_player.__main__:main"
spaas-rpc = "scenario_player.services.rpc.app:service_daemon"
spaas-stack = "scenario_player.services.common.app:service_daemon"

#######################
# ISORT CONFIGURATION #
Expand Down
31 changes: 23 additions & 8 deletions scenario_player/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import os
import subprocess
import sys
import tarfile
import traceback
Expand Down Expand Up @@ -127,9 +128,24 @@ def main(ctx, chains, data_path):
default=sys.stdout.isatty(),
help="En-/disable console UI. [default: auto-detect]",
)
@click.option(
"--spaas-no-teardown",
"no_spaas_teardown",
default=False,
help="Do NOT tear down the spaas stack after scenario execution completes. "
"Allows runnign scenarios concurrently. [Default: False]",
)
@click.pass_context
def run(
ctx, mailgun_api_key, auth, password, keystore_file, scenario_file, notify_tasks, enable_ui
ctx,
mailgun_api_key,
auth,
password,
keystore_file,
scenario_file,
notify_tasks,
enable_ui,
no_spaas_teardown,
):
scenario_file = Path(scenario_file.name).absolute()
data_path = ctx.obj["data_path"]
Expand Down Expand Up @@ -158,10 +174,9 @@ def run(
# Dynamically import valid Task classes from sceanrio_player.tasks package.
collect_tasks(tasks)

# Start our Services
service_process = ServiceProcess()

service_process.start()
# Start the SPaaS Service stack.
# This is a no-op if it's already running.
subprocess.run("spaas-stack start")

# Run the scenario using the configurations passed.
runner = ScenarioRunner(
Expand Down Expand Up @@ -217,14 +232,14 @@ def run(
log.warning("Press q to exit")
while not ui_greenlet.dead:
gevent.sleep(1)
service_process.stop()
except ServiceProcessException:
service_process.kill()

finally:
runner.node_controller.stop()
if ui_greenlet is not None and not ui_greenlet.dead:
ui_greenlet.kill(ExitMainLoop)
ui_greenlet.join()
if not no_spaas_teardown:
subprocess.run("spaas-stack stop")


@main.command(name="reclaim-eth")
Expand Down
114 changes: 34 additions & 80 deletions scenario_player/services/common/app.py
Original file line number Diff line number Diff line change
@@ -1,96 +1,50 @@
import multiprocessing as mp
import logging

import requests
import structlog
import waitress

from scenario_player.exceptions.services import ServiceProcessException
from scenario_player.services.utils.factories import construct_flask_app
from scenario_player.services.utils.factories import (
construct_flask_app,
default_service_daemon_cli,
start_daemon,
stop_daemon,
)

log = structlog.getLogger(__name__)


class ServiceProcess(mp.Process):
""":class:`multiprocessing.Process` subclass for running the SP services.
def serve_spaas_stack(logfile_path, host, port):
"""Run an RPC flask app as a daemonized process."""
logging.basicConfig(filename=logfile_path, filemode="a+", level=logging.DEBUG)
log = structlog.getLogger()

The class's :meth:`.stop` method checks the server status and shuts it down
via a POST request.
app = construct_flask_app()

Instances of this class have their :attr:`.daemon` attribute **always** set
to `True`, regardless of any keywords passed to the class constructor.
It can only be overridden by setting it after class initialization.
log.info("Starting SPaaS Service Stack", host=host, port=port)
waitress.serve(app, host=host, port=port)

Should the service not be reachable, it calls :meth:`.kill` instead, since
the assumption is that it is stuck in a deadlock.

Also offers a :func:`.app_is_responsive` property, which returns a boolean.
This return `True` if a `GET /status` request returns a response (the status
code does not matter) and `False` if a connection error or timeout occurred.
"""
def service_daemon():
parser = default_service_daemon_cli()

def __init__(self, *args, host: str = "127.0.0.1", port: int = 5000, **kwargs):
if "target" in kwargs:
raise ValueError("'target' is not supported by this class!")
super(ServiceProcess, self).__init__(*args, **kwargs)
self.daemon = True
self.host = host
self.port = port
args = parser.parse_args()

@property
def is_reachable(self) -> bool:
try:
requests.get(f"http://{self.host}:{self.port}/status")
except (requests.ConnectionError, requests.Timeout):
# The service does not appear to be reachable.
return False
else:
return True
logfile_path = args.raiden_dir.joinpath("spaas")
logfile_path.mkdir(exist_ok=True, parents=True)
logfile_path = logfile_path.joinpath("SPaaS-Stack.log")
logfile_path.touch()

def stop(self) -> None:
"""Gracefully stop the service, if possible. Otherwise, kill it.
PIDFILE = args.raiden_dir.joinpath("spaas", "service-stack.pid")

This depends on if and how the Service's `/shutdown` endpoint responds.
If we cannot connect to it, or our request times out, we assume the service
is dead in the water and kill it.
If we do get a response, but its status code is not in the `2xx` range,
we check if the service is otherwise working, by making a GET request to
the `/status` endpoint. If this does succeed, we must assume the
shutdown sequence is faulty, and raise a :exc:`ServiceProcessException`.
.. Note::
In order for this method to work correctly, it requires the
:var:`scenario_player.services.common.blueprints.admin_blueprint`
to be registered with the app to gracefully shut it down.
Should the blueprint not be registered, it will *always* call :meth:`.kill`.
:raises ServiceProcessException: if we failed to shut down the service.
"""
shutdown_type = "werkzeug.server.shutdown"
try:
resp = requests.post(f"http://{self.host}:{self.port}/shutdown")
except (requests.ConnectionError, requests.Timeout):
# The server is not responding. Kill it with a hammer.
shutdown_type = "SIGKILL"
return self.kill()
else:
try:
resp.raise_for_status()
except requests.HTTPError:
if self.is_reachable:
# The server doesn't want to play ball - "gently" terminate its ass.
shutdown_type = "SIGTERM"
self.terminate()
finally:
if self.is_reachable:
# The server still exists.Notify a human about its insubordinantion.
raise ServiceProcessException("Shutdown sequence could not be initialized!")

log.info("SPaaS Server shutdown", shutdown_type=shutdown_type)

def run(self):
"""Run the Service."""
app = construct_flask_app()
waitress.serve(app, host=self.host, port=self.port)
if args.command == "start":
start_daemon(
PIDFILE,
serve_spaas_stack,
logfile_path,
args.host,
args.port,
stdout=logfile_path,
stderr=logfile_path,
)
elif args.command == "stop":
stop_daemon(PIDFILE)
88 changes: 88 additions & 0 deletions scenario_player/services/rpc/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Utility script to run an RPC Service instance from the command line.
TODO: Actually make use of the `--log-service` argument and configure
logging accordingly, if given.
"""
import logging
import pathlib

import flask
import structlog
import waitress

from scenario_player.services.common.blueprints import admin_blueprint, metrics_blueprint
from scenario_player.services.rpc.blueprints import (
instances_blueprint,
tokens_blueprint,
transactions_blueprint,
)
from scenario_player.services.rpc.utils import RPCRegistry
from scenario_player.services.utils.factories import (
default_service_daemon_cli,
start_daemon,
stop_daemon,
)


def rpc_app():
"""Create a :mod:`flask` app using only the RPC blueprints."""
from scenario_player import __version__

log = logging.getLogger()
NAME = "SPaaS-RPC-Service"

log.info("Creating RPC Flask App", version=__version__, name=NAME)

app = flask.Flask(NAME)

log.debug("Creating RPC Client Registry")
app.config["rpc-client"] = RPCRegistry()

blueprints = [
admin_blueprint,
instances_blueprint,
metrics_blueprint,
tokens_blueprint,
transactions_blueprint,
]
for bp in blueprints:
log.debug("Registering blueprint", blueprint=bp.name)
app.register_blueprint(bp)
return app


def serve_rpc(logfile_path, host, port):
"""Run an RPC flask app as a daemonized process."""
logging.basicConfig(filename=logfile_path, filemode="a+", level=logging.DEBUG)
log = structlog.getLogger()

app = rpc_app()

log.info("Starting RPC Service", host=host, port=port)
waitress.serve(app, host=host, port=port)


def service_daemon():
parser = default_service_daemon_cli()

args = parser.parse_args()

logfile_path = args.raiden_dir.joinpath("spaas")
logfile_path.mkdir(exist_ok=True, parents=True)
logfile_path = logfile_path.joinpath("SPaaS-RPC.log")
logfile_path.touch()

PIDFILE = args.raiden_dir.joinpath("spaas", "rpc-service.pid")

if args.command == "start":
start_daemon(
PIDFILE,
serve_rpc,
logfile_path,
args.host,
args.port,
stdout=logfile_path,
stderr=logfile_path,
)
elif args.command == "stop":
stop_daemon(PIDFILE)
2 changes: 1 addition & 1 deletion scenario_player/services/rpc/blueprints/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from scenario_player.services.rpc.blueprints.transactions import transactions_blueprint
from scenario_player.services.rpc.utils import RPCRegistry

__all__ = ["transactions_blueprint", "instances_blueprint"]
__all__ = ["transactions_blueprint", "instances_blueprint", "tokens_blueprint"]


HOOK_IMPL = pluggy.HookimplMarker(HOST_NAMESPACE)
Expand Down
Loading

0 comments on commit 39e1631

Please sign in to comment.