Source code for brainaccess.core.eeg_manager

import ctypes
import warnings
import threading
import numpy as np
import copy
from multimethod import multimethod
from typing import Callable, Union, Optional

from brainaccess.utils.exceptions import _callback, _handle_error, BrainAccessException
from brainaccess.core import _dll
from brainaccess.core.battery_info import BatteryInfo
from brainaccess.core.device_info import DeviceInfo
from brainaccess.core.gain_mode import GainMode
from brainaccess.core.annotation import Annotation
from brainaccess.core.polarity import Polarity
from brainaccess.core.impedance_measurement_mode import ImpedanceMeasurementMode  # noqa
from brainaccess.core.device_features import DeviceFeatures


# ctypes
# new_eeg_manager
_dll.ba_eeg_manager_new.argtypes = []
_dll.ba_eeg_manager_new.restype = ctypes.c_void_p
# destructor
_dll.ba_eeg_manager_free.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_free.restype = None
# connect(port)
_dll.ba_eeg_manager_connect.argtypes = [
    ctypes.c_void_p,
    ctypes.c_int,
    ctypes.CFUNCTYPE(None, ctypes.c_bool, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_connect.restype = ctypes.c_uint8
# is_connected()
_dll.ba_eeg_manager_is_connected.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_connected.restype = ctypes.c_bool
# disconnect()
_dll.ba_eeg_manager_disconnect.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_disconnect.restype = None
# start_stream()
_dll.ba_eeg_manager_start_stream.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_start_stream.restype = ctypes.c_uint8
# stop_stream()
_dll.ba_eeg_manager_stop_stream.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_stop_stream.restype = ctypes.c_uint8
# is_streaming()
_dll.ba_eeg_manager_is_streaming.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_is_streaming.restype = ctypes.c_bool
# load_config()
_dll.ba_eeg_manager_load_config.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_load_config.restype = ctypes.c_uint8
# get_battery_info()
_dll.ba_eeg_manager_get_battery_info.argtypes = [
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_get_battery_info.restype = BatteryInfo
# set_channel_enabled()
_dll.ba_eeg_manager_set_channel_enabled.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_bool,
]
_dll.ba_eeg_manager_set_channel_enabled.restype = None
# set_channel_gain()
_dll.ba_eeg_manager_set_channel_gain.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_channel_gain.restype = None
# set_channel_bias()
_dll.ba_eeg_manager_set_channel_bias.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint16,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_channel_bias.restype = None
# set_impedance_mode()
_dll.ba_eeg_manager_set_impedance_mode.argtypes = [
    ctypes.c_void_p,
    ctypes.c_uint8,
]
_dll.ba_eeg_manager_set_impedance_mode.restype = None
# get_device_info()
_dll.ba_eeg_manager_get_device_info.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_device_info.restype = ctypes.POINTER(DeviceInfo)
# get_channel_index()
_dll.ba_eeg_manager_get_channel_index.argtypes = [ctypes.c_void_p, ctypes.c_uint16]
_dll.ba_eeg_manager_get_channel_index.restype = ctypes.c_size_t
# get_sample_frequency()
_dll.ba_eeg_manager_get_sample_frequency.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_get_sample_frequency.restype = ctypes.c_uint16
# set_callback_chunk()
_dll.ba_eeg_manager_set_callback_chunk.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(
        None,
        ctypes.POINTER(ctypes.c_void_p),
        ctypes.c_size_t,
        ctypes.c_void_p,
    ),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_chunk.restype = None
# set_callback_battery()
_dll.ba_eeg_manager_set_callback_battery.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_battery.restype = None
# set_callback_disconnect()
_dll.ba_eeg_manager_set_callback_disconnect.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_set_callback_disconnect.restype = None
# update firmware
_dll.ba_eeg_manager_start_update.argtypes = [
    ctypes.c_void_p,
    ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t),
    ctypes.c_void_p,
]
_dll.ba_eeg_manager_start_update.restype = ctypes.c_uint8
# annotate()
_dll.ba_eeg_manager_annotate.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
_dll.ba_eeg_manager_annotate.restype = ctypes.c_uint8
# get_annotations()
_dll.ba_eeg_manager_get_annotations.argtypes = [
    ctypes.c_void_p,
    ctypes.POINTER(ctypes.POINTER(Annotation)),
    ctypes.POINTER(ctypes.c_size_t),
]
_dll.ba_eeg_manager_get_annotations.restype = None
# clear_annotations()
_dll.ba_eeg_manager_clear_annotations.argtypes = [ctypes.c_void_p]
_dll.ba_eeg_manager_clear_annotations.restype = None

# Stream size type info super secret function thingy
_dll.ba_eeg_manager_get_stream_channel_data_types.argtypes = [
    ctypes.c_void_p,
    ctypes.POINTER(ctypes.POINTER(ctypes.c_uint8)),
    ctypes.POINTER(ctypes.c_size_t),
]
_dll.ba_eeg_manager_get_stream_channel_data_types.restype = None


_managers_mtx = threading.Lock()
_managers: dict = dict()

_types_map = [
    ctypes.c_float,  # 0
    ctypes.c_uint8,  # 1
    ctypes.c_size_t,  # 2
    ctypes.c_double,  # 3
]


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_stop_stream(data: ctypes.c_void_p) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_stop_stream_mix:
                cbk = mgr._callback_stop_stream
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_size_t)
def _callback_ota_update(data: ctypes.c_void_p, progress: int, total: int) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_ota_update_mtx:
                cbk = mgr._callback_ota_update
                if cbk is not None:
                    cbk(progress, total)


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_start_stream(data: ctypes.c_void_p) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_start_stream_mix:
                cbk = mgr._callback_start_stream
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(
    None, ctypes.POINTER(ctypes.c_void_p), ctypes.c_size_t, ctypes.c_void_p
)
def _callback_chunk(chunk_data, chunk_size, data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_chunk_mtx:
                cbk = mgr._callback_chunk
                if cbk is not None:
                    # Get channel sizes and type information
                    types_ptr = ctypes.POINTER(ctypes.c_uint8)()
                    types_size = ctypes.c_size_t()
                    _dll.ba_eeg_manager_get_stream_channel_data_types(
                        data, ctypes.byref(types_ptr), ctypes.byref(types_size)
                    )
                    types = [_types_map[types_ptr[i]] for i in range(types_size.value)]

                    chunk_arrays = []
                    for i, ctype in enumerate(types):
                        data_pointer = ctypes.cast(
                            chunk_data[i], ctypes.POINTER(ctype * chunk_size)
                        )
                        np_array = np.ctypeslib.as_array(
                            data_pointer.contents, shape=(chunk_size,)
                        )
                        chunk_arrays.append(np_array)

                    cbk(chunk_arrays, chunk_size)


@ctypes.CFUNCTYPE(None, ctypes.POINTER(BatteryInfo), ctypes.c_void_p)
def _callback_battery(b_info, data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_battery_mtx:
                cbk = mgr._callback_battery
                if cbk is not None:
                    cbk(copy.copy(b_info[0]))


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_load_config(data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_load_config_mtx:
                cbk = mgr._callback_load_config
                if cbk is not None:
                    cbk()


@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _callback_disconnect(data) -> None:
    with _managers_mtx:
        mgr = _managers.get(data)
        if mgr is not None:
            with mgr._callback_disconnect_mtx:
                cbk = mgr._callback_disconnect
                if cbk is not None:
                    cbk()


[docs] class EEGManager: """The EEG manager is the primary tool for communicating with the BrainAccess device. """ def __init__(self) -> None: """Creates an EEG Manager. Warning --------- Make sure the core library has been initialized first! """ self.conenction_success: int = 0 self._callback_chunk_mtx = threading.Lock() self._callback_battery_mtx = threading.Lock() self._callback_disconnect_mtx = threading.Lock() self._callback_start_stream_mtx = threading.Lock() self._callback_stop_stream_mtx = threading.Lock() self._callback_load_config_mtx = threading.Lock() self._callback_ota_update_mtx = threading.Lock() self._manager = _dll.ba_eeg_manager_new() with _managers_mtx: _managers[self._manager] = self self._callback_disconnect = lambda: None _dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect, self._manager ) def __enter__(self) -> "EEGManager": return self def __exit__(self, exc_type, exc_value, traceback) -> None: self.destroy()
[docs] def destroy(self) -> None: """Destroys an EEG manager instance. Warning --------- Must be called exactly once, after the manager is no longer needed """ self.disconnect() # prevent callback deadlock by disconnecting first. with _managers_mtx: _dll.ba_eeg_manager_free(self._manager) del _managers[self._manager]
[docs] def disconnect(self) -> None: """Disconnects the EEGManager from the EEG device, if connected""" _dll.ba_eeg_manager_disconnect(self._manager)
[docs] def connect(self, bt_device_index: int = 0) -> int: """Connects the EEGManager to an EEG device Parameters ---------- bt_device_index : int index of the device to connect in the scan list Returns ------- int value: 0 if the connection was successful, 1 if connection failed, 2 if connection is established but data stream is not compatible """ cbk, _ = _callback() self.connection_success = _dll.ba_eeg_manager_connect(self._manager, bt_device_index, cbk, None) if self.connection_success == 2: warnings.warn("Stream is incompatible. Update the firmware.") return self.connection_success
[docs] def is_connected(self) -> bool: """Checks if the EEGManager is currently connected to an EEG device Returns ------- bool True if connected, False otherwise """ return _dll.ba_eeg_manager_is_connected(self._manager)
[docs] def start_stream(self, callback: Union[Callable, None] = None) -> bool: """Starts streaming data from the device Returns ------- bool value: True if the stream was started successfully Raises ------- BrainAccessException if the stream is already running or if the stream could not be started """ if self.connection_success == 2: raise BrainAccessException("Stream is incompatible. Update the firmware.") if callback is not None: with self._callback_start_stream_mtx: self._callback_start_stream = callback else: with self._callback_start_stream_mtx: self._callback_start_stream = lambda: None if self.is_streaming(): raise BrainAccessException("Stream already running") return _handle_error( _dll.ba_eeg_manager_start_stream( self._manager, _callback_start_stream, self._manager ) )
[docs] def stop_stream(self, callback: Union[Callable, None] = None) -> bool: """Stops streaming data from the device Returns ------- bool value: True if the stream was stopped successfully Raises ------- BrainAccessException if the stream is not running or if the stream could not be stopped """ if callback is not None: with self._callback_stop_stream_mtx: self._callback_stop_stream = callback else: with self._callback_stop_stream_mtx: self._callback_stop_stream = lambda: None if not self.is_streaming(): raise BrainAccessException("Stream not running") return _handle_error( _dll.ba_eeg_manager_stop_stream(self._manager, _callback_stop_stream, None) )
[docs] def is_streaming(self) -> bool: """Checks if the device is streaming Returns ------- bool True if the stream is active, False otherwise """ return _dll.ba_eeg_manager_is_streaming(self._manager)
[docs] def load_config(self, callback: Union[Callable, None] = None) -> None: """Loads the configuration of channel and other settings to the device""" if callback is not None: with self._callback_load_config_mtx: self._callback_load_config = callback else: with self._callback_load_config_mtx: self._callback_load_config = lambda: None _handle_error( _dll.ba_eeg_manager_load_config( self._manager, _callback_load_config, self._manager ) )
[docs] def get_battery_info(self) -> BatteryInfo: """Returns a structure containing standard battery information from the device Returns ------- BatteryInfo Battery information from the EEG device """ return _dll.ba_eeg_manager_get_battery_info(self._manager)
[docs] def set_channel_enabled(self, channel: int, state: bool) -> None: """Enables or disables the channel on the device Warning --------- Enabled channels are reset by stream stop. Must be called with the appropriate arguments before every stream start Parameters ------------- channel: int Channel ID (brainaccess.core.eeg_channel) to enable/disable. state: bool True to enable channel, False to disable. Raises ------- BrainAccessException if device is streaming """ if self.is_streaming(): raise BrainAccessException("Cannot change channel state while streaming") _dll.ba_eeg_manager_set_channel_enabled( self._manager, ctypes.c_uint16(channel), ctypes.c_bool(state) )
[docs] def set_channel_gain(self, channel: int, gain: GainMode) -> None: """Changes gain mode for a channel on the device. Setting gain values to lower will increase the measured voltage range, but would decrease the amplitude resolution, 12 is the optimum in most cases. Warning ------ This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. This only affects channels that support it. For example, it affects the electrode measurement channels but not sample number or digital input. Parameters ----------- channel: int Channel ID (brainaccess.core.eeg_channel) whose gain to modify. gain: GainMode Gain mode. Default X12 Raises ------- BrainAccessException if device is streaming or if the channel number is invalid """ if channel < 0 or channel > 33: raise BrainAccessException("Invalid channel number") if self.is_streaming(): raise BrainAccessException("Cannot change channel gain while streaming") _dll.ba_eeg_manager_set_channel_gain( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(gain.value) )
@multimethod def set_channel_bias(self, channel: int, bias: bool) -> None: """ DEPRECATED: use the version with Polarity instead. Set an electrode channel as a bias electrode Essentially the signals of these channels are inverted and injected into the bias channel/electrode. This helps in reducing common mode noise such as noise coming from the mains. Only select channels for bias feedback that have good contact with a skin. Typically one channel is sufficient for bias feedback to work effectively. Warning -------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ------------ channel: int Channel ID (brainaccess.core.eeg_channel) to set/unset as bias channel bias: bool True to enable channel, False to disable. Raises ------- BrainAccessException if device is streaming """ warnings.warn( "This function is deprecated, use the version with Polarity instead.", DeprecationWarning, ) if self.is_streaming(): raise BrainAccessException("Cannot change channel bias while streaming") self.set_channel_bias(channel, Polarity.BOTH if bias else Polarity.NONE)
[docs] @multimethod def set_channel_bias(self, channel: int, p: Polarity) -> None: """Set an electrode channel as a bias electrode Essentially the signals of these channels are inverted and injected into the bias channel/electrode. This helps in reducing common mode noise such as noise coming from the mains. Only select channels for bias feedback that have good contact with a skin. Typically one channel is sufficient for bias feedback to work effectively. Warning -------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ------------ channel: int Channel ID (brainaccess.core.eeg_channel) to set/unset as bias channel p: Polarity Which side of the electrode to use (if device is not bipolar, use BOTH) Raises ------- BrainAccessException if device is streaming """ if self.is_streaming(): raise BrainAccessException("Cannot change channel bias while streaming") _dll.ba_eeg_manager_set_channel_bias( self._manager, ctypes.c_uint16(channel), ctypes.c_uint8(p.value) )
[docs] def set_impedance_mode(self, mode: ImpedanceMeasurementMode): """Sets impedance measurement mode This function setups device for electrode impedance measurement. It injects a 7nA certain frequency current through the bias electrodes to measurement electrodes. Voltage recordings from each channel can then be used to calculate the impedance for each electrode: Impedance = Vpp/7nA Warning --------- This function takes effect on stream start, and its effects are reset by stream stop. Therefore, it must be called with the appropriate arguments before every stream start. Parameters ----------- mode: ImpedanceMeasurementMode Impedance mode to set Raises ------- BrainAccessException if device is streaming """ if self.is_streaming(): raise BrainAccessException("Cannot change impedance mode while streaming") _dll.ba_eeg_manager_set_impedance_mode( self._manager, ctypes.c_uint8(mode.value) )
[docs] def get_device_info(self) -> DeviceInfo: """Get device information Warning ---------- Must not be called unless device connection is successful Returns ------- DeviceInfo device model, version, firmware version and buffer size """ return _dll.ba_eeg_manager_get_device_info(self._manager).contents
[docs] def get_channel_index(self, channel: int) -> int: """Gets the index of a channel's data into the chunk Get the index into the array provided by the chunk callback that contains the data of the channel number specified Parameters ------------ channel: int The number of the channel whose index to get Returns --------- int Index into chunk representing a channel """ val = _dll.ba_eeg_manager_get_channel_index( self._manager, ctypes.c_uint16(channel) ) if val == ctypes.c_size_t(-1).value: raise BrainAccessException( "Channel does not exist or is not currently streaming" ) return val
[docs] def get_sample_frequency(self) -> int: """Get device sampling frequency Returns ------- int Sample frequency (Hz) """ return _dll.ba_eeg_manager_get_sample_frequency(self._manager)
[docs] def set_callback_chunk(self, f: Callable) -> None: """Sets a callback to be called every time a chunk is available Warning ------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ------------ f callback Function to be called every time a chunk is available Set to null to disable. """ with self._callback_chunk_mtx: self._callback_chunk = f _dll.ba_eeg_manager_set_callback_chunk( self._manager, _callback_chunk if f is not None else None, self._manager )
[docs] def set_callback_battery(self, callback: Union[Callable, None] = None) -> None: """Sets a callback to be called every time the battery status is updated Warning --------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ---------- callback: Union[Callable, None] pass callback Function to be called every time a battery update is available Set to null to disable. Raises ------- BrainAccessException if callback is None """ if callback is not None: with self._callback_battery_mtx: self._callback_battery = callback else: raise BrainAccessException("Callback cannot be null") _dll.ba_eeg_manager_set_callback_battery( self._manager, _callback_battery, self._manager, )
[docs] def set_callback_disconnect(self, callback: Optional[Callable] = None) -> None: """Sets a callback to be called every time the device disconnects Warning --------- The callback may or may not run in the reader thread, and as such, synchronization must be used to avoid race conditions, and the callback itself must be as short as possible to avoid blocking communication with the device. Parameters ---------- callback: Union[Callable, None] callback Function to be called every time the device disconnects. Set to null to disable. """ if callback is None: with self._callback_disconnect_mtx: self._callback_disconnect = lambda: None else: with self._callback_disconnect_mtx: self._callback_disconnect = callback _dll.ba_eeg_manager_set_callback_disconnect( self._manager, _callback_disconnect, self._manager )
[docs] def annotate(self, annotation: str) -> None: """Adds an annotation at the current time Warning --------- Annotations are cleared on disconnect Parameters ---------- annotation: str annotation text Raises ------- BrainAccessException if annotation is None or empty """ if annotation is None: raise BrainAccessException("Annotation cannot be None") if len(annotation) == 0: raise BrainAccessException("Annotation cannot be empty") _handle_error( _dll.ba_eeg_manager_annotate( self._manager, ctypes.c_char_p(annotation.encode("ascii")) ) )
[docs] def get_device_features(self) -> DeviceFeatures: """Get device features and capabilities Returns ------- DeviceFeatures object with methods to check the status of gyro, accelerometer, bipolarity, electrode count """ info = self.get_device_info() return DeviceFeatures(info)
[docs] def get_annotations(self) -> dict: """Retrieve all the accumulated annotations Warning --------- Annotations are cleared on disconnect Returns ------- dict annotations: List[str] timestamps: List[float] """ ae = ctypes.POINTER(Annotation)() size = ctypes.c_size_t() _dll.ba_eeg_manager_get_annotations( self._manager, ctypes.pointer(ae), ctypes.pointer(size) ) annotations = [ae[i] for i in range(size.value)] timestamps = [x.timestamp for x in annotations] annotations = [x.annotation for x in annotations] return {"annotations": annotations, "timestamps": timestamps}
[docs] def clear_annotations(self) -> None: """Clears annotations""" _dll.ba_eeg_manager_clear_annotations(self._manager)
[docs] def start_update(self, callback: Union[Callable, None] = None) -> None: """Starts a firmware update Parameters ---------- callback: Union[Callable, None] callback to be called every time the update progress changes Raises ------- BrainAccessException if unable to start update """ if callback is not None: with self._callback_ota_update_mtx: self._callback_ota_update = callback else: with self._callback_ota_update_mtx: self._callback_ota_update = lambda x, y: None _handle_error( _dll.ba_eeg_manager_start_update( self._manager, _callback_ota_update, self._manager ) )