Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority queue functionality to HTEX #3575

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

matthewc2003
Copy link
Contributor

Description

Adds second queue for pending task queue in the interchange. Used to prioritize certain user-labeled tasks when distributing tasks to managers.

Changed Behaviour

Users can now use the parsl_resource_specification keyword in python apps to specify a 'priority'. Lower value is higher priority, tasks with higher priority will be sent out first.

Users can also specify a 'queue_threshold' in the HTEX config to decide the cutoff value for tasks to go to the priority queue or the general FIFO queue.

Fixes

Part of #3323 work

Type of change

Choose which options apply, and delete the ones which do not apply.

  • New feature

if self.enable_mpi_mode:
validate_resource_spec(resource_specification, self.enable_mpi_mode)
else:
if resource_specification and isinstance(resource_specification, dict):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this all belongs in validate_resource_spec()

try implementing #3519 as a separate PR first to get the structure in place in that function for "mpi vs non-mpi" resource validation (with, I guess, no resource spec allowed for non-mpi mode?) and you can get a parsl issue fixed as a nice side effect

@@ -131,7 +133,8 @@ def __init__(self,
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port

self.pending_task_queue: queue.Queue[Any] = queue.Queue(maxsize=10 ** 6)
self.priority_pending_task_queue: SortedList[Any] = SortedList(key=lambda msg: -msg['resource_spec']['priority'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my gut says to avoid two queues and have a single queue. tasks without any priority effectively receive an infinitely-weak priority in your implementation, I think, and you can implement that same behaviour with a single priority queue?

resource_spec = msg.get('resource_spec', {})
if 'priority' in resource_spec:
priority = resource_spec['priority']
if priority < self.queue_threshold:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unclear what semantics you're trying to express with queue_threshold -- I'm going to give you a low priority but under a particular priority, parsl should round it to (effectively) +infinity?

@@ -253,7 +276,7 @@ def _command_server(self) -> NoReturn:
command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "OUTSTANDING_C":
outstanding = self.pending_task_queue.qsize()
outstanding = self.priority_pending_task_queue.__len__() + self.general_pending_task_queue.qsize()
Copy link
Collaborator

@benclifford benclifford Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use len(priority_pending_task_queue) rather than invoking __len__

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants