Source code for brainaccess_board.database

from typing import Any, Optional
import re
import numpy as np
import mne
from .utils import _get_utils_dict, _convert_to_mne_
from .sq import (
    get_handle,
    get_data_after,
    get_range_data,
    get_devices,
    get_data,
    get_last_seconds_data,
    get_metadata,
    get_first_timestamp,
    close_db,
)


[docs] class ReadDB: """Get current database file to read from it""" def __init__(self, filename: str = "current") -> None:
[docs] self.name = filename
self._connect() self._close() def _get_current(self) -> str: p = _get_utils_dict() if p: current_filename = p.current_save_file else: raise Exception("No current file found") return current_filename def _connect(self) -> None: if self.name == "current": self.filename = self._get_current() else: self.filename = self.name self.handle = get_handle(self.filename, uri=True) self.devices = get_devices(self.handle) def _close(self) -> None: close_db(handle=self.handle) def _get_data( self, device: str, duration: Optional[int] = None, ) -> dict[str, Any]: data = None if duration: data = get_last_seconds_data(self.handle, duration=duration, device=device) else: data = get_data(self.handle, device=device, direction="all") if not data: return {} else: _data = np.block([x[0] for x in data[::-1]]) _time = np.block([x[1] for x in data[::-1]]) _l = np.block([x[2] for x in data[::-1]]) return { "data": _data, "time": _time, "local_time": _l, "id": device, } def _get_info(self, device: str) -> dict[str, Any]: _info = get_metadata(self.handle, device=device) first_timestamp = get_first_timestamp(self.handle, device=device) # if not first_timestamp: # return {} if not _info: info: dict = {} else: info = { "channels": [x for x in _info[0][0].split(",")], "channels_type": [x for x in _info[0][1].split(",")], "channels_unit": [x for x in _info[0][2].split(",")], "srate": _info[0][3], "id": _info[0][4], "first_timestamp": first_timestamp, } return info def _list_devices(self, only_lsl: bool = False) -> dict[str, Any]: self.devices = get_devices(self.handle) markers = {} data_devices = {} for device in self.devices: info = self._get_info(device) if not info: continue if info["srate"] > 0: if only_lsl: if not len(re.findall(r"-", device)) == 4: continue data_devices[device] = info else: markers[device] = info return {"data": data_devices, "markers": markers} def _convert_to_mne(self, data: dict, markers: dict, meta: dict) -> mne.io.Raw: """Convert data to mne format Args: data (dict): data to convert markers (dict): markers to convert meta (dict): metadata to convert Returns: mne.raw converted data """ data["meta"] = meta data = _convert_to_mne_(data, markers) return data def _get_marker_data(self, time: float, column_name: str, device: str) -> dict: data = get_data_after( self.handle, start=time, column=column_name, device=device ) if not data: return {} else: _data = np.block([x[0] for x in data[::-1]]) _time = np.block([x[1] for x in data[::-1]]) _l = np.block([x[2] for x in data[::-1]]) return {"data": _data, "time": _time, "local_time": _l, "id": device}
[docs] def get_mne( self, device: Optional[str] = None, duration: Optional[int] = None, time_range: Optional[tuple] = None, only_lsl: bool = True, marker_devices_include: Optional[list[str]] = None, ) -> dict[str, mne.io.Raw]: """Get data from the database as mne objects If no parameters are given, dictionary with all devices full data are returned. Parameters ---------- device : str, optional The device identifier. Usually the LSL uid. If empty all devices are returned. duration : int, Optional The duration in seconds from the end. time_range : tuple, Optional The time range to get data from. If duration is set time_range is ignored. only_lsl : bool, Optional If True, only LSL devices are included. With default templates non LSL devices are not recorded. marker_devices_include : list, Optional List of marker devices to include. If left empty all marker devices are included. Returns ------- dict Dictionary with mne objects. """ self._connect() all_devices = self._list_devices(only_lsl=only_lsl) if device is None: data_devices = list(all_devices["data"].keys()) else: data_devices = [device] markers = {} if marker_devices_include: for dev in marker_devices_include: if not dev: continue try: markers[dev] = self._get_data(device=dev) except Exception: print(f"Device {dev} not found") else: for dev in all_devices["markers"]: _dat = self._get_data(device=dev) if _dat: markers[dev] = _dat meta = {} mne_data = {} if duration: duration += 1 # data is in chunks for dev in data_devices: data = self._get_data(device=dev, duration=duration) meta = self._get_info(device=dev) if data: mne_data[dev] = self._convert_to_mne(data, markers, meta) if time_range: mne_data[dev] = mne_data[dev].crop( tmin=time_range[0], tmax=time_range[1] ) if duration: t = mne_data[dev].times[-1] duration = duration - 1 mne_data[dev] = mne_data[dev].crop(tmin=t - duration, tmax=t) self._close() return mne_data