-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement priority mechanism in new scheduler #8139
base: master
Are you sure you want to change the base?
Changes from all commits
d736967
70f7121
94d22cd
f8890d3
4d6b1f5
27fa325
595ce5d
b3811d2
0c36c84
891c4b8
a80630f
f34ef11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
description: Implement priority mechanism on scheduler | ||
change-type: minor | ||
destination-branches: [master] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,7 @@ | |
from inmanta.deploy import work | ||
from inmanta.deploy.state import DeploymentResult, ModelState, ResourceDetails, ResourceState, ResourceStatus | ||
from inmanta.deploy.tasks import DryRun, RefreshFact, Task | ||
from inmanta.deploy.work import PrioritizedTask | ||
from inmanta.deploy.work import PrioritizedTask, TaskPriority | ||
from inmanta.protocol import Client | ||
from inmanta.resources import Id | ||
|
||
|
@@ -169,20 +169,20 @@ async def stop(self) -> None: | |
self._work.agent_queues.send_shutdown() | ||
await asyncio.gather(*self._workers.values()) | ||
|
||
async def deploy(self) -> None: | ||
async def deploy(self, priority: TaskPriority) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might make sense to add a default Wdyt? |
||
""" | ||
Trigger a deploy | ||
""" | ||
async with self._scheduler_lock: | ||
self._work.deploy_with_context(self._state.dirty, deploying=self._deploying) | ||
self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We'll have another one between our PRs but it shouldn't be too diffcult to resolve. |
||
|
||
async def repair(self) -> None: | ||
async def repair(self, priority: TaskPriority) -> None: | ||
""" | ||
Trigger a repair, i.e. mark all resources as dirty, then trigger a deploy. | ||
""" | ||
async with self._scheduler_lock: | ||
self._state.dirty.update(self._state.resources.keys()) | ||
self._work.deploy_with_context(self._state.dirty, deploying=self._deploying) | ||
self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale) | ||
|
||
async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None: | ||
resources = await self._build_resource_mappings_from_db(version) | ||
|
@@ -195,7 +195,7 @@ async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None: | |
resource_details=resource, | ||
dry_run_id=dry_run_id, | ||
), | ||
priority=10, | ||
priority=TaskPriority.DRYRUN, | ||
) | ||
) | ||
|
||
|
@@ -204,7 +204,7 @@ async def get_facts(self, resource: dict[str, object]) -> None: | |
self._work.agent_queues.queue_put_nowait( | ||
PrioritizedTask( | ||
task=RefreshFact(resource=rid), | ||
priority=10, | ||
priority=TaskPriority.FACT_REFRESH, | ||
) | ||
) | ||
|
||
|
@@ -305,6 +305,7 @@ async def _new_version( | |
# ensure deploy for ALL dirty resources, not just the new ones | ||
self._work.deploy_with_context( | ||
self._state.dirty, | ||
TaskPriority.NEW_VERSION_DEPLOY, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, I hadn't considered this to be a separate case. |
||
deploying=self._deploying, | ||
added_requires=added_requires, | ||
dropped_requires=dropped_requires, | ||
|
@@ -384,7 +385,11 @@ async def report_resource_state( | |
if details.attributes.get("send_event", False): | ||
provides: Set[ResourceIdStr] = self._state.requires.provides_view().get(resource, set()) | ||
if provides: | ||
self._work.deploy_with_context(provides, deploying=self._deploying) | ||
# Not sure about this priority. I believe this method is called when a resource has a new state | ||
# and hence, new version. | ||
Comment on lines
+388
to
+389
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reminder to drop this comment. As to the question: this method is called when a resource finishes deploy (or at least we only reach this branch of it in that case). The deploy that is triggered here is the propagation of events to its dependents. e.g. a service's resource deploys successfully -> we schedule the lsm state transfer resource for deploy. I wonder if we might want a separate priority level for event propagation. I would rank it pretty high (urgent) for fast event response, but perhaps given how eagerly we currently send them out (because we'll still default to Overall, perhaps we should simply schedule it with the same priority as the resource that just finished? We could pass in the information to this method, or we could just query the agent queues mapping. The first is more direct, but it feels a bit clunky, given how generic this method is, so I think I prefer the second. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, we don't even keep that information at the moment. I need to give this some more thought. Preliminary suggestion: make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or perhaps have the I'll get back to this. I have to consider what fits best. |
||
self._work.deploy_with_context( | ||
provides, priority=TaskPriority.NEW_VERSION_DEPLOY, deploying=self._deploying | ||
) | ||
|
||
def get_types_for_agent(self, agent: str) -> Collection[ResourceType]: | ||
return list(self._state.types_per_agent[agent]) |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ | |||||||
import itertools | ||||||||
from collections.abc import Iterator, Mapping, Set | ||||||||
from dataclasses import dataclass | ||||||||
from enum import IntEnum | ||||||||
from typing import Callable, Generic, Optional, TypeVar | ||||||||
|
||||||||
from inmanta.data.model import ResourceIdStr | ||||||||
|
@@ -34,6 +35,17 @@ | |||||||
T = TypeVar("T", bound=tasks.Task, covariant=True) | ||||||||
|
||||||||
|
||||||||
class TaskPriority(IntEnum): | ||||||||
TERMINATED = -1 | ||||||||
USER_DEPLOY = 0 | ||||||||
NEW_VERSION_DEPLOY = 1 | ||||||||
Comment on lines
+40
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm inclined to swap these two (0 and 1). @wouterdb WDYT? |
||||||||
USER_REPAIR = 2 | ||||||||
DRYRUN = 3 | ||||||||
INTERVAL_DEPLOY = 4 | ||||||||
FACT_REFRESH = 5 | ||||||||
INTERVAL_REPAIR = 6 | ||||||||
Comment on lines
+39
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a non-issue, but should we reserve some leeway by using these numbers * 10 in case we need to insert some other tasks in-between these ones in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is much of an issue, we can change the numbers how we like because in the code we just use the task's name. @sanderr what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, I care mostly about the relative order. I do think that a comment stating that these can be freely updated would be useful. If we ever start writing priorities to the database it's different, but afaik we don't plan to. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jptrindade could you add such a comment? |
||||||||
|
||||||||
|
||||||||
@dataclass(frozen=True, kw_only=True) | ||||||||
class PrioritizedTask(Generic[T]): | ||||||||
""" | ||||||||
|
@@ -45,7 +57,7 @@ class PrioritizedTask(Generic[T]): | |||||||
""" | ||||||||
|
||||||||
task: T | ||||||||
priority: int | ||||||||
priority: TaskPriority | ||||||||
|
||||||||
|
||||||||
@functools.total_ordering | ||||||||
|
@@ -135,7 +147,7 @@ def get_tasks_for_resource(self, resource: ResourceIdStr) -> set[tasks.Task]: | |||||||
""" | ||||||||
return set(self._tasks_by_resource.get(resource, {}).keys()) | ||||||||
|
||||||||
def remove(self, task: tasks.Task) -> int: | ||||||||
def remove(self, task: tasks.Task) -> TaskPriority: | ||||||||
""" | ||||||||
Removes the given task from its associated agent queue. Raises KeyError if it is not in the queue. | ||||||||
Returns the priority at which the deleted task was queued. | ||||||||
|
@@ -148,7 +160,7 @@ def remove(self, task: tasks.Task) -> int: | |||||||
del self._tasks_by_resource[task.resource] | ||||||||
return queue_item.task.priority | ||||||||
|
||||||||
def discard(self, task: tasks.Task) -> Optional[int]: | ||||||||
def discard(self, task: tasks.Task) -> Optional[TaskPriority]: | ||||||||
""" | ||||||||
Removes the given task from its associated agent queue if it is present. | ||||||||
Returns the priority at which the deleted task was queued, if it was at all. | ||||||||
|
@@ -198,7 +210,8 @@ def send_shutdown(self) -> None: | |||||||
""" | ||||||||
poison_pill = TaskQueueItem( | ||||||||
task=PrioritizedTask( | ||||||||
task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), priority=-1 | ||||||||
task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), | ||||||||
priority=TaskPriority.TERMINATED, | ||||||||
), | ||||||||
insert_order=0, | ||||||||
) | ||||||||
|
@@ -294,6 +307,7 @@ def reset(self) -> None: | |||||||
def deploy_with_context( | ||||||||
self, | ||||||||
resources: Set[ResourceIdStr], | ||||||||
priority: TaskPriority, | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm very much in favor of making this kw-only like the other args. |
||||||||
*, | ||||||||
# TODO: update docstring + consider in_progress_deploys for name? | ||||||||
deploying: Optional[Set[ResourceIdStr]] = None, | ||||||||
|
@@ -307,6 +321,7 @@ def deploy_with_context( | |||||||
|
||||||||
:param resources: Set of resources that should be deployed. Adds a deploy task to the scheduled work for each | ||||||||
of these, unless it is already scheduled. | ||||||||
:param priority: The priority of this deploy. | ||||||||
:param stale_deploys: Set of resources for which a stale deploy is in progress, i.e. a deploy for an outdated resource | ||||||||
intent. | ||||||||
:param added_requires: Requires edges that were added since the previous state update, if any. | ||||||||
|
@@ -377,11 +392,10 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) | |||||||
# discard rather than remove because task may already be running, in which case we leave it run its course | ||||||||
# and simply add a new one | ||||||||
task: tasks.Deploy = tasks.Deploy(resource=resource) | ||||||||
priority: Optional[int] = self.agent_queues.discard(task) | ||||||||
task_priority: Optional[TaskPriority] = self.agent_queues.discard(task) | ||||||||
queued.remove(resource) | ||||||||
self._waiting[resource] = BlockedDeploy( | ||||||||
# FIXME[#8015]: default priority | ||||||||
task=PrioritizedTask(task=task, priority=priority if priority is not None else 0), | ||||||||
task=PrioritizedTask(task=task, priority=task_priority if task_priority is not None else priority), | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we bump the priority here if it's higher? |
||||||||
# task was previously ready to execute => assume no other blockers than this one | ||||||||
blocked_on=new_blockers, | ||||||||
) | ||||||||
|
@@ -391,17 +405,23 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) | |||||||
|
||||||||
# ensure desired resource deploys are scheduled | ||||||||
for resource in resources: | ||||||||
prioritized_task = PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority) | ||||||||
if is_scheduled(resource): | ||||||||
# Deploy is already scheduled / running. No need to do anything here. If any of its dependencies are to be | ||||||||
# Deploy is already scheduled / running. Check to see if this task has a higher priority than the one already | ||||||||
# scheduled. If it has, update the priority. If any of its dependencies are to be | ||||||||
# scheduled as well, they will follow the provides relation to ensure this deploy waits its turn. | ||||||||
if prioritized_task.task in self.agent_queues: | ||||||||
self.agent_queues.queue_put_nowait(prioritized_task) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
if resource in self._waiting: | ||||||||
if self._waiting[resource].task.priority > priority: | ||||||||
self._waiting[resource].task = prioritized_task | ||||||||
continue | ||||||||
# task is not yet scheduled, schedule it now | ||||||||
blocked_on: set[ResourceIdStr] = { | ||||||||
dependency for dependency in self.requires.get(resource, ()) if is_scheduled(dependency) | ||||||||
} | ||||||||
self._waiting[resource] = BlockedDeploy( | ||||||||
# FIXME[#8015]: priority | ||||||||
task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=0), | ||||||||
task=prioritized_task, | ||||||||
blocked_on=blocked_on, | ||||||||
) | ||||||||
not_scheduled.discard(resource) | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a slight preference for
functools.partial
over defining custom methods for this. But if you prefer it like this I'm fine with it.