diff --git a/cosima_core/mango_direct_connection/mango_communication_network.py b/cosima_core/mango_direct_connection/mango_communication_network.py index 8e0287b..aa44710 100644 --- a/cosima_core/mango_direct_connection/mango_communication_network.py +++ b/cosima_core/mango_direct_connection/mango_communication_network.py @@ -1,4 +1,5 @@ import asyncio +import datetime import math from mango.container.external_coupling import ExternalSchedulingContainer @@ -11,7 +12,6 @@ from cosima_core.util.util_functions import create_protobuf_messages, check_omnet_connection, start_omnet, \ get_dict_from_protobuf_message -UNTIL = 1000000000 logging_level = 'info' @@ -44,7 +44,8 @@ class MangoCommunicationNetwork: """ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContainer], port: int, - start_timestamp=0.0, start_mode='cmd', network='SimbenchNetwork'): + duration_s: int, start_timestamp=0.0, start_mode='cmd', network='SimbenchNetwork', + results_recorder=None): """ Initialize the MangoCommunicationNetwork instance. @@ -56,6 +57,10 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain port (int): The port to establish socket communication with OMNeT++. """ self._start_time = start_timestamp + print('duration in seconds: ', duration_s) + self._simulation_end_time = start_timestamp + duration_s + print('Simulation start time: ', datetime.datetime.fromtimestamp(self._start_time)) + print('Simulation end time: ', datetime.datetime.fromtimestamp(self._simulation_end_time)) self._client_container_mapping = client_container_mapping self._next_activities = list() self._current_time = 0 @@ -72,6 +77,8 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain self._number_of_messages_sent = 0 self._number_of_messages_received = 0 + self.results_recorder = results_recorder + async def _start_scenario(self): """ Start the communication network in the agent-system scenario. @@ -81,8 +88,8 @@ async def _start_scenario(self): """ initial_msg = { 'msg_id': 'InitialMessage', - 'max_advance': UNTIL, - 'until': UNTIL, + 'max_advance': self._simulation_end_time, + 'until': self._simulation_end_time, 'stepSize': 1000, 'logging_level': logging_level, 'max_byte_size_per_msg_group': MAX_BYTE_SIZE_PER_MSG_GROUP @@ -109,6 +116,8 @@ async def run_scenario(self): # therefore, we advance in time in mango smallest_next_activity = min(self._next_activities) self._current_time = smallest_next_activity + if self._current_time > self._simulation_end_time: + return await self.step_mango_containers(received_messages=[]) if len(self._message_buffer) == 0: # no messages occurred in mango @@ -152,6 +161,18 @@ async def step_mango_containers(self, received_messages: list): Returns: None """ + if self.results_recorder: + for message in received_messages: + if type(message) == InfoMessage: + try: + self.results_recorder.add_comm_results(msg_id=message.msg_id, + time_send=message.creation_time, + time_receive=message.sim_time, + sender=message.sender, + receiver=message.receiver, + content=message.content) + except AttributeError: + print('!!', message) for container_name, container in self._client_container_mapping.items(): received_messages_for_container = [str.encode(get_dict_from_protobuf_message(message)['content']) for message in received_messages @@ -174,7 +195,7 @@ def process_mango_outputs(self, container_name, output): output: Output received from the MosaikContainer's step. """ if output.next_activity is None: - next_activity = UNTIL + self._start_time + next_activity = self._simulation_end_time else: next_activity = math.ceil(output.next_activity * MANGO_CONVERSION_FACTOR) self._next_activities.append(next_activity) @@ -210,7 +231,7 @@ async def handle_synchronization_with_waiting_messages(self): 'msg_type': SynchronisationMessage.MsgType.WAITING, 'msg_id': f'WaitingMessage_{self._waiting_msgs_counter}', 'sim_time': self._current_time - self._start_time, - 'max_advance': UNTIL, + 'max_advance': self._simulation_end_time, } self._waiting_msgs_counter += 1 self._message_buffer.append((waiting_msg, SynchronisationMessage))