Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement priority mechanism in new scheduler #8139

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Implement priority mechanism on scheduler
change-type: minor
destination-branches: [master]

15 changes: 11 additions & 4 deletions src/inmanta/agent/agent_new.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from inmanta.const import AGENT_SCHEDULER_ID
from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr
from inmanta.deploy.scheduler import ResourceScheduler
from inmanta.deploy.work import TaskPriority
from inmanta.protocol import SessionEndpoint, methods, methods_v2
from inmanta.types import Apireturn
from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, Scheduler, TaskMethod, TaskSchedule, join_threadpools
Expand Down Expand Up @@ -132,8 +133,14 @@ def periodic_schedule(
return True
return False

periodic_schedule("deploy", self.scheduler.deploy, self._deploy_interval, self._deploy_splay_value)
periodic_schedule("repair", self.scheduler.repair, self._repair_interval, self._repair_splay_value)
async def interval_deploy() -> None:
await self.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY)

async def interval_repair() -> None:
await self.scheduler.repair(TaskPriority.INTERVAL_REPAIR)
Comment on lines +136 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for functools.partial over defining custom methods for this. But if you prefer it like this I'm fine with it.


periodic_schedule("deploy", interval_deploy, self._deploy_interval, self._deploy_splay_value)
periodic_schedule("repair", interval_repair, self._repair_interval, self._repair_splay_value)

def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None:
self._sched.add_action(action, schedule)
Expand Down Expand Up @@ -240,10 +247,10 @@ async def trigger_update(self, env: uuid.UUID, agent: str, incremental_deploy: b
assert agent == AGENT_SCHEDULER_ID
if incremental_deploy:
LOGGER.info("Agent %s got a trigger to run deploy in environment %s", agent, env)
await self.scheduler.deploy()
await self.scheduler.deploy(TaskPriority.USER_DEPLOY)
else:
LOGGER.info("Agent %s got a trigger to run repair in environment %s", agent, env)
await self.scheduler.repair()
await self.scheduler.repair(TaskPriority.USER_REPAIR)
return 200

@protocol.handle(methods.trigger_read_version, env="tid", agent="id")
Expand Down
21 changes: 13 additions & 8 deletions src/inmanta/deploy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from inmanta.deploy import work
from inmanta.deploy.state import DeploymentResult, ModelState, ResourceDetails, ResourceState, ResourceStatus
from inmanta.deploy.tasks import DryRun, RefreshFact, Task
from inmanta.deploy.work import PrioritizedTask
from inmanta.deploy.work import PrioritizedTask, TaskPriority
from inmanta.protocol import Client
from inmanta.resources import Id

Expand Down Expand Up @@ -169,20 +169,20 @@ async def stop(self) -> None:
self._work.agent_queues.send_shutdown()
await asyncio.gather(*self._workers.values())

async def deploy(self) -> None:
async def deploy(self, priority: TaskPriority) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to add a default = TaskPriority.USER_DEPLOY and similar for the repair. We'll probably move the timers to the scheduler itself before we release, in which case outside callers should not be concerned with priorities unless they really want to override the default. If we do add the default I would make sure to update callers not to explicitly set the user-triggered priority levels.

Wdyt?

"""
Trigger a deploy
"""
async with self._scheduler_lock:
self._work.deploy_with_context(self._state.dirty, deploying=self._deploying)
self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deploying=self._deploying -> deploying=self._deploying_stale is incorrect I believe. Might this be a merge conflict resolved incorrectly?

We'll have another one between our PRs but it shouldn't be too diffcult to resolve.


async def repair(self) -> None:
async def repair(self, priority: TaskPriority) -> None:
"""
Trigger a repair, i.e. mark all resources as dirty, then trigger a deploy.
"""
async with self._scheduler_lock:
self._state.dirty.update(self._state.resources.keys())
self._work.deploy_with_context(self._state.dirty, deploying=self._deploying)
self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale)

async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None:
resources = await self._build_resource_mappings_from_db(version)
Expand All @@ -195,7 +195,7 @@ async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None:
resource_details=resource,
dry_run_id=dry_run_id,
),
priority=10,
priority=TaskPriority.DRYRUN,
)
)

