Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ess-bigraph into trigger
  • Loading branch information
eagmon committed Nov 9, 2023
2 parents 301e5c9 + 9c89e6d commit 6f0fcd9
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 82 deletions.
1 change: 1 addition & 0 deletions process_bigraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
# TODO
process_registry.register('console-emitter', ConsoleEmitter)
process_registry.register('ram-emitter', RAMEmitter)

183 changes: 142 additions & 41 deletions process_bigraph/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,26 +296,28 @@ def empty_front(time):

def find_leaves(d, path=None):
leaves = []
path = []
path = ()

for key, value in d.items():
if isinstance(value, dict):
subleaves = find_leaves(value, path + [key])
subleaves = find_leaves(value, path + (key,))
leaves.extend(subleaves)
else:
leaves.append(path + value)
leaves.append(path + tuple(value))

return leaves


def build_step_network(steps):
ancestors = {
step_key: {
'ancestors': [],
# 'ancestors': [],
'input_paths': None,
'output_paths': None}
for step_key in steps}

nodes = {}

for step_key, step in steps.items():
for other_key, other_step in steps.items():
if step_key == other_key:
Expand All @@ -331,49 +333,100 @@ def build_step_network(steps):
wires['inputs'])
input_paths = ancestors[step_key]['input_paths']

if ancestors[other_key]['output_paths'] is None:
ancestors[other_key]['output_paths'] = find_leaves(
other_wires['outputs'])
output_paths = ancestors[other_key]['output_paths']
if ancestors[step_key]['output_paths'] is None:
ancestors[step_key]['output_paths'] = find_leaves(
wires.get('outputs', {}))
output_paths = ancestors[step_key]['output_paths']

if any(item in output_paths for item in input_paths):
ancestors[step_key]['ancestors'].append(other_key)
for input in input_paths:
path = tuple(input)
if not path in nodes:
nodes[path] = {
'before': set([]),
'after': set([])}
nodes[path]['after'].add(step_key)

return ancestors
for output in output_paths:
path = tuple(output)
if not path in nodes:
nodes[path] = {
'before': set([]),
'after': set([])}
nodes[path]['before'].add(step_key)

return ancestors, nodes

def find_starting_steps(steps):
ancestors = build_step_network(steps)
starting = []
for step_key, before in ancestors.items():
if len(before['ancestors']) == 0:
starting.append(step_key)

return starting
def combined_step_network(steps):
steps, nodes = build_step_network(steps)

trigger_state = {
'steps': steps,
'nodes': nodes}

def find_starting_paths(steps):
ancestors = build_step_network(steps)
starting = []
for before in ancestors.values():
if len(before['ancestors']) == 0:
starting.extend(before['input_paths'])
return trigger_state

return starting

def build_trigger_state(nodes):
return {
key: value['before'].copy()
for key, value in nodes.items()}

