BrainAccess Core Usage Examples

Minimal data acquisition demo

""" EEG measurement example

Example how to get measurements using brainaccess library

Change Bluetooth device name to your device name (line 57)
"""

import numpy as np
import time
import warnings
import threading
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy.signal import butter, sosfiltfilt
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
import brainaccess.core.eeg_channel as eeg_channel
from brainaccess.core.gain_mode import (
    GainMode,
)


def butter_bandpass(lowcut, highcut, fs, order=1):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    sos = butter(order, [low, high], analog=False, btype="bandpass", output="sos")
    return sos


def butter_bandpass_filter(data, lowcut, highcut, fs, order=1):
    sos = butter_bandpass(lowcut, highcut, fs, order=order)
    y = sosfiltfilt(sos, data)
    return y


def _acq_closure(ch_number: int = 1, buffer_length: int = 1000):
    data = np.zeros((ch_number, buffer_length))
    mutex = threading.Lock()

    def _acq_callback(chunk, chunk_size):
        nonlocal data
        nonlocal mutex
        with mutex:
            data = np.roll(data, -chunk_size)
            data[:, -chunk_size:] = chunk

    def get_data():
        nonlocal data
        with mutex:
            return data.copy()

    return _acq_callback, get_data


if __name__ == "__main__":
    device_name = "BA MINI 001"  # change to your device name
    core.init()

    # scan for devices find the defined one
    core.scan(0)  # adapter number (on windows always 0)
    count = core.get_device_count()
    port = 0

    print("Found devices:", count)
    for i in range(count):
        name = core.get_device_name(i)
        if device_name in name:
            port = i

    # connect to the device
    with EEGManager() as mgr:
        print("Connecting to device:", core.get_device_name(port))
        _status = mgr.connect(port)
        if _status == 1:
            raise Exception("Connection failed")
        elif _status == 2:
            warnings.warn("Stream is incompatible. Update the firmware.")

        # battery info
        print("battery level:", mgr.get_battery_info().level)

        # set the channels
        print("Setting the channels")
        ch_nr = 0
        eeg_channels_number = 8
        for i in range(eeg_channels_number):
            mgr.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
            ch_nr += 1
            mgr.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT, GainMode.X8)
        mgr.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)

        # check if the device has accelerometer
        has_accel = mgr.get_device_features().has_accel()
        if has_accel:
            print("Setting the accelerometer")
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 1, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 2, True)
            ch_nr += 1

        mgr.set_channel_enabled(eeg_channel.SAMPLE_NUMBER, True)
        ch_nr += 1

        mgr.set_channel_enabled(eeg_channel.STREAMING, True)
        ch_nr += 1

        # set the sample rate
        sr = mgr.get_sample_frequency()

        # define the callback for the acquisition
        duration = 20
        buffer_time = int(sr * duration)  # seconds
        _acq_callback, get_data = _acq_closure(
            ch_number=ch_nr, buffer_length=buffer_time
        )
        mgr.set_callback_chunk(_acq_callback)

        # load defined configuration
        mgr.load_config()

        # start the stream
        mgr.start_stream()
        print("Stream started")

        # collect data
        time.sleep(4)
        for i in range(duration):
            time.sleep(1)
            print(f"Collecting data {i+1}/{duration}")

        # get the data
        dat = get_data()

        # stop the stream
        mgr.stop_stream()

    # close the core
    core.close()

    # plot the data
    df = pd.DataFrame(dat.T)
    if has_accel:
        ch = []
        ch.append("sample")
        ch.extend([f"ch_{i}" for i in range(eeg_channels_number)])
        ch.extend(["accel_x", "accel_y", "accel_z"])
    else:
        ch = []
        ch.append("sample")
        ch.extend([f"ch_{i}" for i in range(eeg_channels_number)])
    ch.extend(["streaming"])
    df.columns = ch
    df.iloc[:, 1:eeg_channels_number+1] = butter_bandpass_filter(df.iloc[:, 1:eeg_channels_number+1].T, 1, 40, sr).T
    fig, axs = plt.subplots(4, 1, figsize=(10, 10))
    eeg_data = df.iloc[:, 1:eeg_channels_number+1].values
    eeg_data = (eeg_data - eeg_data.mean(axis=0)) / eeg_data.std(axis=0)
    eeg_data = eeg_data + np.arange(eeg_channels_number)
    sns.lineplot(data=eeg_data, ax=axs[0])
    sns.lineplot(data=df.iloc[:, len(ch)-4:-1], ax=axs[1])
    sns.lineplot(data=df.iloc[:, 0], ax=axs[2])
    sns.lineplot(data=df.iloc[:, -1], ax=axs[3])
    plt.show()

Continuous data acquisition demo

""" EEG measurement example

Example how to get measurements and
save to fif format
using acquisition class from brainaccess.utils

Change Bluetooth device name (line 23)
"""

import matplotlib.pyplot as plt
import matplotlib
import time

from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager

matplotlib.use("TKAgg", force=True)

eeg = acquisition.EEG()

# define electrode locations
cap: dict = {
    0: "Fp1",
    1: "Fp2",
    2: "O1",
    3: "O2",
}

# define device name
device_name = "BA HALO 001"

# start EEG acquisition setup
with EEGManager() as mgr:
    eeg.setup(mgr, device_name=device_name, cap=cap)

    # Start acquiring data
    eeg.start_acquisition()
    time.sleep(3)

    start_time = time.time()
    annotation = 1
    while time.time() - start_time < 10:
        time.sleep(1)
        # send annotation to the device
        print(f"Sending annotation {annotation} to the device")
        eeg.annotate(str(annotation))
        annotation += 1

    # get all eeg data and stop acquisition
    eeg.get_mne()
    eeg.stop_acquisition()
    mgr.disconnect()

# save EEG data to MNE fif format
eeg.data.save(f'{time.strftime("%Y%m%d_%H%M")}-raw.fif')
# Close brainaccess library
eeg.close()
# Show recorded data
eeg.data.mne_raw.filter(1, 40).plot(scalings="auto", verbose=False)
plt.show()

Impedance measurement

""" Impedance measurement example
Example how to get impedance measurements
using acquisition class from brainaccess.utils

Change Bluetooth device name to your device name (line 18)
"""

import time

from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager

eeg = acquisition.EEG()

cap: dict = {
  0: "Fp1",
  1: "Fp2",
  2: "O1",
  3: "O2",
}

# define device name
device_name = "BA HALO 001"

with EEGManager() as mgr:

    eeg.setup(mgr, device_name=device_name, cap=cap)
    # Start measuring impedance
    eeg.start_impedance_measurement()
    # Print impedances
    start_time = time.time()
    while time.time()-start_time < 20:
        time.sleep(1)
        imp = eeg.calc_impedances()
        print(imp)

    # Stop measuring impedance
    eeg.stop_impedance_measurement()
    mgr.disconnect()

eeg.close()