Skip to content

Commit

Permalink
Merge branch 'main' into download-buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
francesconazzaro committed Jan 30, 2025
2 parents 279e691 + c8ac301 commit 86b53d1
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 14 deletions.
1 change: 1 addition & 0 deletions cads_broker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class BrokerConfig(pydantic_settings.BaseSettings):
broker_requeue_limit: int = 3
broker_max_internal_scheduler_tasks: int = 500
broker_max_accepted_requests: int = 2000
broker_max_dismissed_requests: int = 100
broker_cancel_stuck_requests_cache_ttl: int = 60
broker_stuck_requests_limit_minutes: int = 15

Expand Down
3 changes: 2 additions & 1 deletion cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
# the retrieve API sets the status to "dismissed",
# here the broker fixes the QoS and queue status accordingly
dismissed_requests = db.get_dismissed_requests(
session, limit=CONFIG.broker_max_accepted_requests
session, limit=CONFIG.broker_max_dismissed_requests
)
for request in dismissed_requests:
if future := self.futures.pop(request.request_uid, None):
Expand All @@ -450,6 +450,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
# try to cancel the job directly on the scheduler
cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid])
kill_job_on_worker(self.client, request.request_uid)
kill_job_on_worker(self.client, request.request_uid)
session = self.manage_dismissed_request(request, session)
session.commit()

Expand Down
93 changes: 82 additions & 11 deletions cads_broker/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import enum
import os
import random
import time
import uuid
from pathlib import Path
from typing import Any, List, Optional

