Skip to content

Commit

Permalink
add miniscope interface
Browse files Browse the repository at this point in the history
  • Loading branch information
h-mayorquin committed Oct 16, 2024
1 parent 705e74e commit 31f8ef5
Showing 1 changed file with 309 additions and 0 deletions.
309 changes: 309 additions & 0 deletions src/cai_lab_to_nwb/zaki_2024/imaginginterface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
from roiextractors.imagingextractor import ImagingExtractor
from roiextractors.multiimagingextractor import MultiImagingExtractor
from roiextractors.extraction_tools import PathType, DtypeType, get_package

from typing import Optional
import json
import datetime

from pydantic import DirectoryPath
from pathlib import Path
import numpy as np

from copy import deepcopy
from pathlib import Path
from typing import Literal, Optional

import numpy as np
from pydantic import DirectoryPath, validate_call
from pynwb import NWBFile

from neuroconv.datainterfaces.ophys.baseimagingextractorinterface import BaseImagingExtractorInterface
from neuroconv.utils import DeepDict, dict_deep_update


class MiniscopeImagingExtractor(MultiImagingExtractor):

def __init__(self, folder_path: DirectoryPath):
self.folder_path = Path(folder_path)

miniscope_videos_folder_path = self.folder_path / "Miniscope"
assert miniscope_videos_folder_path.exists(), f"Miniscope videos folder not found in {self.folder_path}"

self._miniscope_avi_file_paths = [p for p in miniscope_videos_folder_path.iterdir() if p.suffix == ".avi"]
import natsort

self._miniscope_avi_file_paths = natsort.natsorted(self._miniscope_avi_file_paths)

imaging_extractors = []
for file_path in self._miniscope_avi_file_paths:
extractor = _MiniscopeSingleVideoExtractor(file_path=file_path)
imaging_extractors.append(extractor)

super().__init__(imaging_extractors=imaging_extractors)

self._sampling_frequency = self._imaging_extractors[0].get_sampling_frequency()
self._image_size = self._imaging_extractors[0].get_image_size()
self._dtype = self._imaging_extractors[0].get_dtype()

def get_num_frames(self) -> int:
return self._num_frames

def get_num_channels(self) -> int:
return 1

def get_image_size(self) -> tuple[int, int]:
return self._image_size

def get_sampling_frequency(self):
return self._sampling_frequency

def get_dtype(self) -> DtypeType:
return self._dtype

def get_channel_names(self) -> list[str]:
return ["OpticalChannel"]


class _MiniscopeSingleVideoExtractor(ImagingExtractor):
"""An auxiliar extractor to get data from a single Miniscope video (.avi) file.
This format consists of a single video (.avi)
Multiple _MiniscopeSingleVideoExtractor are combined into the MiniscopeImagingExtractor for public access.
"""

extractor_name = "_MiniscopeSingleVideo"

def __init__(self, file_path: PathType):
"""Create a _MiniscopeSingleVideoExtractor instance from a file path.
Parameters
----------
file_path: PathType
The file path to the Miniscope video (.avi) file.
"""
from neuroconv.datainterfaces.behavior.video.video_utils import VideoCaptureContext

self._video_capture = VideoCaptureContext
self._cv2 = get_package(package_name="cv2", installation_instructions="pip install opencv-python-headless")
self.file_path = file_path
super().__init__()

cap = self._cv2.VideoCapture(str(self.file_path))

self._num_frames = int(cap.get(self._cv2.CAP_PROP_FRAME_COUNT))

