Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement priority mechanism in new scheduler #8139

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

jptrindade
Copy link
Contributor

@jptrindade jptrindade commented Sep 30, 2024

Description

Added priority mechanism to the new scheduler with priorities for every existing task.

closes #8015

Self Check:

Strike through any lines that are not applicable (~~line~~) then check the box

  • Attached issue to pull request
  • Changelog entry
  • Type annotations are present
  • Code is clear and sufficiently documented
  • No (preventable) type errors (check using make mypy or make mypy-diff)
  • Sufficient test cases (reproduces the bug/tests the requested feature)
  • Correct, in line with design
  • End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )
  • If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see test-fixes for more info)

async def test_scheduler_priority(agent: TestAgent, environment, make_resource_minimal):
"""
Ensure that the tasks are placed in the queue in the correct order
And that existing tasks in the queue are replaced if a task that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
And that existing tasks in the queue are replaced if a task that
and that existing tasks in the queue are replaced if a task that

@@ -377,11 +392,12 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr])
# discard rather than remove because task may already be running, in which case we leave it run its course
# and simply add a new one
task: tasks.Deploy = tasks.Deploy(resource=resource)
priority: Optional[int] = self.agent_queues.discard(task)
task_priority: Optional[int] = self.agent_queues.discard(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the typing as well ? e.g.:

Suggested change
task_priority: Optional[int] = self.agent_queues.discard(task)
task_priority: Optional[TaskPriority] = self.agent_queues.discard(task)

And similarly for the method's signature ?

Comment on lines +39 to +46
TERMINATED = -1
USER_DEPLOY = 0
NEW_VERSION_DEPLOY = 1
USER_REPAIR = 2
DRYRUN = 3
INTERVAL_DEPLOY = 4
FACT_REFRESH = 5
INTERVAL_REPAIR = 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a non-issue, but should we reserve some leeway by using these numbers * 10 in case we need to insert some other tasks in-between these ones in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is much of an issue, we can change the numbers how we like because in the code we just use the task's name. @sanderr what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I care mostly about the relative order. I do think that a comment stating that these can be freely updated would be useful.

If we ever start writing priorities to the database it's different, but afaik we don't plan to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jptrindade could you add such a comment?

Copy link
Contributor

@sanderr sanderr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have to go over the tests.

Comment on lines +136 to +140
async def interval_deploy() -> None:
await self.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY)

async def interval_repair() -> None:
await self.scheduler.repair(TaskPriority.INTERVAL_REPAIR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for functools.partial over defining custom methods for this. But if you prefer it like this I'm fine with it.

@@ -169,20 +169,20 @@ async def stop(self) -> None:
self._work.agent_queues.send_shutdown()
await asyncio.gather(*self._workers.values())

async def deploy(self) -> None:
async def deploy(self, priority: TaskPriority) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to add a default = TaskPriority.USER_DEPLOY and similar for the repair. We'll probably move the timers to the scheduler itself before we release, in which case outside callers should not be concerned with priorities unless they really want to override the default. If we do add the default I would make sure to update callers not to explicitly set the user-triggered priority levels.

Wdyt?

"""
Trigger a deploy
"""
async with self._scheduler_lock:
self._work.deploy_with_context(self._state.dirty, deploying=self._deploying)
self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deploying=self._deploying -> deploying=self._deploying_stale is incorrect I believe. Might this be a merge conflict resolved incorrectly?

We'll have another one between our PRs but it shouldn't be too diffcult to resolve.

@@ -305,6 +305,7 @@ async def _new_version(
# ensure deploy for ALL dirty resources, not just the new ones
self._work.deploy_with_context(
self._state.dirty,
TaskPriority.NEW_VERSION_DEPLOY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I hadn't considered this to be a separate case.

Comment on lines +388 to +389
# Not sure about this priority. I believe this method is called when a resource has a new state
# and hence, new version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to drop this comment.

As to the question: this method is called when a resource finishes deploy (or at least we only reach this branch of it in that case). The deploy that is triggered here is the propagation of events to its dependents. e.g. a service's resource deploys successfully -> we schedule the lsm state transfer resource for deploy.

I wonder if we might want a separate priority level for event propagation. I would rank it pretty high (urgent) for fast event response, but perhaps given how eagerly we currently send them out (because we'll still default to receive_events=True), that might not be the best idea at the moment.

Overall, perhaps we should simply schedule it with the same priority as the resource that just finished? We could pass in the information to this method, or we could just query the agent queues mapping. The first is more direct, but it feels a bit clunky, given how generic this method is, so I think I prefer the second.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we don't even keep that information at the moment. I need to give this some more thought. Preliminary suggestion: make AgentQueues._in_progress a mapping from Task to PrioritizedTask instead of the current set, then query that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps have the queue_get method return PrioritizedTask instead of Task, and pass on the task context to this method?

I'll get back to this. I have to consider what fits best.

Comment on lines +39 to +46
TERMINATED = -1
USER_DEPLOY = 0
NEW_VERSION_DEPLOY = 1
USER_REPAIR = 2
DRYRUN = 3
INTERVAL_DEPLOY = 4
FACT_REFRESH = 5
INTERVAL_REPAIR = 6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jptrindade could you add such a comment?

Comment on lines +40 to +41
USER_DEPLOY = 0
NEW_VERSION_DEPLOY = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to swap these two (0 and 1). @wouterdb WDYT?

@@ -294,6 +307,7 @@ def reset(self) -> None:
def deploy_with_context(
self,
resources: Set[ResourceIdStr],
priority: TaskPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very much in favor of making this kw-only like the other args.

queued.remove(resource)
self._waiting[resource] = BlockedDeploy(
# FIXME[#8015]: default priority
task=PrioritizedTask(task=task, priority=priority if priority is not None else 0),
task=PrioritizedTask(task=task, priority=task_priority if task_priority is not None else priority),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we bump the priority here if it's higher?

# scheduled as well, they will follow the provides relation to ensure this deploy waits its turn.
if prioritized_task.task in self.agent_queues:
self.agent_queues.queue_put_nowait(prioritized_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.agent_queues.queue_put_nowait(prioritized_task)
# simply add it again, the queue will make sure only the highest priority is kept
self.agent_queues.queue_put_nowait(prioritized_task)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

resource scheduler: priorities
3 participants