Skip to content

LFPLoader Tutorial

This tutorial demonstrates how to use neuro_py.io.loading.LFPLoader to load LFP or DAT signals as a nelpy.AnalogSignalArray, including single-channel, multi-channel, and epoch-restricted usage.

%reload_ext autoreload
%autoreload 2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

import neuro_py as npy

npy.plotting.set_plotting_defaults()

1) Set session path

Set basepath to a recording folder that contains your session files (.xml, .lfp or .dat, and optional event files).

basepath = r"U:\data\hpc_ctx_project\HP18\hp18_day85_20250713"

2) Inspect XML metadata

loadXML returns channel count and both LFP and DAT sampling rates.

nChannels, fs_lfp, fs_dat, shank_to_channel = npy.io.loadXML(basepath)
print(f"nChannels: {nChannels}")
print(f"fs_lfp:    {fs_lfp}")
print(f"fs_dat:    {fs_dat}")
print(f"n_shanks:  {len(shank_to_channel)}")
nChannels: 256
fs_lfp:    1250
fs_dat:    20000
n_shanks:  12

3) Load LFP/DAT with LFPLoader

LFPLoader returns an nelpy.AnalogSignalArray object.

When not specifying a channel, LFPLoader will load all channels to a memory-mapped array that is accessed on demand, so it can be used even if the LFP/DAT files are too large to fit in memory.

# Load full LFP file
lfp_obj = npy.io.LFPLoader(basepath, ext="lfp")
print(type(lfp_obj))
print(lfp_obj)
print("data shape:", lfp_obj.data.shape)
print("data dtype:", type(lfp_obj.data))
<class 'neuro_py.io.loading.LFPLoader'>
<LFPLoader at 0x2a7a1290fd0: 256 signals> for a total of 9:10:642 minutes
data shape: (256, 688304)
data dtype: <class 'numpy.memmap'>
# Load DAT as AnalogSignalArray using fs_dat from XML
dat_obj = npy.io.LFPLoader(basepath, ext="dat")
print(type(dat_obj))
print(dat_obj)
print("data shape:", dat_obj.data.shape)
print("data dtype:", type(dat_obj.data))
<class 'neuro_py.io.loading.LFPLoader'>
<LFPLoader at 0x2a786321f90: 256 signals> for a total of 9:10:643 minutes
data shape: (256, 11012864)
data dtype: <class 'numpy.memmap'>

4) Single and multi-channel loading

You can pass one channel (int) or many channels (list).

# Example channel IDs
brain_regions = npy.io.load_brain_regions(basepath, out_format="DataFrame")

ripple_channel = brain_regions.query("region=='CA1so'").channels.iloc[0]
sharp_wave_channel = brain_regions.query("region=='CA1sr'").channels.iloc[0]

single_ch = npy.io.LFPLoader(basepath, channels=ripple_channel, ext="lfp")
multi_ch = npy.io.LFPLoader(
    basepath, channels=[ripple_channel, sharp_wave_channel], ext="lfp"
)

print("single channel shape:", single_ch.data.shape)
print("multi channel shape: ", multi_ch.data.shape)
single channel shape: (1, 688304)
multi channel shape:  (2, 688304)

5) Epoch-restricted loading

Restrict loaded data to a time interval using epoch.

epoch_df = npy.io.load_epoch(basepath)
epoch_df.head()
name startTime stopTime environment manipulation behavioralParadigm stimuli notes basepath
0 hp18_probe_250713_091741 0 550.6432 cheeseboard NaN NaN NaN NaN U:\data\hpc_ctx_project\HP18\hp18_day85_20250713
# Restrict to first 5 seconds of the first epoch
t0 = float(epoch_df.startTime.iloc[0])
restricted_epoch = np.array([t0, t0 + 5.0])

lfp_restricted = npy.io.LFPLoader(
    basepath,
    channels=[ripple_channel, sharp_wave_channel],
    ext="lfp",
    epoch=restricted_epoch,
)

print(lfp_restricted.data.shape)
print(lfp_restricted.abscissa_vals.min(), lfp_restricted.abscissa_vals.max())
(2, 6251)
0.0 5.0

6) Basic plotting

Plot two channels in a short time window.

sig = npy.io.LFPLoader(
    basepath, channels=[ripple_channel, sharp_wave_channel], ext="dat"
)
ts = sig.abscissa_vals

window = (ts >= ts[0] + 2.0) & (ts <= ts[0] + 4.0)

