From 3ed1e75d93fa35908f3c3dfa1afa0ed7bcb00fcd Mon Sep 17 00:00:00 2001 From: "Frost, Emilie" Date: Wed, 22 Nov 2023 10:31:58 +0100 Subject: [PATCH] Store duration per agent in each COHDA step --- mango/container/core.py | 195 ++++++++++++++++++++-------------------- mango/container/tcp.py | 2 + 2 files changed, 98 insertions(+), 99 deletions(-) diff --git a/mango/container/core.py b/mango/container/core.py index d37c8de..2ec4236 100644 --- a/mango/container/core.py +++ b/mango/container/core.py @@ -1,20 +1,17 @@ import asyncio import copy import logging -import warnings import os +import warnings +from abc import ABC, abstractmethod from dataclasses import dataclass from multiprocessing import Process, Event -from abc import ABC, abstractmethod from typing import Any, Dict, Optional, Tuple, Union, List from ..messages.codecs import ACLMessage, Codec from ..util.clock import Clock from ..util.multiprocessing import aioduplex, AioDuplex, PipeToWriteQueue - -import dill # do not remove! Necessary for the auto loaded pickle reg extensions - logger = logging.getLogger(__name__) AGENT_PATTERN_NAME_PRE = "agent" @@ -73,14 +70,14 @@ async def cancel_and_wait_for_task(task): def create_agent_process_environment( - container_data: ContainerData, - agent_creator, - mirror_container_creator, - message_pipe: AioDuplex, - main_queue: asyncio.Queue, - event_pipe: AioDuplex, - terminate_event: Event, - process_initialized_event: Event, + container_data: ContainerData, + agent_creator, + mirror_container_creator, + message_pipe: AioDuplex, + main_queue: asyncio.Queue, + event_pipe: AioDuplex, + terminate_event: Event, + process_initialized_event: Event, ): """Create the agent process environment for using agent subprocesses in a mango container. This routine will create a new event loop and run @@ -192,7 +189,7 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato raise NotImplementedError() def pre_hook_send_internal_message( - self, message, receiver_id, priority, default_meta + self, message, receiver_id, priority, default_meta ): """Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. @@ -251,9 +248,9 @@ class MirrorContainerProcessManager(BaseContainerProcessManager): """ def __init__( - self, - container, - mirror_data: ContainerMirrorData, + self, + container, + mirror_data: ContainerMirrorData, ) -> None: self._container = container self._mirror_data = mirror_data @@ -293,7 +290,7 @@ async def _execute_dispatch_event(self, event_pipe: AioDuplex): logger.exception("The Dispatch Event Loop has failed!") async def _move_incoming_messages_to_inbox( - self, message_pipe: AioDuplex, terminate_event: Event + self, message_pipe: AioDuplex, terminate_event: Event ): try: async with message_pipe.open_readonly() as rx: @@ -312,7 +309,7 @@ async def _move_incoming_messages_to_inbox( logger.exception("The Move Message Task Loop has failed!") async def _send_to_message_pipe( - self, message_pipe: AioDuplex, terminate_event: Event + self, message_pipe: AioDuplex, terminate_event: Event ): try: async with message_pipe.open_writeonly() as tx: @@ -326,7 +323,7 @@ async def _send_to_message_pipe( logger.exception("The Send Message Task Loop has failed!") def pre_hook_send_internal_message( - self, message, receiver_id, priority, default_meta + self, message, receiver_id, priority, default_meta ): self._out_queue.put_nowait((message, receiver_id, priority, default_meta)) return True, None @@ -348,8 +345,8 @@ class MainContainerProcessManager(BaseContainerProcessManager): """ def __init__( - self, - container, + self, + container, ) -> None: self._active = False self._container = container @@ -416,7 +413,7 @@ async def _handle_process_message(self, pipe: AioDuplex): logger.exception("The Process Message Loop has failed!") def pre_hook_send_internal_message( - self, message, receiver_id, priority, default_meta + self, message, receiver_id, priority, default_meta ): target_inbox = None if self._active: @@ -435,48 +432,48 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato self._init_mp() self._active = True - from_pipe_message, to_pipe_message = aioduplex() - from_pipe, to_pipe = aioduplex() - process_initialized = Event() - with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message: - agent_process = Process( - target=create_agent_process_environment, - args=( - ContainerData( - addr=container.addr, - codec=container.codec, - clock=container.clock, - kwargs=container._kwargs, + from_pipe_message, to_pipe_message = aioduplex() + from_pipe, to_pipe = aioduplex() + process_initialized = Event() + with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message: + agent_process = Process( + target=create_agent_process_environment, + args=( + ContainerData( + addr=container.addr, + codec=container.codec, + clock=container.clock, + kwargs=container._kwargs, + ), + agent_creator, + mirror_container_creator, + to_pipe_message, + self._main_queue, + to_pipe, + self._terminate_sub_processes, + process_initialized, ), - agent_creator, - mirror_container_creator, - to_pipe_message, - self._main_queue, - to_pipe, - self._terminate_sub_processes, - process_initialized, - ), + ) + self._agent_processes.append(agent_process) + agent_process.daemon = True + agent_process.start() + + self._pid_to_message_pipe[agent_process.pid] = from_pipe_message + self._pid_to_pipe[agent_process.pid] = from_pipe + self._handle_process_events_tasks.append( + asyncio.create_task(self._handle_process_events(from_pipe)) + ) + self._handle_sp_messages_tasks.append( + asyncio.create_task(self._handle_process_message(from_pipe_message)) ) - self._agent_processes.append(agent_process) - agent_process.daemon = True - agent_process.start() - - self._pid_to_message_pipe[agent_process.pid] = from_pipe_message - self._pid_to_pipe[agent_process.pid] = from_pipe - self._handle_process_events_tasks.append( - asyncio.create_task(self._handle_process_events(from_pipe)) - ) - self._handle_sp_messages_tasks.append( - asyncio.create_task(self._handle_process_message(from_pipe_message)) - ) - async def wait_for_process_initialized(): - while not process_initialized.is_set(): - await asyncio.sleep(WAIT_STEP) + async def wait_for_process_initialized(): + while not process_initialized.is_set(): + await asyncio.sleep(WAIT_STEP) - return AgentProcessHandle( - asyncio.create_task(wait_for_process_initialized()), agent_process.pid - ) + return AgentProcessHandle( + asyncio.create_task(wait_for_process_initialized()), agent_process.pid + ) def dispatch_to_agent_process(self, pid: int, coro_func, *args): assert pid in self._pid_to_pipe @@ -513,16 +510,16 @@ class Container(ABC): """Superclass for a mango container""" def __init__( - self, - *, - addr, - name: str, - codec, - loop, - clock: Clock, - copy_internal_messages=False, - mirror_data=None, - **kwargs, + self, + *, + addr, + name: str, + codec, + loop, + clock: Clock, + copy_internal_messages=False, + mirror_data=None, + **kwargs, ): self.name: str = name self.addr = addr @@ -564,8 +561,8 @@ def _all_aids(self): def _check_agent_aid_pattern_match(self, aid): return ( - aid.startswith(AGENT_PATTERN_NAME_PRE) - and aid[len(AGENT_PATTERN_NAME_PRE) :].isnumeric() + aid.startswith(AGENT_PATTERN_NAME_PRE) + and aid[len(AGENT_PATTERN_NAME_PRE):].isnumeric() ) def is_aid_available(self, aid): @@ -621,12 +618,12 @@ def deregister_agent(self, aid): @abstractmethod async def send_message( - self, - content, - receiver_addr: Union[str, Tuple[str, int]], - *, - receiver_id: Optional[str] = None, - **kwargs, + self, + content, + receiver_addr: Union[str, Tuple[str, int]], + *, + receiver_id: Optional[str] = None, + **kwargs, ) -> bool: """ The Container sends a message to an agent according the container protocol. @@ -640,14 +637,14 @@ async def send_message( raise NotImplementedError async def send_acl_message( - self, - content, - receiver_addr: Union[str, Tuple[str, int]], - *, - receiver_id: Optional[str] = None, - acl_metadata: Optional[Dict[str, Any]] = None, - is_anonymous_acl=False, - **kwargs, + self, + content, + receiver_addr: Union[str, Tuple[str, int]], + *, + receiver_id: Optional[str] = None, + acl_metadata: Optional[Dict[str, Any]] = None, + is_anonymous_acl=False, + **kwargs, ) -> bool: """ The Container sends a message, wrapped in an ACL message, to an agent according the container protocol. @@ -674,12 +671,12 @@ async def send_acl_message( ) def _create_acl( - self, - content, - receiver_addr: Union[str, Tuple[str, int]], - receiver_id: Optional[str] = None, - acl_metadata: Optional[Dict[str, Any]] = None, - is_anonymous_acl=False, + self, + content, + receiver_addr: Union[str, Tuple[str, int]], + receiver_id: Optional[str] = None, + acl_metadata: Optional[Dict[str, Any]] = None, + is_anonymous_acl=False, ): """ :param content: @@ -726,12 +723,12 @@ def _create_acl( return message def _send_internal_message( - self, - message, - receiver_id, - priority=0, - default_meta=None, - inbox=None, + self, + message, + receiver_id, + priority=0, + default_meta=None, + inbox=None, ) -> bool: target_inbox = inbox diff --git a/mango/container/tcp.py b/mango/container/tcp.py index 5da4f70..ea30cdb 100644 --- a/mango/container/tcp.py +++ b/mango/container/tcp.py @@ -215,6 +215,7 @@ async def send_message( :param receiver_id: The agent id of the receiver :param kwargs: Additional parameters to provide protocol specific settings """ + print('send msg', self.addr) self.msgs += 1 if isinstance(receiver_addr, str) and ":" in receiver_addr: receiver_addr = receiver_addr.split(":") @@ -247,6 +248,7 @@ async def _send_external_message(self, addr, message) -> bool: :param message: The message :return: """ + print('send external msg', self.addr) self.msgs += 1 if addr is None or not isinstance(addr, (tuple, list)) or len(addr) != 2: logger.warning(