Skip to content

Commit

Permalink
BullMQ queue (#194)
Browse files Browse the repository at this point in the history
* Chore: Try python 3.10

* Feature: Update to Python 3.11 (#195)

* Update Python and PyTorch

* Use 3.10

* Back to 3.12

* Use sudo

* Avoid dev

* Remove manual install of 3.12

* Relax scipy

* Try and fix gensim

* Update pip and wheel globally

* Install Rust for tokenizers

* Activate Rust

* Unpin transformers

* Try to remove Rust

* Bump setuptools

* Add comma

* Unpin faiss-cpu

* Downgrade to 3.11

* Fix Specter import

* Import from adapters

* Check what's different with specter

* Bump celery

* Remove print statement

* Pin packages

* Feat: Initial bullmq setup

* Add headers to tests

* Feat: Create first working version of bullmq queue

* Feat: Add update status to the expertise service

* Fix: Add jobs in its own queue

* Feat: Add legacy expertise endpoint back

* Refactor: Change the name queue

* Feat: Format name and logs for the queue

* Feat: Print traceback in logs

* Feat: Disallow repeated jobs

* Feat: Remove legacy code

* Feat: Add concurrency option to config

* Feat: Add match_group and alternate_match_group to the job logs

---------

Co-authored-by: Harold Rubio <[email protected]>
Co-authored-by: Harold Rubio <[email protected]>
  • Loading branch information
3 people authored Dec 17, 2024
1 parent 1362f71 commit ecc8a88
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 136 deletions.
1 change: 1 addition & 0 deletions expertise/service/config/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ REDIS_ADDR = 'localhost'
REDIS_PORT = 6379
REDIS_CONFIG_DB = 10
REDIS_EMBEDDINGS_DB = 11
ACTIVE_JOBS = 1
DEFAULT_CONFIG = {
"dataset": {},
"model": "specter+mfr",
Expand Down
277 changes: 268 additions & 9 deletions expertise/service/expertise.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@
from openreview import OpenReviewException
from enum import Enum
from threading import Lock
from bullmq import Queue, Worker
from expertise.execute_expertise import execute_create_dataset, execute_expertise
import asyncio
import threading

from .utils import JobConfig, APIRequest, JobDescription, JobStatus, SUPERUSER_IDS, RedisDatabase

user_index_file_lock = Lock()
class ExpertiseService(object):

def __init__(self, client, config, logger, client_v2 = None):
self.client = client
self.client_v2 = client_v2
def __init__(self, config, logger):
self.logger = logger
self.server_config = config
self.default_expertise_config = config['DEFAULT_CONFIG']
Expand All @@ -29,13 +31,47 @@ def __init__(self, client, config, logger, client_v2 = None):
port = config['REDIS_PORT'],
db = config['REDIS_CONFIG_DB']
)
self.queue = Queue(
'Expertise',
{
'prefix': 'bullmq:expertise',
'connection': {
"host": config['REDIS_ADDR'],
"port": config['REDIS_PORT'],
"db": config['REDIS_CONFIG_DB'],
}
}
)
self.start_queue_in_thread()

self.worker = Worker(
'Expertise',
self.worker_process,
{
'prefix': 'bullmq:expertise',
'connection': {
"host": config['REDIS_ADDR'],
"port": config['REDIS_PORT'],
"db": config['REDIS_CONFIG_DB'],
},
'autorun': False,
'concurrency': config['ACTIVE_JOBS'],
}
)
self.start_worker_in_thread()

# Define expected/required API fields
self.req_fields = ['name', 'match_group', 'user_id', 'job_id']
self.optional_model_params = ['use_title', 'use_abstract', 'average_score', 'max_score', 'skip_specter']
self.optional_fields = ['model', 'model_params', 'exclusion_inv', 'token', 'baseurl', 'baseurl_v2', 'paper_invitation', 'paper_id']
self.path_fields = ['work_dir', 'scores_path', 'publications_path', 'submissions_path']

def set_client(self, client):
self.client = client

def set_client_v2(self, client_v2):
self.client_v2 = client_v2

def _filter_config(self, running_config):
"""
Filters out certain server-side fields of a config file in order to
Expand Down Expand Up @@ -141,29 +177,252 @@ def _get_score_and_metadata_dir(self, search_dir, group_scoring=False):

return file_dir, metadata_dir

def update_status(self, config, new_status, desc=None):
"""
Updates the config of a given job to the new status
Optionally allows manual setting of the description
:param config: JobConfig of a given job
:type config: JobConfig
:param new_status: The new status for the job - a value from the JobStatus enumeration
:type new_status: str
"""
descriptions = JobDescription.VALS.value
config.status = new_status
if desc is None:
config.description = descriptions[new_status]
else:
if 'num_samples=0' in desc:
desc += '. Please check that there is at least 1 member of the match group with at least 1 publication on OpenReview.'
if 'Dimension out of range' in desc:
desc += '. Please check that you have at least 1 submission submitted and that you have run the Post Submission stage.'
config.description = desc
config.mdate = int(time.time() * 1000)
self.redis.save_job(config)

async def worker_process(self, job, token):
job_id = job.data['job_id']
user_id = job.data['user_id']
config = self.redis.load_job(job_id, user_id)
or_token = job.data['token']
openreview_client = openreview.Client(
token=or_token,
baseurl=config.baseurl
)
openreview_client_v2 = openreview.api.OpenReviewClient(
token=or_token,
baseurl=config.baseurl_v2
)
try:
execute_create_dataset(openreview_client, openreview_client_v2, config=config.to_json())
self.update_status(config, JobStatus.RUN_EXPERTISE)
execute_expertise(config=config.to_json())
self.update_status(config, JobStatus.COMPLETED)
except Exception as e:
self.update_status(config, JobStatus.ERROR, str(e))
# Re raise exception so that it appears in the queue
exception = e.with_traceback(e.__traceback__)
raise exception

def _get_job_name(self, request):
job_name_parts = [request.get('name', 'No name provided')]
entities = []
if request.get('entityA', {}).get('type'):
entities.append(request['entityA'])
else:
job_name_parts.append('No Entity A Type Found')
if request.get('entityB', {}).get('type'):
entities.append(request['entityB'])
else:
job_name_parts.append('No Entity B Type Found')

for entity in entities:
if entity['type'] == 'Group':
job_name_parts.append(entity.get('memberOf', 'No Group Found'))
elif entity['type'] == 'Note':
if entity.get('id'):
job_name_parts.append(entity['id'])
elif entity.get('invitation'):
job_name_parts.append(entity['invitation'])
elif entity.get('withVenueid'):
job_name_parts.append(entity['withVenueid'])
else:
job_name_parts.append('No Note Information Found')

return f'{job_name_parts[0]}: {job_name_parts[1]} - {job_name_parts[2]}'

def _get_log_from_request(self, request):
log = []
if request.get('entityA'):
log.append(f"Entity A: {json.dumps(request.get('entityA', {}), indent=4)}")
if request.get('entityB'):
log.append(f"Entity B: {json.dumps(request.get('entityB', {}), indent=4)}")

return '\n'.join(log)

def _get_log_from_config(self, config):
log = []
if config.name:
log.append(f"Job name: {config.name}")
if config.paper_id:
log.append(f"Paper ID: {config.paper_id}")
if config.paper_invitation:
log.append(f"Paper invitation: {config.paper_invitation}")
if config.paper_venueid:
log.append(f"Paper venue ID: {config.paper_venueid}")
if config.match_group:
log.append(f"Match group: {config.match_group}")
if config.alternate_match_group:
log.append(f"Alternate match group: {config.alternate_match_group}")
if config.model:
log.append(f"Model: {config.model}")
if config.model_params:
log.append(f"Model params: {json.dumps(config.to_json().get('model_params', {}), indent=4)}")

return '\n'.join(log)

def get_key_from_request(self, request):
key_parts = []
entities = []
if request.get('entityA', {}).get('type'):
entities.append(request['entityA'])
else:
key_parts.append('NoEntityA')

if request.get('entityB', {}).get('type'):
entities.append(request['entityB'])
else:
key_parts.append('NoEntityB')

for entity in entities:
if entity['type'] == 'Group':
key_parts.append(entity.get('memberOf', 'NoGroupFound'))
elif entity['type'] == 'Note':
if entity.get('id'):
key_parts.append(entity['id'])
elif entity.get('invitation'):
key_parts.append(entity['invitation'])
elif entity.get('withVenueid'):
key_parts.append(entity['withVenueid'])
else:
key_parts.append('NoNoteInformation')

if request.get('model', {}).get('name'):
key_parts.append(request['model']['name'])

return ':'.join(key_parts)

def start_expertise(self, request):
descriptions = JobDescription.VALS.value

from .celery_tasks import run_userpaper
job_name = self._get_job_name(request)
request_log = self._get_log_from_request(request)

request_key = self.get_key_from_request(request)

try:
future = asyncio.run_coroutine_threadsafe(
self.queue.getJobs([
'active',
'delayed',
'paused',
'waiting',
'waiting-children',
'prioritized',
]),
self.queue_loop,
)
jobs = future.result()
except Exception as e:
jobs = []

for job in jobs:
if job.data.get('request_key') == request_key:
raise openreview.OpenReviewException("Request already in queue")

config, token = self._prepare_config(request)
job_id = config.job_id

config_log = self._get_log_from_config(config)

config.mdate = int(time.time() * 1000)
config.status = JobStatus.QUEUED
config.description = descriptions[JobStatus.QUEUED]

# Config has passed validation - add it to the user index
self.logger.info('just before submitting')
run_userpaper.apply_async(
(config, token, self.logger),
queue='userpaper',
task_id=job_id
)

self.logger.info(f"\nconf: {config.to_json()}\n")
self.redis.save_job(config)

future = asyncio.run_coroutine_threadsafe(
self.queue.add(
job_name,
{
"job_id": job_id,
"request_key": request_key,
"user_id": config.user_id,
"token": token
},
{
'jobId': job_id,
'removeOnComplete': {
'count': 100,
},
'removeOnFail': {
'age': 2592000
},
}
),
self.queue_loop
)
job = future.result()

future = asyncio.run_coroutine_threadsafe(job.log(request_log), self.queue_loop)
future.result()

future = asyncio.run_coroutine_threadsafe(job.log(config_log), self.queue_loop)
future.result()

return job_id

def start_queue_in_thread(self):
def run_event_loop(loop):
# set the loop for the current thread
asyncio.set_event_loop(loop)
# run the event loop until stopped
loop.run_forever()

# create a new event loop (low-level api)
self.queue_loop = asyncio.new_event_loop()

# create a new thread to execute a target coroutine
thread = threading.Thread(target=run_event_loop, args=(self.queue_loop,), daemon=True)
# start the new thread
thread.start()

def start_worker_in_thread(self):
def run_event_loop(loop):
# set the loop for the current thread
asyncio.set_event_loop(loop)
# run the event loop until stopped
loop.run_forever()

# create a new event loop (low-level api)
loop = asyncio.new_event_loop()

# create a new thread to execute a target coroutine
thread = threading.Thread(target=run_event_loop, args=(loop,), daemon=True)
# start the new thread
thread.start()

asyncio.run_coroutine_threadsafe(self.worker.run(), loop)

async def close(self):
await self.worker.close()
await self.queue.close()

def get_expertise_all_status(self, user_id, query_params):
"""
Searches the server for all jobs submitted by a user
Expand Down
Loading

0 comments on commit ecc8a88

Please sign in to comment.