Using queue worker to compute handcraft features #780
Replies: 3 comments 6 replies
-
Hi, If I understand correctly there is already a solution by adding an extra parameter to the transform dict_image_to_keep = {'image': 'image_after_transform'}
random_affine = tio.RandomBlur(keep=dict_image_to_keep) then using this transform with the Queue, will give you patch with a new image (obtain after the transform) |
Beta Was this translation helpful? Give feedback.
-
I think a simple solution would just be to add a callable to the list of transforms passed to import torchio as tio
def add_mean(subject):
subject['mean'] = subject.t1.data.mean()
transform = tio.Compose((tio.RandomAffine(), add_mean)) |
Beta Was this translation helpful? Give feedback.
-
I end up writing a callback queue inheriting I have made sure that:
│Traceback (most recent call last): │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/.guild/runs/0dff53e90bc44d40887521b5fdd5a962/.guild/sourcecode/pytorch_med_imaging/main.py", line 474, in console_entry │
│ main(a, config, logger) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/.guild/runs/0dff53e90bc44d40887521b5fdd5a962/.guild/sourcecode/pytorch_med_imaging/main.py", line 298, in main │
│ solver.solve_epoch(i) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/.guild/runs/0dff53e90bc44d40887521b5fdd5a962/.guild/sourcecode/pytorch_med_imaging/solvers/SolverBase.py", line 268, in solve_epoch │
│ for step_idx, mb in enumerate(self._data_loader): │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 517, in __next__ │
│ data = self._next_data() │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 557, in _next_data │
│ data = self._dataset_fetcher.fetch(index) # may raise StopIteration │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/site-packages/torch/utils/data/_utils/fetch.py", line 44, in fetch │
│ data = [self.dataset[idx] for idx in possibly_batched_index] │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/site-packages/torch/utils/data/_utils/fetch.py", line 44, in <listcomp> │
│ data = [self.dataset[idx] for idx in possibly_batched_index] │
│ File "/home/user/.../ThirdParty/torchio/torchio/data/queue.py", line 165, in __getitem__ │
│ self._fill() │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/.guild/runs/0dff53e90bc44d40887521b5fdd5a962/.guild/sourcecode/pytorch_med_imaging/PMI_data_loader/lambda_tio_adaptor.py", line 124, in _fill │
│ res.extend(p.get()) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/multiprocessing/pool.py", line 657, in get │
│ raise self._value │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks │
│ put(task) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/multiprocessing/connection.py", line 206, in send │
│ self._send_bytes(_ForkingPickler.dumps(obj)) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps │
│ cls(buf, protocol).dump(obj) │
│ File "/home/user/Toolkits/Anaconda/envs/py3_cuda11/lib/python3.7/site-packages/torch/multiprocessing/reductions.py", line 312, in reduce_storage │
│ metadata = storage._share_filename_() │
│RuntimeError: unable to open shared memory object </torch_1312678_2507994531> in read-write mode I am able to finish loading the whole set of data, but I keep getting this problem at around the same (5th) epoch. In each epoch, I sample 150 128×128 patches from 600 subjects. I have also checked I would appreciate any inputs on solving this error Here's my code: class CallbackQueue(Queue):
r"""
Args:
patch_sampling_callback (callable, Optional):
A function that takes :class:`Subject` or :class:`dict` as input and return any output. If this is None, its
behavior is same as `tio.Queue`. Default to None.
create_new_attribute (str or list of str, Optional):
If a list of str is supplied and the output of the upper is an iterable with identical lenght as this list,
each output will be mapped to each str in the list.
"""
def __init__(self,
*args,
patch_sampling_callback: Optional[TypeCallable] = None,
create_new_attribute: Optional[Union[Sequence[str], str]] = None,
**kwargs):
super(CallbackQueue, self).__init__(*args, **kwargs)
self.callback = patch_sampling_callback
self.create_new_attribute = create_new_attribute
def _fill(self):
super(CallbackQueue, self)._fill()
if self.callback is None or self.create_new_attribute is None:
return
res = []
if self.num_workers > 1:
# This results in OSError: Too many open files, don't know why, but can be worked around by
# setting ulimit -n [large number], > 100000; actually, the error changed to another error after a few epoch.
# Create thread pool
with mpi.Pool(self.num_workers) as pool:
# for each patch, execute function
p = pool.map_async(self.callback,
self.patches_list)
pool.close()
pool.join()
res.extend(p.get())
pool.terminate()
del pool, p
else:
# Do it in a single thread. Could be slow.
for p in tqdm(self.patches_list):
res.append(self.callback(p))
self._map_to_new_attr(res)
def _map_to_new_attr(self, res):
# check if list or str was supplied to self.create_new_attribute
if isinstance(self.create_new_attribute, str):
# check length
if not len(res) == len(self.patches_list):
raise IndexError(f"Expect result to have the same length (got {len(res)}) as the patch list got "
f"{len(self.patches_list)}).")
for p, r in zip(self.patches_list, res):
p[self.create_new_attribute] = r
elif isinstance(self.create_new_attribute, (str, tuple)):
for p, r in zip(self.patches_list, res):
if not len(r) == len(self.create_new_attribute):
raise IndexError(f"Expect result to have the same length (got {len(r)}) as the patch list got "
f"{len(self.patches_list)}).")
for att, rr in zip(self.create_new_attribute, r):
p[att] = rr |
Beta Was this translation helpful? Give feedback.
-
I am using random sampler to sample patches using
tio.Queue
interface. Is there a proper way I can use the queue worker (after images went through tio.transform) to say, compute certain feature from one of the images (preferrably consistent syntax asinclude
andexclude
kwargs of the tio.transform) in the patches, and put the computed feature into a new attribute of the subject?subject(image=img, segmentation=seg)
--> tio.Queue([subjects], transform)
--> subject(image=img_patch, segmentation=seg_patch, new_feature=new_feature)
Beta Was this translation helpful? Give feedback.
All reactions