-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added an illuminator engine to parse yaml files and create models (#47)
(cherry picked from commit b60e580) Co-authored-by: Mano-Rom <[email protected]>
- Loading branch information
Showing
7 changed files
with
468 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
""" | ||
A simple data collector that prints all data when the simulation finishes. | ||
""" | ||
import collections | ||
|
||
import mosaik_api_v3 | ||
|
||
|
||
META = { | ||
'type': 'event-based', | ||
'models': { | ||
'Monitor': { | ||
'public': True, | ||
'any_inputs': True, | ||
'params': [], | ||
'attrs': [], | ||
}, | ||
}, | ||
} | ||
|
||
|
||
class Collector(mosaik_api_v3.Simulator): | ||
def __init__(self): | ||
super().__init__(META) | ||
self.eid = None | ||
self.data = collections.defaultdict(lambda: | ||
collections.defaultdict(dict)) | ||
|
||
def init(self, sid, time_resolution): | ||
return self.meta | ||
|
||
def create(self, num, model): | ||
if num > 1 or self.eid is not None: | ||
raise RuntimeError('Can only create one instance of Monitor.') | ||
|
||
self.eid = 'Monitor' | ||
return [{'eid': self.eid, 'type': model}] | ||
|
||
def step(self, time, inputs, max_advance): | ||
data = inputs.get(self.eid, {}) | ||
for attr, values in data.items(): | ||
for src, value in values.items(): | ||
self.data[src][attr][time] = value | ||
|
||
return None | ||
|
||
def finalize(self): | ||
print('Collected data:') | ||
for sim, sim_data in sorted(self.data.items()): | ||
print('- %s:' % sim) | ||
for attr, values in sorted(sim_data.items()): | ||
print(' - %s: %s' % (attr, values)) | ||
|
||
|
||
if __name__ == '__main__': | ||
mosaik_api_v3.start_simulation(Collector()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import mosaik_api_v3 | ||
import importlib.util | ||
|
||
class StateSpaceSimulator(mosaik_api_v3.Simulator): | ||
meta = { | ||
'models': { | ||
'StateSpaceModel': { | ||
'public': True, | ||
'params': [], | ||
'attrs': [], | ||
'trigger': [], | ||
}, | ||
}, | ||
} | ||
|
||
def __init__(self): | ||
super().__init__(meta=self.meta) | ||
self.entities = {} # Holds the model instances | ||
self.model_name = None | ||
self.model_data = None | ||
|
||
def init(self, sid, time_resolution, **sim_params): | ||
print(f"running extra init") | ||
# This is the standard Mosaik init method signature | ||
self.sid = sid | ||
self.time_resolution = time_resolution | ||
|
||
# Assuming sim_params is structured as {'sim_params': {model_name: model_data}} | ||
sim_params = sim_params.get('sim_params', {}) | ||
if len(sim_params) != 1: | ||
raise ValueError("Expected sim_params to contain exactly one model.") | ||
|
||
# Extract the model_name and model_data | ||
self.model_name, self.model_data = next(iter(sim_params.items())) | ||
self.model = self.load_model_class(self.model_data['model_path'], self.model_data['model_type']) | ||
return self.model_data['meta'] | ||
|
||
def load_model_class(self, model_path, model_type): | ||
# Get the module name from the model path, e.g., 'Models/adder_model' -> 'adder_model' | ||
module_name = model_path.replace('/', '.').rstrip('.py').split('.')[-1] | ||
|
||
# Load the module from the specified file path | ||
spec = importlib.util.spec_from_file_location(module_name, model_path) | ||
module = importlib.util.module_from_spec(spec) | ||
spec.loader.exec_module(module) | ||
|
||
# Retrieve the class from the module using the model_type | ||
model_class = getattr(module, model_type) | ||
|
||
return model_class | ||
|
||
def create(self, num, model, **model_params): | ||
if num != 1: | ||
raise ValueError("Can only create one instance of the model.") | ||
entities = [] | ||
|
||
for i in range(num): | ||
eid = f"{self.model_name}" | ||
# Create an instance of the model | ||
model_instance = self.model( | ||
self.model_data['inputs'], | ||
self.model_data['outputs'], | ||
self.model_data['parameters'], | ||
self.model_data['states'], | ||
self.model_data['step_size'], | ||
self.model_data['start_time'] | ||
) | ||
# Store the instance | ||
self.entities[eid] = model_instance | ||
entities.append({'eid': eid, 'type': "Model"}) | ||
|
||
return entities | ||
|
||
def step(self, time, inputs, max_advance): | ||
# Update inputs | ||
for eid, entity_inputs in inputs.items(): | ||
model_instance = self.entities[eid] | ||
for input_name, input_value in entity_inputs.items(): | ||
model_instance.inputs[input_name] = next(iter(input_value.values())) | ||
|
||
# Step all models | ||
for model_instance in self.entities.values(): | ||
model_instance.step() | ||
|
||
return time + self.model_data['step_size'] | ||
|
||
def get_data(self, outputs): | ||
data = {} | ||
|
||
for eid, attrs in outputs.items(): | ||
model_instance = self.entities[eid] | ||
data[eid] = {} | ||
for attr in attrs: | ||
if attr in model_instance.outputs: | ||
data[eid][attr] = model_instance.outputs[attr] | ||
else: | ||
data[eid][attr] = model_instance.states[attr] | ||
|
||
return data | ||
|
||
def main(): | ||
return mosaik_api_v3.start_simulation(StateSpaceSimulator()) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import yaml | ||
from datetime import datetime | ||
import os | ||
|
||
class YamlInterpreter: | ||
def __init__(self, file_path): | ||
self.file_path = file_path | ||
self.yaml_data = self.load_yaml(self.file_path) | ||
|
||
self.load_scenario() # Load scenario settings | ||
self.load_simulators() # Load simulators and models | ||
self.load_connections() # Load connections | ||
self.load_monitors() # Load monitors | ||
|
||
def load_scenario(self): | ||
self.scenario = self.yaml_data.get('scenario', 'DefaultScenario') | ||
self.start_time = self.yaml_data.get('start_time', '2012-01-01 00:00:00') | ||
self.end_time = self.yaml_data.get('end_time', 1440) | ||
|
||
# Convert start_time to datetime if necessary | ||
if isinstance(self.start_time, str): | ||
self.start_time = datetime.fromisoformat(self.start_time) | ||
|
||
def load_simulators(self): | ||
available_models = self.get_model_files() | ||
self.simulators = {} # Key: simulator name, value: simulator data | ||
|
||
for sim_conf in self.yaml_data.get('simulators', []): | ||
model_type = sim_conf['model_type'] | ||
step_size = sim_conf.get('step_size', 1) # Default step size if not specified | ||
model_path = available_models.get(model_type.lower(), None) | ||
|
||
for model_conf in sim_conf.get('models', []): | ||
model_name = model_conf['name'] | ||
|
||
# Collect inputs, outputs, parameters, states, triggers | ||
inputs = model_conf.get('Inputs') or {} | ||
outputs = model_conf.get('Outputs') or {} | ||
parameters = model_conf.get('Parameters') or {} | ||
states = model_conf.get('States') or {} | ||
triggers = model_conf.get('Triggers') or {} | ||
|
||
# Create a model data dictionary | ||
model_data = { | ||
'model_type': model_type, | ||
'inputs': inputs, | ||
'outputs': outputs, | ||
'parameters': parameters, | ||
'states': states, | ||
'triggers': triggers, | ||
'model_path': model_path, | ||
'step_size': step_size, | ||
'start_time': self.start_time, | ||
} | ||
meta = self.generate_meta(model_data) | ||
model_data['meta'] = meta | ||
|
||
self.simulators[model_name] = model_data | ||
|
||
def generate_meta(self, model_data): | ||
inputs = set(model_data['inputs'].keys()) | ||
outputs = set(model_data['outputs'].keys()) | ||
parameters = set(model_data['parameters'].keys()) | ||
states = set(model_data['states'].keys()) | ||
triggers = set(model_data['triggers'].keys()) | ||
|
||
models_meta = { | ||
"Model": { | ||
'public': True, | ||
'params': list(parameters | states), | ||
'attrs': list(inputs | outputs | states | triggers), | ||
'any_inputs': False, | ||
'trigger': list(triggers), | ||
} | ||
} | ||
|
||
meta = { | ||
'api_version': '3.0', | ||
'type': 'hybrid', | ||
'models': models_meta, | ||
} | ||
return meta | ||
|
||
def load_connections(self): | ||
self.connections = [] | ||
for conn_conf in self.yaml_data.get('connections', []): | ||
from_str = conn_conf['from'] | ||
to_str = conn_conf['to'] | ||
|
||
# from: source_model.attribute | ||
# to: dest_model.attribute | ||
from_model_attr = from_str.split('.') | ||
to_model_attr = to_str.split('.') | ||
if len(from_model_attr) != 2 or len(to_model_attr) != 2: | ||
print(f"Invalid connection format: {conn_conf}") | ||
continue | ||
|
||
from_model, from_attr = from_model_attr | ||
to_model, to_attr = to_model_attr | ||
|
||
connection = { | ||
'from_model': from_model, | ||
'from_attr': from_attr, | ||
'to_model': to_model, | ||
'to_attr': to_attr, | ||
} | ||
|
||
self.connections.append(connection) | ||
|
||
def load_monitors(self): | ||
self.monitors = [] | ||
for monitor_str in self.yaml_data.get('monitor', []): | ||
# monitor_str is in the format 'ModelName.Attribute' | ||
model_attr = monitor_str.split('.') | ||
if len(model_attr) != 2: | ||
print(f"Invalid monitor format: {monitor_str}") | ||
continue | ||
|
||
model_name, attr_name = model_attr | ||
monitor = { | ||
'model_name': model_name, | ||
'attribute': attr_name, | ||
} | ||
self.monitors.append(monitor) | ||
|
||
@staticmethod | ||
def get_model_files(models_folder='Models'): | ||
""" | ||
Get a dictionary of available model names and their corresponding file paths. | ||
""" | ||
model_files = {} | ||
# Ensure the Models folder exists | ||
if not os.path.isdir(models_folder): | ||
print(f"The folder '{models_folder}' does not exist.") | ||
return model_files | ||
|
||
# List all files in the Models folder | ||
for filename in os.listdir(models_folder): | ||
# Check if the file matches the pattern *_model.py | ||
if filename.endswith('_model.py'): | ||
# Extract the model_name from the filename | ||
model_name = filename[:-len('_model.py')] | ||
# Get the full file path | ||
file_path = os.path.join(models_folder, filename) | ||
# Add the model_name and file_path to the dictionary | ||
model_files[model_name] = file_path | ||
|
||
return model_files | ||
|
||
@staticmethod | ||
def load_yaml(file_path): | ||
with open(file_path, 'r') as f: | ||
data = yaml.safe_load(f) | ||
return data | ||
|
||
if __name__ == "__main__": | ||
interpreter = YamlInterpreter('config.yaml') | ||
print(interpreter.scenario) | ||
print(interpreter.start_time) | ||
print(interpreter.end_time) | ||
print(interpreter.simulators) | ||
print(interpreter.connections) | ||
print(interpreter.monitors) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import datetime | ||
|
||
class Adder: | ||
def __init__(self, inputs:dict={}, outputs:dict={}, parameters:dict={}, states:dict={}, step_size:int=1, time:datetime.datetime=None): | ||
self.inputs = inputs | ||
self.outputs = outputs | ||
self.parameters = parameters | ||
self.states = states | ||
self.step_size = step_size | ||
self.time = time | ||
|
||
def step(self): | ||
""" | ||
A simple adder model. | ||
The output "Out1" is the sum of inputs "In1" and "In2". | ||
""" | ||
self.outputs["Out1"] = self.inputs["In1"] + self.inputs["In2"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import datetime | ||
|
||
class Multiplier: | ||
def __init__(self, inputs:dict={}, outputs:dict={}, parameters:dict={}, states:dict={}, step_size:int=1, time:datetime.datetime=None): | ||
self.inputs = inputs | ||
self.outputs = outputs | ||
self.parameters = parameters | ||
self.states = states | ||
self.step_size = step_size | ||
self.time = time | ||
|
||
def step(self): | ||
""" | ||
A simple multiplier model. | ||
Every step, it multiplies its internal product (state) by a multiplier (parameter). | ||
The product is then set to "Out1" output. | ||
""" | ||
self.states["Product"] = self.states["Product"] + self.parameters["Multiplier"] | ||
self.outputs["Out1"] = self.states["Product"] | ||
|
Oops, something went wrong.