fig, ax = plt.subplots(
    2,
    1,
    figsize=npy.plotting.set_size("thesis", fraction=1.0, subplots=(1, 1.0)),
    sharex=True,
)
ax[0].plot(ts[window], sig.data[0, window], color="k", lw=0.7)
ax[1].plot(ts[window], sig.data[1, window], color="teal", lw=0.7)
ax[0].set_ylabel(f"ch {ripple_channel}")
ax[1].set_ylabel(f"ch {sharp_wave_channel}")
ax[1].set_xlabel("time (s)")
sns.despine()
plt.tight_layout()
plt.show()

png

7) More advanced plotting

Plotting raw data from every channel and every shank around a ripple event.

need to adjust if only single shank

ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="dat")

ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.150) & (
    ts <= ripples.peaks[ripple_n] + 0.150
)

fig, ax = plt.subplots(
    1,
    len(sig.shank_to_channel.keys()),
    figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
    sharex=True,
    sharey=True,
    dpi=150,
)
for i, (shank, channel) in enumerate(sig.shank_to_channel.items()):
    offsets = np.arange(len(channel)) * 3000
    ax[i].plot(
        ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7
    )
    ax[i].set_title(f"shank {shank}")
    ax[i].set_xlabel("time (s)")
    ax[i].axis("off")
# add scale bar
ax[i].plot(
    [ts[window][0], ts[window][0] + 0.1],
    [-offsets[-1] - 3000, -offsets[-1] - 3000],
    color="red",
    lw=2,
    label="100 ms",
)
# add text label for scale bar
ax[i].text(
    ts[window][0] + 0.05,
    -offsets[-1] - 3500,
    "100 ms",
    color="red",
    ha="center",
    va="top",
    fontsize=8,
)

# to scale bar add amplitude label
ax[i].plot(
    [ts[window][0], ts[window][0]],
    [-offsets[-1] - 3000, -offsets[-1]],
    color="blue",
    lw=2,
    label="3000 uV",
)
ax[i].text(
    ts[window][0] - 0.02,
    -offsets[-1] - 1500,
    "3000 uV",
    color="blue",
    ha="right",
    va="center",
    fontsize=8,
    rotation=90,
)
plt.show()

png

Plotting raw data from every channel from a particular shank around a ripple event.

ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="dat")


ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.5) & (ts <= ripples.peaks[ripple_n] + 0.5)

fig, ax = plt.subplots(
    1,
    1,
    figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
    sharex=True,
    sharey=True,
    dpi=150,
)

shank = 10  # example shank number

channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
    [ts[window][0], ts[window][0] + 0.1],
    [-offsets[-1] - 3000, -offsets[-1] - 3000],
    color="red",
    lw=2,
    label="100 ms",
)
# add text label for scale bar
ax.text(
    ts[window][0] + 0.05,
    -offsets[-1] - 3500,
    "100 ms",
    color="red",
    ha="center",
    va="top",
    fontsize=8,
)

# to scale bar add amplitude label
ax.plot(
    [ts[window][0], ts[window][0]],
    [-offsets[-1] - 3000, -offsets[-1]],
    color="blue",
    lw=2,
    label="3000 uV",
)
ax.text(
    ts[window][0] - 0.02,
    -offsets[-1] - 1500,
    "3000 uV",
    color="blue",
    ha="right",
    va="center",
    fontsize=8,
    rotation=90,
)
plt.show()

png

same plot above but with LFP sampled at 1250 Hz instead of 20000 Hz

ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="lfp")


ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.5) & (ts <= ripples.peaks[ripple_n] + 0.5)

fig, ax = plt.subplots(
    1,
    1,
    figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
    sharex=True,
    sharey=True,
    dpi=150,
)

shank = 10  # example shank number

channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
    [ts[window][0], ts[window][0] + 0.1],
    [-offsets[-1] - 3000, -offsets[-1] - 3000],
    color="red",
    lw=2,
    label="100 ms",
)
# add text label for scale bar
ax.text(
    ts[window][0] + 0.05,
    -offsets[-1] - 3500,
    "100 ms",
    color="red",
    ha="center",
    va="top",
    fontsize=8,
)

# to scale bar add amplitude label
ax.plot(
    [ts[window][0], ts[window][0]],
    [-offsets[-1] - 3000, -offsets[-1]],
    color="blue",
    lw=2,
    label="3000 uV",
)
ax.text(
    ts[window][0] - 0.02,
    -offsets[-1] - 1500,
    "3000 uV",
    color="blue",
    ha="right",
    va="center",
    fontsize=8,
    rotation=90,
)
plt.show()

