Skip to content

Commit 12717c0

Browse files
committed
pcgv3 supervoxel splitting
1 parent eaab3d0 commit 12717c0

File tree

10 files changed

+168
-8
lines changed

10 files changed

+168
-8
lines changed

pychunkedgraph/graph/attributes.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ class Hierarchy:
160160
serializer=serializers.NumPyValue(dtype=basetypes.NODE_ID),
161161
)
162162

163+
FormerIdentity = _Attribute(
164+
key=b"former_ids",
165+
family_id="0",
166+
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID),
167+
)
168+
169+
NewIdentity = _Attribute(
170+
key=b"new_ids",
171+
family_id="0",
172+
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID),
173+
)
174+
163175

164176
class GraphMeta:
165177
key = b"meta"

pychunkedgraph/graph/client/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ def create_node_ids(self, chunk_id):
122122
def create_node_id(self, chunk_id):
123123
"""Generate a unique ID in the chunk."""
124124

125+
@abstractmethod
126+
def set_max_node_id(self, chunk_id, node_id):
127+
"""Gets the current maximum node ID in the chunk."""
128+
125129
@abstractmethod
126130
def get_max_node_id(self, chunk_id):
127131
"""Gets the current maximum node ID in the chunk."""

pychunkedgraph/graph/client/bigtable/client.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,17 @@ def create_node_id(
615615
"""Generate a unique node ID in the chunk."""
616616
return self.create_node_ids(chunk_id, 1, root_chunk=root_chunk)[0]
617617

618+
def set_max_node_id(
619+
self, chunk_id: np.uint64, node_id: np.uint64
620+
) -> basetypes.NODE_ID:
621+
"""Set max segment ID for a given chunk."""
622+
size = int(np.uint64(chunk_id) ^ np.uint64(node_id))
623+
key = serialize_uint64(chunk_id, counter=True)
624+
column = attributes.Concurrency.Counter
625+
row = self._table.append_row(key)
626+
row.increment_cell_value(column.family_id, column.key, size)
627+
row = row.commit()
628+
618629
def get_max_node_id(
619630
self, chunk_id: basetypes.CHUNK_ID, root_chunk=False
620631
) -> basetypes.NODE_ID:

pychunkedgraph/graph/edits_sv.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""
2+
Supervoxel splitting and managing new IDs.
3+
"""
4+
5+
from typing import Iterable
6+
7+
import numpy as np
8+
9+
from pychunkedgraph.graph import ChunkedGraph
10+
from pychunkedgraph.graph.utils import basetypes
11+
from pychunkedgraph.graph.attributes import Hierarchy
12+
from pychunkedgraph.graph.utils.serializers import serialize_uint64
13+
14+
15+
def split_supervoxel(
16+
cg: ChunkedGraph, supervoxel_id: basetypes.NODE_ID
17+
) -> Iterable[basetypes.NODE_ID]:
18+
"""
19+
Lookup coordinates of given supervoxel in segmentation.
20+
Split it and update the coordinates with new IDs.
21+
Return new IDs.
22+
"""
23+
24+
25+
def copy_parents_and_create_lineage(
26+
cg: ChunkedGraph, old_id: basetypes.NODE_ID, new_ids: Iterable[basetypes.NODE_ID]
27+
) -> list:
28+
"""
29+
Copy parents column from `old_id` to each of `new_ids`.
30+
This makes it easy to get old hierarchy with `new_ids` using an older timestamp.
31+
Link `old_id` and `new_ids` to create a lineage at supervoxel layer.
32+
Returns a list of mutations to be persisted.
33+
"""
34+
result = []
35+
parent_cells = cg.client.read_node(old_id, properties=Hierarchy.Parent)
36+
37+
for new_id in new_ids:
38+
val_dict = {
39+
Hierarchy.FormerIdentity: np.array([old_id], dtype=basetypes.NODE_ID)
40+
}
41+
result.append(cg.client.mutate_row(serialize_uint64(new_id), val_dict))
42+
43+
for cell in parent_cells:
44+
result.append(
45+
cg.client.mutate_row(
46+
serialize_uint64(new_id),
47+
{Hierarchy.Parent: cell.value},
48+
time_stamp=cell.timestamp,
49+
)
50+
)
51+
52+
val_dict = {Hierarchy.NewIdentity: np.array(new_ids, dtype=basetypes.NODE_ID)}
53+
result.append(cg.client.mutate_row(serialize_uint64(old_id), val_dict))
54+
return result

pychunkedgraph/graph/ocdbt.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import os
2+
import numpy as np
3+
import tensorstore as ts
4+
5+
from pychunkedgraph.graph import ChunkedGraph
6+
7+
OCDBT_SEG_COMPRESSION_LEVEL = 22
8+
9+
10+
def get_seg_source_and_destination_ocdbt(
11+
cg: ChunkedGraph, create: bool = False
12+
) -> tuple:
13+
src_spec = {
14+
"driver": "neuroglancer_precomputed",
15+
"kvstore": cg.meta.data_source.WATERSHED,
16+
}
17+
src = ts.open(src_spec).result()
18+
schema = src.schema
19+
20+
ocdbt_path = os.path.join(cg.meta.data_source.WATERSHED, "ocdbt", "base")
21+
dst_spec = {
22+
"driver": "neuroglancer_precomputed",
23+
"kvstore": {
24+
"driver": "ocdbt",
25+
"base": ocdbt_path,
26+
"config": {
27+
"compression": {"id": "zstd", "level": OCDBT_SEG_COMPRESSION_LEVEL},
28+
},
29+
},
30+
}
31+
32+
dst = ts.open(
33+
dst_spec,
34+
create=create,
35+
rank=schema.rank,
36+
dtype=schema.dtype,
37+
codec=schema.codec,
38+
domain=schema.domain,
39+
shape=schema.shape,
40+
chunk_layout=schema.chunk_layout,
41+
dimension_units=schema.dimension_units,
42+
delete_existing=create,
43+
).result()
44+
return (src, dst)
45+
46+
47+
def copy_ws_chunk(cg: ChunkedGraph, coords: list, source, destination):
48+
coords = np.array(coords, dtype=int)
49+
vx_start = coords * cg.meta.graph_config.CHUNK_SIZE
50+
vx_end = vx_start + cg.meta.graph_config.CHUNK_SIZE
51+
xE, yE, zE = cg.meta.voxel_bounds[:, 1]
52+
53+
x0, y0, z0 = vx_start
54+
x1, y1, z1 = vx_end
55+
x1 = min(x1, xE)
56+
y1 = min(y1, yE)
57+
z1 = min(z1, zE)
58+
59+
data = source[x0:x1, y0:y1, z0:z1].read().result()
60+
destination[x0:x1, y0:y1, z0:z1].write(data).result()

pychunkedgraph/ingest/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from .simple_tests import run_all
2424
from .create.parent_layer import add_parent_chunk
2525
from ..graph.chunkedgraph import ChunkedGraph
26+
from ..graph.ocdbt import get_seg_source_and_destination_ocdbt
2627
from ..utils.redis import get_redis_connection, keys as r_keys
2728

2829
group_name = "ingest"
@@ -71,6 +72,7 @@ def ingest_graph(
7172

7273
imanager = IngestionManager(ingest_config, meta)
7374
enqueue_l2_tasks(imanager, create_atomic_chunk)
75+
get_seg_source_and_destination_ocdbt(cg, create=True)
7476

7577

7678
@ingest_cli.command("imanager")

pychunkedgraph/ingest/cluster.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from .upgrade.parent_layer import update_chunk as update_parent_chunk
2727
from ..graph.edges import EDGE_TYPES, Edges, put_edges
2828
from ..graph import ChunkedGraph, ChunkedGraphMeta
29+
from ..graph.ocdbt import copy_ws_chunk, get_seg_source_and_destination_ocdbt
2930
from ..graph.chunks.hierarchy import get_children_chunk_coords
3031
from ..graph.utils.basetypes import NODE_ID
3132
from ..io.edges import get_chunk_edges
@@ -127,6 +128,9 @@ def create_atomic_chunk(coords: Sequence[int]):
127128
logging.debug(f"{k}: {len(v)}")
128129
for k, v in chunk_edges_active.items():
129130
logging.debug(f"active_{k}: {len(v)}")
131+
132+
src, dst = get_seg_source_and_destination_ocdbt(imanager.cg)
133+
copy_ws_chunk(imanager.cg, coords, src, dst)
130134
_post_task_completion(imanager, 2, coords)
131135

132136

pychunkedgraph/ingest/create/atomic_layer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ def add_atomic_chunk(
3535
return
3636

3737
chunk_ids = cg.get_chunk_ids_from_node_ids(chunk_node_ids)
38-
assert len(np.unique(chunk_ids)) == 1
38+
assert len(np.unique(chunk_ids)) == 1, np.unique(chunk_ids)
39+
40+
max_node_id = np.max(chunk_node_ids)
41+
cg.id_client.set_max_node_id(chunk_ids[0], max_node_id)
3942

4043
graph, _, _, unique_ids = build_gt_graph(chunk_edge_ids, make_directed=True)
4144
ccs = connected_components(graph)

pychunkedgraph/repair/fake_edges.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
from os import environ
1010
from typing import Optional
1111

12-
environ["BIGTABLE_PROJECT"] = "<>"
13-
environ["BIGTABLE_INSTANCE"] = "<>"
14-
environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<path>"
12+
# environ["BIGTABLE_PROJECT"] = "<>"
13+
# environ["BIGTABLE_INSTANCE"] = "<>"
14+
# environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<path>"
1515

1616
from pychunkedgraph.graph import edits
1717
from pychunkedgraph.graph import ChunkedGraph

pychunkedgraph/tests/test_uncategorized.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ def test_build_single_node(self, gen_graph):
107107
cg = gen_graph(n_layers=2)
108108
# Add Chunk A
109109
create_chunk(cg, vertices=[to_label(cg, 1, 0, 0, 0, 0)])
110+
chunk_id = to_label(cg, 1, 0, 0, 0, 0)
111+
assert cg.id_client.get_max_node_id(chunk_id) == chunk_id
110112

111113
res = cg.client._table.read_rows()
112114
res.consume_all()
@@ -130,7 +132,7 @@ def test_build_single_node(self, gen_graph):
130132
assert len(children) == 1 and children[0] == to_label(cg, 1, 0, 0, 0, 0)
131133
# Make sure there are not any more entries in the table
132134
# include counters, meta and version rows
133-
assert len(res.rows) == 1 + 1 + 1 + 1 + 1
135+
assert len(res.rows) == 1 + 1 + 1 + 1 + 1 + 1
134136

135137
@pytest.mark.timeout(30)
136138
def test_build_single_edge(self, gen_graph):
@@ -151,6 +153,8 @@ def test_build_single_edge(self, gen_graph):
151153
vertices=[to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1)],
152154
edges=[(to_label(cg, 1, 0, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 1), 0.5)],
153155
)
156+
chunk_id = to_label(cg, 1, 0, 0, 0, 0)
157+
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1)
154158

155159
res = cg.client._table.read_rows()
156160
res.consume_all()
@@ -183,7 +187,7 @@ def test_build_single_edge(self, gen_graph):
183187

184188
# Make sure there are not any more entries in the table
185189
# include counters, meta and version rows
186-
assert len(res.rows) == 2 + 1 + 1 + 1 + 1
190+
assert len(res.rows) == 2 + 1 + 1 + 1 + 1 + 1
187191

188192
@pytest.mark.timeout(30)
189193
def test_build_single_across_edge(self, gen_graph):
@@ -285,7 +289,7 @@ def test_build_single_across_edge(self, gen_graph):
285289

286290
# Make sure there are not any more entries in the table
287291
# include counters, meta and version rows
288-
assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1
292+
assert len(res.rows) == 2 + 2 + 1 + 3 + 1 + 1 + 2
289293

290294
@pytest.mark.timeout(30)
291295
def test_build_single_edge_and_single_across_edge(self, gen_graph):
@@ -311,13 +315,19 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph):
311315
],
312316
)
313317

318+
chunk_id = to_label(cg, 1, 0, 0, 0, 0)
319+
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 0, 0, 0, 1)
320+
314321
# Chunk B
315322
create_chunk(
316323
cg,
317324
vertices=[to_label(cg, 1, 1, 0, 0, 0)],
318325
edges=[(to_label(cg, 1, 1, 0, 0, 0), to_label(cg, 1, 0, 0, 0, 0), inf)],
319326
)
320327

328+
chunk_id = to_label(cg, 1, 1, 0, 0, 0)
329+
assert cg.id_client.get_max_node_id(chunk_id) == to_label(cg, 1, 1, 0, 0, 0)
330+
321331
add_parent_chunk(cg, 3, np.array([0, 0, 0]), n_threads=1)
322332
res = cg.client._table.read_rows()
323333
res.consume_all()
@@ -393,7 +403,7 @@ def test_build_single_edge_and_single_across_edge(self, gen_graph):
393403

394404
# Make sure there are not any more entries in the table
395405
# include counters, meta and version rows
396-
assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1
406+
assert len(res.rows) == 3 + 2 + 1 + 3 + 1 + 1 + 2
397407

398408
@pytest.mark.timeout(120)
399409
def test_build_big_graph(self, gen_graph):

0 commit comments

Comments
 (0)