Skip to content

Commit

Permalink
Merge pull request #310 from CAMBI-tech/device-recording-status
Browse files Browse the repository at this point in the history
Device Status
  • Loading branch information
lawhead authored Jan 10, 2024
2 parents a8856a0 + 937f90e commit afce143
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 55 deletions.
19 changes: 19 additions & 0 deletions bcipy/acquisition/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Within BciPy users must specify the details of the device they wish to use by pr

`DeviceSpec` channel data can be provided as a list of channel names or specified as a list of `ChannelSpec` entries. These will be validated against the metadata from the LSL data stream. If provided, `ChannelSpecs` allow users to customize the channel labels and override the values provided by the LSL metadata.

Devices can also have a status of either 'active' or 'passive'. Active devices are used for training models (after calibration) and for interactive queries during Copy Phrase tasks. Passive devices are used for recording data in the background for possible later analysis. Device status can be set in the `devices.json` file or through the parameters.

## Client

The `lsl_client` module provides the primary interface for interacting with device data and may be used to dynamically query streaming data in real-time. An instance of the `LslClient` class is parameterized with the `DeviceSpec` of interest, as well as the number of seconds of data that should be available for querying. If the `save_directory` and `filename` parameters are provided it also records the data to disk for later offline analysis. If no device is specified the client will attempt to connect to an EEG stream by default. A separate `LslClient` should be constructed for each device of interest.
Expand Down Expand Up @@ -70,3 +72,20 @@ Generators are Python functions that yield encoded data. They have a parameter f
#### Producer

A `Producer` is a class internal to a server that manages the generation of data at a specified frequency. It's purpose it to mimic the data rate that would be presented if an actual hardware device was used.

## Acquisition Parameters

The primary parameter for configuring the acquisition module for running an experiment is the `acq_mode` parameter. This parameter allows users to specify one or more devices from which to acquire data, as well as the status of each device. In its simplest form the parameter is a string specifying the content type of the LSL data stream of interest (for example. 'EEG'). The first detected device with this content type will be used. Alternatively, users can provide the device name ('EEG/DSI-24'), which specifies that we are interested in a specific device. The module will look for the corresponding data stream, and will use configuration for the device from the `devices.json` file. Finally, users can specify whether the provided device will be recording in active or passive mode ('EEG:passive/DSI-24'). One or more devices can be configured by delimiting the specs with a '+' ('EEG/DSI-24+Eyetracker:passive').

### Use Cases