png

8) Frequency-domain helpers

LFPLoader provides convenience methods for phase and instantaneous frequency estimates.

theta_sig = npy.io.LFPLoader(basepath, channels=ripple_channel, ext="lfp")
phase = theta_sig.get_phase(band2filter=[6, 12], ford=3)
filt_sig, phase2, amp, amp_filt, inst_freq = theta_sig.get_freq_phase_amp(
    band2filter=[6, 12],
    ford=3,
    kernel_size=13,
)

print("phase shape:       ", phase.shape)
print("filtered shape:    ", filt_sig.shape)
print("inst freq shape:   ", inst_freq.shape)
phase shape:        (1, 688304)
filtered shape:     (1, 688304)
inst freq shape:    (1, 688304)
from scipy.signal import find_peaks

window = (theta_sig.abscissa_vals >= 5) & (theta_sig.abscissa_vals <= 10)

fig, ax = plt.subplots(
    5,
    1,
    figsize=npy.plotting.set_size("paper", fraction=1.3, subplots=(1, 1)),
    sharex=True,
    dpi=150,
)

ax[0].plot(
    theta_sig.abscissa_vals[window], theta_sig.data[0, window], color="k", lw=0.7
)
ax[0].set_ylabel(f"ch {ripple_channel}")
ax[1].plot(theta_sig.abscissa_vals[window], filt_sig[0, window], color="k", lw=0.7)
ax[1].set_ylabel("filtered (6-12 Hz)")
ax[2].plot(theta_sig.abscissa_vals[window], phase[0, window], color="k", lw=0.7)
ax[3].plot(theta_sig.abscissa_vals[window], amp[0, window], color="k", lw=0.7)
ax[3].set_ylabel("amplitude")

ax[4].plot(theta_sig.abscissa_vals[window], inst_freq[0, window], color="k", lw=0.7)
ax[4].axhspan(6, 12, color="blue", alpha=0.1, label="theta band")
ax[4].legend()
ax[2].set_ylabel("phase (rad)")
ax[4].set_ylabel("inst freq (Hz)")
ax[4].set_xlabel("time (s)")
ax[2].set_ylim(-np.pi, np.pi)
peaks, _ = find_peaks(phase[0, window], height=0)
for ax_ in ax:
    for peak in peaks:
        ax_.axvline(theta_sig.abscissa_vals[window][peak], color="k", lw=0.25, ls="--")
sns.despine()

png

9) Loading from unconcatenated files

To save space, the concatenated .dat file is often deleted after spike sorting. With 'LFPLoader', you can load the individual amplifer.dat files as if they were concatenated, without needing to actually concatenate them. Just pass the folder path and the channel number(s) to load.

demonstrate loading from unconcatenated files

import os

basepath = r"R:\data\latentseq\RM10\rm10_day4_20250808"

dat_file = os.path.join(basepath, f"{os.path.basename(basepath)}.dat")
print(f"{dat_file} exists? {os.path.exists(dat_file)}")

# find files names amplifier.dat in subfolders in basepath
for root, dirs, files in os.walk(basepath):
    for file in files:
        if file == "amplifier.dat":
            print(os.path.join(root, file))
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_day4_20250808.dat exists? False
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_cheeseboard1_250808_120146\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_cheeseboard2_250808_150603\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_postsleep1_250808_125600\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_postsleep2_250808_160051\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_presleep_250808_102239\amplifier.dat

Load from unconcatenated files and plot a short segment

sig = npy.io.LFPLoader(basepath, ext="dat")
print(sig)
print("data shape:", sig.data.shape)
print("data dtype:", type(sig.data))
<LFPLoader at 0x2a97675e890: 384 signals> for a total of 7:21:12:223 hours
data shape: (384, 529444480)
data dtype: <class 'neuro_py.io.loading.VirtualConcatenatedDatView'>
epoch_df = npy.io.load_epoch(basepath)
display(epoch_df)
name startTime stopTime environment manipulation behavioralParadigm stimuli notes basepath
0 rm10_presleep_250808_102239 0.0000 5588.59515 sleep NaN NaN NaN NaN R:\data\latentseq\RM10\rm10_day4_20250808
1 rm10_cheeseboard1_250808_120146 5588.5952 8674.43835 cheeseboard NaN NaN NaN NaN R:\data\latentseq\RM10\rm10_day4_20250808
2 rm10_postsleep1_250808_125600 8674.4384 16276.18555 sleep NaN NaN NaN NaN R:\data\latentseq\RM10\rm10_day4_20250808
3 rm10_cheeseboard2_250808_150603 16276.1856 19401.32475 cheeseboard NaN NaN NaN NaN R:\data\latentseq\RM10\rm10_day4_20250808
4 rm10_postsleep2_250808_160051 19401.3248 26472.22395 sleep NaN NaN NaN NaN R:\data\latentseq\RM10\rm10_day4_20250808

