Source code for brainaccess_board.utils
import json
import socket
import numpy as np
import mne
import pathlib
import appdirs
from contextlib import closing
from collections import defaultdict
from pydantic import ValidationError, BaseModel
_user_log_dir: pathlib.Path = pathlib.Path(
appdirs.user_log_dir(appname="baboard", appauthor="Neurotechnology")
)
_user_log_utils: pathlib.Path = _user_log_dir.joinpath("utils.json")
class _RunningSessionOptions(BaseModel):
"""Running session options"""
current_save_file: str
socket_port: int
def _get_utils_dict() -> _RunningSessionOptions | None:
try:
with open(_user_log_utils, "r", encoding="utf-8") as f:
_f = json.load(f)
_RUNNING: _RunningSessionOptions = _RunningSessionOptions.parse_obj(_f)
return _RUNNING
except ValidationError as e:
print(e)
return None
def _convert_to_mne_(
data: dict,
markers: dict,
) -> mne.io.RawArray:
"""Convert data to MNE RawArray
Args:
data (dict): data to convert
markers (dict): markers to add to the data
Returns:
mne.io.RawArray: converted data
"""
info = _create_info(data)
data_units = _get_units_conversion(data)
data["data"] = data["data"] * data_units
raw_data = mne.io.RawArray(data["data"], info)
onset = np.array([])
description = []
duration = []
if markers:
data_time0 = data["time"][0]
for marker in markers:
try:
onset = np.append(onset, np.array(markers[marker]["time"]) - data_time0)
except Exception as e:
print(f"Error in marker {marker} {e}")
continue
description.extend([x for x in markers[marker]["data"][0]])
duration = [0 for _ in onset]
annot = mne.Annotations(
onset=onset,
duration=duration,
description=description,
)
raw_data.set_annotations(annot)
raw_data.set_montage("standard_1005", on_missing="warn")
return raw_data
def _create_info(data: dict) -> mne.Info:
"""Create MNE Info object from data.
Args:
data (dict): Data to extract info from.
Returns:
mne.Info: MNE Info object.
"""
channel_types = [
"eeg" if _type == "EEG" else "misc" for _type in data["meta"]["channels_type"]
]
info = mne.create_info(
ch_names=data["meta"]["channels"],
ch_types=channel_types,
sfreq=data["meta"]["srate"],
)
info.set_montage("standard_1005", on_missing="warn")
return info
def _get_units_conversion(data: dict) -> np.ndarray:
"""Get conversion factors for data units.
Args:
data (dict): Data to extract units from.
Returns:
np.array: Conversion factors for each channel.
"""
conversions: defaultdict = defaultdict(lambda: 1)
conversions.update(
{
"microvolts": 1e-6,
"volts": 1,
"mV": 1e-3,
"uV": 1e-6,
"millivolts": 1e-3,
"V": 1,
}
)
result = []
for data_type, unit in zip(
data["meta"]["channels_type"], data["meta"]["channels_unit"]
):
result.append(conversions[unit])
return np.array(result).reshape(-1, 1)
[docs]
def find_free_port() -> int:
"""Find a free port on the localhost
Returns:
int: Port number
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as free_socket:
free_socket.bind(("localhost", 0))
free_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port_number = free_socket.getsockname()[1]
return port_number