Skip to content

Commit

Permalink
Store duration per agent in each COHDA step
Browse files Browse the repository at this point in the history
  • Loading branch information
Frost, Emilie committed Nov 22, 2023
1 parent c81d231 commit 3ed1e75
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 99 deletions.
195 changes: 96 additions & 99 deletions mango/container/core.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import asyncio
import copy
import logging
import warnings
import os
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from multiprocessing import Process, Event
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple, Union, List

from ..messages.codecs import ACLMessage, Codec
from ..util.clock import Clock
from ..util.multiprocessing import aioduplex, AioDuplex, PipeToWriteQueue


import dill # do not remove! Necessary for the auto loaded pickle reg extensions

logger = logging.getLogger(__name__)

AGENT_PATTERN_NAME_PRE = "agent"
Expand Down Expand Up @@ -73,14 +70,14 @@ async def cancel_and_wait_for_task(task):


def create_agent_process_environment(
container_data: ContainerData,
agent_creator,
mirror_container_creator,
message_pipe: AioDuplex,
main_queue: asyncio.Queue,
event_pipe: AioDuplex,
terminate_event: Event,
process_initialized_event: Event,
container_data: ContainerData,
agent_creator,
mirror_container_creator,
message_pipe: AioDuplex,
main_queue: asyncio.Queue,
event_pipe: AioDuplex,
terminate_event: Event,
process_initialized_event: Event,
):
"""Create the agent process environment for using agent subprocesses
in a mango container. This routine will create a new event loop and run
Expand Down Expand Up @@ -192,7 +189,7 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato
raise NotImplementedError()

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
self, message, receiver_id, priority, default_meta
):
"""Hook in before an internal message is sent. Capable of preventing the default
send_internal_message call.
Expand Down Expand Up @@ -251,9 +248,9 @@ class MirrorContainerProcessManager(BaseContainerProcessManager):
"""

def __init__(
self,
container,
mirror_data: ContainerMirrorData,
self,
container,
mirror_data: ContainerMirrorData,
) -> None:
self._container = container
self._mirror_data = mirror_data
Expand Down Expand Up @@ -293,7 +290,7 @@ async def _execute_dispatch_event(self, event_pipe: AioDuplex):
logger.exception("The Dispatch Event Loop has failed!")

async def _move_incoming_messages_to_inbox(
self, message_pipe: AioDuplex, terminate_event: Event
self, message_pipe: AioDuplex, terminate_event: Event
):
try:
async with message_pipe.open_readonly() as rx:
Expand All @@ -312,7 +309,7 @@ async def _move_incoming_messages_to_inbox(
logger.exception("The Move Message Task Loop has failed!")

async def _send_to_message_pipe(
self, message_pipe: AioDuplex, terminate_event: Event
self, message_pipe: AioDuplex, terminate_event: Event
):
try:
async with message_pipe.open_writeonly() as tx:
Expand All @@ -326,7 +323,7 @@ async def _send_to_message_pipe(
logger.exception("The Send Message Task Loop has failed!")

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
self, message, receiver_id, priority, default_meta
):
self._out_queue.put_nowait((message, receiver_id, priority, default_meta))
return True, None
Expand All @@ -348,8 +345,8 @@ class MainContainerProcessManager(BaseContainerProcessManager):
"""

def __init__(
self,
container,
self,
container,
) -> None:
self._active = False
self._container = container
Expand Down Expand Up @@ -416,7 +413,7 @@ async def _handle_process_message(self, pipe: AioDuplex):
logger.exception("The Process Message Loop has failed!")

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
self, message, receiver_id, priority, default_meta
):
target_inbox = None
if self._active:
Expand All @@ -435,48 +432,48 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato
self._init_mp()
self._active = True

