Skip to content

Commit

Permalink
Extends results recording for message information.
Browse files Browse the repository at this point in the history
  • Loading branch information
malinradtke committed Sep 11, 2024
1 parent 3af314a commit b226649
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions cosima_core/mango_direct_connection/mango_communication_network.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime
import math

from mango.container.external_coupling import ExternalSchedulingContainer
Expand All @@ -11,7 +12,6 @@
from cosima_core.util.util_functions import create_protobuf_messages, check_omnet_connection, start_omnet, \
get_dict_from_protobuf_message

UNTIL = 1000000000
logging_level = 'info'


Expand Down Expand Up @@ -44,7 +44,8 @@ class MangoCommunicationNetwork:
"""

def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContainer], port: int,
start_timestamp=0.0, start_mode='cmd', network='SimbenchNetwork'):
duration_s: int, start_timestamp=0.0, start_mode='cmd', network='SimbenchNetwork',
results_recorder=None):
"""
Initialize the MangoCommunicationNetwork instance.
Expand All @@ -56,6 +57,10 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain
port (int): The port to establish socket communication with OMNeT++.
"""
self._start_time = start_timestamp
print('duration in seconds: ', duration_s)
self._simulation_end_time = start_timestamp + duration_s
print('Simulation start time: ', datetime.datetime.fromtimestamp(self._start_time))
print('Simulation end time: ', datetime.datetime.fromtimestamp(self._simulation_end_time))
self._client_container_mapping = client_container_mapping
self._next_activities = list()
self._current_time = 0
Expand All @@ -72,6 +77,8 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain
self._number_of_messages_sent = 0
self._number_of_messages_received = 0

self.results_recorder = results_recorder

async def _start_scenario(self):
"""
Start the communication network in the agent-system scenario.
Expand All @@ -81,8 +88,8 @@ async def _start_scenario(self):
"""
initial_msg = {
'msg_id': 'InitialMessage',
'max_advance': UNTIL,
'until': UNTIL,
'max_advance': self._simulation_end_time,
'until': self._simulation_end_time,
'stepSize': 1000,
'logging_level': logging_level,
'max_byte_size_per_msg_group': MAX_BYTE_SIZE_PER_MSG_GROUP
Expand All @@ -109,6 +116,8 @@ async def run_scenario(self):
# therefore, we advance in time in mango
smallest_next_activity = min(self._next_activities)
self._current_time = smallest_next_activity
if self._current_time > self._simulation_end_time:
return
await self.step_mango_containers(received_messages=[])
if len(self._message_buffer) == 0:
# no messages occurred in mango
Expand Down Expand Up @@ -152,6 +161,18 @@ async def step_mango_containers(self, received_messages: list):
Returns:
None
"""
if self.results_recorder:
for message in received_messages:
if type(message) == InfoMessage:
try:
self.results_recorder.add_comm_results(msg_id=message.msg_id,
time_send=message.creation_time,
time_receive=message.sim_time,
sender=message.sender,
receiver=message.receiver,
content=message.content)
except AttributeError:
print('!!', message)
for container_name, container in self._client_container_mapping.items():
received_messages_for_container = [str.encode(get_dict_from_protobuf_message(message)['content'])
for message in received_messages
Expand All @@ -174,7 +195,7 @@ def process_mango_outputs(self, container_name, output):
output: Output received from the MosaikContainer's step.
"""
if output.next_activity is None:
next_activity = UNTIL + self._start_time
next_activity = self._simulation_end_time
else:
next_activity = math.ceil(output.next_activity * MANGO_CONVERSION_FACTOR)
self._next_activities.append(next_activity)
Expand Down Expand Up @@ -210,7 +231,7 @@ async def handle_synchronization_with_waiting_messages(self):
'msg_type': SynchronisationMessage.MsgType.WAITING,
'msg_id': f'WaitingMessage_{self._waiting_msgs_counter}',
'sim_time': self._current_time - self._start_time,
'max_advance': UNTIL,
'max_advance': self._simulation_end_time,
}
self._waiting_msgs_counter += 1
self._message_buffer.append((waiting_msg, SynchronisationMessage))
Expand Down

0 comments on commit b226649

Please sign in to comment.