Plotting raw data across an epoch boundary to demonstrate that LFPLoader seamlessly handles loading from multiple files when the concatenated file is not present.

ts = sig.abscissa_vals
window = [epoch_df.iloc[0].stopTime - 1, epoch_df.iloc[1].startTime + 1]
window = (ts >= window[0]) & (ts <= window[1])

fig, ax = plt.subplots(
    1,
    1,
    figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
    sharex=True,
    sharey=True,
    dpi=150,
)

shank = 10  # example shank number

channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
    [ts[window][0], ts[window][0] + 0.1],
    [-offsets[-1] - 3000, -offsets[-1] - 3000],
    color="red",
    lw=2,
)
# add text label for scale bar
ax.text(
    ts[window][0] + 0.05,
    -offsets[-1] - 3500,
    "100 ms",
    color="red",
    ha="center",
    va="top",
    fontsize=8,
)

# to scale bar add amplitude label
ax.plot(
    [ts[window][0], ts[window][0]],
    [-offsets[-1] - 3000, -offsets[-1]],
    color="blue",
    lw=2,
)
ax.text(
    ts[window][0] - 0.02,
    -offsets[-1] - 1500,
    "3000 uV",
    color="blue",
    ha="right",
    va="center",
    fontsize=8,
    rotation=90,
)
plt.axvline(epoch_df.iloc[0].stopTime, color="r", lw=1, ls="--", label="epoch boundary")
# add axvline text label
ax.text(
    epoch_df.iloc[0].stopTime + 0.01,
    2000,
    f"epoch boundary at {epoch_df.iloc[0].stopTime:.2f} s",
    color="r",
    ha="left",
    va="bottom",
    fontsize=12,
)

plt.show()

png

Virtual concatenation of multiple data formats [Intan and OpenEphys]

basepath = r"S:\data\HPC_LS\HS3\day7"
sig = npy.io.LFPLoader(basepath, ext="dat")
print(sig)
print("data shape:", sig.data.shape)
print("data dtype:", type(sig.data))
<LFPLoader at 0x2aace34fdd0: 192 signals> for a total of 6:28:44:038 hours
data shape: (192, 466480763)
data dtype: <class 'numpy.memmap'>

Here, epoch 0 is OpenEphys and epochs 2,3 are Intan

epoch_df = npy.io.load_epoch(basepath)
display(epoch_df)
name startTime stopTime environment manipulation behavioralParadigm stimuli notes basepath
0 2025-07-28_09-17-02 0.00000 10130.04775 NaN NaN NaN NaN NaN S:\data\HPC_LS\HS3\day7
1 circle_1_CCW_250728_121210 10130.04775 12007.34695 NaN NaN NaN NaN NaN S:\data\HPC_LS\HS3\day7
2 postsleep_250728_124714 12007.34695 23324.03815 NaN NaN NaN NaN NaN S:\data\HPC_LS\HS3\day7
import glob as glob

print(glob.glob(os.path.join(basepath, "**", "continuous.dat"), recursive=True))

print(glob.glob(os.path.join(basepath, "**", "amplifier.dat"), recursive=True))
['S:\\data\\HPC_LS\\HS3\\day7\\2025-07-28_09-17-02\\Record Node 101\\experiment1\\recording1\\continuous\\Acquisition_Board-100.Rhythm Data\\continuous.dat']
['S:\\data\\HPC_LS\\HS3\\day7\\circle_1_CCW_250728_121210\\amplifier.dat', 'S:\\data\\HPC_LS\\HS3\\day7\\postsleep_250728_124714\\amplifier.dat']
ts = sig.abscissa_vals
window = [epoch_df.iloc[0].stopTime - 1, epoch_df.iloc[1].startTime + 1]
window = (ts >= window[0]) & (ts <= window[1])

fig, ax = plt.subplots(
    1,
    1,
    figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
    sharex=True,
    sharey=True,
    dpi=150,
)

