Skip to content

Commit

Permalink
Merge pull request #73 from OFFIS-DAI/development
Browse files Browse the repository at this point in the history
v1.1.4
  • Loading branch information
rcschrg authored Mar 26, 2024
2 parents edb27f4 + 0a2a3c3 commit f9de33d
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 37 deletions.
9 changes: 8 additions & 1 deletion mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Every agent must live in a container. Containers are responsible for making
connections to other agents.
"""

import asyncio
import logging
from abc import ABC
Expand Down Expand Up @@ -464,9 +465,15 @@ async def shutdown(self):
self._check_inbox_task.remove_done_callback(self.raise_exceptions)
self._check_inbox_task.cancel()
await self._check_inbox_task

except asyncio.CancelledError:
pass
try:
await self._scheduler.stop()
except asyncio.CancelledError:
pass
try:
await self._scheduler.shutdown()
except asyncio.CancelledError:
pass
finally:
logger.info("Agent %s: Shutdown successful", self.aid)
37 changes: 35 additions & 2 deletions mango/agent/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"""
import asyncio
from abc import ABC
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union, Callable

from mango.agent.core import Agent, AgentContext, AgentDelegates
from mango.util.scheduling import Scheduler
Expand Down Expand Up @@ -113,6 +113,7 @@ def __init__(self, agent_context, scheduler):
self._role_model_type_to_subs = {}
self._message_subs = []
self._send_msg_subs = {}
self._role_event_type_to_handler = {}
self._agent_context = agent_context
self._scheduler = scheduler
self._data = DataContainer()
Expand Down Expand Up @@ -284,12 +285,23 @@ def subscribe_message(self, role, method, message_condition, priority=0):
elif i == len(self._message_subs) - 1:
self._message_subs.append((role, message_condition, method, priority))

def subscribe_send(self, role, method):
def subscribe_send(self, role: Role, method: Callable):
if role in self._send_msg_subs:
self._send_msg_subs[role].append(method)
else:
self._send_msg_subs[role] = [method]

def emit_event(self, event: Any, event_source: Any = None):
subs = self._role_event_type_to_handler[type(event)]
for _, method in subs:
method(event, event_source)

def subscribe_event(self, role: Role, event_type: type, method: Callable):
if not event_type in self._role_event_type_to_handler:
self._role_event_type_to_handler[event_type] = []

self._role_event_type_to_handler[event_type] += [(role, method)]


class RoleContext(AgentDelegates):
"""Implementation of the RoleContext."""
Expand Down Expand Up @@ -407,6 +419,27 @@ async def send_acl_message(
**kwargs
)

def emit_event(self, event: Any, event_source: Any = None):
"""Emit an custom event to other roles.
:param event: the event
:type event: Any
:param event_source: emitter of the event (mostly the emitting role), defaults to None
:type event_source: Any, optional
"""
self._role_handler.emit_event(event, event_source)

def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable):
"""Subscribe to specific event types. The listener will be evaluated based
on their order of subscription
:param role: the role in which you want to handle the event
:type role: Role
:param event_type: the event type you want to handle
:type event_type: Any
"""
self._role_handler.subscribe_event(role, event_type, handler_method)

@property
def addr(self):
return self._agent_context.addr
Expand Down
1 change: 1 addition & 0 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This module contains the abstract Container class and the subclasses
TCPContainer and MQTTContainer
"""

import asyncio
import logging
import time
Expand Down
132 changes: 106 additions & 26 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
"""
Module for commonly used time based scheduled task executed inside one agent.
"""

import asyncio
import concurrent.futures
import datetime
from abc import abstractmethod
from multiprocessing import Manager
from multiprocessing import Manager, Event
from typing import Any, List, Tuple
from dataclasses import dataclass
from multiprocessing.synchronize import Event as MultiprocessingEvent

from dateutil.rrule import rrule

from mango.util.clock import AsyncioClock, Clock, ExternalClock
from asyncio import Future


@dataclass
class ScheduledProcessControl:
run_task_event: MultiprocessingEvent
kill_process_event: MultiprocessingEvent

def kill_process(self):
self.kill_process_event.set()

def init_process(self):
self.kill_process_event.clear()

def resume_task(self):
self.run_task_event.set()

def suspend_task(self):
self.run_task_event.clear()


class Suspendable:
"""
Wraps a coroutine, intercepting __await__ to add the functionality of suspending.
"""

def __init__(self, coro, ext_contr_event=None):
def __init__(self, coro, ext_contr_event=None, kill_event=None):
self._coro = coro

self._kill_event = kill_event
if ext_contr_event is not None:
self._can_run = ext_contr_event
else:
Expand All @@ -43,6 +66,9 @@ def __await__(self):
except BaseException as err:
send, message = iter_throw, err

if self._kill_event is not None and self._kill_event.is_set():
return None

try:
# throw error or resume coroutine
signal = send(message)
Expand Down Expand Up @@ -91,6 +117,13 @@ def coro(self):
# asyncio tasks


def _close_coro(coro):
try:
coro.close()
except:
pass


class ScheduledTask:
"""
Base class for scheduled tasks in mango. Within this class it is possible to
Expand Down Expand Up @@ -131,6 +164,11 @@ def on_stop(self, fut: asyncio.Future = None):
self._on_stop_hook_in(fut)
if self._is_observable:
self._is_done.set_result(True)
self.close()

def close(self):
"""Perform closing actions"""
pass


class TimestampScheduledTask(ScheduledTask):
Expand All @@ -155,6 +193,9 @@ async def run(self):
await self._wait(self._timestamp)
return await self._coro