from_pipe_message, to_pipe_message = aioduplex()
from_pipe, to_pipe = aioduplex()
process_initialized = Event()
with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message:
agent_process = Process(
target=create_agent_process_environment,
args=(
ContainerData(
addr=container.addr,
codec=container.codec,
clock=container.clock,
kwargs=container._kwargs,
from_pipe_message, to_pipe_message = aioduplex()
from_pipe, to_pipe = aioduplex()
process_initialized = Event()
with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message:
agent_process = Process(
target=create_agent_process_environment,
args=(
ContainerData(
addr=container.addr,
codec=container.codec,
clock=container.clock,
kwargs=container._kwargs,
),
agent_creator,
mirror_container_creator,
to_pipe_message,
self._main_queue,
to_pipe,
self._terminate_sub_processes,
process_initialized,
),
agent_creator,
mirror_container_creator,
to_pipe_message,
self._main_queue,
to_pipe,
self._terminate_sub_processes,
process_initialized,
),
)
self._agent_processes.append(agent_process)
agent_process.daemon = True
agent_process.start()

self._pid_to_message_pipe[agent_process.pid] = from_pipe_message
self._pid_to_pipe[agent_process.pid] = from_pipe
self._handle_process_events_tasks.append(
asyncio.create_task(self._handle_process_events(from_pipe))
)
self._handle_sp_messages_tasks.append(
asyncio.create_task(self._handle_process_message(from_pipe_message))
)
self._agent_processes.append(agent_process)
agent_process.daemon = True
agent_process.start()

self._pid_to_message_pipe[agent_process.pid] = from_pipe_message
self._pid_to_pipe[agent_process.pid] = from_pipe
self._handle_process_events_tasks.append(
asyncio.create_task(self._handle_process_events(from_pipe))
)
self._handle_sp_messages_tasks.append(
asyncio.create_task(self._handle_process_message(from_pipe_message))
)

async def wait_for_process_initialized():
while not process_initialized.is_set():
await asyncio.sleep(WAIT_STEP)
async def wait_for_process_initialized():
while not process_initialized.is_set():
await asyncio.sleep(WAIT_STEP)

return AgentProcessHandle(
asyncio.create_task(wait_for_process_initialized()), agent_process.pid
)
return AgentProcessHandle(
asyncio.create_task(wait_for_process_initialized()), agent_process.pid
)

def dispatch_to_agent_process(self, pid: int, coro_func, *args):
assert pid in self._pid_to_pipe
Expand Down Expand Up @@ -513,16 +510,16 @@ class Container(ABC):
"""Superclass for a mango container"""

def __init__(
self,
*,
addr,
name: str,
codec,
loop,
clock: Clock,
copy_internal_messages=False,
mirror_data=None,
**kwargs,
self,
*,
addr,
name: str,
codec,
loop,
clock: Clock,
copy_internal_messages=False,
mirror_data=None,
**kwargs,
):
self.name: str = name
self.addr = addr
Expand Down Expand Up @@ -564,8 +561,8 @@ def _all_aids(self):

def _check_agent_aid_pattern_match(self, aid):
return (
aid.startswith(AGENT_PATTERN_NAME_PRE)
and aid[len(AGENT_PATTERN_NAME_PRE) :].isnumeric()
aid.startswith(AGENT_PATTERN_NAME_PRE)
and aid[len(AGENT_PATTERN_NAME_PRE):].isnumeric()
)

def is_aid_available(self, aid):
Expand Down Expand Up @@ -621,12 +618,12 @@ def deregister_agent(self, aid):

@abstractmethod
async def send_message(
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
*,
receiver_id: Optional[str] = None,
**kwargs,
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
*,
receiver_id: Optional[str] = None,
**kwargs,
) -> bool:
"""
The Container sends a message to an agent according the container protocol.
Expand All @@ -640,14 +637,14 @@ async def send_message(
raise NotImplementedError

async def send_acl_message(
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
*,
receiver_id: Optional[str] = None,
acl_metadata: Optional[Dict[str, Any]] = None,
is_anonymous_acl=False,
**kwargs,
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
*,
receiver_id: Optional[str] = None,
acl_metadata: Optional[Dict[str, Any]] = None,
is_anonymous_acl=False,
**kwargs,
) -> bool:
"""
The Container sends a message, wrapped in an ACL message, to an agent according the container protocol.
Expand All @@ -674,12 +671,12 @@ async def send_acl_message(
)

def _create_acl(
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
receiver_id: Optional[str] = None,
acl_metadata: Optional[Dict[str, Any]] = None,
is_anonymous_acl=False,
self,
content,
receiver_addr: Union[str, Tuple[str, int]],
receiver_id: Optional[str] = None,
acl_metadata: Optional[Dict[str, Any]] = None,
is_anonymous_acl=False,
):
"""
:param content:
Expand Down Expand Up @@ -726,12 +723,12 @@ def _create_acl(
return message

def _send_internal_message(
self,
message,
receiver_id,
priority=0,
default_meta=None,
inbox=None,
self,
message,
receiver_id,
priority=0,
default_meta=None,
inbox=None,
) -> bool:
target_inbox = inbox

Expand Down
2 changes: 2 additions & 0 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ async def send_message(
:param receiver_id: The agent id of the receiver
:param kwargs: Additional parameters to provide protocol specific settings
"""
print('send msg', self.addr)
self.msgs += 1
if isinstance(receiver_addr, str) and ":" in receiver_addr:
receiver_addr = receiver_addr.split(":")
Expand Down Expand Up @@ -247,6 +248,7 @@ async def _send_external_message(self, addr, message) -> bool:
:param message: The message
:return:
"""
print('send external msg', self.addr)
self.msgs += 1
if addr is None or not isinstance(addr, (tuple, list)) or len(addr) != 2:
logger.warning(
Expand Down

0 comments on commit 3ed1e75

Please sign in to comment.