From d73696799b5044250460700905c4b0f1ed959f46 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Tue, 24 Sep 2024 16:00:22 +0100 Subject: [PATCH 01/10] Introduce repair and redeploy timers to the new agent --- .../unreleased/introduce-timers-to-agent.yml | 3 + src/inmanta/agent/agent_new.py | 77 ++++++++- .../deploy/e2e/test_autostarted.py | 162 +++++++++++++++++- .../deploy/scheduler_test_util.py | 3 + 4 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 changelogs/unreleased/introduce-timers-to-agent.yml diff --git a/changelogs/unreleased/introduce-timers-to-agent.yml b/changelogs/unreleased/introduce-timers-to-agent.yml new file mode 100644 index 0000000000..8686a7dac3 --- /dev/null +++ b/changelogs/unreleased/introduce-timers-to-agent.yml @@ -0,0 +1,3 @@ +description: Introduce repair and redeploy timers to the new agent +change-type: minor +destination-branches: [master] diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 74f97cff7f..cb49ce61b4 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -16,11 +16,13 @@ Contact: code@inmanta.com """ +import datetime import logging import os +import random import uuid from concurrent.futures.thread import ThreadPoolExecutor -from typing import Any, Optional +from typing import Any, Callable, Coroutine, Optional, Union from inmanta import config, const, protocol from inmanta.agent import config as cfg @@ -31,7 +33,7 @@ from inmanta.deploy.scheduler import ResourceScheduler from inmanta.protocol import SessionEndpoint, methods, methods_v2 from inmanta.types import Apireturn -from inmanta.util import join_threadpools +from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, Scheduler, TaskMethod, TaskSchedule, join_threadpools LOGGER = logging.getLogger(__name__) @@ -67,6 +69,75 @@ def __init__( self.scheduler = ResourceScheduler(self._env_id, self.executor_manager, self._client) self.working = False + self._sched = Scheduler("new agent endpoint") + self._time_triggered_actions: set[ScheduledTask] = set() + + self._set_deploy_and_repair_intervals() + + def _set_deploy_and_repair_intervals(self): + # do regular deploys + self._deploy_interval = cfg.agent_deploy_interval.get() + deploy_splay_time = cfg.agent_deploy_splay_time.get() + self._deploy_splay_value = random.randint(0, deploy_splay_time) + + # do regular repair runs + self._repair_interval: Union[int, str] = cfg.agent_repair_interval.get() + repair_splay_time = cfg.agent_repair_splay_time.get() + self._repair_splay_value = random.randint(0, repair_splay_time) + + def _enable_time_triggers(self) -> None: + + def periodic_schedule( + kind: str, + action: Callable[[], Coroutine[object, None, object]], + interval: Union[int, str], + splay_value: int, + ) -> bool: + """ + Schedule a periodic task + + :param kind: Name of the task (value to display in logs) + :param action: The action to schedule periodically + :param interval: The interval at which to schedule the task. Can be specified as either a number of + seconds, or a cron string. + :param splay_value: When specifying the interval as a number of seconds, this parameter specifies + the number of seconds by which to delay the initial execution of this action. + """ + now = datetime.datetime.now().astimezone() + + if isinstance(interval, int) and interval > 0: + LOGGER.info( + "Scheduling periodic %s with interval %d and splay %d (first run at %s)", + kind, + interval, + splay_value, + (now + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT), + ) + interval_schedule: IntervalSchedule = IntervalSchedule( + interval=float(interval), initial_delay=float(splay_value) + ) + self._enable_time_trigger(action, interval_schedule) + return True + + if isinstance(interval, str): + LOGGER.info("Scheduling periodic %s with cron expression '%s'", kind, interval) + cron_schedule = CronSchedule(cron=interval) + self._enable_time_trigger(action, cron_schedule) + return True + return False + + periodic_schedule("deploy", self.scheduler.deploy, self._deploy_interval, self._deploy_splay_value) + periodic_schedule("repair", self.scheduler.repair, self._repair_interval, self._repair_splay_value) + + def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None: + self._sched.add_action(action, schedule) + self._time_triggered_actions.add(ScheduledTask(action=action, schedule=schedule)) + + def _disable_time_triggers(self) -> None: + for task in self._time_triggered_actions: + self._sched.remove(task) + self._time_triggered_actions.clear() + def create_executor_manager(self) -> executor.ExecutorManager[executor.Executor]: assert self._env_id is not None return forking_executor.MPManager( @@ -102,12 +173,14 @@ async def start_working(self) -> None: self.working = True await self.executor_manager.start() await self.scheduler.start() + self._enable_time_triggers() async def stop_working(self) -> None: """Stop working, connection lost""" if not self.working: return self.working = False + self._disable_time_triggers() await self.executor_manager.stop() await self.scheduler.stop() diff --git a/tests/agent_server/deploy/e2e/test_autostarted.py b/tests/agent_server/deploy/e2e/test_autostarted.py index 7705d9e8d6..2b759b268e 100644 --- a/tests/agent_server/deploy/e2e/test_autostarted.py +++ b/tests/agent_server/deploy/e2e/test_autostarted.py @@ -18,12 +18,16 @@ import asyncio import logging +import time import uuid +from uuid import UUID import pytest from inmanta import const, data -from utils import _wait_until_deployment_finishes +from inmanta.config import Config +from inmanta.util import get_compiler_version +from utils import _wait_until_deployment_finishes, resource_action_consistency_check logger = logging.getLogger("inmanta.test.server_agent") @@ -84,3 +88,159 @@ async def test_auto_deploy_no_splay(server, client, clienthelper, resource_conta assert len(result.result["agents"]) == 1 assert result.result["agents"][0]["name"] == const.AGENT_SCHEDULER_ID + + +@pytest.mark.parametrize( + "agent_deploy_interval", + ["2", "*/2 * * * * * *"], +) +async def test_spontaneous_deploy( + server, + client, + agent, + resource_container, + environment, + clienthelper, + caplog, + agent_deploy_interval, +): + """ + Test that a deploy run is executed every 2 seconds in the new agent + as specified in the agent_repair_interval (using a cron or not) + """ + with caplog.at_level(logging.DEBUG): + resource_container.Provider.reset() + + env_id = UUID(environment) + + Config.set("config", "agent-deploy-interval", agent_deploy_interval) + Config.set("config", "agent-deploy-splay-time", "2") + Config.set("config", "agent-repair-interval", "0") + + # This is just so we can reuse the agent from the fixtures with the new config options + agent._set_deploy_and_repair_intervals() + agent._enable_time_triggers() + + resource_container.Provider.set_fail("agent1", "key1", 1) + + version = await clienthelper.get_version() + + resources = [ + { + "key": "key1", + "value": "value1", + "id": "test::Resource[agent1,key=key1],v=%d" % version, + "purged": False, + "send_event": False, + "requires": [], + } + ] + + await clienthelper.put_version_simple(resources, version) + + # do a deploy + start = time.time() + + result = await client.release_version(env_id, version, False) + assert result.code == 200 + + assert not result.result["model"]["deployed"] + assert result.result["model"]["released"] + assert result.result["model"]["total"] == 1 + assert result.result["model"]["result"] == "deploying" + + result = await client.get_version(env_id, version) + assert result.code == 200 + + await clienthelper.wait_for_deployed() + + await clienthelper.wait_full_success(env_id) + + duration = time.time() - start + + result = await client.get_version(env_id, version) + assert result.result["model"]["done"] == 1 + + assert resource_container.Provider.isset("agent1", "key1") + + # approximate check, the number of heartbeats can vary, but not by a factor of 10 + beats = [message for logger_name, log_level, message in caplog.record_tuples if "Received heartbeat from" in message] + assert ( + len(beats) < duration * 10 + ), f"Sent {len(beats)} heartbeats over a time period of {duration} seconds, sleep mechanism is broken" + + +@pytest.mark.parametrize( + "agent_repair_interval", + [ + "2", + "*/2 * * * * * *", + ], +) +async def test_spontaneous_repair(server, client, agent, resource_container, environment, clienthelper, agent_repair_interval): + """ + Test that a repair run is executed every 2 seconds in the new agent + as specified in the agent_repair_interval (using a cron or not) + """ + resource_container.Provider.reset() + env_id = environment + + Config.set("config", "agent-repair-interval", agent_repair_interval) + Config.set("config", "agent-repair-splay-time", "2") + Config.set("config", "agent-deploy-interval", "0") + + # This is just so we can reuse the agent from the fixtures with the new config options + agent._set_deploy_and_repair_intervals() + agent._enable_time_triggers() + version = await clienthelper.get_version() + + resources = [ + { + "key": "key1", + "value": "value1", + "id": "test::Resource[agent1,key=key1],v=%d" % version, + "purged": False, + "send_event": False, + "requires": [], + }, + ] + + result = await client.put_version( + tid=env_id, version=version, resources=resources, unknowns=[], version_info={}, compiler_version=get_compiler_version() + ) + assert result.code == 200 + + # do a deploy + result = await client.release_version(env_id, version, True, const.AgentTriggerMethod.push_full_deploy) + assert result.code == 200 + assert not result.result["model"]["deployed"] + assert result.result["model"]["released"] + assert result.result["model"]["total"] == 1 + assert result.result["model"]["result"] == "deploying" + + result = await client.get_version(env_id, version) + assert result.code == 200 + + await clienthelper.wait_full_success(env_id) + + async def verify_deployment_result(): + result = await client.get_version(env_id, version) + # A repair run may put one resource from the deployed state to the deploying state. + assert len(resources) - 1 <= result.result["model"]["done"] <= len(resources) + + assert resource_container.Provider.isset("agent1", "key1") + assert resource_container.Provider.get("agent1", "key1") == "value1" + + await verify_deployment_result() + + # Manual change + resource_container.Provider.set("agent1", "key1", "another_value") + # Wait until repair restores the state + now = time.time() + while resource_container.Provider.get("agent1", "key1") != "value1": + if time.time() > now + 10: + raise Exception("Timeout occurred while waiting for repair run") + await asyncio.sleep(0.1) + + await verify_deployment_result() + await resource_action_consistency_check() diff --git a/tests/agent_server/deploy/scheduler_test_util.py b/tests/agent_server/deploy/scheduler_test_util.py index 136a539bab..3feadd8748 100644 --- a/tests/agent_server/deploy/scheduler_test_util.py +++ b/tests/agent_server/deploy/scheduler_test_util.py @@ -129,3 +129,6 @@ class ClientHelper(utils.ClientHelper): async def wait_for_deployed(self, version: int = -1) -> None: await _wait_until_deployment_finishes(self.client, self.environment) + + async def wait_full_success(self, environment: str) -> None: + await wait_full_success(self.client, environment) From 70f71211a24494d6adb17368c49ab5fb22f03c2d Mon Sep 17 00:00:00 2001 From: jptrindade Date: Tue, 24 Sep 2024 16:03:24 +0100 Subject: [PATCH 02/10] mypy and some docstrings --- src/inmanta/agent/agent_new.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index cb49ce61b4..f3a0895ae9 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -74,7 +74,10 @@ def __init__( self._set_deploy_and_repair_intervals() - def _set_deploy_and_repair_intervals(self): + def _set_deploy_and_repair_intervals(self) -> None: + """ + Fetch the settings related to automatic deploys and repairs from the config + """ # do regular deploys self._deploy_interval = cfg.agent_deploy_interval.get() deploy_splay_time = cfg.agent_deploy_splay_time.get() From 94d22cd29c6e8e5e292bab6710182bb7fe32dc41 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Thu, 26 Sep 2024 14:44:05 +0100 Subject: [PATCH 03/10] initial commit --- ...lement-priority-mechanism-on-scheduler.yml | 4 +++ src/inmanta/agent/agent_new.py | 15 ++++++++--- src/inmanta/deploy/scheduler.py | 15 ++++++----- src/inmanta/deploy/work.py | 27 ++++++++++++++----- 4 files changed, 43 insertions(+), 18 deletions(-) create mode 100644 changelogs/unreleased/implement-priority-mechanism-on-scheduler.yml diff --git a/changelogs/unreleased/implement-priority-mechanism-on-scheduler.yml b/changelogs/unreleased/implement-priority-mechanism-on-scheduler.yml new file mode 100644 index 0000000000..91996e8dc4 --- /dev/null +++ b/changelogs/unreleased/implement-priority-mechanism-on-scheduler.yml @@ -0,0 +1,4 @@ +description: Implement priority mechanism on scheduler +change-type: minor +destination-branches: [master] + diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index f3a0895ae9..23074f4d4c 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -31,6 +31,7 @@ from inmanta.const import AGENT_SCHEDULER_ID from inmanta.data.model import AttributeStateChange, ResourceVersionIdStr from inmanta.deploy.scheduler import ResourceScheduler +from inmanta.deploy.work import TaskPriority from inmanta.protocol import SessionEndpoint, methods, methods_v2 from inmanta.types import Apireturn from inmanta.util import CronSchedule, IntervalSchedule, ScheduledTask, Scheduler, TaskMethod, TaskSchedule, join_threadpools @@ -129,8 +130,14 @@ def periodic_schedule( return True return False - periodic_schedule("deploy", self.scheduler.deploy, self._deploy_interval, self._deploy_splay_value) - periodic_schedule("repair", self.scheduler.repair, self._repair_interval, self._repair_splay_value) + async def interval_deploy() -> None: + await self.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + + async def interval_repair() -> None: + await self.scheduler.repair(TaskPriority.INTERVAL_REPAIR) + + periodic_schedule("deploy", interval_deploy, self._deploy_interval, self._deploy_splay_value) + periodic_schedule("repair", interval_repair, self._repair_interval, self._repair_splay_value) def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None: self._sched.add_action(action, schedule) @@ -236,9 +243,9 @@ async def trigger_update(self, env: uuid.UUID, agent: str, incremental_deploy: b assert env == self.environment assert agent == AGENT_SCHEDULER_ID if incremental_deploy: - await self.scheduler.deploy() + await self.scheduler.deploy(TaskPriority.USER_DEPLOY) else: - await self.scheduler.repair() + await self.scheduler.repair(TaskPriority.USER_REPAIR) return 200 @protocol.handle(methods.trigger_read_version, env="tid", agent="id") diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index e106872c76..c260a2d1f5 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -32,7 +32,7 @@ from inmanta.deploy import work from inmanta.deploy.state import DeploymentResult, ModelState, ResourceDetails, ResourceState, ResourceStatus from inmanta.deploy.tasks import DryRun, RefreshFact, Task -from inmanta.deploy.work import PrioritizedTask +from inmanta.deploy.work import PrioritizedTask, TaskPriority from inmanta.protocol import Client from inmanta.resources import Id @@ -168,20 +168,20 @@ async def stop(self) -> None: self._work.agent_queues.send_shutdown() await asyncio.gather(*self._workers.values()) - async def deploy(self) -> None: + async def deploy(self, priority: TaskPriority) -> None: """ Trigger a deploy """ async with self._scheduler_lock: - self._work.deploy_with_context(self._state.dirty, stale_deploys=self._deploying_stale) + self._work.deploy_with_context(self._state.dirty, priority, stale_deploys=self._deploying_stale) - async def repair(self) -> None: + async def repair(self, priority: TaskPriority) -> None: """ Trigger a repair, i.e. mark all resources as dirty, then trigger a deploy. """ async with self._scheduler_lock: self._state.dirty.update(self._state.resources.keys()) - self._work.deploy_with_context(self._state.dirty, stale_deploys=self._deploying_stale) + self._work.deploy_with_context(self._state.dirty, priority, stale_deploys=self._deploying_stale) async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None: resources = await self._build_resource_mappings_from_db(version) @@ -194,7 +194,7 @@ async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> None: resource_details=resource, dry_run_id=dry_run_id, ), - priority=10, + priority=TaskPriority.DRYRUN, ) ) @@ -203,7 +203,7 @@ async def get_facts(self, resource: dict[str, object]) -> None: self._work.agent_queues.queue_put_nowait( PrioritizedTask( task=RefreshFact(resource=rid), - priority=10, + priority=TaskPriority.FACT_REFRESH, ) ) @@ -313,6 +313,7 @@ async def _new_version( # ensure deploy for ALL dirty resources, not just the new ones self._work.deploy_with_context( self._state.dirty, + TaskPriority.NEW_VERSION_DEPLOY, stale_deploys=self._deploying_stale, added_requires=added_requires, dropped_requires=dropped_requires, diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 58f5d578a7..1cd64ac21b 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -21,6 +21,7 @@ import itertools from collections.abc import Iterator, Mapping, Set from dataclasses import dataclass +from enum import IntEnum from typing import Callable, Generic, Optional, TypeVar from inmanta.data.model import ResourceIdStr @@ -34,6 +35,17 @@ T = TypeVar("T", bound=tasks.Task, covariant=True) +class TaskPriority(IntEnum): + TERMINATED = -1 + USER_DEPLOY = 0 + NEW_VERSION_DEPLOY = 1 + USER_REPAIR = 2 + DRYRUN = 3 + INTERVAL_DEPLOY = 4 + FACT_REFRESH = 5 + INTERVAL_REPAIR = 6 + + @dataclass(frozen=True, kw_only=True) class PrioritizedTask(Generic[T]): """ @@ -45,7 +57,7 @@ class PrioritizedTask(Generic[T]): """ task: T - priority: int + priority: TaskPriority @functools.total_ordering @@ -198,7 +210,8 @@ def send_shutdown(self) -> None: """ poison_pill = TaskQueueItem( task=PrioritizedTask( - task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), priority=-1 + task=tasks.PoisonPill(resource=ResourceIdStr("system::Terminate[all,stop=True]")), + priority=TaskPriority.TERMINATED, ), insert_order=0, ) @@ -294,6 +307,7 @@ def reset(self) -> None: def deploy_with_context( self, resources: Set[ResourceIdStr], + priority: TaskPriority, *, stale_deploys: Optional[Set[ResourceIdStr]] = None, added_requires: Optional[Mapping[ResourceIdStr, Set[ResourceIdStr]]] = None, @@ -306,6 +320,7 @@ def deploy_with_context( :param resources: Set of resources that should be deployed. Adds a deploy task to the scheduled work for each of these, unless it is already scheduled. + :param priority: The priority of this deploy. :param stale_deploys: Set of resources for which a stale deploy is in progress, i.e. a deploy for an outdated resource intent. :param added_requires: Requires edges that were added since the previous state update, if any. @@ -378,11 +393,10 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) # discard rather than remove because task may already be running, in which case we leave it run its course # and simply add a new one task: tasks.Deploy = tasks.Deploy(resource=resource) - priority: Optional[int] = self.agent_queues.discard(task) + task_priority: Optional[TaskPriority] = self.agent_queues.discard(task) queued.remove(resource) self._waiting[resource] = BlockedDeploy( - # FIXME[#8015]: default priority - task=PrioritizedTask(task=task, priority=priority if priority is not None else 0), + task=PrioritizedTask(task=task, priority=task_priority if task_priority is not None else priority), # task was previously ready to execute => assume no other blockers than this one blocked_on=new_blockers, ) @@ -401,8 +415,7 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) dependency for dependency in self.requires.get(resource, ()) if is_scheduled(dependency) } self._waiting[resource] = BlockedDeploy( - # FIXME[#8015]: priority - task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=0), + task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority), blocked_on=blocked_on, ) not_scheduled.discard(resource) From f8890d33877fb7fdcc9c97b69584b72e2f03c547 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Fri, 27 Sep 2024 10:38:14 +0100 Subject: [PATCH 04/10] [WIP] --- src/inmanta/deploy/work.py | 9 ++++- .../deploy/test_scheduler_agent.py | 38 ++++++++++++++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 1cd64ac21b..90930bef85 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -407,8 +407,15 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) # ensure desired resource deploys are scheduled for resource in resources: if is_scheduled(resource): - # Deploy is already scheduled / running. No need to do anything here. If any of its dependencies are to be + # Deploy is already scheduled / running. Check to see if this task has a higher priority than the one already + # scheduled. If it has, update the priority. If any of its dependencies are to be # scheduled as well, they will follow the provides relation to ensure this deploy waits its turn. + task = PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority) + if tasks.Deploy(resource=resource) in self.agent_queues: + self.agent_queues.queue_put_nowait(task) + if resource in self._waiting: + if self._waiting[resource].task.priority > priority: + self._waiting[resource].task = task continue # task is not yet scheduled, schedule it now blocked_on: set[ResourceIdStr] = { diff --git a/tests/agent_server/deploy/test_scheduler_agent.py b/tests/agent_server/deploy/test_scheduler_agent.py index ec6280778f..eac0c8ba18 100644 --- a/tests/agent_server/deploy/test_scheduler_agent.py +++ b/tests/agent_server/deploy/test_scheduler_agent.py @@ -36,6 +36,7 @@ from inmanta.config import Config from inmanta.data import ResourceIdStr from inmanta.deploy import state +from inmanta.deploy.work import TaskPriority from inmanta.protocol.common import custom_json_encoder from inmanta.util import retry_limited @@ -120,7 +121,7 @@ async def build_resource_mappings_from_db(version: int | None) -> Mapping[Resour @pytest.fixture def environment() -> uuid.UUID: - return "83d604a0-691a-11ef-ae04-c8f750463317" + return uuid.UUID("83d604a0-691a-11ef-ae04-c8f750463317") @pytest.fixture @@ -272,3 +273,38 @@ async def done(): await retry_limited(done, 5) assert agent.executor_manager.executors["agent1"].facts_count == 1 + + +async def test_scheduler_priority(agent: TestAgent, environment, make_resource_minimal): + """ + Ensure that the tasks are placed in the queue in the correct order + """ + + await agent.stop() + + rid1 = "test::Resource[agent1,name=1]" + rid2 = "test::Resource[agent1,name=2]" + resources = { + ResourceIdStr(rid1): make_resource_minimal(rid1, values={"value": "a"}, requires=[], version=5), + } + + agent.scheduler.mock_versions[5] = resources + + await agent.scheduler.get_facts({"id": rid1}) + + await agent.scheduler._new_version(5, resources, make_requires(resources)) + + await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + + dryrun = uuid.uuid4() + await agent.scheduler.dryrun(dryrun, 5) + + await agent.trigger_update(environment, "$__scheduler", incremental_deploy=False) + await agent.trigger_update(environment, "$__scheduler", incremental_deploy=True) + + await agent.scheduler.repair(TaskPriority.INTERVAL_REPAIR) + + agent_1_queue = agent.scheduler._work.agent_queues.sorted("agent1") + assert len(agent_1_queue) == 7 + + await agent.start_working() From 27fa3251d4a03694fc687057f49788a2d9572082 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Mon, 30 Sep 2024 15:11:44 +0100 Subject: [PATCH 05/10] merge master --- src/inmanta/deploy/scheduler.py | 1 - tests/agent_server/deploy/e2e/test_autostarted.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index ab0cf0c844..a332034f00 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -176,7 +176,6 @@ async def deploy(self, priority: TaskPriority) -> None: async with self._scheduler_lock: self._work.deploy_with_context(self._state.dirty, priority, deploying=self._deploying_stale) - async def repair(self, priority: TaskPriority) -> None: """ Trigger a repair, i.e. mark all resources as dirty, then trigger a deploy. diff --git a/tests/agent_server/deploy/e2e/test_autostarted.py b/tests/agent_server/deploy/e2e/test_autostarted.py index 5a9b93482f..58ee11df69 100644 --- a/tests/agent_server/deploy/e2e/test_autostarted.py +++ b/tests/agent_server/deploy/e2e/test_autostarted.py @@ -27,9 +27,6 @@ from inmanta import const, data from inmanta.config import Config from inmanta.util import get_compiler_version -from utils import _wait_until_deployment_finishes, resource_action_consistency_check -from inmanta.config import Config -from inmanta.util import get_compiler_version from utils import _wait_until_deployment_finishes, resource_action_consistency_check, retry_limited logger = logging.getLogger("inmanta.test.server_agent") From 595ce5d713ba645d65372b2945869449a3b03eef Mon Sep 17 00:00:00 2001 From: jptrindade Date: Tue, 1 Oct 2024 10:52:04 +0100 Subject: [PATCH 06/10] added comment --- src/inmanta/deploy/scheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index a332034f00..3c43cdb6a7 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -385,7 +385,9 @@ async def report_resource_state( if details.attributes.get("send_event", False): provides: Set[ResourceIdStr] = self._state.requires.provides_view().get(resource, set()) if provides: - self._work.deploy_with_context(provides, deploying=self._deploying) + # Not sure about this priority. I believe this method is called when a resource has a new state + # and hence, new version. + self._work.deploy_with_context(provides, priority=TaskPriority.NEW_VERSION_DEPLOY, deploying=self._deploying) def get_types_for_agent(self, agent: str) -> Collection[ResourceType]: return list(self._state.types_per_agent[agent]) From 0c36c840b18ee478755f68732380a53f65260fda Mon Sep 17 00:00:00 2001 From: jptrindade Date: Wed, 2 Oct 2024 17:05:39 +0100 Subject: [PATCH 07/10] fixed test case --- src/inmanta/agent/agent_new.py | 10 +- src/inmanta/deploy/scheduler.py | 4 +- src/inmanta/deploy/work.py | 6 +- .../deploy/test_scheduler_agent.py | 103 +++++++++++++++--- 4 files changed, 103 insertions(+), 20 deletions(-) diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 319c496b78..83ec65fe4e 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -133,8 +133,14 @@ def periodic_schedule( return True return False - periodic_schedule("deploy", self.scheduler.deploy, self._deploy_interval, self._deploy_splay_value) - periodic_schedule("repair", self.scheduler.repair, self._repair_interval, self._repair_splay_value) + async def interval_deploy() -> None: + await self.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + + async def interval_repair() -> None: + await self.scheduler.deploy(TaskPriority.INTERVAL_REPAIR) + + periodic_schedule("deploy", interval_deploy, self._deploy_interval, self._deploy_splay_value) + periodic_schedule("repair", interval_repair, self._repair_interval, self._repair_splay_value) def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None: self._sched.add_action(action, schedule) diff --git a/src/inmanta/deploy/scheduler.py b/src/inmanta/deploy/scheduler.py index 3c43cdb6a7..6c8e485c01 100644 --- a/src/inmanta/deploy/scheduler.py +++ b/src/inmanta/deploy/scheduler.py @@ -387,7 +387,9 @@ async def report_resource_state( if provides: # Not sure about this priority. I believe this method is called when a resource has a new state # and hence, new version. - self._work.deploy_with_context(provides, priority=TaskPriority.NEW_VERSION_DEPLOY, deploying=self._deploying) + self._work.deploy_with_context( + provides, priority=TaskPriority.NEW_VERSION_DEPLOY, deploying=self._deploying + ) def get_types_for_agent(self, agent: str) -> Collection[ResourceType]: return list(self._state.types_per_agent[agent]) diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 77e25cc08f..3e9a0810b3 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -392,10 +392,12 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) # discard rather than remove because task may already be running, in which case we leave it run its course # and simply add a new one task: tasks.Deploy = tasks.Deploy(resource=resource) - task_priority: Optional[TaskPriority] = self.agent_queues.discard(task) + task_priority: Optional[int] = self.agent_queues.discard(task) queued.remove(resource) self._waiting[resource] = BlockedDeploy( - task=PrioritizedTask(task=task, priority=task_priority if task_priority is not None else priority), + task=PrioritizedTask( + task=task, priority=TaskPriority(task_priority) if task_priority is not None else priority + ), # task was previously ready to execute => assume no other blockers than this one blocked_on=new_blockers, ) diff --git a/tests/agent_server/deploy/test_scheduler_agent.py b/tests/agent_server/deploy/test_scheduler_agent.py index 5c1cce998e..95532af44b 100644 --- a/tests/agent_server/deploy/test_scheduler_agent.py +++ b/tests/agent_server/deploy/test_scheduler_agent.py @@ -36,7 +36,7 @@ from inmanta.agent.executor import ResourceDetails, ResourceInstallSpec from inmanta.config import Config from inmanta.data import ResourceIdStr -from inmanta.deploy import state +from inmanta.deploy import state, tasks from inmanta.deploy.work import TaskPriority from inmanta.protocol.common import custom_json_encoder from inmanta.util import retry_limited @@ -397,7 +397,7 @@ async def test_get_facts(agent: TestAgent, make_resource_minimal): ResourceIdStr(rid2): make_resource_minimal(rid2, {"value": "a"}, [rid1]), } - await agent.scheduler._new_version(5, resources, make_requires(resources)) + await agent.scheduler._new_version(1, resources, make_requires(resources)) await agent.scheduler.get_facts({"id": rid1}) @@ -415,33 +415,106 @@ async def done(): async def test_scheduler_priority(agent: TestAgent, environment, make_resource_minimal): """ Ensure that the tasks are placed in the queue in the correct order + And that existing tasks in the queue are replaced if a task that + does the same thing with higher priority is added to the queue """ - await agent.stop() - - rid1 = "test::Resource[agent1,name=1]" - rid2 = "test::Resource[agent1,name=2]" + rid1 = ResourceIdStr("test::Resource[agent1,name=1]") resources = { - ResourceIdStr(rid1): make_resource_minimal(rid1, values={"value": "a"}, requires=[], version=5), + rid1: make_resource_minimal(rid1, values={"value": "a"}, requires=[]), } - agent.scheduler.mock_versions[5] = resources + executor1: ManagedExecutor = ManagedExecutor() + agent.executor_manager.register_managed_executor("agent1", executor1) + + # We add two different tasks and assert that they are consumed in the correct order + # Add a new version deploy to the queue + await agent.scheduler._new_version(1, resources, make_requires(resources)) + agent.scheduler.mock_versions[1] = resources + # And then a dryrun + dryrun = uuid.uuid4() + await agent.scheduler.dryrun(dryrun, 1) + + # The tasks are consumed in the priority order + first_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(first_task, tasks.Deploy) + second_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(second_task, tasks.DryRun) + + # The same is true if a task with lesser priority is added first + # Add a fact refresh task to the queue await agent.scheduler.get_facts({"id": rid1}) - await agent.scheduler._new_version(5, resources, make_requires(resources)) + # Then add an interval deploy task to the queue + agent.scheduler._state.dirty.add(rid1) + await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + # The tasks are consumed in the priority order + first_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(first_task, tasks.Deploy) + second_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(second_task, tasks.RefreshFact) + # Assert that all tasks were consumed + queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue + assert len(queue) == 0 + + # Add an interval deploy task to the queue + agent.scheduler._state.dirty.add(rid1) await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + # Add a dryrun to the queue (which has more priority) dryrun = uuid.uuid4() - await agent.scheduler.dryrun(dryrun, 5) + await agent.scheduler.dryrun(dryrun, 1) - await agent.trigger_update(environment, "$__scheduler", incremental_deploy=False) + # Assert that we have both tasks in the queue + queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue + assert len(queue) == 2 + + # Add a user deploy + # It has more priority than interval deploy, so it will replace it in the queue + # It also has more priority than dryrun, so it will be consumed first + + await agent.trigger_update(environment, "$__scheduler", incremental_deploy=True) + + first_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(first_task, tasks.Deploy) + second_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(second_task, tasks.DryRun) + + # Interval deploy is still in the queue but marked as deleted + queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue + assert len(queue) == 1 + assert queue[0].deleted + + # Force clean queue + agent.scheduler._work.agent_queues._get_queue("agent1")._queue = [] + + # If a task to deploy a resource is added to the queue, + # but a task to deploy that same resource is already present with higher priority, + # it will be ignored and not added to the queue + + # Add a dryrun to the queue + dryrun = uuid.uuid4() + await agent.scheduler.dryrun(dryrun, 1) + + # Add a user deploy await agent.trigger_update(environment, "$__scheduler", incremental_deploy=True) - await agent.scheduler.repair(TaskPriority.INTERVAL_REPAIR) + # Try to add an interval deploy task to the queue + agent.scheduler._state.dirty.add(rid1) + await agent.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) + + # Assert that we still have only 2 tasks in the queue + queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue + assert len(queue) == 2 - agent_1_queue = agent.scheduler._work.agent_queues.sorted("agent1") - assert len(agent_1_queue) == 7 + # The order is unaffected, the interval deploy was essentially ignored + first_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(first_task, tasks.Deploy) + second_task = await agent.scheduler._work.agent_queues.queue_get("agent1") + assert isinstance(second_task, tasks.DryRun) - await agent.start_working() + # All tasks were consumed + queue = agent.scheduler._work.agent_queues._get_queue("agent1")._queue + assert len(queue) == 0 From 891c4b841f4fe37f87ff5224073afe305e261c99 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Thu, 3 Oct 2024 09:00:20 +0100 Subject: [PATCH 08/10] fixed typo on repair --- src/inmanta/agent/agent_new.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inmanta/agent/agent_new.py b/src/inmanta/agent/agent_new.py index 83ec65fe4e..6909e019f2 100644 --- a/src/inmanta/agent/agent_new.py +++ b/src/inmanta/agent/agent_new.py @@ -137,7 +137,7 @@ async def interval_deploy() -> None: await self.scheduler.deploy(TaskPriority.INTERVAL_DEPLOY) async def interval_repair() -> None: - await self.scheduler.deploy(TaskPriority.INTERVAL_REPAIR) + await self.scheduler.repair(TaskPriority.INTERVAL_REPAIR) periodic_schedule("deploy", interval_deploy, self._deploy_interval, self._deploy_splay_value) periodic_schedule("repair", interval_repair, self._repair_interval, self._repair_splay_value) From a80630f9614cb3a1036ce0a9955ef85d5f3399cf Mon Sep 17 00:00:00 2001 From: jptrindade Date: Thu, 3 Oct 2024 09:04:24 +0100 Subject: [PATCH 09/10] removed some code duplication --- src/inmanta/deploy/work.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 3e9a0810b3..77fadb7832 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -407,23 +407,23 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) # ensure desired resource deploys are scheduled for resource in resources: + prioritized_task = PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority) if is_scheduled(resource): # Deploy is already scheduled / running. Check to see if this task has a higher priority than the one already # scheduled. If it has, update the priority. If any of its dependencies are to be # scheduled as well, they will follow the provides relation to ensure this deploy waits its turn. - task = PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority) - if tasks.Deploy(resource=resource) in self.agent_queues: - self.agent_queues.queue_put_nowait(task) + if prioritized_task.task in self.agent_queues: + self.agent_queues.queue_put_nowait(prioritized_task) if resource in self._waiting: if self._waiting[resource].task.priority > priority: - self._waiting[resource].task = task + self._waiting[resource].task = prioritized_task continue # task is not yet scheduled, schedule it now blocked_on: set[ResourceIdStr] = { dependency for dependency in self.requires.get(resource, ()) if is_scheduled(dependency) } self._waiting[resource] = BlockedDeploy( - task=PrioritizedTask(task=tasks.Deploy(resource=resource), priority=priority), + task=prioritized_task, blocked_on=blocked_on, ) not_scheduled.discard(resource) From f34ef117789ba4ea9194a32b0a8c2e09315f1029 Mon Sep 17 00:00:00 2001 From: jptrindade Date: Thu, 3 Oct 2024 15:33:04 +0100 Subject: [PATCH 10/10] updated return type --- src/inmanta/deploy/work.py | 10 ++++------ tests/agent_server/deploy/test_scheduler_agent.py | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/inmanta/deploy/work.py b/src/inmanta/deploy/work.py index 77fadb7832..c17aedc833 100644 --- a/src/inmanta/deploy/work.py +++ b/src/inmanta/deploy/work.py @@ -147,7 +147,7 @@ def get_tasks_for_resource(self, resource: ResourceIdStr) -> set[tasks.Task]: """ return set(self._tasks_by_resource.get(resource, {}).keys()) - def remove(self, task: tasks.Task) -> int: + def remove(self, task: tasks.Task) -> TaskPriority: """ Removes the given task from its associated agent queue. Raises KeyError if it is not in the queue. Returns the priority at which the deleted task was queued. @@ -160,7 +160,7 @@ def remove(self, task: tasks.Task) -> int: del self._tasks_by_resource[task.resource] return queue_item.task.priority - def discard(self, task: tasks.Task) -> Optional[int]: + def discard(self, task: tasks.Task) -> Optional[TaskPriority]: """ Removes the given task from its associated agent queue if it is present. Returns the priority at which the deleted task was queued, if it was at all. @@ -392,12 +392,10 @@ def extend_blocked_on(resource: ResourceIdStr, new_blockers: set[ResourceIdStr]) # discard rather than remove because task may already be running, in which case we leave it run its course # and simply add a new one task: tasks.Deploy = tasks.Deploy(resource=resource) - task_priority: Optional[int] = self.agent_queues.discard(task) + task_priority: Optional[TaskPriority] = self.agent_queues.discard(task) queued.remove(resource) self._waiting[resource] = BlockedDeploy( - task=PrioritizedTask( - task=task, priority=TaskPriority(task_priority) if task_priority is not None else priority - ), + task=PrioritizedTask(task=task, priority=task_priority if task_priority is not None else priority), # task was previously ready to execute => assume no other blockers than this one blocked_on=new_blockers, ) diff --git a/tests/agent_server/deploy/test_scheduler_agent.py b/tests/agent_server/deploy/test_scheduler_agent.py index 95532af44b..8588c2467d 100644 --- a/tests/agent_server/deploy/test_scheduler_agent.py +++ b/tests/agent_server/deploy/test_scheduler_agent.py @@ -415,7 +415,7 @@ async def done(): async def test_scheduler_priority(agent: TestAgent, environment, make_resource_minimal): """ Ensure that the tasks are placed in the queue in the correct order - And that existing tasks in the queue are replaced if a task that + and that existing tasks in the queue are replaced if a task that does the same thing with higher priority is added to the queue """