# Get the frames per second (fps)
self._sampling_frequency = cap.get(self._cv2.CAP_PROP_FPS)
self.frame_width = int(cap.get(self._cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(cap.get(self._cv2.CAP_PROP_FRAME_HEIGHT))

_, frame = cap.read()
self._dtype = frame.dtype

# Release the video capture object
cap.release()

def get_num_frames(self) -> int:
return self._num_frames

def get_num_channels(self) -> int:
return 1

def get_image_size(self) -> tuple[int, int]:
return (self.frame_height, self.frame_width)

def get_sampling_frequency(self):
return self._sampling_frequency

def get_dtype(self) -> DtypeType:
return self._dtype

def get_channel_names(self) -> list[str]:
return ["OpticalChannel"]

def get_video(
self, start_frame: Optional[int] = None, end_frame: Optional[int] = None, channel: int = 0
) -> np.ndarray:
"""Get the video frames.
Parameters
----------
start_frame: int, optional
Start frame index (inclusive).
end_frame: int, optional
End frame index (exclusive).
channel: int, optional
Channel index.
Returns
-------
video: numpy.ndarray
The video frames.
Notes
-----
The grayscale conversion is based on minian
https://github.com/denisecailab/minian/blob/f64c456ca027200e19cf40a80f0596106918fd09/minian/utilities.py#LL272C12-L272C12
"""
if channel != 0:
raise NotImplementedError(
f"The {self.extractor_name}Extractor does not currently support multiple color channels."
)

end_frame = end_frame or self.get_num_frames()
start_frame = start_frame or 0

video = np.empty(shape=(end_frame - start_frame, *self.get_image_size()), dtype=self.get_dtype())
with self._video_capture(file_path=str(self.file_path)) as video_obj:
# Set the starting frame position
video_obj.current_frame = start_frame
for frame_number in range(end_frame - start_frame):
frame = next(video_obj)
video[frame_number] = self._cv2.cvtColor(frame, self._cv2.COLOR_RGB2GRAY)

return video


class MiniscopeImagingInterface(BaseImagingExtractorInterface):
"""Data Interface for MiniscopeImagingExtractor."""

Extractor = MiniscopeImagingExtractor
display_name = "Miniscope Imaging"
associated_suffixes = (".avi", ".csv", ".json")
info = "Interface for Miniscope imaging data."

@classmethod
def get_source_schema(cls) -> dict:
source_schema = super().get_source_schema()
source_schema["properties"]["folder_path"][
"description"
] = "The main Miniscope folder. The microscope movie files are expected to be in sub folders within the main folder."

return source_schema

@validate_call
def __init__(self, folder_path: DirectoryPath):
"""
Initialize reading the Miniscope imaging data.
Parameters
----------
folder_path : DirectoryPath
The main Miniscope folder.
The microscope movie files are expected to be in sub folders within the main folder.
"""
from ndx_miniscope.utils import get_recording_start_times, read_miniscope_config

super().__init__(folder_path=folder_path)

self.folder_path = Path(folder_path)
miniscope_folder = Path(folder_path) / "Miniscope"
self._miniscope_config = read_miniscope_config(folder_path=miniscope_folder)

self.photon_series_type = "OnePhotonSeries"

def _get_session_start_time(self):

general_metadata_json = self.folder_path / "metaData.json"
assert general_metadata_json.exists(), f"General metadata json not found in {self.folder_path}"

## Read metadata
with open(general_metadata_json) as f:
general_metadata = json.load(f)

session_start_time = datetime.datetime(
year=general_metadata["year"],
month=general_metadata["month"],
day=general_metadata["day"],
hour=general_metadata["hour"],
minute=general_metadata["minute"],
second=general_metadata["second"],
microsecond=general_metadata["msec"] * 1000, # Convert milliseconds to microseconds
)

return session_start_time

def get_metadata(self) -> DeepDict:
from neuroconv.tools.roiextractors import get_nwb_imaging_metadata

metadata = super().get_metadata()
default_metadata = get_nwb_imaging_metadata(self.imaging_extractor, photon_series_type=self.photon_series_type)
metadata = dict_deep_update(metadata, default_metadata)
metadata["Ophys"].pop("TwoPhotonSeries", None)

session_start_time = self._get_session_start_time()
metadata["NWBFile"].update(session_start_time=session_start_time)

device_metadata = metadata["Ophys"]["Device"][0]
miniscope_config = deepcopy(self._miniscope_config)
device_name = miniscope_config.pop("name")
device_metadata.update(name=device_name, **miniscope_config)
# Add link to Device for ImagingPlane
imaging_plane_metadata = metadata["Ophys"]["ImagingPlane"][0]
imaging_plane_metadata.update(
device=device_name,
imaging_rate=self.imaging_extractor.get_sampling_frequency(),
)
one_photon_series_metadata = metadata["Ophys"]["OnePhotonSeries"][0]
one_photon_series_metadata.update(unit="px")

return metadata

def get_metadata_schema(self) -> dict:
metadata_schema = super().get_metadata_schema()
metadata_schema["properties"]["Ophys"]["definitions"]["Device"]["additionalProperties"] = True
return metadata_schema

def get_original_timestamps(self) -> np.ndarray:

timestamps_file_path = self.folder_path / "Miniscope" / "timeStamps.csv"
assert timestamps_file_path.exists(), f"Miniscope timestamps file not found in {self.folder_path}"

import pandas as pd

timetsamps_df = pd.read_csv(timestamps_file_path)
timestamps_milliseconds = timetsamps_df["Time Stamp (ms)"].values.astype(float)
timestamps_seconds = timestamps_milliseconds / 1000.0

# Shift when the first timestamp is negative
# TODO: Figure why, I copied from miniscope
if timestamps_seconds[0] < 0.0:
timestamps_seconds += abs(timestamps_seconds[0])

return np.asarray(timestamps_seconds)

def add_to_nwbfile(
self,
nwbfile: NWBFile,
metadata: Optional[dict] = None,
photon_series_type: Literal["TwoPhotonSeries", "OnePhotonSeries"] = "OnePhotonSeries",
stub_test: bool = False,
stub_frames: int = 100,
):
from ndx_miniscope.utils import add_miniscope_device

from neuroconv.tools.roiextractors import add_photon_series_to_nwbfile

miniscope_timestamps = self.get_original_timestamps()
imaging_extractor = self.imaging_extractor

if stub_test:
stub_frames = min([stub_frames, self.imaging_extractor.get_num_frames()])
imaging_extractor = self.imaging_extractor.frame_slice(start_frame=0, end_frame=stub_frames)
miniscope_timestamps = miniscope_timestamps[:stub_frames]

imaging_extractor.set_times(times=miniscope_timestamps)

device_metadata = metadata["Ophys"]["Device"][0]
# Cast to string because miniscope extension requires so
device_metadata["gain"] = str(device_metadata["gain"])
device_metadata.pop("ewl")
add_miniscope_device(nwbfile=nwbfile, device_metadata=device_metadata)

add_photon_series_to_nwbfile(
imaging=imaging_extractor,
nwbfile=nwbfile,
metadata=metadata,
photon_series_type=photon_series_type,
)

0 comments on commit 31f8ef5

Please sign in to comment.