LFPLoader Tutorial¶
This tutorial demonstrates how to use neuro_py.io.loading.LFPLoader to load LFP or DAT signals as a nelpy.AnalogSignalArray, including single-channel, multi-channel, and epoch-restricted usage.
%reload_ext autoreload
%autoreload 2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import neuro_py as npy
npy.plotting.set_plotting_defaults()
1) Set session path¶
Set basepath to a recording folder that contains your session files (.xml, .lfp or .dat, and optional event files).
basepath = r"U:\data\hpc_ctx_project\HP18\hp18_day85_20250713"
2) Inspect XML metadata¶
loadXML returns channel count and both LFP and DAT sampling rates.
nChannels, fs_lfp, fs_dat, shank_to_channel = npy.io.loadXML(basepath)
print(f"nChannels: {nChannels}")
print(f"fs_lfp: {fs_lfp}")
print(f"fs_dat: {fs_dat}")
print(f"n_shanks: {len(shank_to_channel)}")
nChannels: 256
fs_lfp: 1250
fs_dat: 20000
n_shanks: 12
3) Load LFP/DAT with LFPLoader¶
LFPLoader returns an nelpy.AnalogSignalArray object.
When not specifying a channel, LFPLoader will load all channels to a memory-mapped array that is accessed on demand, so it can be used even if the LFP/DAT files are too large to fit in memory.
# Load full LFP file
lfp_obj = npy.io.LFPLoader(basepath, ext="lfp")
print(type(lfp_obj))
print(lfp_obj)
print("data shape:", lfp_obj.data.shape)
print("data dtype:", type(lfp_obj.data))
<class 'neuro_py.io.loading.LFPLoader'>
<LFPLoader at 0x2a7a1290fd0: 256 signals> for a total of 9:10:642 minutes
data shape: (256, 688304)
data dtype: <class 'numpy.memmap'>
# Load DAT as AnalogSignalArray using fs_dat from XML
dat_obj = npy.io.LFPLoader(basepath, ext="dat")
print(type(dat_obj))
print(dat_obj)
print("data shape:", dat_obj.data.shape)
print("data dtype:", type(dat_obj.data))
<class 'neuro_py.io.loading.LFPLoader'>
<LFPLoader at 0x2a786321f90: 256 signals> for a total of 9:10:643 minutes
data shape: (256, 11012864)
data dtype: <class 'numpy.memmap'>
4) Single and multi-channel loading¶
You can pass one channel (int) or many channels (list).
# Example channel IDs
brain_regions = npy.io.load_brain_regions(basepath, out_format="DataFrame")
ripple_channel = brain_regions.query("region=='CA1so'").channels.iloc[0]
sharp_wave_channel = brain_regions.query("region=='CA1sr'").channels.iloc[0]
single_ch = npy.io.LFPLoader(basepath, channels=ripple_channel, ext="lfp")
multi_ch = npy.io.LFPLoader(
basepath, channels=[ripple_channel, sharp_wave_channel], ext="lfp"
)
print("single channel shape:", single_ch.data.shape)
print("multi channel shape: ", multi_ch.data.shape)
single channel shape: (1, 688304)
multi channel shape: (2, 688304)
5) Epoch-restricted loading¶
Restrict loaded data to a time interval using epoch.
epoch_df = npy.io.load_epoch(basepath)
epoch_df.head()
| name | startTime | stopTime | environment | manipulation | behavioralParadigm | stimuli | notes | basepath | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | hp18_probe_250713_091741 | 0 | 550.6432 | cheeseboard | NaN | NaN | NaN | NaN | U:\data\hpc_ctx_project\HP18\hp18_day85_20250713 |
# Restrict to first 5 seconds of the first epoch
t0 = float(epoch_df.startTime.iloc[0])
restricted_epoch = np.array([t0, t0 + 5.0])
lfp_restricted = npy.io.LFPLoader(
basepath,
channels=[ripple_channel, sharp_wave_channel],
ext="lfp",
epoch=restricted_epoch,
)
print(lfp_restricted.data.shape)
print(lfp_restricted.abscissa_vals.min(), lfp_restricted.abscissa_vals.max())
(2, 6251)
0.0 5.0
6) Basic plotting¶
Plot two channels in a short time window.
sig = npy.io.LFPLoader(
basepath, channels=[ripple_channel, sharp_wave_channel], ext="dat"
)
ts = sig.abscissa_vals
window = (ts >= ts[0] + 2.0) & (ts <= ts[0] + 4.0)
fig, ax = plt.subplots(
2,
1,
figsize=npy.plotting.set_size("thesis", fraction=1.0, subplots=(1, 1.0)),
sharex=True,
)
ax[0].plot(ts[window], sig.data[0, window], color="k", lw=0.7)
ax[1].plot(ts[window], sig.data[1, window], color="teal", lw=0.7)
ax[0].set_ylabel(f"ch {ripple_channel}")
ax[1].set_ylabel(f"ch {sharp_wave_channel}")
ax[1].set_xlabel("time (s)")
sns.despine()
plt.tight_layout()
plt.show()

