Skip to content

Commit

Permalink
window depth finder
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Aug 11, 2023
1 parent 347e3c8 commit 51140bb
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 81 deletions.
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ message PbTaskProxy {
optional bool is_runahead = 26;
optional bool flow_wait = 27;
optional PbRuntime runtime = 28;
optional int32 graph_depth = 29;
}

message PbFamily {
Expand Down
137 changes: 68 additions & 69 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

101 changes: 89 additions & 12 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from time import time
from typing import (
Any,
Dict,
Optional,
List,
Set,
Expand Down Expand Up @@ -504,6 +505,8 @@ def __init__(self, schd):
self.n_window_edges = set()
self.n_window_node_walks = {}
self.n_window_completed_walks = set()
self.n_window_depths = {}
self.update_window_depths = False
self.db_load_task_proxies = {}
self.family_pruned_ids = set()
self.prune_trigger_nodes = {}
Expand Down Expand Up @@ -761,6 +764,10 @@ def increment_graph_window(
'done_locs': set(),
'done_ids': set(),
'walk_ids': {active_id},
'depths': {
depth: set()
for depth in range(1, self.n_edge_distance + 1)
}
}
if active_id in self.n_window_completed_walks:
self.n_window_completed_walks.remove(active_id)
Expand All @@ -787,10 +794,12 @@ def increment_graph_window(
# children will required sifting out cousin branches.
new_locs: List[str]
working_locs: List[str] = []
if c_tag in active_locs:
working_locs.extend(('cc', 'cp'))
if p_tag in active_locs:
working_locs.extend(('pp', 'pc'))
if self.n_edge_distance > 1:
if c_tag in active_locs:
working_locs.extend(('cc', 'cp'))
if p_tag in active_locs:
working_locs.extend(('pp', 'pc'))
n_depth = 2
while working_locs:
for w_loc in working_locs:
loc_done = True
Expand All @@ -817,6 +826,7 @@ def increment_graph_window(
if w_set:
active_locs[w_loc] = w_set
active_walk['walk_ids'].update(w_set)
active_walk['depths'][n_depth].update(w_set)
if loc_done:
active_walk['done_locs'].add(w_loc[:-1])
active_walk['done_ids'].update(active_locs[w_loc[:-1]])
Expand All @@ -825,6 +835,7 @@ def increment_graph_window(
if loc in active_locs and len(loc) < self.n_edge_distance:
new_locs.extend((loc + c_tag, loc + p_tag))
working_locs = new_locs
n_depth += 1

# Graph walk
node_tokens: Tokens
Expand Down Expand Up @@ -863,6 +874,7 @@ def increment_graph_window(
p_loc = location + p_tag
c_ids = set()
p_ids = set()
n_depth = len(location) + 1
# Exclude walked nodes at this location.
# This also helps avoid walking in a circle.
for node_id in loc_nodes.difference(active_walk['done_ids']):
Expand Down Expand Up @@ -890,10 +902,13 @@ def increment_graph_window(
# Children/downstream nodes
# TODO: xtrigger is workflow_state edges too
# Reference set for workflow relations
graph_children = generate_graph_children(
tdef,
get_point(node_tokens['cycle'])
)
if itask is not None and n_depth == 1:
graph_children = itask.graph_children
else:
graph_children = generate_graph_children(
tdef,
get_point(node_tokens['cycle'])
)
for items in graph_children.values():
for child_name, child_point, _ in items:
if child_point > final_point:
Expand All @@ -907,7 +922,8 @@ def increment_graph_window(
child_point,
flow_nums,
False,
None
None,
n_depth
)
self.generate_edge(
node_tokens,
Expand All @@ -934,7 +950,8 @@ def increment_graph_window(
parent_point,
flow_nums,
True,
None
None,
n_depth
)
# reverse for parent
self.generate_edge(
Expand All @@ -950,19 +967,26 @@ def increment_graph_window(
'done_ids': set(),
'done_locs': set(),
'orphans': set(),
'walk_ids': {node_id} | c_ids | p_ids
'walk_ids': {node_id} | c_ids | p_ids,
'depths': {
depth: set()
for depth in range(1, self.n_edge_distance + 1)
}
}
if c_ids:
all_walks[node_id]['locations'][c_tag] = c_ids
all_walks[node_id]['depths'][1].update(c_ids)
if p_ids:
all_walks[node_id]['locations'][p_tag] = p_ids
all_walks[node_id]['depths'][1].update(p_ids)

# Create location association
if c_ids:
active_locs.setdefault(c_loc, set()).update(c_ids)
if p_ids:
active_locs.setdefault(p_loc, set()).update(p_ids)
active_walk['walk_ids'].update(c_ids, p_ids)
active_walk['depths'][n_depth].update(c_ids, p_ids)

self.n_window_completed_walks.add(active_id)
self.n_window_nodes[active_id].update(active_walk['walk_ids'])
Expand Down Expand Up @@ -1074,14 +1098,16 @@ def add_pool_node(self, name, point):
task=name,
).id
self.all_task_pool.add(tp_id)
self.update_window_depths = True

def generate_ghost_task(
self,
tokens: Tokens,
point,
flow_nums,
is_parent=False,
itask=None
itask=None,
n_depth=0,
):
"""Create task-point element populated with static data.
Expand Down Expand Up @@ -1154,8 +1180,10 @@ def generate_ghost_task(
in self.schd.pool.tasks_to_hold
),
depth=task_def.depth,
graph_depth=n_depth,
name=name,
)
self.n_window_depths.setdefault(n_depth, set()).add(tp_id)

tproxy.namespace[:] = task_def.namespace
if is_orphan:
Expand Down Expand Up @@ -1639,6 +1667,10 @@ def update_data_structure(self, reloaded=False):
self.window_resize_rewalk()
self.next_n_edge_distance = None

# Find depth changes and create deltas
if self.update_window_depths:
self.window_depth_finder()

# load database history for flagged nodes
self.apply_task_proxy_db_history()

Expand Down Expand Up @@ -1705,6 +1737,51 @@ def window_resize_rewalk(self):
))
)

def window_depth_finder(self):
"""Recalculate window depths, creating depth deltas."""
# Setup new window depths
n_window_depths: Dict(int, Set(str)) = {
0: set().union(self.all_task_pool)
}

depth = 1
# Since starting from smaller depth, exclude those whose depth has
# already been found.
depth_found_tasks: Set(str) = set().union(self.all_task_pool)
while depth <= self.n_edge_distance:
n_window_depths[depth] = set().union(*(
self.n_window_node_walks[n_id]['depths'][depth]
for n_id in self.all_task_pool
if (
n_id in self.n_window_node_walks
and depth in self.n_window_node_walks[n_id]['depths']
)
)).difference(depth_found_tasks)
depth_found_tasks.update(n_window_depths[depth])
# Calculate next depth parameters.
depth += 1

# Create deltas of those whose depth has changed, a node should only
# appear once across all depths.
# So the diff will only contain it at a single depth and if it didn't
# appear at the same depth previously.
update_time = time()
for depth, node_set in n_window_depths.items():
node_set_diff = node_set.difference(
self.n_window_depths.setdefault(depth, set())
)
if not self.updates_pending and node_set_diff:
self.updates_pending = True
for tp_id in node_set_diff:
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id)
)
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.graph_depth = depth
# Set old to new.
self.n_window_depths = n_window_depths
self.update_window_depths = False

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,11 @@ class Meta:
depth = Int(
description='The family inheritance depth',
)
graph_depth = Int(
description=sstrip('''
The n-window graph edge depth from closet active task(s).
'''),
)
job_submits = Int(
description='The number of job submissions for this task instance.',
)
Expand Down

0 comments on commit 51140bb

Please sign in to comment.