Skip to content

Commit

Permalink
Merge pull request #42 from bento-platform/features/public-last-inges…
Browse files Browse the repository at this point in the history
…tion-retrival-test

feat: public endpoint for last completed ingestions
  • Loading branch information
noctillion authored Aug 1, 2023
2 parents 7aa0a1c + 01b03cf commit 4854d38
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 17 deletions.
26 changes: 26 additions & 0 deletions bento_wes/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"update_stuck_runs",
"update_db",
"run_request_dict",
"run_request_dict_public",
"run_log_dict",
"task_log_dict",
"get_task_logs",
Expand Down Expand Up @@ -165,6 +166,23 @@ def run_request_dict(run_request: sqlite3.Row) -> dict:
}


def run_request_dict_public(run_request: sqlite3.Row) -> dict:
tags = json.loads(run_request["tags"])
workflow_metadata = tags["workflow_metadata"]

return {
"workflow_type": run_request["workflow_type"],
"tags": {
"table_id": tags["table_id"],
"workflow_id": tags["workflow_id"],
"workflow_metadata": {
"data_type": workflow_metadata["data_type"],
"id": workflow_metadata["id"],
}
}
}


def _strip_first_slash(string: str):
return string[1:] if len(string) > 0 and string[0] == "/" else string

Expand All @@ -189,6 +207,14 @@ def run_log_dict(run_id: Union[uuid.UUID, str], run_log: sqlite3.Row) -> dict:
}


def run_log_dict_public(run_id: Union[uuid.UUID, str], run_log: sqlite3.Row) -> dict:
return {
"id": run_log["id"],
"start_time": run_log["start_time"],
"end_time": run_log["end_time"],
}


def task_log_dict(task_log: sqlite3.Row) -> dict:
return {
"name": task_log["name"],
Expand Down
60 changes: 44 additions & 16 deletions bento_wes/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,16 @@
count_bento_workflow_file_outputs,
)

from .db import get_db, run_request_dict, run_log_dict, get_task_logs, get_run_details, update_run_state_and_commit
from .db import (
get_db,
run_request_dict,
run_request_dict_public,
run_log_dict,
run_log_dict_public,
get_task_logs,
get_run_details,
update_run_state_and_commit
)


bp_runs = Blueprint("runs", __name__)
Expand Down Expand Up @@ -288,6 +297,37 @@ def _create_run(db, c):
return flask_bad_request_error("Assertion error: bad run request format")


def fetch_run_details(c, public_endpoint=False):
c.execute(
"SELECT r.id AS run_id, r.state AS state, rr.*, rl.* "
"FROM runs AS r, run_requests AS rr, run_logs AS rl "
"WHERE r.request = rr.id AND r.run_log = rl.id"
)

runs = []
for r in c.fetchall():
request = run_request_dict_public(r) if public_endpoint else run_request_dict(r)
run_log = run_log_dict_public(r["run_id"], r) if public_endpoint else run_log_dict(r["run_id"], r)
state = r["state"]

run_data = {
'run_id': r["run_id"],
'state': state,
'details': {
'run_id': r["run_id"],
'state': state,
'request': request,
'run_log': run_log,
'task_logs': get_task_logs(c, r["run_id"]) if not public_endpoint else None,
}
}

if not public_endpoint or (public_endpoint and state == "COMPLETE"):
runs.append(run_data)

return runs


@bp_runs.route("/runs", methods=["GET", "POST"])
@flask_permissions_owner # TODO: Allow others to submit analysis runs?
def run_list():
Expand All @@ -298,6 +338,8 @@ def run_list():
return _create_run(db, c)

# GET
# CHORD Extension: Include run public details with /runs request
public_endpoint = request.args.get("public", "false").lower() == "true"
# CHORD Extension: Include run details with /runs request
with_details = request.args.get("with_details", "false").lower() == "true"

Expand All @@ -309,21 +351,7 @@ def run_list():
"state": run["state"]
} for run in c.fetchall()])