7) More advanced plotting¶
Plotting raw data from every channel and every shank around a ripple event.¶
need to adjust if only single shank
ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="dat")
ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.150) & (
ts <= ripples.peaks[ripple_n] + 0.150
)
fig, ax = plt.subplots(
1,
len(sig.shank_to_channel.keys()),
figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
sharex=True,
sharey=True,
dpi=150,
)
for i, (shank, channel) in enumerate(sig.shank_to_channel.items()):
offsets = np.arange(len(channel)) * 3000
ax[i].plot(
ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7
)
ax[i].set_title(f"shank {shank}")
ax[i].set_xlabel("time (s)")
ax[i].axis("off")
# add scale bar
ax[i].plot(
[ts[window][0], ts[window][0] + 0.1],
[-offsets[-1] - 3000, -offsets[-1] - 3000],
color="red",
lw=2,
label="100 ms",
)
# add text label for scale bar
ax[i].text(
ts[window][0] + 0.05,
-offsets[-1] - 3500,
"100 ms",
color="red",
ha="center",
va="top",
fontsize=8,
)
# to scale bar add amplitude label
ax[i].plot(
[ts[window][0], ts[window][0]],
[-offsets[-1] - 3000, -offsets[-1]],
color="blue",
lw=2,
label="3000 uV",
)
ax[i].text(
ts[window][0] - 0.02,
-offsets[-1] - 1500,
"3000 uV",
color="blue",
ha="right",
va="center",
fontsize=8,
rotation=90,
)
plt.show()

Plotting raw data from every channel from a particular shank around a ripple event.¶
ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="dat")
ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.5) & (ts <= ripples.peaks[ripple_n] + 0.5)
fig, ax = plt.subplots(
1,
1,
figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
sharex=True,
sharey=True,
dpi=150,
)
shank = 10 # example shank number
channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
[ts[window][0], ts[window][0] + 0.1],
[-offsets[-1] - 3000, -offsets[-1] - 3000],
color="red",
lw=2,
label="100 ms",
)
# add text label for scale bar
ax.text(
ts[window][0] + 0.05,
-offsets[-1] - 3500,
"100 ms",
color="red",
ha="center",
va="top",
fontsize=8,
)
# to scale bar add amplitude label
ax.plot(
[ts[window][0], ts[window][0]],
[-offsets[-1] - 3000, -offsets[-1]],
color="blue",
lw=2,
label="3000 uV",
)
ax.text(
ts[window][0] - 0.02,
-offsets[-1] - 1500,
"3000 uV",
color="blue",
ha="right",
va="center",
fontsize=8,
rotation=90,
)
plt.show()

same plot above but with LFP sampled at 1250 Hz instead of 20000 Hz¶
ripples = npy.io.load_ripples_events(basepath)
sig = npy.io.LFPLoader(basepath, ext="lfp")
ripple_n = 28
ts = sig.abscissa_vals
window = (ts >= ripples.peaks[ripple_n] - 0.5) & (ts <= ripples.peaks[ripple_n] + 0.5)
fig, ax = plt.subplots(
1,
1,
figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
sharex=True,
sharey=True,
dpi=150,
)
shank = 10 # example shank number
channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
[ts[window][0], ts[window][0] + 0.1],
[-offsets[-1] - 3000, -offsets[-1] - 3000],
color="red",
lw=2,
label="100 ms",
)
# add text label for scale bar
ax.text(
ts[window][0] + 0.05,
-offsets[-1] - 3500,
"100 ms",
color="red",
ha="center",
va="top",
fontsize=8,
)
# to scale bar add amplitude label
ax.plot(
[ts[window][0], ts[window][0]],
[-offsets[-1] - 3000, -offsets[-1]],
color="blue",
lw=2,
label="3000 uV",
)
ax.text(
ts[window][0] - 0.02,
-offsets[-1] - 1500,
"3000 uV",
color="blue",
ha="right",
va="center",
fontsize=8,
rotation=90,
)
plt.show()