Expand All @@ -204,7 +204,7 @@ async def get_facts(self, resource: dict[str, object]) -> None:
self._work.agent_queues.queue_put_nowait(
PrioritizedTask(
task=RefreshFact(resource=rid),
priority=10,
priority=TaskPriority.FACT_REFRESH,
)
)

Expand Down Expand Up @@ -305,6 +305,7 @@ async def _new_version(
# ensure deploy for ALL dirty resources, not just the new ones
self._work.deploy_with_context(
self._state.dirty,
TaskPriority.NEW_VERSION_DEPLOY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I hadn't considered this to be a separate case.

deploying=self._deploying,
added_requires=added_requires,
dropped_requires=dropped_requires,
Expand Down Expand Up @@ -384,7 +385,11 @@ async def report_resource_state(
if details.attributes.get("send_event", False):
provides: Set[ResourceIdStr] = self._state.requires.provides_view().get(resource, set())
if provides:
self._work.deploy_with_context(provides, deploying=self._deploying)
# Not sure about this priority. I believe this method is called when a resource has a new state
# and hence, new version.
Comment on lines +388 to +389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to drop this comment.

As to the question: this method is called when a resource finishes deploy (or at least we only reach this branch of it in that case). The deploy that is triggered here is the propagation of events to its dependents. e.g. a service's resource deploys successfully -> we schedule the lsm state transfer resource for deploy.

I wonder if we might want a separate priority level for event propagation. I would rank it pretty high (urgent) for fast event response, but perhaps given how eagerly we currently send them out (because we'll still default to receive_events=True), that might not be the best idea at the moment.

Overall, perhaps we should simply schedule it with the same priority as the resource that just finished? We could pass in the information to this method, or we could just query the agent queues mapping. The first is more direct, but it feels a bit clunky, given how generic this method is, so I think I prefer the second.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we don't even keep that information at the moment. I need to give this some more thought. Preliminary suggestion: make AgentQueues._in_progress a mapping from Task to PrioritizedTask instead of the current set, then query that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps have the queue_get method return PrioritizedTask instead of Task, and pass on the task context to this method?

I'll get back to this. I have to consider what fits best.

self._work.deploy_with_context(
provides, priority=TaskPriority.NEW_VERSION_DEPLOY, deploying=self._deploying
)

def get_types_for_agent(self, agent: str) -> Collection[ResourceType]:
return list(self._state.types_per_agent[agent])
38 changes: 30 additions & 8 deletions src/inmanta/deploy/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import itertools
from collections.abc import Iterator, Mapping, Set
from dataclasses import dataclass
from enum import IntEnum
from typing import Callable, Generic, Optional, TypeVar

from inmanta.data.model import ResourceIdStr
Expand All @@ -34,6 +35,17 @@
T = TypeVar("T", bound=tasks.Task, covariant=True)


class TaskPriority(IntEnum):
TERMINATED = -1
USER_DEPLOY = 0
NEW_VERSION_DEPLOY = 1
Comment on lines +40 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to swap these two (0 and 1). @wouterdb WDYT?

USER_REPAIR = 2
DRYRUN = 3
INTERVAL_DEPLOY = 4
FACT_REFRESH = 5
INTERVAL_REPAIR = 6
Comment on lines +39 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a non-issue, but should we reserve some leeway by using these numbers * 10 in case we need to insert some other tasks in-between these ones in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is much of an issue, we can change the numbers how we like because in the code we just use the task's name. @sanderr what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I care mostly about the relative order. I do think that a comment stating that these can be freely updated would be useful.

If we ever start writing priorities to the database it's different, but afaik we don't plan to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jptrindade could you add such a comment?



@dataclass(frozen=True, kw_only=True)
class PrioritizedTask(Generic[T]):
"""
Expand All @@ -45,7 +57,7 @@ class PrioritizedTask(Generic[T]):
"""

task: T
priority: int
priority: TaskPriority


@functools.total_ordering
Expand Down Expand Up @@ -198,7 +210,8 @@ def send_shutdown(self) -> None:
"""
poison_pill = TaskQueueItem(
task=PrioritizedTask(
task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), priority=-1
task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")),
priority=TaskPriority.TERMINATED,
),
insert_order=0,
)
Expand Down Expand Up @@ -294,6 +307,7 @@ def reset(self) -> None:
def deploy_with_context(
self,
resources: Set[ResourceIdStr],
priority: TaskPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very much in favor of making this kw-only like the other args.

*,
# TODO: update docstring + consider in_progress_deploys for name?
deploying: Optional[Set[ResourceIdStr]] = None,
Expand All @@ -307,6 +321,7 @@ def deploy_with_context(

:param resources: Set of resources that should be deployed. Adds a deploy task to the scheduled work for each
of these, unless it is already scheduled.
:param priority: The priority of this deploy.
:param stale_deploys: Set of resources for which a stale deploy is in progress, i.e. a deploy for an outdated resource
intent.
:param added_requires: Requires edges that were added since the previous state update, if any.
Expand Down Expand Up @@ -377,11 +392,12 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr])
# discard rather than remove because task may already be running, in which case we leave it run its course
# and simply add a new one
task: tasks.Deploy = tasks.Deploy(resource=resource)
priority: Optional[int] = self.agent_queues.discard(task)
task_priority: Optional[int] = self.agent_queues.discard(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the typing as well ? e.g.:

Suggested change
task_priority: Optional[int] = self.agent_queues.discard(task)
task_priority: Optional[TaskPriority] = self.agent_queues.discard(task)

And similarly for the method's signature ?

queued.remove(resource)
self._waiting[resource] = BlockedDeploy(
# FIXME[#8015]: default priority
task=PrioritizedTask(task=task, priority=priority if priority is not None else 0),
task=PrioritizedTask(
task=task, priority=TaskPriority(task_priority) if task_priority is not None else priority
),
# task was previously ready to execute => assume no other blockers than this one
blocked_on=new_blockers,
)
Expand All @@ -391,17 +407,23 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr])

# ensure desired resource deploys are scheduled
for resource in resources:
prioritized_task = PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority)
if is_scheduled(resource):
# Deploy is already scheduled / running. No need to do anything here. If any of its dependencies are to be
# Deploy is already scheduled / running. Check to see if this task has a higher priority than the one already
# scheduled. If it has, update the priority. If any of its dependencies are to be
# scheduled as well, they will follow the provides relation to ensure this deploy waits its turn.
if prioritized_task.task in self.agent_queues:
self.agent_queues.queue_put_nowait(prioritized_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.agent_queues.queue_put_nowait(prioritized_task)
# simply add it again, the queue will make sure only the highest priority is kept
self.agent_queues.queue_put_nowait(prioritized_task)

if resource in self._waiting:
if self._waiting[resource].task.priority > priority:
self._waiting[resource].task = prioritized_task
continue
# task is not yet scheduled, schedule it now
blocked_on: set[ResourceIdStr] = {
dependency for dependency in self.requires.get(resource, ()) if is_scheduled(dependency)
}
self._waiting[resource] = BlockedDeploy(
# FIXME[#8015]: priority
task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=0),
task=prioritized_task,
blocked_on=blocked_on,
)
not_scheduled.discard(resource)
Expand Down
115 changes: 112 additions & 3 deletions tests/agent_server/deploy/test_scheduler_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
from inmanta.agent.executor import ResourceDetails, ResourceInstallSpec
from inmanta.config import Config
from inmanta.data import ResourceIdStr
from inmanta.deploy import state
from inmanta.deploy import state, tasks
from inmanta.deploy.work import TaskPriority
from inmanta.protocol.common import custom_json_encoder
from inmanta.util import retry_limited

Expand Down Expand Up @@ -167,7 +168,7 @@ async def build_resource_mappings_from_db(version: int | None) -> Mapping[Resour

@pytest.fixture
def environment() -> uuid.UUID:
return "83d604a0-691a-11ef-ae04-c8f750463317"
return uuid.UUID("83d604a0-691a-11ef-ae04-c8f750463317")


@pytest.fixture
Expand Down Expand Up @@ -396,7 +397,7 @@ async def test_get_facts(agent: TestAgent, make_resource_minimal):
ResourceIdStr(rid2): make_resource_minimal(rid2, {"value": "a"}, [rid1]),
}

await agent.scheduler._new_version(5, resources, make_requires(resources))
await agent.scheduler._new_version(1, resources, make_requires(resources))

await agent.scheduler.get_facts({"id": rid1})

Expand All @@ -409,3 +410,111 @@ async def done():
await retry_limited(done, 5)

assert agent.executor_manager.executors["agent1"].facts_count == 1


async def test_scheduler_priority(agent: TestAgent, environment, make_resource_minimal):
"""
Ensure that the tasks are placed in the queue in the correct order
And that existing tasks in the queue are replaced if a task that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
And that existing tasks in the queue are replaced if a task that
and that existing tasks in the queue are replaced if a task that

does the same thing with higher priority is added to the queue
"""

rid1 = ResourceIdStr("test::Resource[agent1,name=1]")
resources = {
rid1: make_resource_minimal(rid1, values={"value": "a"}, requires=[]),
}

executor1: ManagedExecutor = ManagedExecutor()
agent.executor_manager.register_managed_executor("agent1", executor1)

# We add two different tasks and assert that they are consumed in the correct order
# Add a new version deploy to the queue
await agent.scheduler._new_version(1, resources, make_requires(resources))
agent.scheduler.mock_versions[1] = resources

# And then a dryrun
dryrun = uuid.uuid4()
await agent.scheduler.dryrun(dryrun, 1)

# The tasks are consumed in the priority order
first_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(first_task, tasks.Deploy)
second_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(second_task, tasks.DryRun)

# The same is true if a task with lesser priority is added first
# Add a fact refresh task to the queue
await agent.scheduler.get_facts({"id": rid1})

# Then add an interval deploy task to the queue
agent.scheduler._state.dirty.add(rid1)
await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY)

# The tasks are consumed in the priority order
first_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(first_task, tasks.Deploy)
second_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(second_task, tasks.RefreshFact)
# Assert that all tasks were consumed
queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue
assert len(queue) == 0

# Add an interval deploy task to the queue
agent.scheduler._state.dirty.add(rid1)
await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY)

# Add a dryrun to the queue (which has more priority)
dryrun = uuid.uuid4()
await agent.scheduler.dryrun(dryrun, 1)

# Assert that we have both tasks in the queue
queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue
assert len(queue) == 2

# Add a user deploy
# It has more priority than interval deploy, so it will replace it in the queue
# It also has more priority than dryrun, so it will be consumed first

await agent.trigger_update(environment, "$__scheduler", incremental_deploy=True)

first_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(first_task, tasks.Deploy)
second_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(second_task, tasks.DryRun)

# Interval deploy is still in the queue but marked as deleted
queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue
assert len(queue) == 1
assert queue[0].deleted

# Force clean queue
agent.scheduler._work.agent_queues._get_queue("agent1")._queue = []

# If a task to deploy a resource is added to the queue,
# but a task to deploy that same resource is already present with higher priority,
# it will be ignored and not added to the queue

# Add a dryrun to the queue
dryrun = uuid.uuid4()
await agent.scheduler.dryrun(dryrun, 1)

# Add a user deploy
await agent.trigger_update(environment, "$__scheduler", incremental_deploy=True)

# Try to add an interval deploy task to the queue
agent.scheduler._state.dirty.add(rid1)
await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY)

# Assert that we still have only 2 tasks in the queue
queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue
assert len(queue) == 2

# The order is unaffected, the interval deploy was essentially ignored
first_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(first_task, tasks.Deploy)
second_task = await agent.scheduler._work.agent_queues.queue_get("agent1")
assert isinstance(second_task, tasks.DryRun)

# All tasks were consumed
queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue
assert len(queue) == 0