Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Device Status #310

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions bcipy/acquisition/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
devices."""
import json
import logging
from enum import Enum, auto
from pathlib import Path
from typing import Optional, Dict, List, NamedTuple, Union
from typing import Dict, List, NamedTuple, Optional, Union

from bcipy.config import DEFAULT_ENCODING, DEVICE_SPEC_PATH

Expand Down Expand Up @@ -50,6 +51,22 @@ def channel_spec(channel: Union[str, dict, ChannelSpec]) -> ChannelSpec:
raise Exception("Unexpected channel type")


class DeviceStatus(Enum):
"""Represents the recording status of a device during acquisition."""
ACTIVE = auto()
PASSIVE = auto()

def __str__(self) -> str:
"""String representation"""
return self.name.lower()

@classmethod
def from_str(cls, name: str) -> 'DeviceStatus':
"""Returns the DeviceStatus associated with the given string
representation."""
return cls[name.upper()]


class DeviceSpec:
"""Specification for a hardware device used in data acquisition.

Expand All @@ -67,6 +84,7 @@ class DeviceSpec:
data_type - data format of a channel; all channels must have the same type;
see https://labstreaminglayer.readthedocs.io/projects/liblsl/ref/enums.html
excluded_from_analysis - list of channels (label) to exclude from analysis.
status - recording status
"""

def __init__(self,
Expand All @@ -76,7 +94,8 @@ def __init__(self,
content_type: str = DEFAULT_DEVICE_TYPE,
description: Optional[str] = None,
excluded_from_analysis: Optional[List[str]] = None,
data_type='float32'):
data_type: str = 'float32',
status: DeviceStatus = DeviceStatus.ACTIVE):

assert sample_rate >= 0, "Sample rate can't be negative."
assert data_type in SUPPORTED_DATA_TYPES
Expand All @@ -89,6 +108,7 @@ def __init__(self,
self.data_type = data_type
self.excluded_from_analysis = excluded_from_analysis or []
self._validate_excluded_channels()
self.status = status

@property
def channel_count(self) -> int:
Expand Down Expand Up @@ -117,6 +137,12 @@ def analysis_channels(self) -> List[str]:
filter(lambda channel: channel not in self.excluded_from_analysis,
self.channels))

@property
def is_active(self) -> bool:
"""Returns a boolean indicating if the device is currently active
(recording status set to DeviceStatus.ACTIVE)."""
return self.status == DeviceStatus.ACTIVE

def to_dict(self) -> dict:
"""Converts the DeviceSpec to a dict."""
return {
Expand All @@ -125,7 +151,8 @@ def to_dict(self) -> dict:
'channels': [ch._asdict() for ch in self.channel_specs],
'sample_rate': self.sample_rate,
'description': self.description,
'excluded_from_analysis': self.excluded_from_analysis
'excluded_from_analysis': self.excluded_from_analysis,
'status': str(self.status)
}

def __str__(self):
Expand Down Expand Up @@ -153,13 +180,15 @@ def _validate_excluded_channels(self):
def make_device_spec(config: dict) -> DeviceSpec:
"""Constructs a DeviceSpec from a dict. Throws a KeyError if any fields
are missing."""
default_status = str(DeviceStatus.ACTIVE)
return DeviceSpec(name=config['name'],
content_type=config['content_type'],
channels=config['channels'],
sample_rate=config['sample_rate'],
description=config['description'],
excluded_from_analysis=config.get(
'excluded_from_analysis', []))
'excluded_from_analysis', []),
status=DeviceStatus.from_str(config.get('status', default_status)))


def load(config_path: Path = Path(DEFAULT_CONFIG), replace: bool = False) -> Dict[str, DeviceSpec]:
Expand Down
9 changes: 9 additions & 0 deletions bcipy/acquisition/multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ def device_content_types(self) -> List[ContentType]:
"""
return list(self._clients.keys())

@property
def active_device_content_types(self) -> List[ContentType]:
"""Returns a list of ContentTypes provided by the active configured
devices."""
return [
content_type for content_type, spec in self._clients.items()
if spec.is_active
]

@property
def default_client(self) -> Optional[LslAcquisitionClient]:
"""Returns the default client."""
Expand Down
1 change: 1 addition & 0 deletions bcipy/acquisition/protocols/lsl/lsl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def stop_acquisition(self) -> None:
log.info("Closing LSL connection")
self.inlet.close_stream()
self.inlet = None
log.info("Inlet closed")

self.buffer = None

Expand Down
60 changes: 48 additions & 12 deletions bcipy/acquisition/tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path

from bcipy.acquisition import devices

from bcipy.config import DEFAULT_ENCODING


Expand All @@ -21,8 +20,8 @@ def test_default_supported_devices(self):
"""List of supported devices should include generic values for
backwards compatibility."""
supported = devices.preconfigured_devices()
self.assertTrue(len(supported.keys()) > 0)
self.assertTrue('DSI-24' in supported.keys())
self.assertTrue(len(supported) > 0)
self.assertTrue('DSI-24' in supported)

