Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create jobs from new endpoints #375

Merged
merged 14 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ jobs:
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
rabbit-mq:
image: "rabbitmq:3.12-management"
ports:
- "5672:5672"
- "15672:15672"
env:
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

steps:
- name: Check out repository
Expand All @@ -119,11 +127,17 @@ jobs:
python -m pip install --upgrade pip
pip install .[test]

- name: Run tests
- name: Run tests (excluding job maker)
env:
DATABASE_URL: postgresql://postgres:password@localhost:5432/test_db
run: |
pytest test/e2e --random-order --random-order-bucket=global --cov --cov-report=xml
pytest test/e2e --random-order --random-order-bucket=global --cov --cov-report=xml --ignore=test/e2e/test_job_maker.py

- name: Run test job maker
env:
DEV_MODE: True
run: |
pytest test/e2e/test_job_maker.py --random-order --random-order-bucket=global --cov --cov-report=xml

- name: Upload coverage
uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0
Expand Down
88 changes: 88 additions & 0 deletions fia_api/core/job_maker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import json
from typing import Any

from pika.adapters.blocking_connection import BlockingConnection # type: ignore[import-untyped]
from pika.connection import ConnectionParameters # type: ignore[import-untyped]
from pika.credentials import PlainCredentials # type: ignore[import-untyped]


class JobMaker:
def __init__(self, queue_host: str, username: str, password: str, queue_name: str):
credentials = PlainCredentials(username=username, password=password)
self.connection_parameters = ConnectionParameters(queue_host, 5672, credentials=credentials)
self.queue_name = queue_name
self.connection = None
self.channel = None
self._connect_to_broker()

def _connect_to_broker(self) -> None:
"""
Use this to connect to the broker
:return: None
"""
self.connection = BlockingConnection(self.connection_parameters)
self.channel = self.connection.channel() # type: ignore[attr-defined]
self.channel.exchange_declare( # type: ignore[attr-defined]
self.queue_name,
exchange_type="direct",
durable=True,
)
self.channel.queue_declare( # type: ignore[attr-defined]
self.queue_name,
durable=True,
arguments={"x-queue-type": "quorum"},
)
self.channel.queue_bind(self.queue_name, self.queue_name, routing_key="") # type: ignore[attr-defined]

def _send_message(self, message: str) -> None:
self._connect_to_broker()
# Assuming channel is set in _connect_to_broker()
self.channel.basic_publish(exchange=self.queue_name, routing_key="", body=message) # type: ignore

def rerun_job(
self,
job_id: int,
runner_image: str,
script: str,
experiment_number: int | None = None,
user_number: int | None = None,
) -> None:
"""
Submit a rerun job to the scheduled job queue in the message broker. Default to using experiment_number over
user_number.
:param job_id: The id of the job to be reran
:param runner_image: The image used as a runner on the cluster
:param script: The script to be used in the runner
:param experiment_number: the experiment number of the owner
:param user_number: the user number of the owner
:return: None
"""
json_dict: dict[str, Any] = {"job_id": job_id, "runner_image": runner_image, "script": script}
if experiment_number is not None:
json_dict["experiment_number"] = experiment_number
elif user_number is not None:
json_dict["user_number"] = user_number
else:
raise ValueError("Something needs to own the job, either experiment_number or user_number.")
self._send_message(json.dumps(json_dict))

def simple_job(
self, runner_image: str, script: str, experiment_number: int | None = None, user_number: int | None = None
) -> None:
"""
Submit a job to the scheduled job queue in the message broker. Default to using experiment_number over
user_number.
:param runner_image: The image used as a runner on the cluster
:param script: The script to be used in the runner
:param experiment_number: the experiment number of the owner
:param user_number: the user number of the owner
:return: None
"""
json_dict: dict[str, Any] = {"runner_image": runner_image, "script": script}
if experiment_number is not None:
json_dict["experiment_number"] = experiment_number
elif user_number is not None:
json_dict["user_number"] = user_number
else:
raise ValueError("Something needs to own the job, either experiment_number or user_number.")
self._send_message(json.dumps(json_dict))
41 changes: 41 additions & 0 deletions fia_api/core/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,42 @@
Service Layer for jobs
"""

import os
from collections.abc import Sequence
from typing import Literal

from db.data_models import Job
from pydantic import BaseModel

from fia_api.core.auth.experiments import get_experiments_for_user_number
from fia_api.core.exceptions import AuthenticationError, MissingRecordError
from fia_api.core.job_maker import JobMaker
from fia_api.core.repositories import Repo
from fia_api.core.specifications.job import JobSpecification


def job_maker() -> JobMaker:
"""Creates a JobMaker and returns it using env vars"""
queue_host = os.environ.get("QUEUE_HOST", "localhost")
queue_name = os.environ.get("EGRESS_QUEUE_NAME", "scheduled-jobs")
producer_username = os.environ.get("QUEUE_USER", "guest")
producer_password = os.environ.get("QUEUE_PASSWORD", "guest")
return JobMaker(
queue_host=queue_host, queue_name=queue_name, username=producer_username, password=producer_password
)


class SimpleJob(BaseModel):
runner_image: str
script: str


class RerunJob(BaseModel):
job_id: int
runner_image: str
script: str


OrderField = Literal[
"start",
"end",
Expand Down Expand Up @@ -124,3 +150,18 @@ def count_jobs() -> int:
:return: (int) number of jobs
"""
return _REPO.count(JobSpecification().all())


