diff --git a/tests/scheduling/conftest.py b/tests/scheduling/conftest.py index 350e094..5e894bc 100644 --- a/tests/scheduling/conftest.py +++ b/tests/scheduling/conftest.py @@ -1,8 +1,27 @@ import types -import psyneulink as pnl import pytest +import graph_scheduler as gs + + +class SimpleTestNode(types.SimpleNamespace): + def __init__(self, name=None, is_finished_flag=True, value=0): + super().__init__(name=name, is_finished_flag=is_finished_flag, value=value) + + def __hash__(self): + return id(self) + + def __repr__(self): + return str(self.name) + + def is_finished(self, execution_id): + return self.is_finished_flag + + def add(self, n=1): + self.value += n + return self.value + def pytest_assertrepr_compare(op, left, right): if isinstance(left, list) and isinstance(right, list) and op == '==': @@ -37,12 +56,25 @@ def create_node(function=lambda x: x): @pytest.fixture -def three_node_linear_composition(): - A = pnl.TransferMechanism(name='A') - B = pnl.TransferMechanism(name='B') - C = pnl.TransferMechanism(name='C') +def three_node_linear_scheduler(): + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') - comp = pnl.Composition() - comp.add_linear_processing_pathway([A, B, C]) + sched = gs.Scheduler(graph=create_graph_from_pathways([A, B, C])) + + return sched.nodes, sched + + +@pytest.helpers.register +def get_test_node(): + return SimpleTestNode + + +@pytest.helpers.register +def run_scheduler(scheduler, func=lambda test_node: test_node, **run_kwargs): + for execution_set in scheduler.run(**run_kwargs): + for node in execution_set: + func(node) - return comp.nodes, comp + return [node.value for node in scheduler.nodes] diff --git a/tests/scheduling/test_condition.py b/tests/scheduling/test_condition.py index fb798e4..ad73fdd 100644 --- a/tests/scheduling/test_condition.py +++ b/tests/scheduling/test_condition.py @@ -1,13 +1,14 @@ import logging -import graph_scheduler as gs import numpy as np -import psyneulink as pnl import pytest +import graph_scheduler as gs logger = logging.getLogger(__name__) +SimpleTestNode = pytest.helpers.get_test_node() + class TestConditionSet: # maintain through v1.x @@ -37,27 +38,27 @@ def test_keyword_arg(self, conds): class TestCondition: def test_invalid_input_WhenFinished(self): - with pytest.raises(pnl.ConditionError): - pnl.WhenFinished(None).is_satisfied() + with pytest.raises(gs.ConditionError): + gs.WhenFinished(None).is_satisfied() def test_invalid_input_WhenFinishedAny_1(self): - with pytest.raises(pnl.ConditionError): - pnl.WhenFinished(None).is_satisfied() + with pytest.raises(gs.ConditionError): + gs.WhenFinished(None).is_satisfied() def test_invalid_input_WhenFinishedAny_2(self): - with pytest.raises(pnl.ConditionError): - pnl.WhenFinished({None}).is_satisfied() + with pytest.raises(gs.ConditionError): + gs.WhenFinished({None}).is_satisfied() def test_invalid_input_WhenFinishedAll_1(self): - with pytest.raises(pnl.ConditionError): - pnl.WhenFinished(None).is_satisfied() + with pytest.raises(gs.ConditionError): + gs.WhenFinished(None).is_satisfied() def test_invalid_input_WhenFinishedAll_2(self): - with pytest.raises(pnl.ConditionError): - pnl.WhenFinished({None}).is_satisfied() + with pytest.raises(gs.ConditionError): + gs.WhenFinished({None}).is_satisfied() def test_additional_args(self): - class OneSatisfied(pnl.Condition): + class OneSatisfied(gs.Condition): def __init__(self, a): def func(a, b): return a or b @@ -72,7 +73,7 @@ def func(a, b): assert not cond.is_satisfied(False) def test_additional_kwargs(self): - class OneSatisfied(pnl.Condition): + class OneSatisfied(gs.Condition): def __init__(self, a, c=True): def func(a, b, c=True): return a or b or c @@ -94,104 +95,84 @@ def func(a, b, c=True): assert not cond.is_satisfied(False, c=False) assert not cond.is_satisfied(False, c=False, extra_arg=True) - @pytest.mark.psyneulink class TestGeneric: def test_WhileNot_AtPass(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.WhileNot(lambda sched: sched.get_clock(sched.default_execution_id).get_total_times_relative(pnl.TimeScale.PASS, pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) == 0, sched)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.WhileNot(lambda sched: sched.get_clock(sched.default_execution_id).get_total_times_relative(gs.TimeScale.PASS, gs.TimeScale.ENVIRONMENT_STATE_UPDATE) == 0, sched)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhileNot_AtPass_in_middle(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.WhileNot(lambda sched: sched.get_clock(sched.default_execution_id).get_total_times_relative(pnl.TimeScale.PASS, pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) == 2, sched)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.WhileNot(lambda sched: sched.get_clock(sched.default_execution_id).get_total_times_relative(gs.TimeScale.PASS, gs.TimeScale.ENVIRONMENT_STATE_UPDATE) == 2, sched)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, set(), A, A] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink class TestRelative: def test_Any_end_before_one_finished(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - for m in [A]: - comp.add_node(m) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) - sched.add_condition(A, pnl.EveryNPasses(1)) + sched.add_condition(A, gs.EveryNPasses(1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.Any(pnl.AfterNCalls(A, 10), pnl.AtPass(5)) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.Any(gs.AfterNCalls(A, 10), gs.AtPass(5)) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A for _ in range(5)] assert output == pytest.helpers.setify_expected_output(expected_output) def test_All_end_after_one_finished(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - for m in [A]: - comp.add_node(m) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) - sched.add_condition(A, pnl.EveryNPasses(1)) + sched.add_condition(A, gs.EveryNPasses(1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.Any(pnl.AfterNCalls(A, 5), pnl.AtPass(10)) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.Any(gs.AfterNCalls(A, 5), gs.AtPass(10)) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A for _ in range(5)] assert output == pytest.helpers.setify_expected_output(expected_output) def test_Not_AtPass(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Not(pnl.AtPass(0))) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.Not(gs.AtPass(0))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_Not_AtPass_in_middle(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Not(pnl.AtPass(2))) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.Not(gs.AtPass(2))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, set(), A, A] @@ -205,291 +186,223 @@ def test_Not_AtPass_in_middle(self): ] ) def test_NWhen_AfterNCalls(self, n, expected_output): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Always()) - sched.add_condition(B, pnl.NWhen(pnl.AfterNCalls(A, 3), n)) + A = 'A' + B = 'B' + sched = gs.Scheduler(graph={'A': set(), 'B': {'A'}}) + sched.add_condition(A, gs.Always()) + sched.add_condition(B, gs.NWhen(gs.AfterNCalls(A, 3), n)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(A, 6) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(A, 6) output = list(sched.run(termination_conds=termination_conds)) - expected_output = [A if x == 'A' else B for x in expected_output] - assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink class TestTimePNL: def test_BeforeConsiderationSetExecution(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.BeforeConsiderationSetExecution(2)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.BeforeConsiderationSetExecution(2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_BeforeConsiderationSetExecution_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(name='B') - comp.add_node(A) - comp.add_node(B) - - comp.add_projection(pnl.MappingProjection(), A, B) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.BeforeConsiderationSetExecution(2)) - sched.add_condition(B, pnl.Always()) + A = 'A' + B = 'B' + sched = gs.Scheduler(graph={'A': set(), 'B': {'A'}}) + sched.add_condition(A, gs.BeforeConsiderationSetExecution(2)) + sched.add_condition(B, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, B, B, B, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtConsiderationSetExecution(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtConsiderationSetExecution(0)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.AtConsiderationSetExecution(0)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, set(), set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_BeforePass(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.BeforePass(2)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.BeforePass(2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtPass(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtPass(0)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AtPass(0)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, set(), set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtPass_underconstrained(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtPass(0)) - sched.add_condition(B, pnl.Always()) - sched.add_condition(C, pnl.Always()) + A = 'A' + B = 'B' + C = 'C' + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {B}}) + sched.add_condition(A, gs.AtPass(0)) + sched.add_condition(B, gs.Always()) + sched.add_condition(C, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 2) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 2) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, C, B, C] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtPass_in_middle(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtPass(2)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AtPass(2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), set(), A, set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtPass_at_end(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtPass(5)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AtPass(5)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), set(), set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtPass_after_end(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AtPass(6)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AtPass(6)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), set(), set(), set(), set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterPass(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AfterPass(0)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AfterPass(0)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterNPasses(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AfterNPasses(1)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AfterNPasses(1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_BeforeEnvironmentStateUpdate(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.BeforeEnvironmentStateUpdate(4)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.BeforeEnvironmentStateUpdate(4)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(5) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(1) - comp.run( - inputs={A: range(6)}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp.default_execution_id] + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(5) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(1) + for _ in range(6): + list(sched.run(termination_conds)) + output = sched.execution_list[None] expected_output = [A, A, A, A, set()] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AtEnvironmentStateUpdate(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Always()) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AtEnvironmentStateUpdate(4) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(1) - comp.run( - inputs={A: range(6)}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp.default_execution_id] + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AtEnvironmentStateUpdate(4) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(1) + for _ in range(5): + list(sched.run(termination_conds)) + output = sched.execution_list[None] expected_output = [A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterEnvironmentStateUpdate(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Always()) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterEnvironmentStateUpdate(4) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(1) - comp.run( - inputs={A: range(6)}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp.default_execution_id] + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterEnvironmentStateUpdate(4) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(1) + for _ in range(6): + list(sched.run(termination_conds)) + output = sched.execution_list[None] expected_output = [A, A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterNEnvironmentStateUpdates(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.AfterNPasses(1)) + A = 'A' + sched = gs.Scheduler(graph={A: set()}) + sched.add_condition(A, gs.AfterNPasses(1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [set(), A, A, A, A] @@ -503,8 +416,8 @@ class TestTime: pytest.param( gs.AfterNPasses(1), { - pnl.TimeScale.ENVIRONMENT_SEQUENCE: gs.AfterNEnvironmentStateUpdates(1), - pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterConsiderationSetExecution(4) + gs.TimeScale.ENVIRONMENT_SEQUENCE: gs.AfterNEnvironmentStateUpdates(1), + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterConsiderationSetExecution(4) }, [set(), 'A', 'A', 'A', 'A'], id='AfterConsiderationSetExecution' @@ -512,8 +425,8 @@ class TestTime: pytest.param( gs.AfterNPasses(1), { - pnl.TimeScale.ENVIRONMENT_SEQUENCE: gs.AfterNEnvironmentStateUpdates(1), - pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNConsiderationSetExecutions(5) + gs.TimeScale.ENVIRONMENT_SEQUENCE: gs.AfterNEnvironmentStateUpdates(1), + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNConsiderationSetExecutions(5) }, [set(), 'A', 'A', 'A', 'A'], id='AfterNConsiderationSetExecutions' @@ -536,7 +449,7 @@ def test_single_node( [ pytest.param( gs.AtEnvironmentStateUpdateNStart(2), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [[set(), set(), 'A', set()]], 1, 4, @@ -544,7 +457,7 @@ def test_single_node( ), pytest.param( gs.AtEnvironmentSequence(4), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [[set()], [set()], [set()], [set()], ['A'], [set()]], 6, 1, @@ -552,7 +465,7 @@ def test_single_node( ), pytest.param( gs.AfterEnvironmentSequence(3), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [[set()], [set()], [set()], [set()], ['A'], ['A']], 6, 1, @@ -560,7 +473,7 @@ def test_single_node( ), pytest.param( gs.AfterNEnvironmentSequences(4), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [[set()], [set()], [set()], [set()], ['A'], ['A']], 6, 1, @@ -568,7 +481,7 @@ def test_single_node( ), pytest.param( gs.AtEnvironmentSequenceStart(), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [['A', set()], ['A', set()]], 2, 2, @@ -576,7 +489,7 @@ def test_single_node( ), pytest.param( gs.AtEnvironmentSequenceNStart(1), - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNPasses(1)}, [[set(), set()], ['A', set()], [set(), set()]], 3, 2, @@ -610,20 +523,16 @@ def test_single_node_n_sequences( for i in range(n_sequences): assert output[i] == pytest.helpers.setify_expected_output(expected_output[i]), f'ENVIRONMENT_SEQUENCE {i}' - @pytest.mark.psyneulink class TestComponentBased: def test_BeforeNCalls(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.BeforeNCalls(A, 3)) + A = 'A' + sched = gs.Scheduler(graph={'A': set()}) + sched.add_condition(A, gs.BeforeNCalls(A, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, set(), set()] @@ -635,138 +544,109 @@ def test_BeforeNCalls(self): # A fix could invalidate key assumptions and affect many other conditions # Since this condition is unlikely to be used, it's best to leave it for now # def test_AtCall(self): - # comp = pnl.Composition() - # A = pnl.TransferMechanism(function = pnl.Linear(slope=5.0, intercept = 2.0), name = 'A') - # B = pnl.TransferMechanism(function = pnl.Linear(intercept = 4.0), name = 'B') - # C = pnl.TransferMechanism(function = pnl.Linear(intercept = 1.5), name = 'C') - # for m in [A,B]: - # comp.add_node(m) + # A = 'A' + # B = 'B' - # sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - # sched.add_condition(A, pnl.Always()) - # sched.add_condition(B, AtCall(A, 3)) + # sched = gs.Scheduler(graph={A: set(), B: set()}) + # sched.add_condition(A, gs.Always()) + # sched.add_condition(B, gs.AtCall(A, 3)) # termination_conds = {} - # termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - # termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + # termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + # termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) # output = list(sched.run(termination_conds=termination_conds)) # expected_output = [A, A, set([A, B]), A, A] # assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterCall(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - for m in [A, B]: - comp.add_node(m) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(B, pnl.AfterCall(A, 3)) + A = 'A' + B = 'B' + sched = gs.Scheduler(graph={A: set(), B: set()}) + sched.add_condition(B, gs.AfterCall(A, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, set([A, B]), set([A, B])] assert output == pytest.helpers.setify_expected_output(expected_output) def test_AfterNCalls(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - for m in [A, B]: - comp.add_node(m) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(A, pnl.Always()) - sched.add_condition(B, pnl.AfterNCalls(A, 3)) + A = 'A' + B = 'B' + sched = gs.Scheduler(graph={A: set(), B: set()}) + sched.add_condition(A, gs.Always()) + sched.add_condition(B, gs.AfterNCalls(A, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, set([A, B]), set([A, B]), set([A, B])] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink class TestConvenience: def test_AtEnvironmentStateUpdateStart(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(name='A') - B = pnl.TransferMechanism(name='B') - comp.add_linear_processing_pathway([A, B]) + A = 'A' + B = 'B' - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - sched.add_condition(B, pnl.AtEnvironmentStateUpdateStart()) + sched = gs.Scheduler(graph={A: set(), B: {A}}) + sched.add_condition(B, gs.AtEnvironmentStateUpdateStart()) termination_conds = { - pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AtPass(3) + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AtPass(3) } output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink def test_composite_condition_multi(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.All( - pnl.Any( - pnl.AfterPass(6), - pnl.AfterNCalls(B, 2) + A = 'A' + B = 'B' + C = 'C' + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {B}}) + + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.All( + gs.Any( + gs.AfterPass(6), + gs.AfterNCalls(B, 2) ), - pnl.Any( - pnl.AfterPass(2), - pnl.AfterNCalls(B, 3) + gs.Any( + gs.AfterPass(2), + gs.AfterNCalls(B, 3) ) ) ) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 3) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 3) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ A, A, B, A, A, B, C, A, C, A, B, C ] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink def test_AfterNCallsCombined(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = True - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 2)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCallsCombined(B, C, n=4) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCallsCombined(B, C, n=4) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -774,27 +654,19 @@ def test_AfterNCallsCombined(self): ] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink def test_AllHaveRun(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 2)) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') + C = SimpleTestNode('C') + + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AllHaveRun(A, B, C) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AllHaveRun(A, B, C) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -802,25 +674,19 @@ def test_AllHaveRun(self): ] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink def test_AllHaveRun_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 2)) + A = 'A' + B = 'B' + C = 'C' + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {B}}) + + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AllHaveRun() + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AllHaveRun() output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -828,66 +694,50 @@ def test_AllHaveRun_2(self): ] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink @pytest.mark.parametrize( 'parameter, indices, default_variable, integration_rate, expected_results', [ - ('value', None, None, 1, [[[10]]]), - ('value', (0, 0), [[0, 0]], [1, 2], [[[10, 20]]]), - ('value', (0, 1), [[0, 0]], [1, 2], [[[5, 10]]]), - ('num_executions', pnl.TimeScale.TRIAL, None, 1, [[[10]]]), + ('value', None, None, 1, [10]), ] ) @pytest.mark.parametrize('threshold', [10, 10.0]) def test_Threshold_parameters( self, parameter, indices, default_variable, integration_rate, expected_results, threshold, ): + A = SimpleTestNode('A') + sched = gs.Scheduler(graph={A: set()}) - A = pnl.TransferMechanism( - default_variable=default_variable, - integrator_mode=True, - integrator_function=pnl.SimpleIntegrator, - integration_rate=integration_rate, - ) - comp = pnl.Composition(pathways=[A]) - - comp.termination_processing = { - pnl.TimeScale.TRIAL: gs.Threshold(A, parameter, threshold, '>=', indices=indices) + sched.termination_conds = { + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.Threshold(A, parameter, threshold, '>=', indices=indices) } - comp.run(inputs={A: np.ones(A.defaults.variable.shape)}) - - np.testing.assert_array_equal(comp.results, expected_results) + results = pytest.helpers.run_scheduler(sched, lambda node: node.add(integration_rate)) + np.testing.assert_array_equal(results, expected_results) - @pytest.mark.psyneulink @pytest.mark.parametrize( 'comparator, increment, threshold, expected_results', [ - ('>', 1, 5, [[[6]]]), - ('>=', 1, 5, [[[5]]]), - ('<', -1, -5, [[[-6]]]), - ('<=', -1, -5, [[[-5]]]), - ('==', 1, 5, [[[5]]]), - ('!=', 1, 0, [[[1]]]), - ('!=', -1, 0, [[[-1]]]), + ('>', 1, 5, [6]), + ('>=', 1, 5, [5]), + ('<', -1, -5, [-6]), + ('<=', -1, -5, [-5]), + ('==', 1, 5, [5]), + ('!=', 1, 0, [1]), + ('!=', -1, 0, [-1]), ] ) def test_Threshold_comparators( self, comparator, increment, threshold, expected_results ): - A = pnl.TransferMechanism( - integrator_mode=True, - integrator_function=pnl.AccumulatorIntegrator(rate=1, increment=increment), - ) - comp = pnl.Composition(pathways=[A]) + A = SimpleTestNode('A') + sched = gs.Scheduler(graph={A: set()}) - comp.termination_processing = { - pnl.TimeScale.TRIAL: gs.Threshold(A, 'value', threshold, comparator) + sched.termination_conds = { + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.Threshold(A, 'value', threshold, comparator) } - comp.run(inputs={A: np.ones(A.defaults.variable.shape)}) - - np.testing.assert_array_equal(comp.results, expected_results) + results = pytest.helpers.run_scheduler(sched, lambda node: node.add(increment)) + np.testing.assert_array_equal(results, expected_results) def test_Threshold_custom_methods(self): class CustomDependency: @@ -911,76 +761,48 @@ def has_parameter(self, parameter): assert d.parameters['a'] == 3 - @pytest.mark.psyneulink @pytest.mark.parametrize( 'comparator, increment, threshold, atol, rtol, expected_results', [ - ('==', 1, 10, 1, 0.1, [[[8]]]), - ('==', 1, 10, 1, 0, [[[9]]]), - ('==', 1, 10, 0, 0.1, [[[9]]]), - ('!=', 1, 2, 1, 0.5, [[[5]]]), - ('!=', 1, 1, 1, 0, [[[3]]]), - ('!=', 1, 1, 0, 1, [[[3]]]), - ('!=', -1, -2, 1, 0.5, [[[-5]]]), - ('!=', -1, -1, 1, 0, [[[-3]]]), - ('!=', -1, -1, 0, 1, [[[-3]]]), + ('==', 1, 10, 1, 0.1, [8]), + ('==', 1, 10, 1, 0, [9]), + ('==', 1, 10, 0, 0.1, [9]), + ('!=', 1, 2, 1, 0.5, [5]), + ('!=', 1, 1, 1, 0, [3]), + ('!=', 1, 1, 0, 1, [3]), + ('!=', -1, -2, 1, 0.5, [-5]), + ('!=', -1, -1, 1, 0, [-3]), + ('!=', -1, -1, 0, 1, [-3]), ] ) def test_Threshold_tolerances( self, comparator, increment, threshold, atol, rtol, expected_results ): - A = pnl.TransferMechanism( - integrator_mode=True, - integrator_function=pnl.AccumulatorIntegrator(rate=1, increment=increment), - ) - comp = pnl.Composition(pathways=[A]) + A = SimpleTestNode('A') + sched = gs.Scheduler(graph={A: set()}) - comp.termination_processing = { - pnl.TimeScale.TRIAL: gs.Threshold(A, 'value', threshold, comparator, atol=atol, rtol=rtol) + sched.termination_conds = { + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.Threshold(A, 'value', threshold, comparator, atol=atol, rtol=rtol) } - comp.run(inputs={A: np.ones(A.defaults.variable.shape)}) + results = pytest.helpers.run_scheduler(sched, lambda node: node.add(increment)) + np.testing.assert_array_equal(results, expected_results) - np.testing.assert_array_equal(comp.results, expected_results) - -@pytest.mark.psyneulink class TestWhenFinished: - - @classmethod - def setup_class(self): - self.orig_is_finished_flag = pnl.TransferMechanism.is_finished_flag - self.orig_is_finished = pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = True - pnl.TransferMechanism.is_finished = lambda self, context: self.is_finished_flag - - @classmethod - def teardown_class(self): - del pnl.TransferMechanism.is_finished_flag - del pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = self.orig_is_finished_flag - pnl.TransferMechanism.is_finished = self.orig_is_finished - def test_WhenFinishedAny_1(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = True - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNPasses(1)) - sched.add_condition(C, pnl.WhenFinishedAny(A, B)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNPasses(1)) + sched.add_condition(C, gs.WhenFinishedAny(A, B)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 1) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 1) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ set([A, B]), C @@ -988,25 +810,18 @@ def test_WhenFinishedAny_1(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhenFinishedAny_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNPasses(1)) - sched.add_condition(C, pnl.WhenFinishedAny(A, B)) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') + C = SimpleTestNode('C') + + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNPasses(1)) + sched.add_condition(C, gs.WhenFinishedAny(A, B)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(A, 5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(A, 5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ set([A, B]), C, set([A, B]), C, set([A, B]), C, set([A, B]), C, set([A, B]) @@ -1014,24 +829,18 @@ def test_WhenFinishedAny_2(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhenFinishedAny_noargs(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - m.is_finished_flag = False - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.Always()) - sched.add_condition(B, pnl.Always()) - sched.add_condition(C, pnl.Always()) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B', False) + C = SimpleTestNode('C', False) + + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.Always()) + sched.add_condition(B, gs.Always()) + sched.add_condition(C, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.WhenFinishedAny() + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.WhenFinishedAny() output = [] i = 0 for step in sched.run(termination_conds=termination_conds): @@ -1048,25 +857,17 @@ def test_WhenFinishedAny_noargs(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhenFinishedAll_1(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = True - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNPasses(1)) - sched.add_condition(C, pnl.WhenFinishedAll(A, B)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNPasses(1)) + sched.add_condition(C, gs.WhenFinishedAll(A, B)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 1) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 1) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ set([A, B]), C @@ -1074,25 +875,17 @@ def test_WhenFinishedAll_1(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhenFinishedAll_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - B.is_finished_flag = True - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNPasses(1)) - sched.add_condition(C, pnl.WhenFinishedAll(A, B)) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNPasses(1)) + sched.add_condition(C, gs.WhenFinishedAll(A, B)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(A, 5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(A, 5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ set([A, B]), set([A, B]), set([A, B]), set([A, B]), set([A, B]), @@ -1100,24 +893,18 @@ def test_WhenFinishedAll_2(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_WhenFinishedAll_noargs(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='C') - for m in [A, B, C]: - comp.add_node(m) - m.is_finished_flag = False - comp.add_projection(pnl.MappingProjection(), A, C) - comp.add_projection(pnl.MappingProjection(), B, C) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.Always()) - sched.add_condition(B, pnl.Always()) - sched.add_condition(C, pnl.Always()) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B', False) + C = SimpleTestNode('C', False) + + sched = gs.Scheduler(graph={A: set(), B: set(), C: {A, B}}) + sched.add_condition(A, gs.Always()) + sched.add_condition(B, gs.Always()) + sched.add_condition(C, gs.Always()) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.WhenFinishedAll() + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.WhenFinishedAll() output = [] i = 0 for step in sched.run(termination_conds=termination_conds): @@ -1134,59 +921,56 @@ def test_WhenFinishedAll_noargs(self): assert output == pytest.helpers.setify_expected_output(expected_output) -@pytest.mark.psyneulink class TestAbsolute: - A = pnl.TransferMechanism(name='scheduler-pytests-A') - B = pnl.TransferMechanism(name='scheduler-pytests-B') - C = pnl.TransferMechanism(name='scheduler-pytests-C') + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') @pytest.mark.parametrize( 'conditions, termination_conds', [ ( - {A: pnl.TimeInterval(repeat=8), B: pnl.TimeInterval(repeat=4), C: pnl.TimeInterval(repeat=2)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(A, 2)}, + {A: gs.TimeInterval(repeat=8), B: gs.TimeInterval(repeat=4), C: gs.TimeInterval(repeat=2)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(A, 2)}, ), ( - {A: pnl.TimeInterval(repeat=5), B: pnl.TimeInterval(repeat=3), C: pnl.TimeInterval(repeat=1)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(A, 2)}, + {A: gs.TimeInterval(repeat=5), B: gs.TimeInterval(repeat=3), C: gs.TimeInterval(repeat=1)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(A, 2)}, ), ( - {A: pnl.TimeInterval(repeat=3), B: pnl.TimeInterval(repeat=2)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(A, 2)}, + {A: gs.TimeInterval(repeat=3), B: gs.TimeInterval(repeat=2)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(A, 2)}, ), ( - {A: pnl.TimeInterval(repeat=5), B: pnl.TimeInterval(repeat=7)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(B, 2)}, + {A: gs.TimeInterval(repeat=5), B: gs.TimeInterval(repeat=7)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(B, 2)}, ), ( - {A: pnl.TimeInterval(repeat=1200), B: pnl.TimeInterval(repeat=1000)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(A, 3)}, + {A: gs.TimeInterval(repeat=1200), B: gs.TimeInterval(repeat=1000)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(A, 3)}, ), ( - {A: pnl.TimeInterval(repeat=0.33333), B: pnl.TimeInterval(repeat=0.66666)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(B, 3)}, + {A: gs.TimeInterval(repeat=0.33333), B: gs.TimeInterval(repeat=0.66666)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(B, 3)}, ), # smaller than default units cause floating point issue without mitigation ( - {A: pnl.TimeInterval(repeat=2 * pnl._unit_registry.us), B: pnl.TimeInterval(repeat=4 * pnl._unit_registry.us)}, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AfterNCalls(B, 3)}, + {A: gs.TimeInterval(repeat=2 * gs._unit_registry.us), B: gs.TimeInterval(repeat=4 * gs._unit_registry.us)}, + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.AfterNCalls(B, 3)}, ), ] ) def test_TimeInterval_linear_everynms(self, conditions, termination_conds): - comp = pnl.Composition() - - comp.add_linear_processing_pathway([self.A, self.B, self.C]) - comp.scheduler.add_condition_set(conditions) + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([self.A, self.B, self.C])) + sched.add_condition_set(conditions) - list(comp.scheduler.run(termination_conds=termination_conds)) + list(sched.run(termination_conds=termination_conds)) for node, cond in conditions.items(): executions = [ - comp.scheduler.execution_timestamps[comp.default_execution_id][i].absolute - for i in range(len(comp.scheduler.execution_list[comp.default_execution_id])) - if node in comp.scheduler.execution_list[comp.default_execution_id][i] + sched.execution_timestamps[sched.default_execution_id][i].absolute + for i in range(len(sched.execution_list[sched.default_execution_id])) + if node in sched.execution_list[sched.default_execution_id][i] ] for i in range(1, len(executions)): @@ -1197,75 +981,74 @@ def test_TimeInterval_linear_everynms(self, conditions, termination_conds): [ ( { - A: pnl.TimeInterval(repeat=10, start=100), - B: pnl.TimeInterval(repeat=10, start=300), - C: pnl.TimeInterval(repeat=10, start=400) + A: gs.TimeInterval(repeat=10, start=100), + B: gs.TimeInterval(repeat=10, start=300), + C: gs.TimeInterval(repeat=10, start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500)} ), ( { - A: pnl.TimeInterval(start=100), - B: pnl.TimeInterval(start=300), - C: pnl.TimeInterval(start=400) + A: gs.TimeInterval(start=100), + B: gs.TimeInterval(start=300), + C: gs.TimeInterval(start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500)} ), ( { - A: pnl.TimeInterval(repeat=2, start=105), - B: pnl.TimeInterval(repeat=7, start=317), - C: pnl.TimeInterval(repeat=11, start=431) + A: gs.TimeInterval(repeat=2, start=105), + B: gs.TimeInterval(repeat=7, start=317), + C: gs.TimeInterval(repeat=11, start=431) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=597)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=597)} ), ( { - A: pnl.TimeInterval(repeat=10, start=100, start_inclusive=False), - B: pnl.TimeInterval(repeat=10, start=300), - C: pnl.TimeInterval(repeat=10, start=400) + A: gs.TimeInterval(repeat=10, start=100, start_inclusive=False), + B: gs.TimeInterval(repeat=10, start=300), + C: gs.TimeInterval(repeat=10, start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500)} ), ( { - A: pnl.TimeInterval(repeat=10, start=100, start_inclusive=False), - B: pnl.TimeInterval(repeat=10, start=300), - C: pnl.TimeInterval(repeat=10, start=400) + A: gs.TimeInterval(repeat=10, start=100, start_inclusive=False), + B: gs.TimeInterval(repeat=10, start=300), + C: gs.TimeInterval(repeat=10, start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500, start_inclusive=False)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500, start_inclusive=False)} ), ( { - A: pnl.TimeInterval(repeat=10, start=100), - B: pnl.TimeInterval(repeat=10, start=100, end=200), - C: pnl.TimeInterval(repeat=10, start=400) + A: gs.TimeInterval(repeat=10, start=100), + B: gs.TimeInterval(repeat=10, start=100, end=200), + C: gs.TimeInterval(repeat=10, start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500, start_inclusive=False)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500, start_inclusive=False)} ), ( { - A: pnl.TimeInterval(repeat=10, start=100), - B: pnl.TimeInterval(repeat=10, start=100, end=200, end_inclusive=False), - C: pnl.TimeInterval(repeat=10, start=400) + A: gs.TimeInterval(repeat=10, start=100), + B: gs.TimeInterval(repeat=10, start=100, end=200, end_inclusive=False), + C: gs.TimeInterval(repeat=10, start=400) }, - {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeInterval(start=500)} + {gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeInterval(start=500)} ), ] ) def test_TimeInterval_no_dependencies(self, conditions, termination_conds): - comp = pnl.Composition() - comp.add_nodes([self.A, self.B, self.C]) - comp.scheduler.add_condition_set(conditions) - consideration_set_execution_abs_value = comp.scheduler._get_absolute_consideration_set_execution_unit(termination_conds) + sched = gs.Scheduler(graph={self.A: set(), self.B: set(), self.C: set()}) + sched.add_condition_set(conditions) + consideration_set_execution_abs_value = sched._get_absolute_consideration_set_execution_unit(termination_conds) - list(comp.scheduler.run(termination_conds=termination_conds)) + list(sched.run(termination_conds=termination_conds)) for node, cond in conditions.items(): executions = [ - comp.scheduler.execution_timestamps[comp.default_execution_id][i].absolute - for i in range(len(comp.scheduler.execution_list[comp.default_execution_id])) - if node in comp.scheduler.execution_list[comp.default_execution_id][i] + sched.execution_timestamps[sched.default_execution_id][i].absolute + for i in range(len(sched.execution_list[sched.default_execution_id])) + if node in sched.execution_list[sched.default_execution_id][i] ] for i in range(1, len(executions)): @@ -1284,8 +1067,8 @@ def test_TimeInterval_no_dependencies(self, conditions, termination_conds): # this test only runs a single ENVIRONMENT_STATE_UPDATE, so this # timestamp corresponds to its last - final_timestamp = comp.scheduler.execution_timestamps[comp.default_execution_id][-1].absolute - term_cond = termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] + final_timestamp = sched.execution_timestamps[sched.default_execution_id][-1].absolute + term_cond = termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] if term_cond.start_inclusive: assert term_cond.start - consideration_set_execution_abs_value == final_timestamp @@ -1295,43 +1078,43 @@ def test_TimeInterval_no_dependencies(self, conditions, termination_conds): @pytest.mark.parametrize( 'repeat, unit, expected_repeat', [ - (1, None, 1 * pnl._unit_registry.ms), - ('1ms', None, 1 * pnl._unit_registry.ms), - (1 * pnl._unit_registry.ms, None, 1 * pnl._unit_registry.ms), - (1, 'ms', 1 * pnl._unit_registry.ms), - (1, pnl._unit_registry.ms, 1 * pnl._unit_registry.ms), - ('1', pnl._unit_registry.ms, 1 * pnl._unit_registry.ms), - (1 * pnl._unit_registry.ms, pnl._unit_registry.ns, 1 * pnl._unit_registry.ms), - (1000 * pnl._unit_registry.ms, None, 1000 * pnl._unit_registry.ms), + (1, None, 1 * gs._unit_registry.ms), + ('1ms', None, 1 * gs._unit_registry.ms), + (1 * gs._unit_registry.ms, None, 1 * gs._unit_registry.ms), + (1, 'ms', 1 * gs._unit_registry.ms), + (1, gs._unit_registry.ms, 1 * gs._unit_registry.ms), + ('1', gs._unit_registry.ms, 1 * gs._unit_registry.ms), + (1 * gs._unit_registry.ms, gs._unit_registry.ns, 1 * gs._unit_registry.ms), + (1000 * gs._unit_registry.ms, None, 1000 * gs._unit_registry.ms), ] ) def test_TimeInterval_time_specs(self, repeat, unit, expected_repeat): if unit is None: - c = pnl.TimeInterval(repeat=repeat) + c = gs.TimeInterval(repeat=repeat) else: - c = pnl.TimeInterval(repeat=repeat, unit=unit) + c = gs.TimeInterval(repeat=repeat, unit=unit) assert c.repeat == expected_repeat @pytest.mark.parametrize( 'repeat, inclusive, last_time', [ - (10, True, 10 * pnl._unit_registry.ms), - (10, False, 11 * pnl._unit_registry.ms), + (10, True, 10 * gs._unit_registry.ms), + (10, False, 11 * gs._unit_registry.ms), ] ) def test_TimeTermination( self, - three_node_linear_composition, + three_node_linear_scheduler, repeat, inclusive, last_time ): - _, comp = three_node_linear_composition + _, sched = three_node_linear_scheduler - comp.scheduler.termination_conds = { - pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.TimeTermination(repeat, inclusive) + sched.termination_conds = { + gs.TimeScale.ENVIRONMENT_STATE_UPDATE: gs.TimeTermination(repeat, inclusive) } - list(comp.scheduler.run()) + list(sched.run()) - assert comp.scheduler.get_clock(comp.scheduler.default_execution_id).time.absolute == last_time + assert sched.get_clock(sched.default_execution_id).time.absolute == last_time diff --git a/tests/scheduling/test_scheduler.py b/tests/scheduling/test_scheduler.py index 64f6873..4bd835b 100644 --- a/tests/scheduling/test_scheduler.py +++ b/tests/scheduling/test_scheduler.py @@ -1,16 +1,16 @@ import fractions import logging +import types + import networkx as nx -import numpy as np -import psyneulink as pnl import pytest -import types import graph_scheduler as gs - logger = logging.getLogger(__name__) +SimpleTestNode = pytest.helpers.get_test_node() + class TestScheduler: stroop_paths = [ @@ -53,7 +53,7 @@ def test_create_multiple_contexts(self): graph = {'A': set()} scheduler = gs.Scheduler(graph) - scheduler.get_clock(scheduler.default_execution_id)._increment_time(pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + scheduler.get_clock(scheduler.default_execution_id)._increment_time(gs.TimeScale.ENVIRONMENT_STATE_UPDATE) eid = 'eid' eid1 = 'eid1' @@ -61,7 +61,7 @@ def test_create_multiple_contexts(self): assert scheduler.clocks[eid].time.environment_state_update == 0 - scheduler.get_clock(scheduler.default_execution_id)._increment_time(pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + scheduler.get_clock(scheduler.default_execution_id)._increment_time(gs.TimeScale.ENVIRONMENT_STATE_UPDATE) assert scheduler.clocks[eid].time.environment_state_update == 0 @@ -69,170 +69,32 @@ def test_create_multiple_contexts(self): assert scheduler.clocks[eid1].time.environment_state_update == 2 - @pytest.mark.psyneulink - def test_two_compositions_one_scheduler(self): - comp1 = pnl.Composition() - comp2 = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - comp1.add_node(A) - comp2.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp1)) - - sched.add_condition(A, pnl.BeforeNCalls(A, 5, time_scale=pnl.TimeScale.LIFE)) - - termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(6) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNPasses(1) - comp1.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp1.default_execution_id] - - expected_output = [ - A, A, A, A, A, set() - ] - # pprint.pprint(output) - assert output == pytest.helpers.setify_expected_output(expected_output) - - comp2.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp2.default_execution_id] - - expected_output = [ - A, A, A, A, A, set() - ] - # pprint.pprint(output) - assert output == pytest.helpers.setify_expected_output(expected_output) - - @pytest.mark.psyneulink - def test_one_composition_two_contexts(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - comp.add_node(A) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.BeforeNCalls(A, 5, time_scale=pnl.TimeScale.LIFE)) - - termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(6) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNPasses(1) - eid = 'eid' - comp.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds, - context=eid, - ) - output = sched.execution_list[eid] - - expected_output = [ - A, A, A, A, A, set() - ] - # pprint.pprint(output) - assert output == pytest.helpers.setify_expected_output(expected_output) - - comp.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds, - context=eid, - ) - output = sched.execution_list[eid] - - expected_output = [ - A, A, A, A, A, set(), set(), set(), set(), set(), set(), set() - ] - # pprint.pprint(output) - assert output == pytest.helpers.setify_expected_output(expected_output) - - eid = 'eid1' - comp.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds, - context=eid, - ) - output = sched.execution_list[eid] - - expected_output = [ - A, A, A, A, A, set() - ] - # pprint.pprint(output) - assert output == pytest.helpers.setify_expected_output(expected_output) - - @pytest.mark.psyneulink - def test_change_termination_condition(self): - D = pnl.DDM( - function=pnl.DriftDiffusionIntegrator( - threshold=10, time_step_size=1.0 - ), - execute_until_finished=False, - reset_stateful_function_when=pnl.Never() - ) - C = pnl.Composition(pathways=[D]) - - D.set_log_conditions(pnl.VALUE) - - def change_termination_processing(): - if C.termination_processing is None: - C.scheduler.termination_conds = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.WhenFinished(D)} - C.termination_processing = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.WhenFinished(D)} - elif isinstance(C.termination_processing[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE], pnl.AllHaveRun): - C.scheduler.termination_conds = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.WhenFinished(D)} - C.termination_processing = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.WhenFinished(D)} - else: - C.scheduler.termination_conds = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AllHaveRun()} - C.termination_processing = {pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.AllHaveRun()} - - change_termination_processing() - C.run(inputs={D: [[1.0], [2.0]]}, - # termination_processing={pnl.TimeScale.ENVIRONMENT_STATE_UPDATE: pnl.WhenFinished(D)}, - call_after_trial=change_termination_processing, - reset_stateful_functions_when=pnl.AtConsiderationSetExecution(0), - num_trials=4) - # EnvironmentStateUpdate 0: - # input = 1.0, termination condition = pnl.WhenFinished - # 10 passes (value = 1.0, 2.0 ... 9.0, 10.0) - # EnvironmentStateUpdate 1: - # input = 2.0, termination condition = pnl.AllHaveRun - # 1 pass (value = 2.0) - expected_results = [[np.array([10.]), np.array([10.])], - [np.array([2.]), np.array([1.])], - [np.array([10.]), np.array([10.])], - [np.array([2.]), np.array([1.])]] - assert np.allclose(expected_results, np.asfarray(C.results)) - - @pytest.mark.psyneulink def test_default_condition_1(self): - A = pnl.TransferMechanism(name='A') - B = pnl.TransferMechanism(name='B') - C = pnl.TransferMechanism(name='C') + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') - comp = pnl.Composition(pathways=[[A, C], [A, B, C]]) - comp.scheduler.add_condition(A, pnl.AtPass(1)) - comp.scheduler.add_condition(B, pnl.Always()) + sched = gs.Scheduler( + graph=pytest.helpers.create_graph_from_pathways([A, C], [A, B, C]) + ) + sched.add_condition(A, gs.AtPass(1)) + sched.add_condition(B, gs.Always()) - output = list(comp.scheduler.run()) + output = list(sched.run()) expected_output = [B, A, B, C] assert output == pytest.helpers.setify_expected_output(expected_output) - @pytest.mark.psyneulink def test_default_condition_2(self): - A = pnl.TransferMechanism(name='A') - B = pnl.TransferMechanism(name='B') - C = pnl.TransferMechanism(name='C') + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') - comp = pnl.Composition(pathways=[[A, B], [C]]) - comp.scheduler.add_condition(C, pnl.AtPass(1)) + sched = gs.Scheduler( + graph=pytest.helpers.create_graph_from_pathways([A, B], [C]) + ) + sched.add_condition(C, gs.AtPass(1)) - output = list(comp.scheduler.run()) + output = list(sched.run()) expected_output = [A, B, {C, A}] assert output == pytest.helpers.setify_expected_output(expected_output) @@ -243,8 +105,8 @@ def test_exact_time_mode(self): ) # these cannot run at same execution set unless in EXACT_TIME - sched.add_condition('A', pnl.TimeInterval(start=1)) - sched.add_condition('B', pnl.TimeInterval(start=1)) + sched.add_condition('A', gs.TimeInterval(start=1)) + sched.add_condition('B', gs.TimeInterval(start=1)) list(sched.run()) @@ -301,38 +163,16 @@ def test_delete_counts(self): assert sched.execution_list[eid_repeat] == repeat_run_2 + repeat_run_2 -@pytest.mark.psyneulink class TestLinear: - - @classmethod - def setup_class(self): - self.orig_is_finished_flag = pnl.TransferMechanism.is_finished_flag - self.orig_is_finished = pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = True - pnl.TransferMechanism.is_finished = lambda self, context: self.is_finished_flag - - @classmethod - def teardown_class(self): - del pnl.TransferMechanism.is_finished_flag - del pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = self.orig_is_finished_flag - pnl.TransferMechanism.is_finished = self.orig_is_finished - def test_no_termination_conds(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 3)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 3)) output = list(sched.run()) @@ -344,24 +184,18 @@ def test_no_termination_conds(self): # tests below are copied from old scheduler, need renaming def test_1(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 3)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 4, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 4, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -374,24 +208,18 @@ def test_1(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_1b(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.Any(pnl.EveryNCalls(A, 2), pnl.AfterPass(1))) - sched.add_condition(C, pnl.EveryNCalls(B, 3)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.Any(gs.EveryNCalls(A, 2), gs.AfterPass(1))) + sched.add_condition(C, gs.EveryNCalls(B, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 4, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 4, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -404,48 +232,36 @@ def test_1b(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(B, 2)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(B, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 1, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 1, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, B, A, A, B, C] assert output == pytest.helpers.setify_expected_output(expected_output) def test_3(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.All(pnl.AfterNCalls(B, 2), pnl.EveryNCalls(B, 1))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.All(gs.AfterNCalls(B, 2), gs.EveryNCalls(B, 1))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 4, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 4, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -455,24 +271,18 @@ def test_3(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_6(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched.add_condition(A, pnl.BeforePass(5)) - sched.add_condition(B, pnl.AfterNCalls(A, 5)) - sched.add_condition(C, pnl.AfterNCalls(B, 1)) + sched.add_condition(A, gs.BeforePass(5)) + sched.add_condition(B, gs.AfterNCalls(A, 5)) + sched.add_condition(C, gs.AfterNCalls(B, 1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 3) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 3) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -482,30 +292,22 @@ def test_6(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_6_two_environment_state_updates(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched.add_condition(A, pnl.BeforePass(5)) - sched.add_condition(B, pnl.AfterNCalls(A, 5)) - sched.add_condition(C, pnl.AfterNCalls(B, 1)) + sched.add_condition(A, gs.BeforePass(5)) + sched.add_condition(B, gs.AfterNCalls(A, 5)) + sched.add_condition(C, gs.AfterNCalls(B, 1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(2) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 3) - comp.run( - inputs={A: [[0], [1], [2], [3], [4], [5]]}, - scheduler=sched, - termination_processing=termination_conds - ) - output = sched.execution_list[comp.default_execution_id] + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(2) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 3) + for _ in range(5): + list(sched.run(termination_conds)) + + output = sched.execution_list[None] expected_output = [ A, A, A, A, A, B, C, B, C, B, C, @@ -515,63 +317,48 @@ def test_6_two_environment_state_updates(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_7(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.Any(pnl.AfterNCalls(A, 1), pnl.AfterNCalls(B, 1)) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.Any(gs.AfterNCalls(A, 1), gs.AfterNCalls(B, 1)) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_8(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.All(pnl.AfterNCalls(A, 1), pnl.AfterNCalls(B, 1)) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.All(gs.AfterNCalls(A, 1), gs.AfterNCalls(B, 1)) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_9(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.WhenFinished(A)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.WhenFinished(A)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 2) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 2) output = [] i = 0 @@ -586,256 +373,184 @@ def test_9(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_9b(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.WhenFinished(A)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.WhenFinished(A)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) def test_10(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - A.is_finished_flag = True - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A') + B = SimpleTestNode('B') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.Any(pnl.WhenFinished(A), pnl.AfterNCalls(A, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.Any(gs.WhenFinished(A), gs.AfterNCalls(A, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 5) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 5) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, A, B, A, B, A, B, A, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_10b(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.Any(pnl.WhenFinished(A), pnl.AfterNCalls(A, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.Any(gs.WhenFinished(A), gs.AfterNCalls(A, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 4) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 4) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, B, A, B, A, B, A, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_10c(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - A.is_finished_flag = True - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A') + B = SimpleTestNode('B') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.All(pnl.WhenFinished(A), pnl.AfterNCalls(A, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.All(gs.WhenFinished(A), gs.AfterNCalls(A, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 4) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 4) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, B, A, B, A, B, A, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_10d(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - A.is_finished_flag = False - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A', False) + B = SimpleTestNode('B') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.All(pnl.WhenFinished(A), pnl.AfterNCalls(A, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.All(gs.WhenFinished(A), gs.AfterNCalls(A, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AtPass(10) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AtPass(10) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, A, A, A, A, A, A, A, A] assert output == pytest.helpers.setify_expected_output(expected_output) ######################################## - # tests with linear compositions + # tests with linear schedositions ######################################## def test_linear_AAB(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNCalls(B, 2, time_scale=pnl.TimeScale.ENVIRONMENT_SEQUENCE) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 2, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNCalls(B, 2, time_scale=gs.TimeScale.ENVIRONMENT_SEQUENCE) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 2, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, A, B, A, A, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_linear_ABB(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - for m in [A, B]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + sched = gs.Scheduler(graph={A: set(), B: {A}}) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.Any(pnl.AtPass(0), pnl.EveryNCalls(B, 2))) - sched.add_condition(B, pnl.Any(pnl.EveryNCalls(A, 1), pnl.EveryNCalls(B, 1))) + sched.add_condition(A, gs.Any(gs.AtPass(0), gs.EveryNCalls(B, 2))) + sched.add_condition(B, gs.Any(gs.EveryNCalls(A, 1), gs.EveryNCalls(B, 1))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(B, 8, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(B, 8, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, B, A, B, B, A, B, B, A, B, B] assert output == pytest.helpers.setify_expected_output(expected_output) def test_linear_ABBCC(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched.add_condition(A, pnl.Any(pnl.AtPass(0), pnl.EveryNCalls(C, 2))) - sched.add_condition(B, pnl.Any(pnl.JustRan(A), pnl.JustRan(B))) - sched.add_condition(C, pnl.Any(pnl.EveryNCalls(B, 2), pnl.JustRan(C))) + sched.add_condition(A, gs.Any(gs.AtPass(0), gs.EveryNCalls(C, 2))) + sched.add_condition(B, gs.Any(gs.JustRan(A), gs.JustRan(B))) + sched.add_condition(C, gs.Any(gs.EveryNCalls(B, 2), gs.JustRan(C))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 4, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 4, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, B, C, C, A, B, B, C, C] assert output == pytest.helpers.setify_expected_output(expected_output) def test_linear_ABCBC(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), B, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph=pytest.helpers.create_graph_from_pathways([A, B, C])) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.Any(pnl.AtPass(0), pnl.EveryNCalls(C, 2))) - sched.add_condition(B, pnl.Any(pnl.EveryNCalls(A, 1), pnl.EveryNCalls(C, 1))) - sched.add_condition(C, pnl.EveryNCalls(B, 1)) + sched.add_condition(A, gs.Any(gs.AtPass(0), gs.EveryNCalls(C, 2))) + sched.add_condition(B, gs.Any(gs.EveryNCalls(A, 1), gs.EveryNCalls(C, 1))) + sched.add_condition(C, gs.EveryNCalls(B, 1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 4, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 4, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [A, B, C, B, C, A, B, C, B, C] assert output == pytest.helpers.setify_expected_output(expected_output) ######################################## - # tests with small branching compositions + # tests with small branching schedositions ######################################## -@pytest.mark.psyneulink class TestBranching: - @classmethod - def setup_class(self): - self.orig_is_finished_flag = pnl.TransferMechanism.is_finished_flag - self.orig_is_finished = pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = True - pnl.TransferMechanism.is_finished = lambda self, context: self.is_finished_flag - - @classmethod - def teardown_class(self): - del pnl.TransferMechanism.is_finished_flag - del pnl.TransferMechanism.is_finished - pnl.TransferMechanism.is_finished_flag = self.orig_is_finished_flag - pnl.TransferMechanism.is_finished = self.orig_is_finished - # triangle: A # / \ # B C def test_triangle_1(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), A, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {A}}) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 1)) - sched.add_condition(C, pnl.EveryNCalls(A, 1)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 1)) + sched.add_condition(C, gs.EveryNCalls(A, 1)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 3, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 3, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -847,24 +562,18 @@ def test_triangle_1(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_triangle_2(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), A, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 1)) - sched.add_condition(C, pnl.EveryNCalls(A, 2)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 1)) + sched.add_condition(C, gs.EveryNCalls(A, 2)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 3, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 3, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -879,24 +588,18 @@ def test_triangle_2(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_triangle_3(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), A, C) - - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.EveryNCalls(A, 3)) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.EveryNCalls(A, 3)) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 2, time_scale=pnl.TimeScale.ENVIRONMENT_STATE_UPDATE) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 2, time_scale=gs.TimeScale.ENVIRONMENT_STATE_UPDATE) output = list(sched.run(termination_conds=termination_conds)) expected_output = [ @@ -907,25 +610,19 @@ def test_triangle_3(self): # this is test 11 of original constraint_scheduler.py def test_triangle_4(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), A, C) + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {A}}) - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) - - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.All(pnl.WhenFinished(A), pnl.AfterNCalls(B, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.All(gs.WhenFinished(A), gs.AfterNCalls(B, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 1) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 1) output = [] i = 0 A.is_finished_flag = False @@ -940,25 +637,19 @@ def test_triangle_4(self): assert output == pytest.helpers.setify_expected_output(expected_output) def test_triangle_4b(self): - comp = pnl.Composition() - A = pnl.TransferMechanism(function=pnl.Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A') - B = pnl.TransferMechanism(function=pnl.Linear(intercept=4.0), name='scheduler-pytests-B') - C = pnl.TransferMechanism(function=pnl.Linear(intercept=1.5), name='scheduler-pytests-C') - - for m in [A, B, C]: - comp.add_node(m) - comp.add_projection(pnl.MappingProjection(), A, B) - comp.add_projection(pnl.MappingProjection(), A, C) + A = SimpleTestNode('A') + B = SimpleTestNode('B') + C = SimpleTestNode('C') - sched = pnl.Scheduler(**pytest.helpers.composition_to_scheduler_args(comp)) + sched = gs.Scheduler(graph={A: set(), B: {A}, C: {A}}) - sched.add_condition(A, pnl.EveryNPasses(1)) - sched.add_condition(B, pnl.EveryNCalls(A, 2)) - sched.add_condition(C, pnl.All(pnl.WhenFinished(A), pnl.AfterNCalls(B, 3))) + sched.add_condition(A, gs.EveryNPasses(1)) + sched.add_condition(B, gs.EveryNCalls(A, 2)) + sched.add_condition(C, gs.All(gs.WhenFinished(A), gs.AfterNCalls(B, 3))) termination_conds = {} - termination_conds[pnl.TimeScale.ENVIRONMENT_SEQUENCE] = pnl.AfterNEnvironmentStateUpdates(1) - termination_conds[pnl.TimeScale.ENVIRONMENT_STATE_UPDATE] = pnl.AfterNCalls(C, 1) + termination_conds[gs.TimeScale.ENVIRONMENT_SEQUENCE] = gs.AfterNEnvironmentStateUpdates(1) + termination_conds[gs.TimeScale.ENVIRONMENT_STATE_UPDATE] = gs.AfterNCalls(C, 1) output = [] i = 0 A.is_finished_flag = False @@ -979,24 +670,19 @@ def test_triangle_4b(self): # this is test 4 of original constraint_scheduler.py # this test has an implicit priority set of A