dsi = supported['DSI-24']
self.assertEqual('EEG', dsi.content_type)
Expand All @@ -49,13 +48,15 @@ def test_load_from_config(self):

devices.load(config_path, replace=True)
supported = devices.preconfigured_devices()
self.assertEqual(1, len(supported.keys()))
self.assertTrue('DSI-VR300' in supported.keys())
self.assertEqual(1, len(supported))
self.assertTrue('DSI-VR300' in supported)

spec = supported['DSI-VR300']
self.assertEqual('EEG', spec.content_type)
self.assertEqual(300.0, spec.sample_rate)
self.assertEqual(channels, spec.channels)
self.assertEqual(devices.DeviceStatus.ACTIVE, spec.status)
self.assertTrue(spec.is_active)

self.assertEqual(spec, devices.preconfigured_device('DSI-VR300'))
shutil.rmtree(temp_dir)
Expand Down Expand Up @@ -89,14 +90,14 @@ def test_load_channel_specs_from_config(self):
with open(config_path, 'w', encoding=DEFAULT_ENCODING) as config_file:
json.dump(my_devices, config_file)

prior_device_count = len(devices.preconfigured_devices().keys())
prior_device_count = len(devices.preconfigured_devices())
devices.load(config_path)
supported = devices.preconfigured_devices()

spec = supported["Custom-Device"]
self.assertEqual(spec.channels, ['Fz', 'Pz', 'F7'])
self.assertEqual(spec.channel_names, ['ch1', 'ch2', 'ch3'])
self.assertEqual(len(supported.keys()), prior_device_count + 1)
self.assertEqual(len(supported), prior_device_count + 1)
shutil.rmtree(temp_dir)

def test_load_missing_config(self):
Expand All @@ -120,16 +121,16 @@ def test_device_registration(self):
sample_rate=300.0,
description="Custom built device")
supported = devices.preconfigured_devices()
device_count = len(supported.keys())
device_count = len(supported)
self.assertTrue(device_count > 0)
self.assertTrue('my-device' not in supported.keys())
self.assertTrue('my-device' not in supported)

spec = devices.make_device_spec(data)
devices.register(spec)

supported = devices.preconfigured_devices()
self.assertEqual(device_count + 1, len(supported.keys()))
self.assertTrue('my-device' in supported.keys())
self.assertEqual(device_count + 1, len(supported))
self.assertTrue('my-device' in supported)

def test_search_by_name(self):
"""Should be able to find a supported device by name."""
Expand All @@ -143,6 +144,7 @@ def test_device_spec_defaults(self):
sample_rate=256.0)
self.assertEqual(3, spec.channel_count)
self.assertEqual('EEG', spec.content_type)
self.assertEqual(devices.DeviceStatus.ACTIVE, spec.status)

def test_device_spec_analysis_channels(self):
"""DeviceSpec should have a list of channels used for analysis."""
Expand Down Expand Up @@ -228,12 +230,46 @@ def test_device_spec_to_dict(self):
spec = devices.DeviceSpec(name=device_name,
channels=channels,
sample_rate=sample_rate,
content_type=content_type)
content_type=content_type,
status=devices.DeviceStatus.PASSIVE)
spec_dict = spec.to_dict()
self.assertEqual(device_name, spec_dict['name'])
self.assertEqual(content_type, spec_dict['content_type'])
self.assertEqual(expected_channel_output, spec_dict['channels'])
self.assertEqual(sample_rate, spec_dict['sample_rate'])
self.assertEqual('passive', spec_dict['status'])

def test_load_status(self):
"""Should be able to load a list of supported devices from a
configuration file."""

# create a config file in a temp location.
temp_dir = tempfile.mkdtemp()
my_devices = [
dict(name="MyDevice",
content_type="EEG",
description="My Device",
channels=["a", "b", "c"],
sample_rate=100.0,
status=str(devices.DeviceStatus.PASSIVE))
]
config_path = Path(temp_dir, 'my_devices.json')
with open(config_path, 'w', encoding=DEFAULT_ENCODING) as config_file:
json.dump(my_devices, config_file)

devices.load(config_path, replace=True)
supported = devices.preconfigured_devices()
self.assertEqual(devices.DeviceStatus.PASSIVE, supported['MyDevice'].status)
shutil.rmtree(temp_dir)

def test_device_status(self):
"""Test DeviceStatus enum"""
self.assertEqual('active', str(devices.DeviceStatus.ACTIVE))
self.assertEqual(devices.DeviceStatus.ACTIVE,
devices.DeviceStatus.from_str('active'))
self.assertEqual(
devices.DeviceStatus.PASSIVE,
devices.DeviceStatus.from_str(str(devices.DeviceStatus.PASSIVE)))


if __name__ == '__main__':
Expand Down
44 changes: 33 additions & 11 deletions bcipy/helpers/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import logging
import subprocess
import time
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, NamedTuple, Optional, Tuple

import numpy as np