- I want to Calibrate using EEG and Eyetracker and train models for both.
- set parameter `'acq_mode': 'EEG+Eyetracker'`
- I want to Calibrate using EEG and Eyetracker and train models for EEG only.
- set parameter `'acq_mode': 'EEG+Eyetracker:passive'`
- I want to Calibrate using EEG and Eyetracker and train models for Eyetracker only.
- set parameter `'acq_mode': 'EEG:passive+Eyetracker'`
- I want to do a Copy Phrase task using EEG only. In my calibration directory I have trained models for both EEG and Eyetracker.
- set parameter `'acq_mode': 'EEG'`
- I want to do a Copy Phrase task using EEG only, but I want to record gaze data. In my calibration directory I have trained models for both EEG and Eyetracker.
- set parameter `'acq_mode': 'EEG+Eyetracker:passive'`
37 changes: 33 additions & 4 deletions bcipy/acquisition/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
devices."""
import json
import logging
from enum import Enum, auto
from pathlib import Path
from typing import Optional, Dict, List, NamedTuple, Union
from typing import Dict, List, NamedTuple, Optional, Union

from bcipy.config import DEFAULT_ENCODING, DEVICE_SPEC_PATH

Expand Down Expand Up @@ -50,6 +51,22 @@ def channel_spec(channel: Union[str, dict, ChannelSpec]) -> ChannelSpec:
raise Exception("Unexpected channel type")


class DeviceStatus(Enum):
"""Represents the recording status of a device during acquisition."""
ACTIVE = auto()
PASSIVE = auto()

def __str__(self) -> str:
"""String representation"""
return self.name.lower()

@classmethod
def from_str(cls, name: str) -> 'DeviceStatus':
"""Returns the DeviceStatus associated with the given string
representation."""
return cls[name.upper()]


class DeviceSpec:
"""Specification for a hardware device used in data acquisition.
Expand All @@ -67,6 +84,7 @@ class DeviceSpec:
data_type - data format of a channel; all channels must have the same type;
see https://labstreaminglayer.readthedocs.io/projects/liblsl/ref/enums.html
excluded_from_analysis - list of channels (label) to exclude from analysis.
status - recording status
"""

def __init__(self,
Expand All @@ -76,7 +94,8 @@ def __init__(self,
content_type: str = DEFAULT_DEVICE_TYPE,
description: Optional[str] = None,
excluded_from_analysis: Optional[List[str]] = None,
data_type='float32'):
data_type: str = 'float32',
status: DeviceStatus = DeviceStatus.ACTIVE):

assert sample_rate >= 0, "Sample rate can't be negative."
assert data_type in SUPPORTED_DATA_TYPES
Expand All @@ -89,6 +108,7 @@ def __init__(self,
self.data_type = data_type
self.excluded_from_analysis = excluded_from_analysis or []
self._validate_excluded_channels()
self.status = status

@property
def channel_count(self) -> int:
Expand Down Expand Up @@ -117,6 +137,12 @@ def analysis_channels(self) -> List[str]:
filter(lambda channel: channel not in self.excluded_from_analysis,
self.channels))

@property
def is_active(self) -> bool:
"""Returns a boolean indicating if the device is currently active
(recording status set to DeviceStatus.ACTIVE)."""
return self.status == DeviceStatus.ACTIVE

def to_dict(self) -> dict:
"""Converts the DeviceSpec to a dict."""
return {
Expand All @@ -125,7 +151,8 @@ def to_dict(self) -> dict:
'channels': [ch._asdict() for ch in self.channel_specs],
'sample_rate': self.sample_rate,
'description': self.description,
'excluded_from_analysis': self.excluded_from_analysis
'excluded_from_analysis': self.excluded_from_analysis,
'status': str(self.status)
}

def __str__(self):
Expand Down Expand Up @@ -153,13 +180,15 @@ def _validate_excluded_channels(self):
def make_device_spec(config: dict) -> DeviceSpec:
"""Constructs a DeviceSpec from a dict. Throws a KeyError if any fields
are missing."""
default_status = str(DeviceStatus.ACTIVE)
return DeviceSpec(name=config['name'],
content_type=config['content_type'],
channels=config['channels'],
sample_rate=config['sample_rate'],
description=config['description'],
excluded_from_analysis=config.get(
'excluded_from_analysis', []))
'excluded_from_analysis', []),
status=DeviceStatus.from_str(config.get('status', default_status)))


def load(config_path: Path = Path(DEFAULT_CONFIG), replace: bool = False) -> Dict[str, DeviceSpec]:
Expand Down
9 changes: 9 additions & 0 deletions bcipy/acquisition/multimodal.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ def device_content_types(self) -> List[ContentType]:
"""
return list(self._clients.keys())

@property
def active_device_content_types(self) -> List[ContentType]:
"""Returns a list of ContentTypes provided by the active configured
devices."""
return [
content_type for content_type, spec in self._clients.items()
if spec.is_active
]

@property
def default_client(self) -> Optional[LslAcquisitionClient]:
"""Returns the default client."""
Expand Down
1 change: 1 addition & 0 deletions bcipy/acquisition/protocols/lsl/lsl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def stop_acquisition(self) -> None:
log.info("Closing LSL connection")
self.inlet.close_stream()
self.inlet = None
log.info("Inlet closed")

self.buffer = None

Expand Down
60 changes: 48 additions & 12 deletions bcipy/acquisition/tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path

from bcipy.acquisition import devices

from bcipy.config import DEFAULT_ENCODING


