diff --git a/.gitignore b/.gitignore
index 46a54b99..b8c321d6 100755
--- a/.gitignore
+++ b/.gitignore
@@ -36,7 +36,10 @@ secret-docker-compose.yml
*worker_config.prod.yaml
*docker-compose-apps*
-distributed/kubernetes/apps/*
-dev-secret.yaml
+kubernetes/*
+secret.yaml
-*flask-deployment.yaml
\ No newline at end of file
+*scheduler-deployment.yaml
+*outputs-processor-deployment.yaml
+
+*google-creds.json
\ No newline at end of file
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index e69dac2d..1d33e314 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -1,6 +1,6 @@
repos:
-- repo: https://github.com/ambv/black
- rev: 18.9b0
- hooks:
- - id: black
- language_version: python3
\ No newline at end of file
+ - repo: https://github.com/ambv/black
+ rev: 19.10b0
+ hooks:
+ - id: black
+ language_version: python3
diff --git a/distributed/.dockerignore b/distributed/.dockerignore
deleted file mode 100644
index e1804ef7..00000000
--- a/distributed/.dockerignore
+++ /dev/null
@@ -1,25 +0,0 @@
-*.pyc
-*.db
-*.env
-db.sqlite3
-
-node_modules
-bower_components
-
-webapp_test.sh
-
-logs/
-
-dump.rdb
-
-.idea
-.cache
-.ipynb_checkpoints
-
-*.pem
-
-_build
-
-.DS_Store
-*.egg-info
-staticfiles
diff --git a/distributed/api/__init__.py b/distributed/api/__init__.py
deleted file mode 100644
index 172050c6..00000000
--- a/distributed/api/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from flask import Flask
-
-
-def create_app(test_config=None):
- app = Flask(__name__)
-
- @app.route('/hello')
- def hello():
- return 'Hello, World!'
-
- if test_config is not None:
- app.config.update(test_config)
-
- from api import endpoints
- app.register_blueprint(endpoints.bp)
-
- return app
-
-try:
- app = create_app()
-except Exception as e:
- print("got exception on import: ", e)
- app = None
\ No newline at end of file
diff --git a/distributed/api/celery_app/__init__.py b/distributed/api/celery_app/__init__.py
deleted file mode 100644
index 1c2c7882..00000000
--- a/distributed/api/celery_app/__init__.py
+++ /dev/null
@@ -1,148 +0,0 @@
-import os
-import time
-import functools
-import re
-import traceback
-
-import requests
-from celery import Celery
-from celery.signals import task_postrun
-from celery.result import AsyncResult
-
-import cs_storage
-
-
-try:
- from cs_config import functions
-except ImportError as ie:
- if os.environ.get("IS_FLASK", "False") == "True":
- functions = None
- else:
- raise ie
-
-
-COMP_URL = os.environ.get("COMP_URL")
-COMP_API_TOKEN = os.environ.get("COMP_API_TOKEN")
-
-CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
-CELERY_RESULT_BACKEND = os.environ.get(
- "CELERY_RESULT_BACKEND", "redis://localhost:6379"
-)
-
-OUTPUTS_VERSION = os.environ.get("OUTPUTS_VERSION")
-
-
-def get_task_routes():
- def clean(name):
- return re.sub("[^0-9a-zA-Z]+", "", name).lower()
-
- print(f"getting config from: {COMP_URL}/publish/api/")
- resp = requests.get(f"{COMP_URL}/publish/api/")
- if resp.status_code != 200:
- raise Exception(f"Response status code: {resp.status_code}")
- data = resp.json()
- task_routes = {}
- for project in data:
- owner = clean(project["owner"])
- title = clean(project["title"])
- model = f"{owner}_{title}"
-
- # all apps use celery workers for handling their inputs.
- routes = {
- f"{model}_tasks.inputs_get": {"queue": f"{model}_inputs_queue"},
- f"{model}_tasks.inputs_parse": {"queue": f"{model}_inputs_queue"},
- f"{model}_tasks.inputs_version": {"queue": f"{model}_inputs_queue"},
- }
-
- # only add sim routes for models that use celery workers.
- if project["cluster_type"] == "single-core":
- routes[f"{model}_tasks.sim"] = {"queue": f"{model}_queue"}
-
- task_routes.update(routes)
- return task_routes
-
-
-task_routes = get_task_routes()
-
-
-celery_app = Celery(
- "celery_app", broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND
-)
-celery_app.conf.update(
- task_serializer="json",
- accept_content=["msgpack", "json"],
- task_routes=task_routes,
- worker_prefetch_multiplier=1,
- task_acks_late=True,
-)
-
-
-def task_wrapper(func):
- @functools.wraps(func)
- def f(*args, **kwargs):
- task = args[0]
- task_id = task.request.id
- start = time.time()
- traceback_str = None
- res = {}
- try:
- outputs = func(*args, **kwargs)
- if task.name.endswith("sim"):
- version = outputs.pop("version", OUTPUTS_VERSION)
- if version == "v0":
- res["model_version"] = "NA"
- res.update(dict(outputs, **{"version": version}))
- else:
- outputs = cs_storage.write(task_id, outputs)
- res.update(
- {
- "model_version": functions.get_version(),
- "outputs": outputs,
- "version": version,
- }
- )
- else:
- res.update(outputs)
- except Exception:
- traceback_str = traceback.format_exc()
- finish = time.time()
- if "meta" not in res:
- res["meta"] = {}
- res["meta"]["task_times"] = [finish - start]
- if traceback_str is None:
- res["status"] = "SUCCESS"
- else:
- res["status"] = "FAIL"
- res["traceback"] = traceback_str
- return res
-
- return f
-
-
-@task_postrun.connect
-def post_results(sender=None, headers=None, body=None, **kwargs):
- print(f'task_id: {kwargs["task_id"]}')
- print(f'task: {kwargs["task"]} {kwargs["task"].name}')
- print(f'is sim: {kwargs["task"].name.endswith("sim")}')
- print(f'state: {kwargs["state"]}')
- kwargs["retval"]["job_id"] = kwargs["task_id"]
- if kwargs["task"].name.endswith("sim"):
- print(f"posting data to {COMP_URL}/outputs/api/")
- resp = requests.put(
- f"{COMP_URL}/outputs/api/",
- json=kwargs["retval"],
- headers={"Authorization": f"Token {COMP_API_TOKEN}"},
- )
- print("resp", resp.status_code)
- if resp.status_code == 400:
- print("errors", resp.json())
- if kwargs["task"].name.endswith("parse"):
- print(f"posting data to {COMP_URL}/inputs/api/")
- resp = requests.put(
- f"{COMP_URL}/inputs/api/",
- json=kwargs["retval"],
- headers={"Authorization": f"Token {COMP_API_TOKEN}"},
- )
- print("resp", resp.status_code)
- if resp.status_code == 400:
- print("errors", resp.json())
diff --git a/distributed/api/endpoints.py b/distributed/api/endpoints.py
deleted file mode 100644
index 206fc96a..00000000
--- a/distributed/api/endpoints.py
+++ /dev/null
@@ -1,239 +0,0 @@
-import functools
-import json
-import os
-import re
-import time
-import traceback
-import uuid
-from collections import defaultdict
-
-from flask import Blueprint, request, make_response
-from celery.result import AsyncResult
-from celery import chord
-from distributed import Client, Future, fire_and_forget
-import redis
-import requests
-
-from api.celery_app import celery_app
-from cs_dask_sim import dask_sim, done_callback
-
-
-COMP_URL = os.environ.get("COMP_URL")
-COMP_API_TOKEN = os.environ.get("COMP_API_TOKEN")
-
-bp = Blueprint("endpoints", __name__)
-
-queue_name = "celery"
-client = redis.Redis.from_url(
- os.environ.get("CELERY_BROKER_URL", "redis://redis-master/0")
-)
-
-
-def clean(word):
- return re.sub("[^0-9a-zA-Z]+", "", word).lower()
-
-
-def get_cs_config():
- print(f"getting config from: {COMP_URL}/publish/api/")
- resp = requests.get(f"{COMP_URL}/publish/api/")
- if resp.status_code != 200:
- raise Exception(f"Response status code: {resp.status_code}")
- data = resp.json()
- print("got config: ", data)
- config = {}
-
- for model in data:
- model_id = clean(model["owner"]), clean(model["title"])
- config[model_id] = {
- "cluster_type": model["cluster_type"],
- "time_out": model["exp_task_time"] * 1.25,
- }
- print("made config: ", config)
- return config
-
-
-CONFIG = get_cs_config()
-
-
-def get_cluster_type(owner, app_name):
- model_id = clean(owner), clean(app_name)
- # allowed to return None
- return CONFIG.get(model_id, {}).get("cluster_type")
-
-
-def get_time_out(owner, app_name):
- model_id = clean(owner), clean(app_name)
- return CONFIG[model_id]["time_out"]
-
-
-def dask_scheduler_address(owner, app_name):
- owner, app_name = clean(owner), clean(app_name)
- return f"{owner}-{app_name}-dask-scheduler:8786"
-
-
-def async_endpoint(owner, app_name, compute_task):
- print(f"async endpoint {compute_task}")
- data = request.get_data()
- inputs = json.loads(data)
- print("inputs", inputs)
- result = celery_app.signature(compute_task, kwargs=inputs).delay()
- length = client.llen(f"{owner}_{app_name}_queue") + 1
- data = {"job_id": str(result), "qlength": length}
- return json.dumps(data)
-
-
-def sync_endpoint(owner, app_name, compute_task):
- print(f"io endpoint {compute_task}")
- data = request.get_data()
- print("got data", data)
- if not data:
- inputs = {}
- else:
- inputs = json.loads(data)
- print("inputs", inputs)
- result = celery_app.signature(compute_task, kwargs=inputs).delay()
- print("getting...")
- result = result.get()
- return json.dumps(result)
-
-
-def dask_endpoint(owner, app_name, action):
- """
- Route dask simulation to appropriate dask scheduluer.
- """
- print(f"dask endpoint: {owner}/{app_name}/{action}")
- data = request.get_data()
- inputs = json.loads(data)
- print("inputs", inputs)
- addr = dask_scheduler_address(owner, app_name)
- job_id = str(uuid.uuid4())
-
- # Worker needs the job_id to push the results back to the
- # webapp.
- # The url and api token are passed as args insted of env
- # variables so that the wrapper has access to them
- # but the model does not.
- inputs.update(
- {
- "job_id": job_id,
- "comp_url": os.environ.get("COMP_URL"),
- "comp_api_token": os.environ.get("COMP_API_TOKEN"),
- "timeout": get_time_out(owner, app_name),
- }
- )
-
- with Client(addr) as c:
- fut = c.submit(dask_sim, **inputs)
- fire_and_forget(fut)
- return {"job_id": job_id, "qlength": 1}
-
-
-def route_to_task(owner, app_name, endpoint, action):
- owner, app_name = clean(owner), clean(app_name)
- print("getting...", owner, app_name, endpoint, action)
- task_name = f"{owner}_{app_name}_tasks.{action}"
- print("got task_name", task_name)
- print("map", celery_app.amqp.routes)
- if task_name in celery_app.amqp.routes[0].map:
- return endpoint(owner, app_name, task_name)
- else:
- return json.dumps({"error": "invalid endpoint"}), 404
-
-
-@bp.route("/
+
+
+