from bcipy.acquisition import (ClientManager, LslAcquisitionClient,
LslDataServer, await_start,
discover_device_spec)
from bcipy.acquisition.devices import (DeviceSpec, preconfigured_device,
with_content_type)
from bcipy.acquisition.devices import (DeviceSpec, DeviceStatus,
preconfigured_device, with_content_type)
from bcipy.config import BCIPY_ROOT
from bcipy.config import DEFAULT_DEVICE_SPEC_FILENAME as spec_name
from bcipy.config import RAW_DATA_FILENAME
Expand Down Expand Up @@ -52,7 +52,7 @@ def init_eeg_acquisition(
manager = ClientManager()

for stream_type in stream_types(parameters['acq_mode']):
content_type, device_name = parse_stream_type(stream_type)
content_type, device_name, status = parse_stream_type(stream_type)

if server:
server_device_spec = server_spec(content_type, device_name)
Expand All @@ -64,6 +64,8 @@ def init_eeg_acquisition(
await_start(dataserver)

device_spec = init_device(content_type, device_name)
if status:
device_spec.status = status
raw_data_name = raw_data_filename(device_spec)

client = init_lsl_client(parameters, device_spec, save_folder,
Expand Down Expand Up @@ -138,24 +140,44 @@ def server_spec(content_type: str,
return devices[0]


class StreamType(NamedTuple):
"""Specifies a stream."""
content_type: str
device_name: Optional[str] = None
status: Optional[DeviceStatus] = None


def parse_stream_type(stream_type: str,
delimiter: str = "/") -> Tuple[str, Optional[str]]:
"""Parses the stream type into a tuple of (content_type, device_name).
name_delim: str = "/",
status_delim: str = ":") -> StreamType:
"""Parses the stream type into a tuple of (content_type, device_name, status).

Parameters
----------
stream_type - LSL content type (EEG, Gaze, etc). If you know the name
of the preconfigured device it can be added 'EEG/DSI-24'

name_delim - optionally used after the content_type (and status)
for specifying the device name.
status_delim - optionally used after the content_type to specify the
status for the device ('active' or 'passive')
>>> parse_stream_type('EEG/DSI-24')
('EEG', 'DSI-24')
>>> parse_stream_type('Gaze')
('Gaze', None)
>>> parse_stream_type('Gaze:passive')
('Gaze', None, DeviceStatus.PASSIVE)
"""
if delimiter in stream_type:
content_type, device_name = stream_type.split(delimiter)[0:2]
return (content_type, device_name)
return (stream_type, None)
device_name = None
status = None

if name_delim in stream_type:
content_type, device_name = stream_type.split(name_delim)[0:2]
else:
content_type = stream_type
if status_delim in content_type:
content_type, status_str = content_type.split(status_delim)[0:2]
status = DeviceStatus.from_str(status_str)
return StreamType(content_type, device_name, status)


def init_lsl_client(parameters: dict,
Expand Down
16 changes: 11 additions & 5 deletions bcipy/helpers/tests/test_acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from unittest.mock import Mock, patch

from bcipy.acquisition.devices import DeviceSpec
from bcipy.acquisition.devices import DeviceSpec, DeviceStatus
from bcipy.config import DEFAULT_PARAMETERS_PATH
from bcipy.helpers.acquisition import (RAW_DATA_FILENAME, init_device,
init_eeg_acquisition,
Expand Down Expand Up @@ -39,7 +39,7 @@ def test_init_acquisition(self):
"""Test init_eeg_acquisition with LSL client."""

params = self.parameters
params['acq_mode'] = 'EEG/DSI-24'
params['acq_mode'] = 'EEG:passive/DSI-24'

client, servers = init_eeg_acquisition(params, self.save, server=True)

Expand All @@ -50,6 +50,7 @@ def test_init_acquisition(self):

self.assertEqual(1, len(servers))
self.assertEqual(client.device_spec.name, 'DSI-24')
self.assertFalse(client.device_spec.is_active)

self.assertTrue(Path(self.save, 'devices.json').is_file())

Expand Down Expand Up @@ -145,9 +146,14 @@ def test_init_device_with_named_device(self, preconfigured_device_mock,
self.assertEqual(device_spec, device_mock)

def test_parse_stream_type(self):
"""Test function to split the stream type into content_type, name"""
self.assertEqual(('EEG', 'DSI-24'), parse_stream_type('EEG/DSI-24'))
self.assertEqual(('Gaze', None), parse_stream_type('Gaze'))
"""Test function to split the stream type into content_type, name,
and status"""
self.assertEqual(('EEG', 'DSI-24', None), parse_stream_type('EEG/DSI-24'))
self.assertEqual(('Gaze', None, None), parse_stream_type('Gaze'))
self.assertEqual(('Gaze', None, DeviceStatus.PASSIVE),
parse_stream_type('Gaze:passive'))
self.assertEqual(('EEG', 'DSI-24', DeviceStatus.ACTIVE),
parse_stream_type('EEG:active/DSI-24'))

@patch('bcipy.helpers.acquisition.preconfigured_device')
def test_server_spec_with_named_device(self, preconfigured_device_mock):
Expand Down
Loading
Loading