Skip to content

Commit

Permalink
Add handling of infrastructure changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
malinradtke committed Sep 24, 2024
1 parent f40c109 commit 759f7cb
Showing 1 changed file with 85 additions and 33 deletions.
118 changes: 85 additions & 33 deletions cosima_core/mango_direct_connection/mango_communication_network.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import asyncio
import datetime
import math
from enum import Enum

from mango.container.external_coupling import ExternalSchedulingContainer
from typing import Dict

import scenario_config
from cosima_core.messages.message_pb2 import InfoMessage, InitialMessage, SynchronisationMessage, TrafficMessage
from cosima_core.messages.message_pb2 import InfoMessage, InitialMessage, SynchronisationMessage
from cosima_core.simulators.omnetpp_connection import OmnetppConnection
from cosima_core.util.general_config import MAX_BYTE_SIZE_PER_MSG_GROUP, MANGO_CONVERSION_FACTOR
from cosima_core.util.util_functions import create_protobuf_messages, check_omnet_connection, start_omnet, \
get_dict_from_protobuf_message

logging_level = 'debug'
logging_level = 'info'


class MangoCommunicationNetwork:
Expand Down Expand Up @@ -43,9 +44,14 @@ class MangoCommunicationNetwork:
_number_of_messages_received (int): Count of received messages.
"""

class ClientStatus(Enum):
CONNECTED = 0
DISCONNECTED = 1

def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContainer], port: int,
duration_s: int, start_timestamp=0.0, start_mode='cmd', network='SimbenchNetwork',
results_recorder=None, traffic_configuration=[]):
results_recorder=None, traffic_configuration=None,
infrastructure_changes=None):
"""
Initialize the MangoCommunicationNetwork instance.
Expand All @@ -56,6 +62,10 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain
MosaikContainer instances (mango).
port (int): The port to establish socket communication with OMNeT++.
"""
if traffic_configuration is None:
traffic_configuration = []
if infrastructure_changes is None:
infrastructure_changes = []
self._start_time = start_timestamp
print('duration in seconds: ', duration_s)
self._simulation_end_time = start_timestamp + duration_s
Expand All @@ -77,6 +87,12 @@ def __init__(self, client_container_mapping: Dict[str, ExternalSchedulingContain
self._number_of_messages_sent = 0
self._number_of_messages_received = 0
self._traffic_configurations = traffic_configuration
self._infrastructure_changes = infrastructure_changes
self.scenario_finished = asyncio.Future()

self._client_status = {client_name: self.ClientStatus.CONNECTED
for client_name in client_container_mapping.keys()}
self.update_client_status()

self.results_recorder = results_recorder

Expand All @@ -89,8 +105,8 @@ async def _start_scenario(self):
"""
initial_msg = {
'msg_id': 'InitialMessage',
'max_advance': self._simulation_end_time,
'until': self._simulation_end_time,
'max_advance': self._simulation_end_time - self._start_time,
'until': self._simulation_end_time - self._start_time,
'stepSize': 1000,
'logging_level': logging_level,
'max_byte_size_per_msg_group': MAX_BYTE_SIZE_PER_MSG_GROUP
Expand Down Expand Up @@ -127,6 +143,28 @@ async def _start_scenario(self):
await self.send_messages_to_omnetpp()
await self.run_scenario()

def update_client_status(self):
# Create a new list for infrastructure changes that have not been processed yet
remaining_changes = []

for change in self._infrastructure_changes:
# Check if the event's simulation time has passed or is at the current time
if self._current_time >= (self._start_time + change['simulation_time_ms']):
# Get the type of change and the module to change
module = change['change_module']
if change['type'] == 'disconnect':
# Disconnect the module
self._client_status[module] = self.ClientStatus.DISCONNECTED
elif change['type'] == 'connect':
# Reconnect the module
self._client_status[module] = self.ClientStatus.CONNECTED
else:
# If the change hasn't been processed yet, keep it in the list
remaining_changes.append(change)

# Update the infrastructure changes to only include unprocessed changes
self._infrastructure_changes = remaining_changes

async def run_scenario(self):
"""
Run the communication network simulation scenario loop.
Expand All @@ -135,20 +173,26 @@ async def run_scenario(self):
performing container steps, handling the synchronization and sending messages to OMNeT++.
"""
while True:
if self._current_time > self._simulation_end_time:
print('--Simulation end, reason: exceeded the simulation time')
return
while not self.waiting_for_messages_from_omnetpp() \
and len(self._next_activities) > 0:
# we are not waiting for messages from OMNeT++ and have no next activities in mango
# therefore, we advance in time in mango
smallest_next_activity = min(self._next_activities)
self._current_time = smallest_next_activity
self.update_client_status()
if self._current_time > self._simulation_end_time:
print('--Simulation end, reason: exceeded the simulation time')
return
await self.step_mango_containers(received_messages=[])
if len(self._message_buffer) == 0:
# no messages occurred in mango
if len(self._next_activities) == 0:
# no next activities
# therefore, simulation is finished
print('--Simulation end, reason: no messages in buffer and no next activities')
return
# new next activities in mango
# therefore, continue advancing in time in mango
Expand All @@ -163,6 +207,7 @@ async def run_scenario(self):
received_info_msgs = [msg for msg in received_messages if type(msg) == InfoMessage]
self._number_of_messages_received += len(received_info_msgs)
self._current_time = received_messages[0].sim_time + self._start_time
self.update_client_status()

# perform container steps in mango
await self.step_mango_containers(received_messages=received_messages)
Expand All @@ -186,18 +231,21 @@ async def step_mango_containers(self, received_messages: list):
Returns:
None
"""
if self.results_recorder:
for message in received_messages:
if type(message) == InfoMessage:
self.results_recorder.update_received_message(msg_id=message.msg_id,
time_receive=message.sim_time)

for container_name, container in self._client_container_mapping.items():
received_messages_for_container = [str.encode(get_dict_from_protobuf_message(message)['content'])
for message in received_messages
if type(message) == InfoMessage
and 'Traffic' not in message.msg_id
and message.receiver == container_name]
if self._client_status.get(container_name) == self.ClientStatus.CONNECTED:
if self.results_recorder:
for message in received_messages:
if type(message) == InfoMessage:
self.results_recorder.update_received_message(msg_id=message.msg_id,
time_receive=message.sim_time)
received_messages_for_container = [str.encode(get_dict_from_protobuf_message(message)['content'])
for message in received_messages
if type(message) == InfoMessage
and 'Traffic' not in message.msg_id
and message.receiver == container_name]
else:
print(container_name, ' not connected. Discard incoming messages. ')
received_messages_for_container = []
output = await container.step(simulation_time=self._current_time,
incoming_messages=received_messages_for_container)
self.process_mango_outputs(container_name, output)
Expand All @@ -221,23 +269,27 @@ def process_mango_outputs(self, container_name, output):
self._next_activities.append(next_activity)
for mango_output in output.messages:
msg_output_time = math.ceil(mango_output.time * MANGO_CONVERSION_FACTOR)
msg_id = f'AgentMessage_{container_name}_{self._msg_counter}'
message_dict = {'msg_id': msg_id,
'max_advance': next_activity - self._start_time,
'sim_time': msg_output_time - self._start_time,
'sender': container_name,
'receiver': mango_output.receiver,
'content': mango_output.message.decode(),
'creation_time': msg_output_time - self._start_time,
}
self.results_recorder.add_comm_results(msg_id=msg_id,
time_send=msg_output_time - self._start_time,
time_receive=None,
sender=container_name,
receiver=mango_output.receiver,
content=mango_output.message.decode())
self._message_buffer.append((message_dict, InfoMessage))
self._msg_counter += 1
if self._client_status.get(container_name) == self.ClientStatus.CONNECTED:
msg_id = f'AgentMessage_{container_name}_{self._msg_counter}'
message_dict = {'msg_id': msg_id,
'max_advance': next_activity - self._start_time,
'sim_time': msg_output_time - self._start_time,
'sender': container_name,
'receiver': mango_output.receiver,
'content': mango_output.message.decode(),
'creation_time': msg_output_time - self._start_time,
}
self.results_recorder.add_comm_results(msg_id=msg_id,
time_send=msg_output_time - self._start_time,
time_receive=None,
sender=container_name,
receiver=mango_output.receiver,
content=mango_output.message.decode())
self._message_buffer.append((message_dict, InfoMessage))
self._msg_counter += 1
else:
print(container_name, ' is not connected. Message will not be sent at time ',
msg_output_time - self._start_time)

async def handle_synchronization_with_waiting_messages(self):
"""
Expand Down

0 comments on commit 759f7cb

Please sign in to comment.