Skip to content

Commit

Permalink
SparseMeshes: Added support for different output destinations, and ex…
Browse files Browse the repository at this point in the history
…tracted output-related schema and code from CreateMeshes into a utility file.
  • Loading branch information
stuarteberg committed Nov 5, 2024
1 parent 77dfe7d commit fcdc8f6
Show file tree
Hide file tree
Showing 3 changed files with 390 additions and 251 deletions.
232 changes: 22 additions & 210 deletions flyemflows/workflow/createmeshes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from distributed import get_client, worker_client

from neuclease.util import Timer, SparseBlockMask, compute_nonzero_box, box_intersection, extract_subvol, switch_cwd, iter_batches, box_shape
from neuclease.dvid import (fetch_mappings, fetch_repo_instances, create_tarsupervoxel_instance,
create_instance, is_locked, post_load, post_keyvalues, fetch_exists, fetch_keys,
fetch_supervoxels, fetch_server_info, fetch_mapping, resolve_ref, set_default_dvid_session_timeout)
from neuclease.dvid import (fetch_mappings,
post_load, post_keyvalues, fetch_exists, fetch_keys,
fetch_supervoxels, fetch_mapping, set_default_dvid_session_timeout)

from dvid_resource_manager.client import ResourceManagerClient
from dvidutils import LabelMapper
Expand All @@ -25,7 +25,8 @@

from ..util.dask_util import drop_empty_partitions, release_collection
from .util.config_helpers import BodyListSchema, load_body_list
from ..volumes import VolumeService, SegmentationVolumeSchema, DvidVolumeService
from .util.mesh_workflow_utils import MeshOutputSchema, prepare_mesh_output, input_is_labelmap, input_is_labelmap_supervoxels, input_is_labelmap_bodies
from ..volumes import VolumeService, SegmentationVolumeSchema
from ..brick import BrickWall
from . import Workflow

Expand All @@ -36,79 +37,6 @@ class CreateMeshes(Workflow):
"""
Generate meshes for many (or all) segments in a volume.
"""
GenericDvidInstanceSchema = \
{
"description": "Parameters to specify a generic dvid instance (server/uuid/instance).\n"
"Omitted values will be copied from the input, or given default values.",
"type": "object",
"required": ["server", "uuid"],

# "default": {}, # Must not have default. (Appears below in a 'oneOf' context.)
"additionalProperties": False,
"properties": {
"server": {
"description": "location of DVID server to READ.",
"type": "string",
"default": ""
},
"uuid": {
"description": "version node from dvid",
"type": "string",
"default": ""
},
"instance": {
"description": "Name of the instance to create",
"type": "string"
},
"sync-to": {
"description": "When creating a tarsupervoxels instance, it should be sync'd to a labelmap instance.\n"
"Give the instance name here.",
"type": "string",
"default": ""
},
"timeout": {
"description": "",
"type": "number",
"default": 600.0
},
"create-if-necessary": {
"description": "Whether or not to create the instance if it doesn't already exist.\n"
"If you expect the instance to exist on the server already, leave this\n"
"set to False to avoid confusion in the case of typos, UUID mismatches, etc.\n",
"type": "boolean",
"default": False
},
}
}

TarsupervoxelsOutputSchema = \
{
"additionalProperties": False,
"properties": {
"tarsupervoxels": GenericDvidInstanceSchema
}
}

KeyvalueOutputSchema = \
{
"additionalProperties": False,
"properties": {
"keyvalue": GenericDvidInstanceSchema
}
}

DirectoryOutputSchema = \
{
"additionalProperties": False,
"properties": {
"directory": {
"description": "Directory to write supervoxel meshes into.",
"type": "string",
# "default": "" # Must not have default. (Appears below in a 'oneOf' context.)
}
}
}