Expand All @@ -21,8 +20,8 @@ def test_default_supported_devices(self):
"""List of supported devices should include generic values for
backwards compatibility."""
supported = devices.preconfigured_devices()
self.assertTrue(len(supported.keys()) > 0)
self.assertTrue('DSI-24' in supported.keys())
self.assertTrue(len(supported) > 0)
self.assertTrue('DSI-24' in supported)

dsi = supported['DSI-24']
self.assertEqual('EEG', dsi.content_type)
Expand All @@ -49,13 +48,15 @@ def test_load_from_config(self):

devices.load(config_path, replace=True)
supported = devices.preconfigured_devices()
self.assertEqual(1, len(supported.keys()))
self.assertTrue('DSI-VR300' in supported.keys())
self.assertEqual(1, len(supported))
self.assertTrue('DSI-VR300' in supported)

spec = supported['DSI-VR300']
self.assertEqual('EEG', spec.content_type)
self.assertEqual(300.0, spec.sample_rate)
self.assertEqual(channels, spec.channels)
self.assertEqual(devices.DeviceStatus.ACTIVE, spec.status)
self.assertTrue(spec.is_active)

self.assertEqual(spec, devices.preconfigured_device('DSI-VR300'))
shutil.rmtree(temp_dir)
Expand Down Expand Up @@ -89,14 +90,14 @@ def test_load_channel_specs_from_config(self):
with open(config_path, 'w', encoding=DEFAULT_ENCODING) as config_file:
json.dump(my_devices, config_file)

prior_device_count = len(devices.preconfigured_devices().keys())
prior_device_count = len(devices.preconfigured_devices())
devices.load(config_path)
supported = devices.preconfigured_devices()

spec = supported["Custom-Device"]
self.assertEqual(spec.channels, ['Fz', 'Pz', 'F7'])
self.assertEqual(spec.channel_names, ['ch1', 'ch2', 'ch3'])
self.assertEqual(len(supported.keys()), prior_device_count + 1)
self.assertEqual(len(supported), prior_device_count + 1)
shutil.rmtree(temp_dir)

def test_load_missing_config(self):
Expand All @@ -120,16 +121,16 @@ def test_device_registration(self):
sample_rate=300.0,
description="Custom built device")
supported = devices.preconfigured_devices()
device_count = len(supported.keys())
device_count = len(supported)
self.assertTrue(device_count > 0)
self.assertTrue('my-device' not in supported.keys())
self.assertTrue('my-device' not in supported)

spec = devices.make_device_spec(data)
devices.register(spec)

supported = devices.preconfigured_devices()
self.assertEqual(device_count + 1, len(supported.keys()))
self.assertTrue('my-device' in supported.keys())
self.assertEqual(device_count + 1, len(supported))
self.assertTrue('my-device' in supported)

def test_search_by_name(self):
"""Should be able to find a supported device by name."""
Expand All @@ -143,6 +144,7 @@ def test_device_spec_defaults(self):
sample_rate=256.0)
self.assertEqual(3, spec.channel_count)
self.assertEqual('EEG', spec.content_type)
self.assertEqual(devices.DeviceStatus.ACTIVE, spec.status)

def test_device_spec_analysis_channels(self):
"""DeviceSpec should have a list of channels used for analysis."""
Expand Down Expand Up @@ -228,12 +230,46 @@ def test_device_spec_to_dict(self):
spec = devices.DeviceSpec(name=device_name,
channels=channels,
sample_rate=sample_rate,
content_type=content_type)
content_type=content_type,
status=devices.DeviceStatus.PASSIVE)
spec_dict = spec.to_dict()
self.assertEqual(device_name, spec_dict['name'])
self.assertEqual(content_type, spec_dict['content_type'])
self.assertEqual(expected_channel_output, spec_dict['channels'])
self.assertEqual(sample_rate, spec_dict['sample_rate'])
self.assertEqual('passive', spec_dict['status'])

def test_load_status(self):
"""Should be able to load a list of supported devices from a
configuration file."""

# create a config file in a temp location.
temp_dir = tempfile.mkdtemp()
my_devices = [
dict(name="MyDevice",
content_type="EEG",
description="My Device",
channels=["a", "b", "c"],
sample_rate=100.0,
status=str(devices.DeviceStatus.PASSIVE))
]
config_path = Path(temp_dir, 'my_devices.json')
with open(config_path, 'w', encoding=DEFAULT_ENCODING) as config_file:
json.dump(my_devices, config_file)