8) Frequency-domain helpers¶
LFPLoader provides convenience methods for phase and instantaneous frequency estimates.
theta_sig = npy.io.LFPLoader(basepath, channels=ripple_channel, ext="lfp")
phase = theta_sig.get_phase(band2filter=[6, 12], ford=3)
filt_sig, phase2, amp, amp_filt, inst_freq = theta_sig.get_freq_phase_amp(
band2filter=[6, 12],
ford=3,
kernel_size=13,
)
print("phase shape: ", phase.shape)
print("filtered shape: ", filt_sig.shape)
print("inst freq shape: ", inst_freq.shape)
phase shape: (1, 688304)
filtered shape: (1, 688304)
inst freq shape: (1, 688304)
from scipy.signal import find_peaks
window = (theta_sig.abscissa_vals >= 5) & (theta_sig.abscissa_vals <= 10)
fig, ax = plt.subplots(
5,
1,
figsize=npy.plotting.set_size("paper", fraction=1.3, subplots=(1, 1)),
sharex=True,
dpi=150,
)
ax[0].plot(
theta_sig.abscissa_vals[window], theta_sig.data[0, window], color="k", lw=0.7
)
ax[0].set_ylabel(f"ch {ripple_channel}")
ax[1].plot(theta_sig.abscissa_vals[window], filt_sig[0, window], color="k", lw=0.7)
ax[1].set_ylabel("filtered (6-12 Hz)")
ax[2].plot(theta_sig.abscissa_vals[window], phase[0, window], color="k", lw=0.7)
ax[3].plot(theta_sig.abscissa_vals[window], amp[0, window], color="k", lw=0.7)
ax[3].set_ylabel("amplitude")
ax[4].plot(theta_sig.abscissa_vals[window], inst_freq[0, window], color="k", lw=0.7)
ax[4].axhspan(6, 12, color="blue", alpha=0.1, label="theta band")
ax[4].legend()
ax[2].set_ylabel("phase (rad)")
ax[4].set_ylabel("inst freq (Hz)")
ax[4].set_xlabel("time (s)")
ax[2].set_ylim(-np.pi, np.pi)
peaks, _ = find_peaks(phase[0, window], height=0)
for ax_ in ax:
for peak in peaks:
ax_.axvline(theta_sig.abscissa_vals[window][peak], color="k", lw=0.25, ls="--")
sns.despine()

