diff --git a/process_bigraph/emitter.py b/process_bigraph/emitter.py index 2a45b83..73c1f65 100644 --- a/process_bigraph/emitter.py +++ b/process_bigraph/emitter.py @@ -266,48 +266,6 @@ def format_data( } } - def emit(self, data: Dict[str, Any] = None, **unformatted_data) -> None: - """Checks that the data size of the passed `data` is less than the emit limit, which can be - derived from `self.config['emit_limit'] and then emits data to the MongoDB collection read in from - the passed `data['table']`, which is the `table_id` of the given collection. - - Args: - data:`data: Dict[str, Union[str, Dict[str, Union[int, str, NoneType, Dict]]]]`: data to be saved - to the collection. The `NoneType` in the type annotations accounts for `time` (timestamp) not - being passed. Defaults to `None` in which case `**unformatted_data must be passed`. - If passing `data`, the type annotation provided suggests the following schema for this argument: - data = { - 'table': table(collection) id to which data will be saved, - 'data': { - 'time': timestamp of simulation by which to slice and retrieve the collection data, - 'values': {value name: value} - } - } - - **unformatted_data:`Dict[str, Union[str, Dict]]`: args to pass as input for `self.format_data` which - are as follows: table_id(str), time(int, str), values({simulation values}) - """ - if not data: - data = self.format_data( - unformatted_data['table_id'], - unformatted_data['time'], - values=unformatted_data['values'] - ) - table_id = data['table'] - table = self.db.get_collection(table_id) - time = data['data'].pop('time', None) - data['data'] = assoc_path({}, self.embed_path, data['data']) - # Analysis scripts expect the time to be at the top level of the - # dictionary, but some emits, like configuration emits, lack a - # time key. - if time is not None: - data['data']['time'] = time - emit_data = data.copy() - emit_data.pop('table', None) - emit_data['experiment_id'] = self.experiment_id - print(f'the emit data: {emit_data}') - self.write_emit(table, emit_data) - def write_emit(self, table: Collection, emit_data: Dict[str, Any]) -> None: """Check that data size is less than emit limit. Break up large emits into smaller pieces and emit them individually. @@ -340,18 +298,6 @@ def write_emit(self, table: Collection, emit_data: Dict[str, Any]) -> None: d['data']['time'] = time table.insert_one(d) - def get_data(self, query: Optional[List[Tuple[str]]] = None) -> Dict: - """Get data based on the passed query. - - Args: - query: a list of tuples pointing to fields within the experiment data. - In the format: [('path', 'to', 'field1'), ('path', 'to', 'field2')] - - Returns: - `Dict` - """ - return get_history_data_db(self.history, self.experiment_id, query) - def query(self, query: Optional[List[Tuple[str]]] = None) -> Dict: """API contract-wrapper for `self.get_data`. Get data based on the passed query. @@ -362,7 +308,7 @@ def query(self, query: Optional[List[Tuple[str]]] = None) -> Dict: Returns: `Dict` """ - return self.get_data(query) + return get_history_data_db(self.history, self.experiment_id, query) def update(self, state): table_id = state['table'] diff --git a/process_bigraph/experiments/minimal_gillespie.py b/process_bigraph/experiments/minimal_gillespie.py index 95ed853..b89e2b7 100644 --- a/process_bigraph/experiments/minimal_gillespie.py +++ b/process_bigraph/experiments/minimal_gillespie.py @@ -211,7 +211,7 @@ def test_gillespie_composite(): 'emitter': { '_type': 'step', - 'address': 'local:ram-emitter', + 'address': 'local:database-emitter', 'config': { 'ports': { 'inputs': {