shank = 1  # example shank number

channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
    [ts[window][0], ts[window][0] + 0.1],
    [-offsets[-1] - 3000, -offsets[-1] - 3000],
    color="red",
    lw=2,
)
# add text label for scale bar
ax.text(
    ts[window][0] + 0.05,
    -offsets[-1] - 3500,
    "100 ms",
    color="red",
    ha="center",
    va="top",
    fontsize=8,
)

# to scale bar add amplitude label
ax.plot(
    [ts[window][0], ts[window][0]],
    [-offsets[-1] - 3000, -offsets[-1]],
    color="blue",
    lw=2,
)
ax.text(
    ts[window][0] - 0.02,
    -offsets[-1] - 1500,
    "3000 uV",
    color="blue",
    ha="right",
    va="center",
    fontsize=8,
    rotation=90,
)
plt.axvline(epoch_df.iloc[0].stopTime, color="r", lw=1, ls="--", label="epoch boundary")
# add axvline text label
ax.text(
    epoch_df.iloc[0].stopTime + 0.01,
    1200,
    f"epoch boundary at {epoch_df.iloc[0].stopTime:.2f} s",
    color="r",
    ha="left",
    va="bottom",
    fontsize=12,
)

# add text labels above signal to specify the first section is OpenEphys and the second section is Intan
ax.text(
    epoch_df.iloc[0].stopTime - 0.5,
    2000,
    "OpenEphys",
    color="k",
    ha="center",
    va="bottom",
    fontsize=12,
)
ax.text(
    epoch_df.iloc[0].stopTime + 0.5,
    2000,
    "Intan",
    color="k",
    ha="center",
    va="bottom",
    fontsize=12,
)
plt.show()

png

10) Quick benchmark: choose the smaller first-pass read

When both channels and epoch are specified, reading the smaller slice first usually minimizes I/O. This synthetic benchmark compares two indexing orders on a memmapped array.

import tempfile
import time
from pathlib import Path


def benchmark_index_order(
    n_channels=64,
    fs=1250,
    recording_hours=5,
    load_channels=2,
    time_interval=2,
    dtype=np.int16,
    repeats=5,
    warmup=1,
    seed=0,
    benchmark_samples=250_000,
):
    rng = np.random.default_rng(seed)
    full_recording_samples = int(fs * 60 * 60 * recording_hours)
    requested_time_samples = int(time_interval * fs)

    channels_first_samples = full_recording_samples * load_channels
    time_first_samples = requested_time_samples * n_channels
    print(f"channels first loading {channels_first_samples} samples")
    print(f"time interval first loading {time_first_samples} samples")

    # Keep notebook benchmark lightweight while preserving the same indexing pattern.
    n_samples = max(requested_time_samples * 4, int(benchmark_samples))
    sample_stop = min(requested_time_samples, n_samples)

    with tempfile.TemporaryDirectory() as td:
        path = Path(td) / "bench.dat"
        mmw = np.memmap(path, dtype=dtype, mode="w+", shape=(n_samples, n_channels))
        mmw[:] = rng.integers(
            low=np.iinfo(dtype).min,
            high=np.iinfo(dtype).max,
            size=(n_samples, n_channels),
            dtype=dtype,
        )
        mmw.flush()
        del mmw

        mm = np.memmap(path, dtype=dtype, mode="r", shape=(n_samples, n_channels))
        ch_idx = np.arange(load_channels)

        def _time_call(fn):
            for _ in range(warmup):
                _ = fn()
            t0 = time.perf_counter()
            for _ in range(repeats):
                _ = fn()
            return (time.perf_counter() - t0) / repeats

        t_channels_first = _time_call(
            lambda: np.array(mm[:, ch_idx][:sample_stop, :], copy=True)
        )
        t_time_first = _time_call(
            lambda: np.array(mm[:sample_stop, :][:, ch_idx], copy=True)
        )

        if hasattr(mm, "_mmap") and mm._mmap is not None:
            mm._mmap.close()
        del mm

    print(f"channels→time: {t_channels_first:.6f} s")
    print(f"time→channels: {t_time_first:.6f} s")
    faster = "time→channels" if t_time_first <= t_channels_first else "channels→time"
    print(f"faster in this run: {faster}")


benchmark_index_order()
channels first loading 45000000 samples
time interval first loading 160000 samples
channels→time: 0.003933 s
time→channels: 0.000009 s
faster in this run: time→channels