import prettytable
import sqlalchemy as sa
import typer
from typing_extensions import Annotated
Expand Down Expand Up @@ -64,7 +66,10 @@ def add_dummy_requests(

@app.command()
def requests_cleaner(
connection_string: Optional[str] = None, older_than_days: Optional[int] = 365
connection_string: Optional[str] = None,
older_than_days: Optional[int] = 365,
delete_bulk_size: Optional[int] = None,
delete_sleep_time: Optional[int] = None,
) -> None:
"""Remove records from the system_requests table older than `older_than_days`."""
if not connection_string:
Expand All @@ -73,18 +78,29 @@ def requests_cleaner(
engine = sa.create_engine(connection_string)
time_delta = datetime.datetime.now() - datetime.timedelta(days=older_than_days)
# clean system requests and (via cascading delete) events
with engine.begin() as conn:
database.logger.info("deleting old system_requests and events...")
database.logger.info("deleting old system_requests and events...")
curr_deleted = 1
subquery = sa.select(database.SystemRequest.request_uid).where(
database.SystemRequest.created_at <= time_delta
)
with engine.connect() as conn:
if delete_bulk_size is not None:
# delete in sized bulks to give time to db replicas for synch
subquery = subquery.limit(delete_bulk_size)
stmt = sa.delete(database.SystemRequest).where(
database.SystemRequest.created_at <= time_delta
)
result = conn.execute(stmt)
conn.commit()
num_requests_deleted = result.rowcount
database.logger.info(
f"{num_requests_deleted} old system requests "
f"successfully removed from the broker database."
database.SystemRequest.request_uid.in_(subquery)
)
while curr_deleted:
with conn.begin():
result = conn.execute(stmt)
conn.commit()
curr_deleted = result.rowcount
database.logger.info(
f"{curr_deleted} old system requests "
f"successfully removed from the broker database."
)
if delete_sleep_time is not None:
time.sleep(delete_sleep_time)
# clean adaptor_properties
with engine.begin() as conn:
try:
Expand Down Expand Up @@ -131,6 +147,61 @@ class RequestStatus(str, enum.Enum):
accepted = "accepted"


@app.command()
def get_dynamic_priority(
request_uid: Optional[str] = None,
request_uids_file: Annotated[
Path, typer.Argument(exists=True, file_okay=True, dir_okay=False)
]
| None = None,
interval: float = 24 * 60 * 60,
origin: Optional[str] = None,
resource_mul: float = -1.0,
last_completed_mul: float = 0.8,
):
with database.ensure_session_obj(None)() as session:
users_resources = database.get_users_queue_from_processing_time(
session=session,
interval_stop=datetime.datetime.now(),
interval=datetime.timedelta(hours=interval / 60 / 60),
origin=origin,
)
if request_uid:
request_uids = [request_uid]
elif request_uids_file:
request_uids = request_uids_file.open().read().splitlines()
table = prettytable.PrettyTable(
[
"user_uid",
"request_uid",
"process_id",
"entry_point",
"user_resources_used",
"user_last_completed_request",
"priority",
]
)
for request_uid in request_uids:
request = database.get_request(request_uid, session)
resources = users_resources[request.user_uid]
last_completed_request = database.user_last_completed_request(
session, request.user_uid, interval
)
table.add_row(
[
request.user_uid,
request_uid,
request.process_id,
request.entry_point,
resources,
last_completed_request,
resource_mul * resources
+ last_completed_mul * last_completed_request,
]
)
typer.echo(table)


@app.command()
def delete_requests(
status: RequestStatus = RequestStatus.running,
Expand Down
2 changes: 1 addition & 1 deletion cads_broker/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def request_contains_all(context, key, values):
return contains_all(request_values, values)


def request_contains_any(context, column, key, values):
def request_contains_any(context, key, values):
request_values = context.request.request_body.get("request").get(key)
return contains_any(request_values, values)

Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- dask-core
- distributed
- lz4 # dask distributed needs this to compress / de-compress
- prettytable
- psycopg2
- pydantic>2
- pydantic-settings
Expand Down
49 changes: 48 additions & 1 deletion tests/test_90_entry_points.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import json
import logging
import os
import unittest.mock
import uuid
from typing import Any

import cacholote
import pytest
import sqlalchemy as sa
from psycopg import Connection
from typer.testing import CliRunner
Expand Down Expand Up @@ -147,7 +150,9 @@ def prepare_db(
# not existing case of recent properties with old requests


def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
def test_requests_cleaner(
session_obj: sa.orm.sessionmaker, caplog: pytest.LogCaptureFixture
):
connection_string = session_obj.kw["bind"].url

# test remove nothing, older_than_days=365 by default
Expand Down Expand Up @@ -207,3 +212,45 @@ def test_requests_cleaner(session_obj: sa.orm.sessionmaker):
assert len(all_requests) == 5
assert len(all_events) == 5
assert len(all_props) == 5

# test remove 10 requests in bulk size of 3 (all old props have old requests)
prepare_db(
session_obj,
num_old_props_used_by_old=10,
num_recent_props_used_by_recent=5,
)
caplog.clear()
caplog.set_level(logging.INFO)
result = runner.invoke(
entry_points.app,
[
"requests-cleaner",
"--connection-string",
connection_string,
"--delete-bulk-size",
3,
"--delete-sleep-time",
1,
],
)
assert result.exit_code == 0
with session_obj() as session:
all_requests = session.query(database.SystemRequest).all()
all_events = session.query(database.Events).all()
all_props = session.query(database.AdaptorProperties).all()
assert len(all_requests) == 5
assert len(all_events) == 5
assert len(all_props) == 5
with caplog.at_level(logging.ERROR):
log_msgs = [json.loads(r.msg)["event"] for r in caplog.records]
assert log_msgs == [
"deleting old system_requests and events...",
"3 old system requests successfully removed from the broker database.",
"3 old system requests successfully removed from the broker database.",
"3 old system requests successfully removed from the broker database.",
"1 old system requests successfully removed from the broker database.",
"0 old system requests successfully removed from the broker database.",
"deleting old adaptor_properties...",
"10 old adaptor properties successfully removed from the broker database.",
]
caplog.clear()

0 comments on commit 86b53d1

Please sign in to comment.