BrainAccess Core Usage Examples

Minimal data acquisition demo

"""EEG measurement example

Example how to get measurements using brainaccess library

Change Bluetooth device name to your device name (line 57)
"""

import numpy as np
import time
import threading
import matplotlib.pyplot as plt
from scipy.signal import butter, sosfiltfilt
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
import brainaccess.core.eeg_channel as eeg_channel
from brainaccess.core.gain_mode import (
    GainMode,
)


def butter_bandpass(lowcut, highcut, fs, order=2):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    sos = butter(order, [low, high], analog=False, btype="bandpass", output="sos")
    return sos


def butter_bandpass_filter(data, lowcut, highcut, fs, order=2):
    sos = butter_bandpass(lowcut, highcut, fs, order=order)
    y = sosfiltfilt(sos, data)
    return y


def _acq_closure(ch_number: int = 1, buffer_length: int = 1000):
    data = np.zeros((ch_number, buffer_length))
    mutex = threading.Lock()

    def _acq_callback(chunk, chunk_size):
        nonlocal data
        nonlocal mutex
        with mutex:
            data = np.roll(data, -chunk_size)
            data[:, -chunk_size:] = chunk

    def get_data():
        nonlocal data
        with mutex:
            return data.copy()

    return _acq_callback, get_data


if __name__ == "__main__":
    # Change to your device name
    # Device name can be found on the back of the device
    device_name = "BA HALO 001"

    # init the core
    core.init()

    # scan for devices
    core.scan(0)  # adapter number (on windows always 0)
    count = core.get_device_count()
    port = 0
    # find the defined one
    print("Found devices:", count)
    for i in range(count):
        name = core.get_device_name(i)
        if device_name in name:
            port = i
            break
    else:
        raise Exception(f"Device {device_name} not found")

    # connect to the device
    with EEGManager() as mgr:
        print("Connecting to device:", core.get_device_name(port))
        _status = mgr.connect(port)
        if _status == 1:
            raise Exception("Connection failed")
        elif _status == 2:
            raise Exception("Stream is incompatible. Update the firmware.")

        # battery info
        print(f"battery level: {mgr.get_battery_info().level} %")

        # Get electrode count
        device_features = mgr.get_device_features()
        eeg_channels_number = device_features.electrode_count()
        print(f"Device has {eeg_channels_number} EEG channels")

        # set the channels
        ch_nr = 0
        # for i in range(eeg_channels_number):
        for i in range(3, eeg_channels_number):
            mgr.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
            ch_nr += 1
            mgr.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GainMode.X8)
        mgr.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)

        # check if the device has accelerometer
        has_accel = device_features.has_accel()
        if has_accel:
            print("Setting the accelerometer")
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 1, True)
            ch_nr += 1
            mgr.set_channel_enabled(eeg_channel.ACCELEROMETER + 2, True)
            ch_nr += 1

        mgr.set_channel_enabled(eeg_channel.SAMPLE_NUMBER, True)
        ch_nr += 1

        # set the streaming channel, shows 0 if Bluetooth connection
        # was lost and 0 was added to the data
        mgr.set_channel_enabled(eeg_channel.STREAMING, True)
        ch_nr += 1

        # get the sample rate
        sr = mgr.get_sample_frequency()

        # define the callback for the acquisition
        duration = 10
        buffer_time = int(sr * duration)  # seconds
        _acq_callback, get_data = _acq_closure(
            ch_number=ch_nr, buffer_length=buffer_time
        )
        mgr.set_callback_chunk(_acq_callback)

        # load defined configuration
        mgr.load_config()

        # start the stream
        mgr.start_stream()
        print("Stream started")

        # collect data
        time.sleep(4)
        for i in range(duration):
            time.sleep(1)
            print(f"Collecting data {i + 1}/{duration}")

        # get the data
        dat = get_data()

        # stop the stream
        mgr.stop_stream()
        print("Stream stopped")
        time.sleep(1)

        # The EEGManager destructor calls mgr.disconnect()
        # so we don't need to call it here

    print("Disconnected from the device")
    time.sleep(1)

    # close the core
    core.close()
    print("Core closed")

    # plot the data
    print("Plotting the data")

    # Create channel labels
    ch = ["sample"]
    ch.extend([f"ch_{i}" for i in range(eeg_channels_number)])
    if has_accel:
        ch.extend(["accel_x", "accel_y", "accel_z"])
    ch.extend(["streaming"])

    # Apply bandpass filter to EEG data
    eeg_data = dat[1 : eeg_channels_number + 1, :]
    eeg_data = butter_bandpass_filter(eeg_data, 1, 40, sr)

    # Normalize EEG data
    eps = 1e-20  # To avoid division by zero in this example
    eeg_data = (eeg_data - np.mean(eeg_data, axis=0)) / (np.std(eeg_data, axis=0) + eps)
    # Add offsets for visualization
    eeg_data = eeg_data + np.arange(eeg_channels_number)[:, np.newaxis]

    # Create subplots
    fig, axs = plt.subplots(4, 1, figsize=(10, 10))

    # Plot the data
    axs[0].plot(eeg_data.T)
    axs[0].set_ylabel("EEG Channels")

    if has_accel:
        axs[1].plot(dat[len(ch) - 4 : -1, :].T)
        axs[1].set_ylabel("Accelerometer")
    else:
        axs[1].axis("off")  # Hide the unused subplot

    axs[2].plot(dat[0, :])
    axs[2].set_ylabel("Sample")

    axs[3].plot(dat[-1, :])
    axs[3].set_ylabel("Streaming")

    plt.show()