MeshParametersSchema = \
{
# TODO: skip-decimation-body-size
Expand Down Expand Up @@ -326,15 +254,7 @@ class CreateMeshes(Workflow):
Schema["properties"].update({
"input": SegmentationVolumeSchema,

"output": {
"oneOf": [
DirectoryOutputSchema,
TarsupervoxelsOutputSchema,
KeyvalueOutputSchema,
],
"default": {"directory": "meshes"}
},

"output": MeshOutputSchema,
"createmeshes": CreateMeshesOptionsSchema,

})
Expand All @@ -359,127 +279,15 @@ def _sanitize_config(self):
if options['subset-batch-size'] > 0 and not (options['subset-supervoxels'] or options['subset-bodies']):
raise RuntimeError("The batch feature is not supported unless you explicitly specify subset-supervoxels or subset-bodies.")

if 'directory-of-tarfiles' in self.config['output']:
raise RuntimeError("Sorry, 'directory-of-tarfiles' isn't yet supported as an output structure for this workflow.")

def _init_input_service(self):
input_config = self.config["input"]
resource_config = self.config["resource-manager"]
self.resource_mgr_client = ResourceManagerClient(resource_config["server"], resource_config["port"])
input_service = VolumeService.create_from_config(input_config, self.resource_mgr_client)
self.input_service = input_service

def _prepare_output(self):
"""
If necessary, create the output directory or
DVID instance so that meshes can be written to it.
"""
output_cfg = self.config["output"]
output_fmt = self.config["createmeshes"]["format"]

## directory output
if 'directory' in output_cfg:
# Convert to absolute so we can chdir with impunity later.
output_cfg['directory'] = os.path.abspath(output_cfg['directory'])
os.makedirs(output_cfg['directory'], exist_ok=True)
return

##
## DVID output (either keyvalue or tarsupervoxels)
##
(instance_type,) = output_cfg.keys()

set_default_dvid_session_timeout(
output_cfg[instance_type]["timeout"],
output_cfg[instance_type]["timeout"]
)

server = output_cfg[instance_type]['server']
uuid = output_cfg[instance_type]['uuid']
instance = output_cfg[instance_type]['instance']

# If the output server or uuid is left blank,
# we assume it should be auto-filled from the input settings.
if server == "" or uuid == "":
base_input = self.input_service.base_service
if not isinstance(base_input, DvidVolumeService):
# Can't copy from the input if the input ain't a dvid source
raise RuntimeError("Output destination server/uuid was left blank.")

if server == "":
server = base_input.server
output_cfg[instance_type]['server'] = server

if uuid == "":
uuid = base_input.uuid
output_cfg[instance_type]['uuid'] = uuid

# Resolve in case a branch was given instead of a specific uuid
uuid = resolve_ref(server, uuid)

if is_locked(server, uuid):
info = fetch_server_info(server)
if "Mode" in info and info["Mode"] == "allow writes on committed nodes":
logger.warning(f"Output is a locked node ({uuid}), but server is in full-write mode. Proceeding.")
elif os.environ.get("DVID_ADMIN_TOKEN", ""):
logger.warning(f"Output is a locked node ({uuid}), but you defined DVID_ADMIN_TOKEN. Proceeding.")
else:
raise RuntimeError(f"Can't write to node {uuid} because it is locked.")

if instance_type == 'tarsupervoxels' and not self.input_is_labelmap_supervoxels():
msg = ("You shouldn't write to a tarsupervoxels instance unless "
"you're reading supervoxels from a labelmap input.\n"
"Use a labelmap input source, and set supervoxels: true")
raise RuntimeError(msg)

existing_instances = fetch_repo_instances(server, uuid)
if instance in existing_instances:
# Instance exists -- nothing to do.
return

if not output_cfg[instance_type]['create-if-necessary']:
msg = (f"Output instance '{instance}' does not exist, "
"and your config did not specify create-if-necessary")
raise RuntimeError(msg)

assert instance_type in ('tarsupervoxels', 'keyvalue')

## keyvalue output
if instance_type == "keyvalue":
create_instance(server, uuid, instance, "keyvalue", tags=["type=meshes"])
return

## tarsupervoxels output
sync_instance = output_cfg["tarsupervoxels"]["sync-to"]

if not sync_instance:
# Auto-fill a default 'sync-to' instance using the input segmentation, if possible.
base_input = self.input_service.base_service
if isinstance(base_input, DvidVolumeService):
if base_input.instance_name in existing_instances:
sync_instance = base_input.instance_name

if not sync_instance:
msg = ("Can't create a tarsupervoxels instance unless "
"you specify a 'sync-to' labelmap instance name.")
raise RuntimeError(msg)

if sync_instance not in existing_instances:
msg = ("Can't sync to labelmap instance '{sync_instance}': "
"it doesn't exist on the output server.")
raise RuntimeError(msg)

create_tarsupervoxel_instance(server, uuid, instance, sync_instance, output_fmt)

def input_is_labelmap(self):
return isinstance(self.input_service.base_service, DvidVolumeService)

def input_is_labelmap_supervoxels(self):
if isinstance(self.input_service.base_service, DvidVolumeService):
return self.input_service.base_service.supervoxels
return False

def input_is_labelmap_bodies(self):
if isinstance(self.input_service.base_service, DvidVolumeService):
return not self.input_service.base_service.supervoxels
return False
input_config = self.config["input"]
self.input_service = VolumeService.create_from_config(input_config, self.resource_mgr_client)

def execute(self):
"""
Expand Down Expand Up @@ -514,7 +322,11 @@ def execute(self):
"""
self._sanitize_config()
self._init_input_service()
self._prepare_output()
prepare_mesh_output(
self.config["output"],
self.config["format"],
self.input_service
)

try:
subset_supervoxels, existing_svs = self._load_subset_supervoxels()
Expand Down Expand Up @@ -670,7 +482,7 @@ def _load_subset_supervoxels(self):
if len(subset_supervoxels) and len(subset_bodies):
raise RuntimeError("Can't use both subset-supervoxels and subset-bodies. Choose one.")

if len(subset_bodies) and not self.input_is_labelmap():
if len(subset_bodies) and not input_is_labelmap(self.input_service):
raise RuntimeError("Can't use 'subset-bodies' unless your input is a DVID labelmap. Try subset-supervoxels.")

# Load subset_bodies from a list, CSV, or from the kafka log
Expand All @@ -683,7 +495,7 @@ def _load_subset_supervoxels(self):
subset_bodies = self._determine_changed_labelmap_bodies(subset_bodies)

# If user supplied bodies, convert to supervoxels.
if self.input_is_labelmap_supervoxels() and len(subset_bodies) > 0:
if input_is_labelmap_supervoxels(self.input_service) and len(subset_bodies) > 0:
with Timer(f"Fetching supervoxel set for {len(subset_bodies)} labelmap bodies", logger):
seg_instance = self.input_service.base_service.instance_triple

Expand Down Expand Up @@ -712,7 +524,7 @@ def fetch_svs(body):

logger.info(f"Selected bodies contain {len(subset_supervoxels)} supervoxels")

if self.input_is_labelmap_bodies():
if input_is_labelmap_bodies(self.input_service):
assert len(subset_supervoxels) == 0, \
"Can't use subset-supervoxels when reading from a labelmap in body mode. Please use subset-bodies."

Expand Down Expand Up @@ -750,7 +562,7 @@ def _determine_changed_labelmap_bodies(self, kafka_timestamp_string):
Returns:
list of body IDs
"""
if not self.input_is_labelmap():
if not input_is_labelmap(self.input_service):
raise RuntimeError("Can't specify subset-bodies as a kafka timestamp for sources other than DVID labelmap")

subset_bodies = self.input_service.base_service.determine_changed_labelmap_bodies(kafka_timestamp_string)
Expand Down Expand Up @@ -835,7 +647,7 @@ def _compute_brick_labelcounts(self, batch_index, bricks_ddf, subset_supervoxels

def _compute_body_stats(self, batch_index, brick_counts_df):
with Timer(f"Batch {batch_index:02}: Aggregating brick stats into body stats", logger):
if self.input_is_labelmap_supervoxels():
if input_is_labelmap_supervoxels(self.input_service):
seg_instance = self.input_service.base_service.instance_triple

brick_counts_df['sv'] = brick_counts_df['label'].values
Expand Down Expand Up @@ -1407,7 +1219,7 @@ def serialize_mesh(sv, mesh, path=None, fmt=None, log=True):

try:
return mesh.serialize(path, fmt)
except:
except Exception:
logger = logging.getLogger(__name__)

msg = f"Failed to serialize mesh as {fmt}."
Expand Down
Loading

0 comments on commit fcdc8f6

Please sign in to comment.