Skip to content

Commit

Permalink
Removed hengenlab metadata reader. This functionality will be moved t…
Browse files Browse the repository at this point in the history
…o a workflow. The neuraltoolkit dependency was awkward and causing other bugs. (#72)

* added AtomicGetSetEphysMetadata

* Corrected bug and added force_release function

* Added documentation commented on by Kate

* Moved AtomicGetSetEphysMetadata to common_utils

* Switched to using common_utils.checkin|checkout methods

* minor update

* Changing smart_open import

* Added assert and docs to use "r" or "rb" mode.

* clean up comments

* Removed hengenlab metadata reader. This functionality will be moved to a workflow. The neuraltoolkit dependency was awkward and causing other bugs.

* Updated unittests

* missed a hengenlab reference

---------

Co-authored-by: Ash <[email protected]>
Co-authored-by: Lon Blauvelt <[email protected]>
  • Loading branch information
3 people authored Mar 5, 2024
1 parent dc02a99 commit fedc18f
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 183 deletions.
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,12 @@ local_scheme = "no-local-version"
[project.optional-dependencies]
all = [
'braingeneers[ml]',
'braingeneers[hengenlab]',
'braingeneers[dev]',
]
ml = [
'torch',
'scikit-learn',
]
hengenlab = [
'neuraltoolkit==0.3.1', # channel mapping information
]
dev = [
"pytest >=6",
"pytest-cov >=3",
Expand Down
131 changes: 0 additions & 131 deletions src/braingeneers/data/datasets_electrophysiology.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
import re
from types import ModuleType
import bisect
try:
import neuraltoolkit as ntk # optional import
except ImportError:
pass


VALID_LOAD_DATA_DTYPES = [np.int16, np.float16, np.float32, np.float64]
Expand Down Expand Up @@ -847,133 +843,6 @@ def _read_hengenlab_ecube_timestamp(filepath: str) -> int:
return int(np.frombuffer(f.read(8), dtype=np.uint64))


def generate_metadata_hengenlab(batch_uuid: str,
dataset_name: str,
experiment_name: Union[List[str], str] = 'experiment1',
fs: int = 25000,
n_threads: int = 32,
save: bool = False):
"""
Generates a metadata json and experiment1...experimentN section for a hengenlab dataset upload.
File locations in S3 for hengenlab neural data files:
s3://braingeneers/ephys/YYYY-MM-DD-e-${DATASET_NAME}/original/data/*.bin
Contiguous recording periods
:param batch_uuid: location on braingeneers storage (S3)
:param dataset_name: the dataset_name as defined in `neuraltoolkit`. Metadata will be pulled from `neuraltoolkit`.
:param experiment_name: Dataset name as stored in `neuraltoolkit`. For example "CAF26"
:param fs: sampling rate, default to 25,000
:param n_threads: number of threads to use for reading ecube timestamps (default: 32)
:param save: (default False) option to save the metadata.json back to S3
(or the current braingeneers.default_endpoint)
:return: metadata.json
"""
# hengenlab's (current) source of record for experiment metadata is stored in a repo which can't be imported
# due to unacceptable dependencies. Instead, the source code is being downloaded with the relevant static
# functions parsed out explicitly. This is a hacky approach, but this data shouldn't be stored
# in a repo and is expected to be replaced with a proper database in the future.
# All current solutions to this problem are bad, this is the least objectionable solution.
crit_utils_src = requests.get('https://raw.githubusercontent.com/hengenlab/sahara_work/master/crit_utils.py').text

src_get_birthday = re.search(r'(def get_birthday\(animal, returnall=False\):.+?)\ndef ', crit_utils_src, flags=re.S).group(1)
src_get_regions = re.search(r'(def get_regions\(animal\):.+?)\ndef ', crit_utils_src, flags=re.S).group(1)
src_get_sex = re.search(r'(def get_sex\(animal\):.+?)\ndef ', crit_utils_src, flags=re.S).group(1)
src_get_genotype = re.search(r'(def get_genotype\(animal\):.+?)\ndef ', crit_utils_src, flags=re.S).group(1)
src_get_hstype = re.search(r'(def get_hstype\(animal\):.+?)\ndef ', crit_utils_src, flags=re.S).group(1)

module = ModuleType('tempmodule')
module.dt = datetime # the only import necessary to run these static functions
exec(compile(src_get_birthday, '', 'exec'), module.__dict__)
exec(compile(src_get_regions, '', 'exec'), module.__dict__)
exec(compile(src_get_sex, '', 'exec'), module.__dict__)
exec(compile(src_get_genotype, '', 'exec'), module.__dict__)
exec(compile(src_get_hstype, '', 'exec'), module.__dict__)

headstage_types = module.get_hstype(dataset_name.lower())

# list neural data files on S3
s3_path = f's3://braingeneers/ephys/{batch_uuid}/original/{experiment_name}/'
neural_data_files = common_utils.file_list(s3_path)
assert len(neural_data_files) > 0, f'No neural data files found at: {s3_path}'

args = [s3_path + ndf[0] for ndf in neural_data_files]

# get ecube times for each file
ecube_timestamps = common_utils.map2(
_read_hengenlab_ecube_timestamp,
args=args,
parallelism=n_threads,
use_multithreading=True,
)

# sort data files by ecube timestamps
neural_data_files = [(*ndf, et) for ndf, et in zip(neural_data_files, ecube_timestamps)]
neural_data_files.sort(key=lambda ndf: ndf[3])

# parse n_channels from file name
channels_match = re.search(r'.*Headstages_(\d+)_Channels.*', neural_data_files[0][0])
assert channels_match is not None, f'Unable to parse n_channels from filename: {neural_data_files[0][0]}'
n_channels = int(channels_match.group(1))

# parse timestamp from first file name
timestamp_match = re.search(r'.*_Channels_int16_(.+)\.bin', neural_data_files[0][0])
assert timestamp_match is not None, f'Unable to parse timestamp from filename: {neural_data_files[0][0]}'
timestamp = datetime.strptime(timestamp_match.group(1), '%Y-%m-%d_%H-%M-%S')

channels_per_probe = n_channels // len(headstage_types)
channel_map = list(itertools.chain(*[
(ntk.find_channel_map(hstype, number_of_channels=channels_per_probe) + i * channels_per_probe).tolist()
for i, hstype in enumerate(headstage_types)
]))

metadata = dict(
uuid=batch_uuid,
timestamp=timestamp.isoformat(),
issue='',
channel_map=channel_map,
headstage_types=headstage_types,
notes=dict(
purpose_of_experiment='',
comments='',
biology=dict(
sample_type='mouse',
dataset_name=dataset_name,
birthday=module.get_birthday(dataset_name.lower()).isoformat(),
gender=module.get_sex(dataset_name.lower()),
genotype=module.get_genotype(dataset_name.lower()),
),
),
ephys_experiments=[dict(
name=experiment_name,
hardware='Hengenlab',
num_channels=n_channels,
sample_rate=fs,
voltage_scaling_factor=0.19073486328125,
timestamp=timestamp.isoformat(),
units='\u00b5V',
version='1.0.0',
blocks=[
{
'num_frames': (size - 8) // 2 // n_channels,
'path': f'original/{experiment_name}/{neural_data_file.split("/")[-1]}',
'timestamp': datetime.strptime(
re.search(r'.*_Channels_int16_(.+)\.bin', neural_data_file).group(1),
'%Y-%m-%d_%H-%M-%S',
).isoformat(),
'ecube_time': ecube_time,
}
for neural_data_file, last_modified_timestamp, size, ecube_time in neural_data_files
],
)],
)

if save is True:
save_path = f's3://braingeneers/ephys/{batch_uuid}/metadata.json'
with smart_open.open(save_path, 'w') as f:
f.write(json.dumps(metadata, indent=2))

return metadata


# --- AXION READER -----------------------------
def from_uint64(all_values):
"""
Expand Down
41 changes: 0 additions & 41 deletions src/braingeneers/data/datasets_electrophysiology_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,47 +413,6 @@ def test_online_load_data_hengenlab_float32(self):
self.assertEqual((192, 4), data.shape)
self.assertEqual(np.float32, data.dtype)

@skip_unittest_if_offline
def test_online_generate_metadata(self):
metadata = ephys.generate_metadata_hengenlab(
batch_uuid=self.batch_uuid,
dataset_name='CAF26',
save=False,
)

# top level items
self.assertEqual(metadata['uuid'], '2020-04-12-e-hengenlab-caf26')
self.assertEqual(metadata['timestamp'], '2020-08-07T14:00:15')
self.assertEqual(metadata['issue'], '')
self.assertEqual(metadata['headstage_types'], ['EAB50chmap_00', 'APT_PCB', 'APT_PCB'])

# notes
self.assertEqual(metadata['notes']['biology']['sample_type'], 'mouse')
self.assertEqual(metadata['notes']['biology']['dataset_name'], 'CAF26')
self.assertEqual(metadata['notes']['biology']['birthday'], '2020-02-20T07:30:00')
self.assertEqual(metadata['notes']['biology']['genotype'], 'wt')

# ephys_experiments
self.assertEqual(len(metadata['ephys_experiments']), 1)
self.assertTrue(isinstance(metadata['ephys_experiments'], list))

experiment = metadata['ephys_experiments'][0]
self.assertEqual(experiment['name'], 'experiment1')
self.assertEqual(experiment['hardware'], 'Hengenlab')
self.assertEqual(experiment['num_channels'], 192)
self.assertEqual(experiment['sample_rate'], 25000)
self.assertEqual(experiment['voltage_scaling_factor'], 0.19073486328125)
self.assertEqual(experiment['timestamp'], '2020-08-07T14:00:15')
self.assertEqual(experiment['units'], '\u00b5V')
self.assertEqual(experiment['version'], '1.0.0')
self.assertEqual(len(experiment['blocks']), 324)

block1 = metadata['ephys_experiments'][0]['blocks'][1]
self.assertEqual(block1['num_frames'], 7500000)
self.assertEqual(block1['path'], 'original/experiment1/Headstages_192_Channels_int16_2020-08-07_14-05-16.bin')
self.assertEqual(block1['timestamp'], '2020-08-07T14:05:16')
self.assertEqual(block1['ecube_time'], 301061600050)


if __name__ == '__main__':
unittest.main()
1 change: 0 additions & 1 deletion src/braingeneers/iot/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,3 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def _mqtt_topic_regex(topic: str) -> str:
""" Converts a topic string with wildcards to a regex string """
return "^" + topic.replace("+", "[^/]+").replace("#", ".*").replace("$", "\\$") + "$"

1 change: 1 addition & 0 deletions src/braingeneers/iot/messaging_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,5 +303,6 @@ def test_acquire_release(self):
lock.release()



if __name__ == '__main__':
unittest.main()
94 changes: 89 additions & 5 deletions src/braingeneers/utils/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
""" Common utility functions """
import io
import urllib
import boto3
from botocore.exceptions import ClientError
import os
import braingeneers
import braingeneers.utils.smart_open_braingeneers as smart_open
from typing import Callable, Iterable, Union, List, Tuple, Dict, Any
import functools
import inspect
import multiprocessing
import posixpath
import itertools
import pathlib

import json
import hashlib

_s3_client = None # S3 client for boto3, lazy initialization performed in _lazy_init_s3_client()
_message_broker = None # Lazy initialization of the message broker
_named_locks = {} # Named locks for checkout and checkin


def _lazy_init_s3_client():
Expand Down Expand Up @@ -199,20 +204,99 @@ def f(x, y):
return list(result_iterator)


def checkout(s3_file: str, mode: str = 'r') -> io.IOBase:
"""
Check out a file from S3 for reading or writing, use checkin to release the file.
Any subsequent calls to checkout will block until the file is returned with checkin(s3_file).
Example usage:
f = checkout('s3://braingeneersdev/test/test_file.bin', mode='rb')
new_bytes = do_something(f.read())
checkin('s3://braingeneersdev/test/test_file.bin', new_bytes)
Example usage to update metadata:
f = checkout('s3://braingeneersdev/test/metadata.json')
metadata_dict = json.loads(f.read())
metadata_dict['new_key'] = 'new_value'
metadata_updated_str = json.dumps(metadata_dict, indent=2)
checkin('s3://braingeneersdev/test/metadata.json', updated_metadata_str)
:param s3_file: The S3 file path to check out.
:param mode: The mode to open the file in, 'r' (text mode) or 'rb' (binary mode), analogous to system open(filename, mode)
"""
# Avoid circular import
from braingeneers.iot.messaging import MessageBroker

assert mode in ('r', 'rb'), 'Use "r" (text) or "rb" (binary) mode only. File changes are applied at checkout(...)'

global _message_broker, _named_locks
if _message_broker is None:
print('creating message broker')
_message_broker = MessageBroker()
mb = _message_broker

lock_str = f'common-utils-checkout-{s3_file}'
named_lock = mb.get_lock(lock_str)
named_lock.acquire()
_named_locks[s3_file] = named_lock
f = smart_open.open(s3_file, mode)
return f


def checkin(s3_file: str, file: Union[str, bytes, io.IOBase]):
"""
Releases a file that was checked out with checkout.
:param s3_file: The S3 file path, must match checkout.
:param file: The string, bytes, or file object to write back to S3.
"""
assert isinstance(file, (str, bytes, io.IOBase)), 'file must be a string, bytes, or file object.'

with smart_open.open(s3_file, 'wb') as f:
if isinstance(file, str):
f.write(file.encode())
elif isinstance(file, bytes):
f.write(file)
else:
file.seek(0)
data = file.read()
f.write(data if isinstance(data, bytes) else data.encode())

global _named_locks
named_lock = _named_locks[s3_file]
named_lock.release()


def force_release_checkout(s3_file: str):
"""
Force release the lock on a file that was checked out with checkout.
"""
# Avoid circular import
from braingeneers.iot.messaging import MessageBroker

global _message_broker
if _message_broker is None:
_message_broker = MessageBroker()
mb = _message_broker

lock_str = f'common-utils-checkout-{s3_file}'
mb.delete_lock(lock_str)


def pretty_print(data, n=10, indent=0):
"""
Custom pretty print function that uniformly truncates any collection (list or dictionary)
longer than `n` values, showing the first `n` values and a summary of omitted items.
Ensures mapping sections and similar are displayed compactly.
Example usage (to display metadata.json):
from braingeneers.utils.common_utils import pretty_print
from braingeneers.data import datasets_electrophysiology as de
metadata = de.load_metadata('2023-04-17-e-connectoid16235_CCH')
pretty_print(metadata)
Parameters:
- data: The data to pretty print, either a list or a dictionary.
- n: Maximum number of elements or items to display before truncation.
Expand All @@ -227,7 +311,7 @@ def pretty_print(data, n=10, indent=0):
else:
truncated_keys = keys
omitted_keys = None

print('{')
for key in truncated_keys:
value = data[key]
Expand Down
19 changes: 18 additions & 1 deletion src/braingeneers/utils/common_utils_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import unittest
from unittest.mock import patch, MagicMock
from common_utils import checkout, checkin, force_release_checkout, map2
from braingeneers.iot import messaging
import common_utils
from common_utils import map2
import os
import tempfile
import braingeneers.utils.smart_open_braingeneers as smart_open


def multiply(x, y):
Expand Down Expand Up @@ -50,6 +52,21 @@ def test_local_no_files(self):
self.assertEqual(result, [])


class TestCheckingCheckout(unittest.TestCase):
def setUp(self) -> None:
self.text_value = 'unittest1'
self.filepath = 's3://braingeneersdev/unittest/test.txt'
force_release_checkout(self.filepath)

with smart_open.open(self.filepath, 'w') as f:
f.write(self.text_value)

def test_checkout_checkin(self):
f = checkout(self.filepath)
self.assertEqual(f.read(), self.text_value)
checkin(self.filepath, f)


class TestMap2(unittest.TestCase):
def test_basic_functionality(self):
"""Test map2 with a simple function, no fixed values, no parallelism."""
Expand Down

0 comments on commit fedc18f

Please sign in to comment.