diff --git a/process_bigraph/composite.py b/process_bigraph/composite.py index 1df335f..1f6d420 100644 --- a/process_bigraph/composite.py +++ b/process_bigraph/composite.py @@ -310,6 +310,12 @@ def __init__(self, config=None, core=None): self.config_schema, config) + # TODO: validate your config after filling, report if anything + # is off + # print(self.core.validate_state( + # self.config_schema, + # config)) + def initial_state(self): return {} @@ -594,8 +600,17 @@ class Composite(Process): 'bridge': { 'inputs': 'wires', 'outputs': 'wires'}, - 'global_time_precision': 'maybe[float]', - } + 'emitter': { + 'path': { + '_type': 'path', + '_default': ['emitter']}, + 'address': { + '_type': 'string', + '_default': 'local:ram-emitter'}, + 'config': 'tree[any]', + 'mode': 'emitter_mode', + 'emit': 'wires'}, + 'global_time_precision': 'maybe[float]'} def __init__(self, config=None, core=None): @@ -665,6 +680,11 @@ def __init__(self, config=None, core=None): self.global_time_precision = self.config[ 'global_time_precision'] + emitter_config = self.config.get('emitter') + if emitter_config and not emitter_config.get('mode', 'none') == 'none': + self.add_emitter( + emitter_config) + self.step_triggers = {} for step_path, step in self.step_paths.items(): @@ -730,12 +750,68 @@ def outputs(self): return self.process_schema.get('outputs', {}) + def read_emitter_config(self, emitter_config): + address = emitter_config.get('address', 'local:ram-emitter') + config = emitter_config.get('config', {}) + mode = emitter_config.get('mode', 'none') + + if mode == 'all': + inputs = { + key: [emitter_config.get('inputs', {}).get(key, key)] + for key in self.state.keys() + if not is_schema_key(key)} + + elif mode == 'none': + inputs = emitter_config.get('emit', {}) + + elif mode == 'bridge': + inputs = {} + + elif mode == 'ports': + inputs = {} + + if not 'emit' in config: + config['emit'] = { + input: 'any' + for input in inputs} + + return { + '_type': 'step', + 'address': address, + 'config': config, + 'inputs': inputs} + + + def add_emitter(self, emitter_config): + path = tuple(emitter_config['path']) + + step_config = self.read_emitter_config(emitter_config) + emitter = set_path( + {}, path, step_config) + self.merge(emitter) + _, instance = self.core.slice( + self.composition, + self.state, + path) + + self.emitter_paths[path] = instance + self.step_paths[path] = instance + + + # TODO: merge needs to be schema aware, + # and since the results of the merge may + # entail a schema update, we need to return + # the new schema def merge(self, initial_state): self.state = self.core.merge( self.composition, self.state, initial_state) + self.composition, self.state = self.core.complete( + self.composition, + self.state) + def process_update( self, @@ -1074,6 +1150,9 @@ def gather_results(self, queries=None): emitter = get_path(self.state, path) results[path] = emitter['instance'].query(query) + # TODO: unnest the results? + # TODO: allow the results to be transposed + return results def update(self, state, interval): @@ -1163,6 +1242,9 @@ def query(self, query=None): return result +# def StateEmitter(Emitter): + + # def test_emitter(): # composite = Composite({}) diff --git a/process_bigraph/experiments/comets.py b/process_bigraph/experiments/comets.py index ed3bbfb..fe39cfe 100644 --- a/process_bigraph/experiments/comets.py +++ b/process_bigraph/experiments/comets.py @@ -465,28 +465,39 @@ def run_comets(): 'fields': ['fields'] } }, - 'emitter': { - '_type': 'step', - 'address': 'local:ram-emitter', - 'config': { - 'emit': { - 'fields': 'map', - 'time': 'float', - } - }, - 'inputs': { - 'fields': ['fields'], - 'time': ['global_time'] - } - } + # 'emitter': { + # '_type': 'step', + # 'address': 'local:ram-emitter', + # 'config': { + # 'emit': { + # 'fields': 'map', + # 'time': 'float', + # } + # }, + # 'inputs': { + # 'fields': ['fields'], + # 'time': ['global_time'] + # } + # } } - sim = Composite({'state': composite_state}, core=core) + sim = Composite({ + 'state': composite_state, + 'emitter': { + 'mode': 'all'}}, core=core) - sim.update({}, 100.0) + # TODO: this should fail validation + # sim = Composite({ + # 'state': composite_state, + # 'emitter': { + # 'mode': 'pluto'}}, core=core) + + sim.update({}, 10.0) results = sim.gather_results() + import ipdb; ipdb.set_trace() + print(results) diff --git a/process_bigraph/process_types.py b/process_bigraph/process_types.py index 72e719a..d5230ad 100644 --- a/process_bigraph/process_types.py +++ b/process_bigraph/process_types.py @@ -168,6 +168,9 @@ def deserialize_step(schema, encoded, core): '_type': 'protocol', '_inherit': 'string'}, + # TODO: have the default enum be the first option + 'emitter_mode': 'enum[none,all,paths,bridge,port]', + 'interval': { '_type': 'interval', '_inherit': 'float', @@ -188,6 +191,7 @@ def deserialize_step(schema, encoded, core): 'address': 'protocol', 'config': 'tree[any]'}, + # TODO: slice process to allow for navigating through a port 'process': { '_type': 'process', '_inherit': 'edge', diff --git a/process_bigraph/tests.py b/process_bigraph/tests.py index e883055..ee7629c 100644 --- a/process_bigraph/tests.py +++ b/process_bigraph/tests.py @@ -384,6 +384,66 @@ def test_reaction(): 'inner': ['inner']}}}}}} +def test_emitter(core): + composite_schema = { + 'bridge': { + 'inputs': { + 'DNA': ['DNA'], + 'mRNA': ['mRNA']}, + 'outputs': { + 'DNA': ['DNA'], + 'mRNA': ['mRNA']}}, + + 'state': { + 'interval': { + '_type': 'step', + 'address': 'local:!process_bigraph.experiments.minimal_gillespie.GillespieInterval', + 'config': {'ktsc': '6e0'}, + 'inputs': { + 'DNA': ['DNA'], + 'mRNA': ['mRNA']}, + 'outputs': { + 'interval': ['event', 'interval']}}, + + 'event': { + '_type': 'process', + 'address': 'local:!process_bigraph.experiments.minimal_gillespie.GillespieEvent', + 'config': {'ktsc': 6e0}, + 'inputs': { + 'DNA': ['DNA'], + 'mRNA': ['mRNA']}, + 'outputs': { + 'mRNA': ['mRNA']}, + 'interval': '3.0'}}, + + 'emitter': { + 'emit': { + 'time': ['global_time'], + 'mRNA': ['mRNA'], + 'interval': ['event', 'interval']}}} + + gillespie = Composite( + composite_schema, + core=core) + + updates = gillespie.update({ + 'DNA': { + 'A gene': 11.0, + 'B gene': 5.0}, + 'mRNA': { + 'A mRNA': 33.3, + 'B mRNA': 2.1}}, + 1000.0) + + # TODO: make this work + results = gillespie.gather_results() + + assert 'mRNA' in updates[0] + # TODO: support omit as well as emit + + + + if __name__ == '__main__': core = ProcessTypes() @@ -394,4 +454,5 @@ def test_reaction(): test_infer(core) test_step_initialization(core) test_dependencies(core) + test_emitter(core) # test_reaction()