|
| 1 | +""" |
| 2 | +Script to run stitching modules. |
| 3 | +Creates overview and stitched links for unprocessed acquisitions using the ddbclient |
| 4 | +
|
| 5 | +last updated: 2022/08/17 |
| 6 | +@author: shbhas |
| 7 | +""" |
| 8 | +import time |
| 9 | +import os |
| 10 | +import numpy as np |
| 11 | +from acpreprocessing.utils.metadata import parse_metadata |
| 12 | +from acpreprocessing.stitching_modules.convert_to_n5 import acquisition_dir_to_n5_dir |
| 13 | +from acpreprocessing.utils.nglink import update_state, create_overview |
| 14 | +from acpreprocessing.stitching_modules.stitch import create_json, stitch |
| 15 | +import argschema |
| 16 | +from argschema.fields import Boolean, Int, Str |
| 17 | +from ddbclient import acquisition, client, utils |
| 18 | +import posixpath |
| 19 | + |
| 20 | + |
| 21 | +def find_entry(response_json): |
| 22 | + """returns subset of responses that are high res (filters out overviews)""" |
| 23 | + for entry in response_json: |
| 24 | + uri = entry["data_location"]["ACTiff_dir_ispim"]["uri"] |
| 25 | + if uri.find("high_res") != -1: |
| 26 | + return [uri,entry["scope"]] |
| 27 | + return [] |
| 28 | + |
| 29 | + |
| 30 | +start = time.time() |
| 31 | + |
| 32 | +# ping db to find new samples to process |
| 33 | +acq_client = acquisition.AcquisitionClient("http://bigkahuna.corp.alleninstitute.org/transfer_temp/api/api", subpath="") |
| 34 | + |
| 35 | +#for testing.. |
| 36 | +# url_base = acq_client.base_url |
| 37 | +# # "http://api/acquisition/{acquisition_id}/update_fields" |
| 38 | +# stitching_status_dict = {"stitching_status": "STITCHING_COMPLETE"} |
| 39 | +# acq = response_json[0] |
| 40 | +# print(acq_client.base_url) |
| 41 | +# acq_id = acq["acquisition_id"] |
| 42 | +# url = posixpath.join( |
| 43 | +# url_base, |
| 44 | +# 'acquisition', |
| 45 | +# acq_id, |
| 46 | +# 'stitching_status' |
| 47 | +# utils.put_json(url, "STITCHING_COMPLETE") |
| 48 | + |
| 49 | +# test_acq = {"section_num": "0", "session_id": "220722", "specimen_id": "test", "scope": "iSPIM2", "data_location": {"ACTiff_dir_m2": {"status": "DELETED", "uri": "", "metadata": {}},"ACTiff_dir_ispim": {"status": "COMPLETE", "uri": "qnap-ispim2:/workflow_data/iSPIM2/test_S0_220722_high_res", "metadata": {"contains": ["high_res_Pos0/high_res_0_0.tif", "high_res_Pos0/high_res_0_1.tif", "high_res_Pos0/high_res_0_2.tif"]}}}, "acquisition_metadata": {}, "acquisition_time_utc": "2022-07-22T19:50:03.717883+00:00", "acquisition_id": ""} |
| 50 | +# acq_client.post(test_acq) |
| 51 | + |
| 52 | +response_json = acq_client.query( |
| 53 | +{ |
| 54 | + "filter": { |
| 55 | + "stitching_status": {"status": {"$eq": "pending"}} |
| 56 | + # "specimen_id": "test" |
| 57 | + }, |
| 58 | + "projection": { |
| 59 | + "specimen_id": True, |
| 60 | + "session_id": True, |
| 61 | + "section_num": True, |
| 62 | + "scope": True, |
| 63 | + "acquisition_id": True, |
| 64 | + "data_location": True, |
| 65 | + "stitching_status": True |
| 66 | + } |
| 67 | +}) |
| 68 | +print(response_json) |
| 69 | + |
| 70 | +if not response_json: |
| 71 | + print("Error: Could not find acquisition") |
| 72 | +else: |
| 73 | + info = find_entry(response_json) |
| 74 | + # print(info) |
| 75 | + if not info: |
| 76 | + "Error: Unable to find high_res data location" |
| 77 | + else: |
| 78 | + dir_ACdata = info[0] |
| 79 | + rootDir = "/ACdata"+dir_ACdata.split(':')[1] |
| 80 | + # print(rootDir) |
| 81 | + scope = info[1] |
| 82 | + dirname = rootDir.split("/")[-1] |
| 83 | + print(dirname) |
| 84 | + |
| 85 | +run_input = { |
| 86 | + "outputDir": "/ACdata/processed/"+scope+"/"+dirname+"/", |
| 87 | + "rootDir": rootDir + "/", |
| 88 | + "ds_name": dirname, |
| 89 | + "mip_level": 3, |
| 90 | + "md_filename": rootDir+"/acqinfo_metadata.json", |
| 91 | + "consolidate_pos": True, |
| 92 | + "reverse_stitch": False, |
| 93 | + "deskew": False, |
| 94 | + "stitch_channel": 0, |
| 95 | + "stitch_json": "stitch-test.json", |
| 96 | + "stitch_final": "stitch-final.json", |
| 97 | + "state_json": "state-test.json", |
| 98 | + "nglink_name": "nglink-test.txt", |
| 99 | + "stitched_nglink": "stitched-nglink-test.txt", |
| 100 | + "stitched_state": "stitched-state.json" |
| 101 | +} |
| 102 | +print(run_input) |
| 103 | + |
| 104 | + |
| 105 | +class RunModulesSchema(argschema.ArgSchema): |
| 106 | + outputDir = Str(required=True, description='output directory') |
| 107 | + rootDir = Str(required=True, description='raw tiff root directory') |
| 108 | + ds_name = Str(required=True) |
| 109 | + mip_level = Int(required=False, default=3, |
| 110 | + description='downsample level to perform stitching with') |
| 111 | + md_filename = Str(required=False, default="acqinfo_metadata.json", |
| 112 | + description='metadata file name') |
| 113 | + consolidate_pos = Boolean(required=False, default=True, |
| 114 | + description="Whether to consolidate all position" |
| 115 | + "strips in one neuroglancer layer or" |
| 116 | + "separate them into independent layers") |
| 117 | + reverse_stitch = Boolean(required=False,default=False, description="Whether to reverse direction of stitching or not") |
| 118 | + deskew = Boolean(required=False,default=False, description="Whether to deskew or not") |
| 119 | + stitch_channel = Int(required=False,default=0, description="Which channel to compute stitching with") |
| 120 | + stitch_json = Str(required=False,default="stitch.json", description="Name of stitching parameters json file") |
| 121 | + stitch_final = Str(required=False,default="stitch-final.json", description="Name of final stitching result json file") |
| 122 | + state_json = Str(required=False,default="state.json", description="Name of overview state json file") |
| 123 | + nglink_name = Str(required=False,default="nglink.txt", description="Name of nglink txt file") |
| 124 | + stitched_nglink = Str(required=False,default="stitched-nglink.txt", description="Name of stitched nglink txt file") |
| 125 | + stitched_state = Str(required=False,default="stitched-state.json", description="Name of stitched state json file") |
| 126 | + |
| 127 | + |
| 128 | +class RunModules(argschema.ArgSchemaParser): |
| 129 | + default_schema = RunModulesSchema |
| 130 | + |
| 131 | + def run(self): |
| 132 | + # set variables needed for processing |
| 133 | + md_input = { |
| 134 | + "rootDir": run_input['rootDir'], |
| 135 | + "fname": run_input['md_filename'] |
| 136 | + } |
| 137 | + # md = parse_metadata.ParseMetadata(input_data=md_input).get_md() |
| 138 | + # dsname = md["stripdirs"][0].split("_Pos")[0] |
| 139 | + # run_input["ds_name"]=dsname |
| 140 | + run_input["reverse_stitch"]=parse_metadata.ParseMetadata(input_data=md_input).get_direction() |
| 141 | + deskew = 0 |
| 142 | + if run_input['deskew']: |
| 143 | + deskew = np.cos(parse_metadata.ParseMetadata(input_data=md_input).get_angle()) |
| 144 | + n_channels = parse_metadata.ParseMetadata(input_data=md_input).get_number_of_channels() |
| 145 | + n_pos = parse_metadata.ParseMetadata(input_data=md_input).get_number_of_positions() |
| 146 | + if n_pos == 0: |
| 147 | + print("No positions to process") |
| 148 | + return -1 |
| 149 | + |
| 150 | + |
| 151 | + # if not converted to n5, do so |
| 152 | + convert_input = { |
| 153 | + "input_dir": f"{run_input['rootDir']}", |
| 154 | + "output_dir": f"{run_input['outputDir']}", |
| 155 | + "max_mip": 5, |
| 156 | + "position_concurrency": 5 |
| 157 | + } |
| 158 | + if not os.path.isdir(convert_input['output_dir']): |
| 159 | + acquisition_dir_to_n5_dir.AcquisitionDirToN5Dir(input_data=convert_input).run() |
| 160 | + else: |
| 161 | + print(f"Skipping conversion, {dirname} directory already exists") |
| 162 | + |
| 163 | + |
| 164 | + # Create overview nglink, initialize state for each channel |
| 165 | + state = {"showDefaultAnnotations": False, "layers": []} |
| 166 | + overview_input={ |
| 167 | + "run_input": run_input |
| 168 | + } |
| 169 | + create_overview.Overview(input_data=overview_input).run(n_channels, n_pos, dirname, deskew, state=state) |
| 170 | + |
| 171 | + |
| 172 | + # Create Stitch.json which is input for stitching code (using channel 0) |
| 173 | + create_json_input = { |
| 174 | + 'rootDir': run_input['rootDir']+"/", |
| 175 | + 'outputDir': run_input['outputDir']+"/", |
| 176 | + "mip_level": run_input['mip_level'], |
| 177 | + "reverse": run_input['reverse_stitch'], |
| 178 | + "dirname": dirname, |
| 179 | + "stitch_channel": run_input['stitch_channel'], |
| 180 | + "stitch_json": run_input['stitch_json'] |
| 181 | + } |
| 182 | + create_json.CreateJson(input_data=create_json_input).run() |
| 183 | + # Run Stitching with stitch.json |
| 184 | + stitchjsonpath = os.path.join(create_json_input['outputDir'], run_input['stitch_json']) |
| 185 | + stitch_input = { |
| 186 | + "stitchjson": stitchjsonpath |
| 187 | + } |
| 188 | + # Perform stitching if not done yet |
| 189 | + if not os.path.exists(os.path.join(run_input['outputDir'], run_input["stitch_final"])): |
| 190 | + stitch.Stitch(input_data=stitch_input).run() |
| 191 | + else: |
| 192 | + print("Skipped stitching - already computed") |
| 193 | + |
| 194 | + |
| 195 | + # update state json with stitched coord |
| 196 | + update_state_input = { |
| 197 | + 'rootDir': run_input['rootDir'], |
| 198 | + "outputDir": run_input['outputDir'], |
| 199 | + 'mip_level': run_input['mip_level'], |
| 200 | + "fname": "stitched-nglink.txt", |
| 201 | + "consolidate_pos": run_input['consolidate_pos'], |
| 202 | + "n_channels": n_channels, |
| 203 | + "state_json": run_input["state_json"], |
| 204 | + "stitch_final": run_input["stitch_final"], |
| 205 | + "stitched_nglink": run_input["stitched_nglink"], |
| 206 | + "stitched_state": run_input["stitched_state"] |
| 207 | + } |
| 208 | + update_state.UpdateState(input_data=update_state_input).run() |
| 209 | + |
| 210 | + print('Done processing') |
| 211 | + print(str((time.time()-start)/60) + " minutes elapsed") |
| 212 | + |
| 213 | + |
| 214 | +if __name__ == '__main__': |
| 215 | + RunModules(run_input).run() |
0 commit comments