Skip to content

Commit 2528bb4

Browse files
committed
breakup long fn
1 parent c340b21 commit 2528bb4

File tree

1 file changed

+48
-48
lines changed

1 file changed

+48
-48
lines changed

pychunkedgraph/ingest/create/parent_layer.py

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,50 @@ def _write_components_helper(args):
154154
_write(cg, layer, pcoords, ccs, node_layer_d, time_stamp)
155155

156156

157+
def _children_rows(
158+
cg: ChunkedGraph, parent_id, children: Sequence, cx_edges_d: dict, time_stamp
159+
):
160+
"""
161+
Update children rows to point to the parent_id, collect cached children
162+
cross chunk edges to lift and update parent cross chunk edges.
163+
Returns list of mutations to children and list of children cross edges.
164+
"""
165+
rows = []
166+
children_cx_edges = []
167+
for child in children:
168+
node_layer = cg.get_chunk_layer(child)
169+
row_id = serializers.serialize_uint64(child)
170+
val_dict = {attributes.Hierarchy.Parent: parent_id}
171+
node_cx_edges_d = cx_edges_d.get(child, {})
172+
if not node_cx_edges_d:
173+
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp))
174+
continue
175+
for layer in range(node_layer, cg.meta.layer_count):
176+
if not layer in node_cx_edges_d:
177+
continue
178+
layer_edges = node_cx_edges_d[layer]
179+
nodes = np.unique(layer_edges)
180+
parents = cg.get_roots(nodes, stop_layer=node_layer, ceil=False)
181+
edge_parents_d = dict(zip(nodes, parents))
182+
layer_edges = fastremap.remap(
183+
layer_edges, edge_parents_d, preserve_missing_labels=True
184+
)
185+
layer_edges = np.unique(layer_edges, axis=0)
186+
col = attributes.Connectivity.CrossChunkEdge[layer]
187+
val_dict[col] = layer_edges
188+
node_cx_edges_d[layer] = layer_edges
189+
children_cx_edges.append(node_cx_edges_d)
190+
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp))
191+
return rows, children_cx_edges
192+
193+
157194
def _write(
158195
cg: ChunkedGraph,
159196
layer_id,
160197
parent_coords,
161198
components,
162199
node_layer_d,
163-
time_stamp,
200+
ts,
164201
use_threads=True,
165202
):
166203
parent_layers = range(layer_id, cg.meta.layer_count + 1)
@@ -175,71 +212,34 @@ def _write(
175212
x, y, z = parent_coords
176213
parent_chunk_id = cg.get_chunk_id(layer=layer_id, x=x, y=y, z=z)
177214
parent_chunk_id_dict = cg.get_parent_chunk_id_dict(parent_chunk_id)
178-
179215
for parent_layer in parent_layers:
180216
if len(cc_connections[parent_layer]) == 0:
181217
continue
182-
183218
parent_chunk_id = parent_chunk_id_dict[parent_layer]
184219
reserved_parent_ids = cg.id_client.create_node_ids(
185220
parent_chunk_id,
186221
size=len(cc_connections[parent_layer]),
187222
root_chunk=parent_layer == cg.meta.layer_count and use_threads,
188223
)
189-
190-
for i_cc, node_ids in enumerate(cc_connections[parent_layer]):
191-
parent_id = reserved_parent_ids[i_cc]
192-
224+
for i_cc, children in enumerate(cc_connections[parent_layer]):
225+
parent = reserved_parent_ids[i_cc]
193226
if layer_id == 3:
194227
# when layer 3 is being processed, children chunks are at layer 2
195228
# layer 2 chunks at this time will only have atomic cross edges
196-
cx_edges_d = cg.get_atomic_cross_edges(node_ids)
229+
cx_edges_d = cg.get_atomic_cross_edges(children)
197230
else:
198-
# children are from abstract chunks
199-
cx_edges_d = cg.get_cross_chunk_edges(node_ids, raw_only=True)
200-
201-
children_cx_edges = []
202-
for node in node_ids:
203-
node_layer = cg.get_chunk_layer(node)
204-
row_id = serializers.serialize_uint64(node)
205-
val_dict = {attributes.Hierarchy.Parent: parent_id}
206-
207-
node_cx_edges_d = cx_edges_d.get(node, {})
208-
if not node_cx_edges_d:
209-
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp))
210-
continue
211-
212-
for layer in range(node_layer, cg.meta.layer_count):
213-
if not layer in node_cx_edges_d:
214-
continue
215-
layer_edges = node_cx_edges_d[layer]
216-
nodes = np.unique(layer_edges)
217-
parents = cg.get_roots(nodes, stop_layer=node_layer, ceil=False)
218-
219-
edge_parents_d = dict(zip(nodes, parents))
220-
layer_edges = fastremap.remap(
221-
layer_edges, edge_parents_d, preserve_missing_labels=True
222-
)
223-
layer_edges = np.unique(layer_edges, axis=0)
224-
225-
col = attributes.Connectivity.CrossChunkEdge[layer]
226-
val_dict[col] = layer_edges
227-
node_cx_edges_d[layer] = layer_edges
228-
children_cx_edges.append(node_cx_edges_d)
229-
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp))
230-
231-
row_id = serializers.serialize_uint64(parent_id)
232-
val_dict = {attributes.Hierarchy.Child: node_ids}
233-
parent_cx_edges_d = concatenate_cross_edge_dicts(
234-
children_cx_edges, unique=True
235-
)
231+
cx_edges_d = cg.get_cross_chunk_edges(children, raw_only=True)
232+
_rows, cx_edges = _children_rows(cg, parent, children, cx_edges_d, ts)
233+
rows.extend(_rows)
234+
row_id = serializers.serialize_uint64(parent)
235+
val_dict = {attributes.Hierarchy.Child: children}
236+
parent_cx_edges_d = concatenate_cross_edge_dicts(cx_edges, unique=True)
236237
for layer in range(parent_layer, cg.meta.layer_count):
237238
if not layer in parent_cx_edges_d:
238239
continue
239240
col = attributes.Connectivity.CrossChunkEdge[layer]
240241
val_dict[col] = parent_cx_edges_d[layer]
241-
242-
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp))
242+
rows.append(cg.client.mutate_row(row_id, val_dict, ts))
243243
if len(rows) > 100000:
244244
cg.client.write(rows)
245245
rows = []

0 commit comments

Comments
 (0)