Continuous data acquisition demo

""" EEG measurement example

Example how to get measurements and
save to fif format
using acquisition class from brainaccess.utils

Change Bluetooth device name (line 23)
"""

import matplotlib.pyplot as plt
import matplotlib
import time

from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager

matplotlib.use("TKAgg", force=True)

eeg = acquisition.EEG()

# define electrode locations
cap: dict = {
    0: "Fp1",
    1: "Fp2",
    2: "O1",
    3: "O2",
}

# define device name
device_name = "BA HALO 001"

# start EEG acquisition setup
with EEGManager() as mgr:
    eeg.setup(mgr, device_name=device_name, cap=cap)

    # Start acquiring data
    eeg.start_acquisition()
    print("Acquisition started")
    time.sleep(3)

    start_time = time.time()
    annotation = 1
    while time.time() - start_time < 10:
        time.sleep(1)
        # send annotation to the device
        print(f"Sending annotation {annotation} to the device")
        eeg.annotate(str(annotation))
        annotation += 1

    print("Preparing to plot data")
    time.sleep(2)

    # get all eeg data and stop acquisition
    eeg.get_mne()
    eeg.stop_acquisition()
    mgr.disconnect()

# save EEG data to MNE fif format
eeg.data.save(f'{time.strftime("%Y%m%d_%H%M")}-raw.fif')
# Close brainaccess library
eeg.close()
# Show recorded data
eeg.data.mne_raw.filter(1, 40).plot(scalings="auto", verbose=False)
plt.show()

Impedance measurement

""" Impedance measurement example
Example how to get impedance measurements
using acquisition class from brainaccess.utils

Change Bluetooth device name to your device name (line 18)
"""

import time

from brainaccess.utils import acquisition
from brainaccess.core.eeg_manager import EEGManager

eeg = acquisition.EEG()

cap: dict = {
  0: "Fp1",
  1: "Fp2",
  2: "O1",
  3: "O2",
}

# define device name
device_name = "BA HALO 001"

with EEGManager() as mgr:

    eeg.setup(mgr, device_name=device_name, cap=cap)
    # Start measuring impedance
    eeg.start_impedance_measurement()
    # Print impedances
    start_time = time.time()
    while time.time()-start_time < 20:
        time.sleep(1)
        imp = eeg.calc_impedances()
        print(imp)

    # Stop measuring impedance
    eeg.stop_impedance_measurement()
    mgr.disconnect()

eeg.close()