def close(self):
_close_coro(self._coro)


class AwaitingTask(ScheduledTask):
"""
Expand All @@ -175,6 +216,10 @@ async def run(self):
self.notify_running()
return await self._coroutine

def close(self):
_close_coro(self._awaited_coroutine)
_close_coro(self._coroutine)


class InstantScheduledTask(TimestampScheduledTask):
"""
Expand All @@ -188,6 +233,9 @@ def __init__(self, coroutine, clock: Clock = None, on_stop=None, observable=True
coroutine, clock.time, clock=clock, on_stop=on_stop, observable=observable
)

def close(self):
_close_coro(self._coro)


class PeriodicScheduledTask(ScheduledTask):
"""
Expand Down Expand Up @@ -274,6 +322,9 @@ async def run(self):
self.notify_running()
return await self._coro

def close(self):
_close_coro(self._coro)


# process tasks

Expand Down Expand Up @@ -407,17 +458,25 @@ def __init__(
Tuple[ScheduledTask, asyncio.Future, Suspendable, Any]
] = []
self.clock = clock if clock is not None else AsyncioClock()
self._scheduled_process_tasks = []
self._process_pool_exec = concurrent.futures.ProcessPoolExecutor(
max_workers=num_process_parallel, initializer=_create_asyncio_context
)
self._scheduled_process_tasks: List[
Tuple[ScheduledProcessTask, Future, ScheduledProcessControl, Any]
] = []
self._manager = None
self._num_process_parallel = num_process_parallel
self._process_pool_exec = None
self._suspendable = suspendable
self._observable = observable

@staticmethod
def _run_task_in_p_context(task, suspend_event):
def _run_task_in_p_context(
task, scheduled_process_control: ScheduledProcessControl
):
try:
coro = Suspendable(task.run(), ext_contr_event=suspend_event)
coro = Suspendable(
task.run(),
ext_contr_event=scheduled_process_control.run_task_event,
kill_event=scheduled_process_control.kill_process_event,
)

return asyncio.get_event_loop().run_until_complete(coro)
finally:
Expand Down Expand Up @@ -607,18 +666,35 @@ def schedule_process_task(self, task: ScheduledProcessTask, src=None):
:type src: Object
"""

if self._process_pool_exec is None:
self._process_pool_exec = concurrent.futures.ProcessPoolExecutor(
max_workers=self._num_process_parallel,
initializer=_create_asyncio_context,
)
loop = asyncio.get_running_loop()
manager = Manager()
event = manager.Event()
event.set()
if self._manager is None:
self._manager = Manager()

scheduled_process_control = ScheduledProcessControl(
run_task_event=self._manager.Event(),
kill_process_event=self._manager.Event(),
)
scheduled_process_control.init_process()
scheduled_process_control.resume_task()

l_task = asyncio.ensure_future(
loop.run_in_executor(
self._process_pool_exec, Scheduler._run_task_in_p_context, task, event
self._process_pool_exec,
Scheduler._run_task_in_p_context,
task,
scheduled_process_control,
)
)
l_task.add_done_callback(self._remove_process_task)
l_task.add_done_callback(task.on_stop)
self._scheduled_process_tasks.append((task, l_task, event, src))
self._scheduled_process_tasks.append(
(task, l_task, scheduled_process_control, src)
)
return l_task

def schedule_timestamp_process_task(
Expand Down Expand Up @@ -746,9 +822,9 @@ def suspend(self, given_src):
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.suspend()
for _, _, event, src in self._scheduled_process_tasks:
if src == given_src and event is not None:
event.clear()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.suspend_task()

def resume(self, given_src):
"""Resume a set of tasks triggered by the given src object.
Expand All @@ -762,16 +838,17 @@ def resume(self, given_src):
for _, _, coro, src in self._scheduled_tasks:
if src == given_src and coro is not None:
coro.resume()
for _, _, event, src in self._scheduled_process_tasks:
if src == given_src and event is not None:
event.set()
for _, _, scheduled_process_control, src in self._scheduled_process_tasks:
if src == given_src:
scheduled_process_control.resume_task()

def _remove_process_task(self, fut=asyncio.Future):
for i in range(len(self._scheduled_process_tasks)):
_, task, event, _ = self._scheduled_process_tasks[i]
_, task, scheduled_process_control, _ = self._scheduled_process_tasks[i]
if task == fut:
scheduled_process_control.resume_task()
scheduled_process_control.kill_process()
del self._scheduled_process_tasks[i]
event.set()
break

# methods for removing tasks, stopping or shutting down
Expand Down Expand Up @@ -830,12 +907,15 @@ async def tasks_complete_or_sleeping(self):
# we need to recognize how many sleeping tasks we have in order to find out if all tasks are done
sleeping_tasks.append(scheduled_task)

def shutdown(self):
async def shutdown(self):
"""
Shutdown internal process executor pool.
"""
# resume all process so they can get shutdown
for _, _, event, _ in self._scheduled_process_tasks:
if event is not None:
event.set()
self._process_pool_exec.shutdown()
for _, _, scheduled_process_control, _ in self._scheduled_process_tasks:
scheduled_process_control.kill_process()
for task, _, _, _ in self._scheduled_tasks:
task.close()
await self.stop()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EMAIL = "[email protected]"
AUTHOR = "mango Team"
REQUIRES_PYTHON = ">=3.7.0"
VERSION = "1.1.3"
VERSION = "1.1.4"

# What packages are required for this module to be executed?
REQUIRED = [
Expand Down
Loading

0 comments on commit f9de33d

Please sign in to comment.