def get_experiment_number_for_job_id(job_id: int) -> int:
"""
Given a job id find and return the experiment number attached to it or will raise an exception.
:param job_id: (int) The id of the job
:return: (int) the experiment number of the job found with the id
"""
job = _REPO.find_one(JobSpecification().by_id(job_id))
if job is not None:
owner = job.owner
if owner is not None and owner.experiment_number is not None:
return owner.experiment_number
raise ValueError("Job has no owner or owner does not have an experiment number in the DB")
raise ValueError("No job found with ID in the DB")
42 changes: 42 additions & 0 deletions fia_api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

from http import HTTPStatus
from typing import Annotated, Any, Literal

from fastapi import APIRouter, Depends, HTTPException
Expand All @@ -12,7 +13,9 @@
from starlette.background import BackgroundTasks

from fia_api.core.auth.api_keys import APIKeyBearer
from fia_api.core.auth.experiments import get_experiments_for_user_number
from fia_api.core.auth.tokens import JWTBearer, get_user_from_token
from fia_api.core.job_maker import JobMaker
from fia_api.core.repositories import test_connection
from fia_api.core.responses import (
CountResponse,
Expand All @@ -22,11 +25,15 @@
)
from fia_api.core.services.instrument import get_specification_by_instrument_name, update_specification_for_instrument
from fia_api.core.services.job import (
RerunJob,
SimpleJob,
count_jobs,
count_jobs_by_instrument,
get_all_jobs,
get_experiment_number_for_job_id,
get_job_by_id,
get_job_by_instrument,
job_maker,
)
from fia_api.scripts.acquisition import (
get_script_by_sha,
Expand Down Expand Up @@ -220,6 +227,41 @@ async def count_all_jobs() -> CountResponse:
return CountResponse(count=count_jobs())


@ROUTER.post("/job/rerun", tags=["job"])
async def make_rerun_job(
rerun_job: RerunJob,
credentials: Annotated[HTTPAuthorizationCredentials, Depends(jwt_security)],
job_maker: Annotated[JobMaker, Depends(job_maker)],
) -> None:
user = get_user_from_token(credentials.credentials)
experiment_number = get_experiment_number_for_job_id(rerun_job.job_id)
# Forbidden if not staff, and experiment number not related to this user_number's experiment number
if user.role != "staff":
experiment_numbers = get_experiments_for_user_number(user.user_number)
if experiment_number not in experiment_numbers:
# If not staff this is not allowed
raise HTTPException(status_code=HTTPStatus.FORBIDDEN)
job_maker.rerun_job(
job_id=rerun_job.job_id,
runner_image=rerun_job.runner_image,
script=rerun_job.script,
experiment_number=experiment_number,
)


@ROUTER.post("/job/simple", tags=["job"])
async def make_simple_job(
simple_job: SimpleJob,
credentials: Annotated[HTTPAuthorizationCredentials, Depends(jwt_security)],
job_maker: Annotated[JobMaker, Depends(job_maker)],
) -> None:
user = get_user_from_token(credentials.credentials)
if user.role != "staff":
# If not staff this is not allowed
raise HTTPException(status_code=HTTPStatus.FORBIDDEN)
job_maker.simple_job(runner_image=simple_job.runner_image, script=simple_job.script, user_number=user.user_number)


@ROUTER.get("/instrument/{instrument_name}/specification", tags=["instrument"], response_model=None)
async def get_instrument_specification(
instrument_name: str, _: Annotated[HTTPAuthorizationCredentials, Depends(api_key_security)]
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ dependencies = [
"pydantic==2.8.2",
"db@git+https://github.com/fiaisis/db",
"uvicorn==0.30.3",
"requests==2.32.3"
"requests==2.32.3",
"pika==1.3.2",
]

[project.urls]
Expand Down
23 changes: 23 additions & 0 deletions test/core/services/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

from unittest.mock import Mock, patch

import faker.generator
import pytest

from fia_api.core.exceptions import AuthenticationError, MissingRecordError
from fia_api.core.services.job import (
count_jobs,
count_jobs_by_instrument,
get_all_jobs,
get_experiment_number_for_job_id,
get_job_by_id,
get_job_by_instrument,
)
Expand Down Expand Up @@ -161,3 +163,24 @@ def test_get_all_jobs_with_user_no_access(mock_get_experiments, mock_spec_class,
spec.by_experiment_numbers.assert_called_once_with([], order_by="start", order_direction="desc", limit=0, offset=0)
mock_repo.find.assert_called_once_with(spec.by_experiment_numbers())
assert jobs == []


@patch("fia_api.core.services.job._REPO")
@patch("fia_api.core.services.job.JobSpecification")
def test_get_experiment_number_from_job_id(mock_spec_class, mock_repo):
job_id = faker.generator.random.randint(1, 1000)

get_experiment_number_for_job_id(job_id)

mock_spec_class.assert_called_once_with()
mock_spec_class.return_value.by_id.assert_called_once_with(job_id)
mock_repo.find_one.assert_called_once_with(mock_spec_class().by_id())


@patch("fia_api.core.services.job._REPO")
def test_get_experiment_number_from_job_id_expect_raise(mock_repo):
job_id = faker.generator.random.randint(1, 1000)
mock_repo.find_one.return_value = None

with patch("fia_api.core.services.job.JobSpecification"), pytest.raises(ValueError): # noqa: PT011
get_experiment_number_for_job_id(job_id)
Loading