class Composite(Process):
"""Composite parent class.

def find_downstream(steps, nodes, upstream):
downstream = set(upstream)
visited = set([])
previous_len = -1

while len(downstream) > len(visited) and len(visited) > previous_len:
previous_len = len(visited)
down = set([])
for step_path in downstream:
if step_path not in visited:
for output in steps[step_path]['output_paths']:
for dependent in nodes[output]['after']:
down.add(dependent)
visited.add(step_path)
downstream ^= down

return downstream


def determine_steps(steps, remaining, fulfilled):
to_run = []

for step_path in remaining:
step_inputs = steps[step_path]['input_paths']
all_fulfilled = True
for input in step_inputs:
if len(fulfilled[input]) > 0:
all_fulfilled = False
if all_fulfilled:
to_run.append(step_path)

for step_path in to_run:
remaining.remove(step_path)
step_outputs = steps[step_path]['output_paths']
for output in step_outputs:
fulfilled[output].remove(step_path)

return to_run, remaining, fulfilled


class Composite(Process):
"""
Composite parent class.
"""


config_schema = {
# TODO: add schema type
'composition': 'tree[any]',
'state': 'tree[any]',
'schema': 'tree[any]',
'bridge': 'wires',
'global_time_precision': 'maybe[float]',
}
'global_time_precision': 'maybe[float]'}


# TODO: if processes are serialized, deserialize them first
def __init__(self, config=None, local_types=None):
Expand Down Expand Up @@ -471,12 +524,29 @@ def __init__(self, config=None, local_types=None):

self.bridge_updates = []

# this will work for dags but not for cycles
self.starting_steps = find_starting_steps(
self.step_dependencies, self.node_dependencies = build_step_network(
self.step_paths)

self.run_steps(
self.starting_steps)
self.reset_step_state(self.step_paths)
to_run = self.cycle_step_state()

self.run_steps(to_run)


def reset_step_state(self, step_paths):
self.trigger_state = build_trigger_state(
self.node_dependencies)

self.steps_remaining = set(step_paths)


def cycle_step_state(self):
to_run, self.steps_remaining, self.trigger_state = determine_steps(
self.step_dependencies,
self.steps_remaining,
self.trigger_state)

return to_run


def schema(self):
Expand Down Expand Up @@ -514,8 +584,6 @@ def process_update(

def defer_project(update, args):
schema, state, path = args
# if 'c' in update:
# import ipdb; ipdb.set_trace()
return types.project_edge(
schema,
state,
Expand All @@ -532,6 +600,7 @@ def defer_project(update, args):

return absolute


def run_process(self, path, process, end_time, full_step, force_complete):
if path not in self.front:
self.front[path] = empty_front(self.state['global_time'])
Expand Down Expand Up @@ -602,8 +671,6 @@ def apply_updates(self, updates):
series = [series]

for update in series:
# print(update)

paths = hierarchy_depth(update)
update_paths.extend(paths.keys())

Expand All @@ -621,14 +688,15 @@ def apply_updates(self, updates):
if bridge_update:
self.bridge_updates.append(bridge_update)

self.trigger_steps(update_paths)
return update_paths

# view_expire_update = self.apply_update(up, store)
# view_expire = view_expire or view_expire_update

# if view_expire:
# self.state.build_topology_views()


def run(self, interval, force_complete=False):
end_time = self.state['global_time'] + interval
while self.state['global_time'] < end_time or force_complete:
Expand Down Expand Up @@ -667,7 +735,10 @@ def run(self, interval, force_complete=False):
advance['update'] = {}
paths.append(path)

self.apply_updates(updates)
# get all update paths, then trigger steps that
# depend on those paths
update_paths = self.apply_updates(updates)
self.trigger_steps(update_paths)

# # display and emit
# if self.progress_bar:
Expand All @@ -687,6 +758,25 @@ def run(self, interval, force_complete=False):
force_complete = False


def determine_steps(self):
to_run = []
for step_key, wires in trigger_state['steps']:
fulfilled = True
for input in wires['input_paths']:
if len(trigger_state['states'][tuple(input)]) > 0:
fulfilled = False
break
if fulfilled:
to_run.append(step_key)

for step_key in to_run:
wires = trigger_state['steps'][step_key]
for output in wires['output_paths']:
trigger_state['states'][tuple(output)].remove(step_key)

return to_run, trigger_state


def run_steps(self, step_paths):
if len(step_paths) > 0:
updates = []
Expand All @@ -710,7 +800,10 @@ def run_steps(self, step_paths):

updates.append(step_update)

self.apply_updates(updates)
update_paths = self.apply_updates(updates)
to_run = self.cycle_step_state()
if len(to_run) > 0:
self.run_steps(to_run)
else:
self.steps_run = set([])

Expand All @@ -727,7 +820,15 @@ def trigger_steps(self, update_paths):
steps_to_run.append(step_path)
self.steps_run.add(step_path)

self.run_steps(steps_to_run)
steps_to_run = find_downstream(
self.step_dependencies,
self.node_dependencies,
steps_to_run)

self.reset_step_state(steps_to_run)
to_run = self.cycle_step_state()

self.run_steps(to_run)


def gather_results(self, queries=None):
Expand Down
1 change: 1 addition & 0 deletions process_bigraph/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@

#: Maps process names to :term:`protocol methods`
protocol_registry = Registry()

Loading

0 comments on commit 6f0fcd9

Please sign in to comment.