Source code for brainaccess_board.utils

import json
import socket
import numpy as np
import mne
import pathlib
import appdirs
from contextlib import closing

from collections import defaultdict
from pydantic import ValidationError, BaseModel


_user_log_dir: pathlib.Path = pathlib.Path(
    appdirs.user_log_dir(appname="baboard", appauthor="Neurotechnology")
)
_user_log_utils: pathlib.Path = _user_log_dir.joinpath("utils.json")


class _RunningSessionOptions(BaseModel):
    """Running session options"""

    current_save_file: str
    socket_port: int


def _get_utils_dict() -> _RunningSessionOptions | None:
    try:
        with open(_user_log_utils, "r", encoding="utf-8") as f:
            _f = json.load(f)
            _RUNNING: _RunningSessionOptions = _RunningSessionOptions.parse_obj(_f)
        return _RUNNING
    except ValidationError as e:
        print(e)
        return None


def _convert_to_mne_(
    data: dict,
    markers: dict,
) -> mne.io.RawArray:
    """Convert data to MNE RawArray

    Args:
        data (dict): data to convert
        markers (dict): markers to add to the data

    Returns:
        mne.io.RawArray: converted data

    """
    info = _create_info(data)
    data_units = _get_units_conversion(data)
    data["data"] = data["data"] * data_units
    raw_data = mne.io.RawArray(data["data"], info)
    onset = np.array([])
    description = []
    duration = []
    if markers:
        data_time0 = data["time"][0]
        for marker in markers:
            try:
                onset = np.append(onset, np.array(markers[marker]["time"]) - data_time0)
            except Exception as e:
                print(f"Error in marker {marker} {e}")
                continue
            description.extend([x for x in markers[marker]["data"][0]])
        duration = [0 for _ in onset]
    annot = mne.Annotations(
        onset=onset,
        duration=duration,
        description=description,
    )
    raw_data.set_annotations(annot)
    raw_data.set_montage("standard_1005", on_missing="warn")
    return raw_data


def _create_info(data: dict) -> mne.Info:
    """Create MNE Info object from data.

    Args:
        data (dict): Data to extract info from.

    Returns:
        mne.Info: MNE Info object.
    """
    channel_types = [
        "eeg" if _type == "EEG" else "misc" for _type in data["meta"]["channels_type"]
    ]
    info = mne.create_info(
        ch_names=data["meta"]["channels"],
        ch_types=channel_types,
        sfreq=data["meta"]["srate"],
    )
    info.set_montage("standard_1005", on_missing="warn")
    return info


def _get_units_conversion(data: dict) -> np.ndarray:
    """Get conversion factors for data units.

    Args:
        data (dict): Data to extract units from.

    Returns:
        np.array: Conversion factors for each channel.
    """
    conversions: defaultdict = defaultdict(lambda: 1)
    conversions.update(
        {
            "microvolts": 1e-6,
            "volts": 1,
            "mV": 1e-3,
            "uV": 1e-6,
            "millivolts": 1e-3,
            "V": 1,
        }
    )
    result = []
    for data_type, unit in zip(
        data["meta"]["channels_type"], data["meta"]["channels_unit"]
    ):
        result.append(conversions[unit])
    return np.array(result).reshape(-1, 1)


[docs] def find_free_port() -> int: """Find a free port on the localhost Returns: int: Port number """ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as free_socket: free_socket.bind(("localhost", 0)) free_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port_number = free_socket.getsockname()[1] return port_number