9) Loading from unconcatenated files¶
To save space, the concatenated .dat file is often deleted after spike sorting. With 'LFPLoader', you can load the individual amplifer.dat files as if they were concatenated, without needing to actually concatenate them. Just pass the folder path and the channel number(s) to load.
demonstrate loading from unconcatenated files¶
import os
basepath = r"R:\data\latentseq\RM10\rm10_day4_20250808"
dat_file = os.path.join(basepath, f"{os.path.basename(basepath)}.dat")
print(f"{dat_file} exists? {os.path.exists(dat_file)}")
# find files names amplifier.dat in subfolders in basepath
for root, dirs, files in os.walk(basepath):
for file in files:
if file == "amplifier.dat":
print(os.path.join(root, file))
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_day4_20250808.dat exists? False
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_cheeseboard1_250808_120146\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_cheeseboard2_250808_150603\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_postsleep1_250808_125600\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_postsleep2_250808_160051\amplifier.dat
R:\data\latentseq\RM10\rm10_day4_20250808\rm10_presleep_250808_102239\amplifier.dat
Load from unconcatenated files and plot a short segment¶
sig = npy.io.LFPLoader(basepath, ext="dat")
print(sig)
print("data shape:", sig.data.shape)
print("data dtype:", type(sig.data))
<LFPLoader at 0x2a97675e890: 384 signals> for a total of 7:21:12:223 hours
data shape: (384, 529444480)
data dtype: <class 'neuro_py.io.loading.VirtualConcatenatedDatView'>
epoch_df = npy.io.load_epoch(basepath)
display(epoch_df)
| name | startTime | stopTime | environment | manipulation | behavioralParadigm | stimuli | notes | basepath | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | rm10_presleep_250808_102239 | 0.0000 | 5588.59515 | sleep | NaN | NaN | NaN | NaN | R:\data\latentseq\RM10\rm10_day4_20250808 |
| 1 | rm10_cheeseboard1_250808_120146 | 5588.5952 | 8674.43835 | cheeseboard | NaN | NaN | NaN | NaN | R:\data\latentseq\RM10\rm10_day4_20250808 |
| 2 | rm10_postsleep1_250808_125600 | 8674.4384 | 16276.18555 | sleep | NaN | NaN | NaN | NaN | R:\data\latentseq\RM10\rm10_day4_20250808 |
| 3 | rm10_cheeseboard2_250808_150603 | 16276.1856 | 19401.32475 | cheeseboard | NaN | NaN | NaN | NaN | R:\data\latentseq\RM10\rm10_day4_20250808 |
| 4 | rm10_postsleep2_250808_160051 | 19401.3248 | 26472.22395 | sleep | NaN | NaN | NaN | NaN | R:\data\latentseq\RM10\rm10_day4_20250808 |
Plotting raw data across an epoch boundary to demonstrate that LFPLoader seamlessly handles loading from multiple files when the concatenated file is not present.¶
ts = sig.abscissa_vals
window = [epoch_df.iloc[0].stopTime - 1, epoch_df.iloc[1].startTime + 1]
window = (ts >= window[0]) & (ts <= window[1])
fig, ax = plt.subplots(
1,
1,
figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
sharex=True,
sharey=True,
dpi=150,
)
shank = 10 # example shank number
channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
[ts[window][0], ts[window][0] + 0.1],
[-offsets[-1] - 3000, -offsets[-1] - 3000],
color="red",
lw=2,
)
# add text label for scale bar
ax.text(
ts[window][0] + 0.05,
-offsets[-1] - 3500,
"100 ms",
color="red",
ha="center",
va="top",
fontsize=8,
)
# to scale bar add amplitude label
ax.plot(
[ts[window][0], ts[window][0]],
[-offsets[-1] - 3000, -offsets[-1]],
color="blue",
lw=2,
)
ax.text(
ts[window][0] - 0.02,
-offsets[-1] - 1500,
"3000 uV",
color="blue",
ha="right",
va="center",
fontsize=8,
rotation=90,
)
plt.axvline(epoch_df.iloc[0].stopTime, color="r", lw=1, ls="--", label="epoch boundary")
# add axvline text label
ax.text(
epoch_df.iloc[0].stopTime + 0.01,
2000,
f"epoch boundary at {epoch_df.iloc[0].stopTime:.2f} s",
color="r",
ha="left",
va="bottom",
fontsize=12,
)
plt.show()

Virtual concatenation of multiple data formats [Intan and OpenEphys]¶
basepath = r"S:\data\HPC_LS\HS3\day7"
sig = npy.io.LFPLoader(basepath, ext="dat")
print(sig)
print("data shape:", sig.data.shape)
print("data dtype:", type(sig.data))
<LFPLoader at 0x2aace34fdd0: 192 signals> for a total of 6:28:44:038 hours
data shape: (192, 466480763)
data dtype: <class 'numpy.memmap'>
Here, epoch 0 is OpenEphys and epochs 2,3 are Intan
epoch_df = npy.io.load_epoch(basepath)
display(epoch_df)
| name | startTime | stopTime | environment | manipulation | behavioralParadigm | stimuli | notes | basepath | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | 2025-07-28_09-17-02 | 0.00000 | 10130.04775 | NaN | NaN | NaN | NaN | NaN | S:\data\HPC_LS\HS3\day7 |
| 1 | circle_1_CCW_250728_121210 | 10130.04775 | 12007.34695 | NaN | NaN | NaN | NaN | NaN | S:\data\HPC_LS\HS3\day7 |
| 2 | postsleep_250728_124714 | 12007.34695 | 23324.03815 | NaN | NaN | NaN | NaN | NaN | S:\data\HPC_LS\HS3\day7 |
import glob as glob
print(glob.glob(os.path.join(basepath, "**", "continuous.dat"), recursive=True))
print(glob.glob(os.path.join(basepath, "**", "amplifier.dat"), recursive=True))
['S:\\data\\HPC_LS\\HS3\\day7\\2025-07-28_09-17-02\\Record Node 101\\experiment1\\recording1\\continuous\\Acquisition_Board-100.Rhythm Data\\continuous.dat']
['S:\\data\\HPC_LS\\HS3\\day7\\circle_1_CCW_250728_121210\\amplifier.dat', 'S:\\data\\HPC_LS\\HS3\\day7\\postsleep_250728_124714\\amplifier.dat']
ts = sig.abscissa_vals
window = [epoch_df.iloc[0].stopTime - 1, epoch_df.iloc[1].startTime + 1]
window = (ts >= window[0]) & (ts <= window[1])
fig, ax = plt.subplots(
1,
1,
figsize=npy.plotting.set_size("paper", fraction=2, subplots=(1, 1)),
sharex=True,
sharey=True,
dpi=150,
)
shank = 1 # example shank number
channel = sig.shank_to_channel.get(shank)
offsets = np.arange(len(channel)) * 3000
ax.plot(ts[window], sig.data[:, window][channel, :].T - offsets, color="k", lw=0.7)
ax.set_title(f"shank {shank}")
ax.set_xlabel("time (s)")
ax.axis("off")
# add scale bar
ax.plot(
[ts[window][0], ts[window][0] + 0.1],
[-offsets[-1] - 3000, -offsets[-1] - 3000],
color="red",
lw=2,
)
# add text label for scale bar
ax.text(
ts[window][0] + 0.05,
-offsets[-1] - 3500,
"100 ms",
color="red",
ha="center",
va="top",
fontsize=8,
)
# to scale bar add amplitude label
ax.plot(
[ts[window][0], ts[window][0]],
[-offsets[-1] - 3000, -offsets[-1]],
color="blue",
lw=2,
)
ax.text(
ts[window][0] - 0.02,
-offsets[-1] - 1500,
"3000 uV",
color="blue",
ha="right",
va="center",
fontsize=8,
rotation=90,
)
plt.axvline(epoch_df.iloc[0].stopTime, color="r", lw=1, ls="--", label="epoch boundary")
# add axvline text label
ax.text(
epoch_df.iloc[0].stopTime + 0.01,
1200,
f"epoch boundary at {epoch_df.iloc[0].stopTime:.2f} s",
color="r",
ha="left",
va="bottom",
fontsize=12,
)
# add text labels above signal to specify the first section is OpenEphys and the second section is Intan
ax.text(
epoch_df.iloc[0].stopTime - 0.5,
2000,
"OpenEphys",
color="k",
ha="center",
va="bottom",
fontsize=12,
)
ax.text(
epoch_df.iloc[0].stopTime + 0.5,
2000,
"Intan",
color="k",
ha="center",
va="bottom",
fontsize=12,
)
plt.show()