devices.load(config_path, replace=True)
supported = devices.preconfigured_devices()
self.assertEqual(devices.DeviceStatus.PASSIVE, supported['MyDevice'].status)
shutil.rmtree(temp_dir)

def test_device_status(self):
"""Test DeviceStatus enum"""
self.assertEqual('active', str(devices.DeviceStatus.ACTIVE))
self.assertEqual(devices.DeviceStatus.ACTIVE,
devices.DeviceStatus.from_str('active'))
self.assertEqual(
devices.DeviceStatus.PASSIVE,
devices.DeviceStatus.from_str(str(devices.DeviceStatus.PASSIVE)))


if __name__ == '__main__':
Expand Down
44 changes: 33 additions & 11 deletions bcipy/helpers/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import logging
import subprocess
import time
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, NamedTuple, Optional, Tuple

import numpy as np

from bcipy.acquisition import (ClientManager, LslAcquisitionClient,
LslDataServer, await_start,
discover_device_spec)
from bcipy.acquisition.devices import (DeviceSpec, preconfigured_device,
with_content_type)
from bcipy.acquisition.devices import (DeviceSpec, DeviceStatus,
preconfigured_device, with_content_type)
from bcipy.config import BCIPY_ROOT
from bcipy.config import DEFAULT_DEVICE_SPEC_FILENAME as spec_name
from bcipy.config import RAW_DATA_FILENAME
Expand Down Expand Up @@ -52,7 +52,7 @@ def init_eeg_acquisition(
manager = ClientManager()

for stream_type in stream_types(parameters['acq_mode']):
content_type, device_name = parse_stream_type(stream_type)
content_type, device_name, status = parse_stream_type(stream_type)

if server:
server_device_spec = server_spec(content_type, device_name)
Expand All @@ -64,6 +64,8 @@ def init_eeg_acquisition(
await_start(dataserver)

device_spec = init_device(content_type, device_name)
if status:
device_spec.status = status
raw_data_name = raw_data_filename(device_spec)

client = init_lsl_client(parameters, device_spec, save_folder,
Expand Down Expand Up @@ -138,24 +140,44 @@ def server_spec(content_type: str,
return devices[0]


class StreamType(NamedTuple):
"""Specifies a stream."""
content_type: str
device_name: Optional[str] = None
status: Optional[DeviceStatus] = None


def parse_stream_type(stream_type: str,
delimiter: str = "/") -> Tuple[str, Optional[str]]:
"""Parses the stream type into a tuple of (content_type, device_name).
name_delim: str = "/",
status_delim: str = ":") -> StreamType:
"""Parses the stream type into a tuple of (content_type, device_name, status).
Parameters
----------
stream_type - LSL content type (EEG, Gaze, etc). If you know the name
of the preconfigured device it can be added 'EEG/DSI-24'
name_delim - optionally used after the content_type (and status)
for specifying the device name.
status_delim - optionally used after the content_type to specify the
status for the device ('active' or 'passive')
>>> parse_stream_type('EEG/DSI-24')
('EEG', 'DSI-24')
>>> parse_stream_type('Gaze')
('Gaze', None)
>>> parse_stream_type('Gaze:passive')
('Gaze', None, DeviceStatus.PASSIVE)
"""
if delimiter in stream_type:
content_type, device_name = stream_type.split(delimiter)[0:2]
return (content_type, device_name)
return (stream_type, None)
device_name = None
status = None

if name_delim in stream_type:
content_type, device_name = stream_type.split(name_delim)[0:2]
else:
content_type = stream_type
if status_delim in content_type:
content_type, status_str = content_type.split(status_delim)[0:2]
status = DeviceStatus.from_str(status_str)
return StreamType(content_type, device_name, status)


def init_lsl_client(parameters: dict,
Expand Down
Loading

0 comments on commit afce143

Please sign in to comment.