c.execute("SELECT r.id AS run_id, r.state AS state, rr.*, rl.* "
"FROM runs AS r, run_requests AS rr, run_logs AS rl "
"WHERE r.request = rr.id AND r.run_log = rl.id")

return jsonify([{
"run_id": r["run_id"],
"state": r["state"],
"details": {
"run_id": r["run_id"],
"state": r["state"],
"request": run_request_dict(r),
"run_log": run_log_dict(r["run_id"], r),
"task_logs": get_task_logs(c, r["run_id"])
}
} for r in c.fetchall()])
return jsonify(fetch_run_details(c, public_endpoint=public_endpoint))


@bp_runs.route("/runs/<uuid:run_id>", methods=["GET"])
Expand Down
1 change: 1 addition & 0 deletions tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"tags": {
"workflow_id": "phenopackets_json",
"workflow_metadata": {
"id": "phenopackets_json",
"name": "Bento Phenopackets-Compatible JSON",
"description": "This ingestion workflow will validate and import a Phenopackets schema-compatible "
"JSON document.",
Expand Down
55 changes: 54 additions & 1 deletion tests/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import responses
import uuid

from bento_wes.states import STATE_QUEUED
from bento_lib.events import EventBus

from .constants import EXAMPLE_RUN, EXAMPLE_RUN_BODY

from bento_wes.db import get_db, run_request_dict_public, update_run_state_and_commit
from bento_wes.states import STATE_QUEUED, STATE_COMPLETE


def _add_workflow_response(r):
with open(os.path.join(os.path.dirname(__file__), "phenopackets_json.wdl"), "r") as wf:
Expand Down Expand Up @@ -170,6 +173,56 @@ def test_run_cancel_endpoint(client, mocked_responses):
assert len(error["errors"]) == 1
assert error["errors"][0]["message"].startswith("No Celery ID present")


event_bus = EventBus(allow_fake=True) # mock event bus


def test_runs_public_endpoint(client, mocked_responses):
_add_workflow_response(mocked_responses)
_add_ott_response(mocked_responses)

# first, create a run, so we have something to fetch
rv = client.post("/runs", data=EXAMPLE_RUN_BODY)
assert rv.status_code == 200 # 200 is WES spec, even though 201 would be better (?)

# make sure the run is complete, otherwise the public endpoint won't list it
db = get_db()
c = db.cursor()
update_run_state_and_commit(db, c, event_bus, rv.get_json()["run_id"], STATE_COMPLETE)

# validate the public runs endpoint
rv = client.get("/runs?with_details=true&public=true")
assert rv.status_code == 200
data = rv.get_json()

expected_keys = ["run_id", "state", "details"]
expected_details_keys = ["request", "run_id", "run_log", "state", "task_logs"]
expected_request_keys = ["tags", "workflow_type"]
expected_tags_keys = ["table_id", "workflow_id", "workflow_metadata"]
expected_metadata_keys = ["data_type", "id"]
expected_run_log_keys = ["end_time", "id", "start_time"]

for run in data:
assert set(run.keys()) == set(expected_keys)
details = run["details"]
assert set(details.keys()) == set(expected_details_keys)
request = details["request"]
assert set(request.keys()) == set(expected_request_keys)
tags = request["tags"]
assert set(tags.keys()) == set(expected_tags_keys)
metadata = tags["workflow_metadata"]
assert set(metadata.keys()) == set(expected_metadata_keys)
run_log = details["run_log"]
assert set(run_log.keys()) == set(expected_run_log_keys)

# Testing run_request_dict_public function
mock_run_request = {
"workflow_type": request["workflow_type"],
"tags": json.dumps(tags)
}
expected_request = run_request_dict_public(mock_run_request)
assert request == expected_request

# TODO: Get celery running for tests

# rv = client.post(f"/runs/{cr_data['run_id']}/cancel")
Expand Down

0 comments on commit 4854d38

Please sign in to comment.