10) Quick benchmark: choose the smaller first-pass read¶
When both channels and epoch are specified, reading the smaller slice first usually minimizes I/O.
This synthetic benchmark compares two indexing orders on a memmapped array.
import tempfile
import time
from pathlib import Path
def benchmark_index_order(
n_channels=64,
fs=1250,
recording_hours=5,
load_channels=2,
time_interval=2,
dtype=np.int16,
repeats=5,
warmup=1,
seed=0,
benchmark_samples=250_000,
):
rng = np.random.default_rng(seed)
full_recording_samples = int(fs * 60 * 60 * recording_hours)
requested_time_samples = int(time_interval * fs)
channels_first_samples = full_recording_samples * load_channels
time_first_samples = requested_time_samples * n_channels
print(f"channels first loading {channels_first_samples} samples")
print(f"time interval first loading {time_first_samples} samples")
# Keep notebook benchmark lightweight while preserving the same indexing pattern.
n_samples = max(requested_time_samples * 4, int(benchmark_samples))
sample_stop = min(requested_time_samples, n_samples)
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "bench.dat"
mmw = np.memmap(path, dtype=dtype, mode="w+", shape=(n_samples, n_channels))
mmw[:] = rng.integers(
low=np.iinfo(dtype).min,
high=np.iinfo(dtype).max,
size=(n_samples, n_channels),
dtype=dtype,
)
mmw.flush()
del mmw
mm = np.memmap(path, dtype=dtype, mode="r", shape=(n_samples, n_channels))
ch_idx = np.arange(load_channels)
def _time_call(fn):
for _ in range(warmup):
_ = fn()
t0 = time.perf_counter()
for _ in range(repeats):
_ = fn()
return (time.perf_counter() - t0) / repeats
t_channels_first = _time_call(
lambda: np.array(mm[:, ch_idx][:sample_stop, :], copy=True)
)
t_time_first = _time_call(
lambda: np.array(mm[:sample_stop, :][:, ch_idx], copy=True)
)
if hasattr(mm, "_mmap") and mm._mmap is not None:
mm._mmap.close()
del mm
print(f"channels→time: {t_channels_first:.6f} s")
print(f"time→channels: {t_time_first:.6f} s")
faster = "time→channels" if t_time_first <= t_channels_first else "channels→time"
print(f"faster in this run: {faster}")
benchmark_index_order()
channels first loading 45000000 samples
time interval first loading 160000 samples
channels→time: 0.003933 s
time→channels: 0.000009 s
faster in this run: time→channels