Skip to content

intervals

_find_intersecting_intervals(set1, set2)

Find the amount of time two sets of intervals are intersecting each other for each interval in set1.

Parameters:

Name Type Description Default
set1 ndarray

An array of intervals represented as pairs of start and end times.

required
set2 ndarray

An array of intervals represented as pairs of start and end times.

required

Returns:

Type Description
list of float

A list of floats, where each float represents the amount of time the corresponding interval in set1 intersects with any interval in set2.

Source code in neuro_py/process/intervals.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@jit(nopython=True)
def _find_intersecting_intervals(set1: np.ndarray, set2: np.ndarray) -> List[float]:
    """
    Find the amount of time two sets of intervals are intersecting each other for each interval in set1.

    Parameters
    ----------
    set1 : ndarray
        An array of intervals represented as pairs of start and end times.
    set2 : ndarray
        An array of intervals represented as pairs of start and end times.

    Returns
    -------
    list of float
        A list of floats, where each float represents the amount of time the
        corresponding interval in set1 intersects with any interval in set2.
    """
    intersecting_intervals = []
    for i, (start1, end1) in enumerate(set1):
        # Check if any of the intervals in set2 intersect with the current interval in set1
        for start2, end2 in set2:
            if start2 <= end1 and end2 >= start1:
                # Calculate the amount of intersection between the two intervals
                intersection = min(end1, end2) - max(start1, start2)
                intersecting_intervals.append(intersection)
                break
        else:
            intersecting_intervals.append(0)  # No intersection found

    return intersecting_intervals

find_intersecting_intervals(set1, set2, return_indices=True)

Find the amount of time two sets of intervals are intersecting each other for each intersection.

Parameters:

Name Type Description Default
set1 nelpy EpochArray

The first set of intervals to check for intersections.

required
set2 nelpy EpochArray

The second set of intervals to check for intersections.

required
return_indices bool

If True, return the indices of the intervals in set2 that intersect with each interval in set1. If False, return the amount of time each interval in set1 intersects with any interval in set2.

True

Returns:

Type Description
Union[ndarray, List[bool]]

If return_indices is True, returns a boolean array indicating whether each interval in set1 intersects with any interval in set2. If return_indices is False, returns a NumPy array with the amount of time each interval in set1 intersects with any interval in set2.

Examples:

>>> set1 = nel.EpochArray([(1, 3), (5, 7), (9, 10)])
>>> set2 = nel.EpochArray([(2, 4), (6, 8)])
>>> find_intersecting_intervals(set1, set2)
[True, True, False]
>>> find_intersecting_intervals(set1, set2, return_indices=False)
[1, 2, 0]
Source code in neuro_py/process/intervals.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def find_intersecting_intervals(
    set1: nel.EpochArray, set2: nel.EpochArray, return_indices: bool = True
) -> Union[np.ndarray, List[bool]]:
    """
    Find the amount of time two sets of intervals are intersecting each other for each intersection.

    Parameters
    ----------
    set1 : nelpy EpochArray
        The first set of intervals to check for intersections.
    set2 : nelpy EpochArray
        The second set of intervals to check for intersections.
    return_indices : bool, optional
        If True, return the indices of the intervals in set2 that intersect with each interval in set1.
        If False, return the amount of time each interval in set1 intersects with any interval in set2.

    Returns
    -------
    Union[np.ndarray, List[bool]]
        If return_indices is True, returns a boolean array indicating whether each interval in set1 intersects with any interval in set2.
        If return_indices is False, returns a NumPy array with the amount of time each interval in set1 intersects with any interval in set2.

    Examples
    --------
    >>> set1 = nel.EpochArray([(1, 3), (5, 7), (9, 10)])
    >>> set2 = nel.EpochArray([(2, 4), (6, 8)])
    >>> find_intersecting_intervals(set1, set2)
    [True, True, False]
    >>> find_intersecting_intervals(set1, set2, return_indices=False)
    [1, 2, 0]
    """
    if not isinstance(set1, core.IntervalArray) & isinstance(set2, core.IntervalArray):
        raise ValueError("only EpochArrays are supported")

    intersection = np.array(_find_intersecting_intervals(set1.data, set2.data))
    if return_indices:
        return intersection > 0
    return intersection

find_interval(logical)

Find consecutive intervals of True values in a list of boolean values.

Parameters:

Name Type Description Default
logical List[bool]

The list of boolean values.

required

Returns:

Type Description
List[Tuple[int, int]]

A list of tuples representing the start and end indices of each consecutive interval of True values in the logical list.

Examples:

>>> find_interval([0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1])
[(2, 4), (6, 7), (10, 11)]
>>> find_interval([1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1])
[(0, 2), (4, 5), (9, 10)]
Source code in neuro_py/process/intervals.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def find_interval(logical: List[bool]) -> List[Tuple[int, int]]:
    """
    Find consecutive intervals of True values in a list of boolean values.

    Parameters
    ----------
    logical : List[bool]
        The list of boolean values.

    Returns
    -------
    List[Tuple[int, int]]
        A list of tuples representing the start and end indices of each consecutive interval of True values in the logical list.

    Examples
    --------
    >>> find_interval([0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1])
    [(2, 4), (6, 7), (10, 11)]
    >>> find_interval([1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1])
    [(0, 2), (4, 5), (9, 10)]
    """
    intervals = []
    start = None
    for i, value in enumerate(logical):
        if value and start is None:
            start = i
        elif not value and start is not None:
            intervals.append((start, i - 1))
            start = None
    if start is not None:
        intervals.append((start, len(logical) - 1))
    return intervals

get_overlapping_intervals(start, stop, interval_width, slideby)

Generate overlapping intervals within a specified time range.

Parameters:

Name Type Description Default
start float

The start time of the time range.

required
stop float

The stop time of the time range.

required
interval_width float

The width of each interval in seconds.

required
slideby float

The amount to slide the interval by in seconds.

required

Returns:

Type Description
ndarray

A 2D array containing (start, stop) pairs for all overlapping intervals.

Examples:

>>> get_overlapping_intervals(0, 10, 2, 1)
array([[0, 2],
    [1, 3],
    [2, 4],
    [3, 5],
    [4, 6],
    [5, 7],
    [6, 8],
    [7, 9]])
Source code in neuro_py/process/intervals.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def get_overlapping_intervals(
    start: float, stop: float, interval_width: float, slideby: float
) -> np.ndarray:
    """
    Generate overlapping intervals within a specified time range.

    Parameters
    ----------
    start : float
        The start time of the time range.
    stop : float
        The stop time of the time range.
    interval_width : float
        The width of each interval in seconds.
    slideby : float
        The amount to slide the interval by in seconds.

    Returns
    -------
    np.ndarray
        A 2D array containing (start, stop) pairs for all overlapping intervals.

    Examples
    --------
    >>> get_overlapping_intervals(0, 10, 2, 1)
    array([[0, 2],
        [1, 3],
        [2, 4],
        [3, 5],
        [4, 6],
        [5, 7],
        [6, 8],
        [7, 9]])
    """
    starts = np.arange(start, stop - interval_width, slideby)
    stops = starts + interval_width
    return np.column_stack((starts, stops))

in_intervals(timestamps, intervals, return_interval=False, shift=False)

Find which timestamps fall within the given intervals.

Parameters:

Name Type Description Default
timestamps ndarray

An array of timestamp values. Assumes sorted.

required
intervals ndarray

An array of time intervals, represented as pairs of start and end times.

required
return_interval (bool, optional(default=False))

If True, return the index of the interval to which each timestamp belongs.

False
shift (bool, optional(default=False))

If True, return the shifted timestamps

False

Returns:

Name Type Description
in_interval ndarray

A logical index indicating which timestamps fall within the intervals.

interval (ndarray, optional)

A ndarray indicating for each timestamps which interval it was within.

shifted_timestamps (ndarray, optional)

The shifted timestamps

Examples:

>>> timestamps = np.array([1, 2, 3, 4, 5, 6, 7, 8])
>>> intervals = np.array([[2, 4], [5, 7]])
>>> in_intervals(timestamps, intervals)
array([False,  True,  True,  True,  True,  True,  True, False])
>>> in_intervals(timestamps, intervals, return_interval=True)
(array([False,  True,  True,  True,  True,  True,  True, False]),
array([nan,  0.,  0.,  0.,  1.,  1.,  1., nan]))
>>> in_intervals(timestamps, intervals, shift=True)
(array([False,  True,  True,  True,  True,  True,  True, False]),
array([0, 1, 2, 2, 3, 4]))
>>> in_intervals(timestamps, intervals, return_interval=True, shift=True)
(array([False,  True,  True,  True,  True,  True,  True, False]),
array([0, 0, 0, 1, 1, 1]),
array([0, 1, 2, 2, 3, 4]))
Source code in neuro_py/process/intervals.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def in_intervals(
    timestamps: np.ndarray,
    intervals: np.ndarray,
    return_interval: bool = False,
    shift: bool = False,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]]:
    """
    Find which timestamps fall within the given intervals.

    Parameters
    ----------
    timestamps : ndarray
        An array of timestamp values. Assumes sorted.
    intervals : ndarray
        An array of time intervals, represented as pairs of start and end times.
    return_interval : bool, optional (default=False)
        If True, return the index of the interval to which each timestamp belongs.
    shift : bool, optional (default=False)
        If True, return the shifted timestamps

    Returns
    -------
    in_interval : ndarray
        A logical index indicating which timestamps fall within the intervals.
    interval : ndarray, optional
        A ndarray indicating for each timestamps which interval it was within.
    shifted_timestamps : ndarray, optional
        The shifted timestamps

    Examples
    --------
    >>> timestamps = np.array([1, 2, 3, 4, 5, 6, 7, 8])
    >>> intervals = np.array([[2, 4], [5, 7]])
    >>> in_intervals(timestamps, intervals)
    array([False,  True,  True,  True,  True,  True,  True, False])

    >>> in_intervals(timestamps, intervals, return_interval=True)
    (array([False,  True,  True,  True,  True,  True,  True, False]),
    array([nan,  0.,  0.,  0.,  1.,  1.,  1., nan]))

    >>> in_intervals(timestamps, intervals, shift=True)
    (array([False,  True,  True,  True,  True,  True,  True, False]),
    array([0, 1, 2, 2, 3, 4]))

    >>> in_intervals(timestamps, intervals, return_interval=True, shift=True)
    (array([False,  True,  True,  True,  True,  True,  True, False]),
    array([0, 0, 0, 1, 1, 1]),
    array([0, 1, 2, 2, 3, 4]))
    """
    in_interval = np.zeros(timestamps.shape, dtype=np.bool_)
    interval = np.full(timestamps.shape, np.nan)

    for i, (start, end) in enumerate(intervals):
        # Find the leftmost index of a timestamp that is >= start
        left = np.searchsorted(timestamps, start, side="left")
        if left == len(timestamps):
            # If start is greater than all timestamps, skip this interval
            continue
        # Find the rightmost index of a timestamp that is <= end
        right = np.searchsorted(timestamps, end, side="right")
        if right == left:
            # If there are no timestamps in the interval, skip it
            continue
        # Mark the timestamps in the interval
        in_interval[left:right] = True
        interval[left:right] = i

    if shift:
        # Restrict to the timestamps that fall within the intervals
        interval = interval[in_interval].astype(int)

        # Calculate shifts based on intervals
        shifts = np.insert(np.cumsum(intervals[1:, 0] - intervals[:-1, 1]), 0, 0)[
            interval
        ]

        # Apply shifts to timestamps
        shifted_timestamps = timestamps[in_interval] - shifts - intervals[0, 0]

    if return_interval and shift:
        return in_interval, interval, shifted_timestamps

    if return_interval:
        return in_interval, interval

    if shift:
        return in_interval, shifted_timestamps

    return in_interval

in_intervals_interval(timestamps, intervals)

for each timestamps value, the index of the interval to which it belongs (nan = none)

Parameters:

Name Type Description Default
timestamps ndarray

An array of timestamp values. assumes sorted

required
intervals ndarray

An array of time intervals, represented as pairs of start and end times.

required

Returns:

Name Type Description
ndarray

A ndarray indicating for each timestamps which interval it was within.

Note produces same result as in_intervals with return_interval=True

Examples:

>>> timestamps = np.array([1, 2, 3, 4, 5, 6, 7, 8])
>>> intervals = np.array([[2, 4], [5, 7]])
>>> in_intervals_interval(timestamps, intervals)
array([nan,  0,  0,  0,  1,  1,  1, nan])
Source code in neuro_py/process/intervals.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@jit(nopython=True, parallel=True)
def in_intervals_interval(timestamps: np.ndarray, intervals: np.ndarray) -> np.ndarray:
    """
    for each timestamps value, the index of the interval to which it belongs (nan = none)

    Parameters
    ----------
    timestamps : ndarray
        An array of timestamp values. assumes sorted
    intervals : ndarray
        An array of time intervals, represented as pairs of start and end times.

    Returns
    -------
    ndarray
        A ndarray indicating for each timestamps which interval it was within.

    Note: produces same result as in_intervals with return_interval=True

    Examples
    --------
    >>> timestamps = np.array([1, 2, 3, 4, 5, 6, 7, 8])
    >>> intervals = np.array([[2, 4], [5, 7]])
    >>> in_intervals_interval(timestamps, intervals)
    array([nan,  0,  0,  0,  1,  1,  1, nan])
    """
    in_interval = np.full(timestamps.shape, np.nan)
    for i in numba.prange(intervals.shape[0]):
        start, end = intervals[i]
        mask = (timestamps >= start) & (timestamps <= end)
        in_interval[mask] = i

    return in_interval

overlap_intersect(epoch, interval, return_indices=True)

Returns the epochs with overlap with the given interval.

Parameters:

Name Type Description Default
epoch EpochArray

The epochs to check.

required
interval IntervalArray

The interval to check for overlap.

required
return_indices bool

If True, returns the indices of the overlapping epochs. Default is True.

True

Returns:

Type Description
EpochArray

The epochs with overlap with the interval.

(Tuple[EpochArray, ndarray], optional)

If return_indices is True, also returns the indices of the overlapping epochs.

Source code in neuro_py/process/intervals.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def overlap_intersect(
    epoch: nel.EpochArray, interval: nel.IntervalArray, return_indices: bool = True
) -> Union[nel.EpochArray, Tuple[nel.EpochArray, np.ndarray]]:
    """
    Returns the epochs with overlap with the given interval.

    Parameters
    ----------
    epoch : nelpy.EpochArray
        The epochs to check.
    interval : nelpy.IntervalArray
        The interval to check for overlap.
    return_indices : bool, optional
        If True, returns the indices of the overlapping epochs. Default is True.

    Returns
    -------
    nelpy.EpochArray
        The epochs with overlap with the interval.
    Tuple[nelpy.EpochArray, np.ndarray], optional
        If `return_indices` is True, also returns the indices of the overlapping epochs.
    """
    new_intervals = []
    indices = []
    for epa in epoch:
        if any((interval.starts < epa.stop) & (interval.stops > epa.start)):
            new_intervals.append([epa.start, epa.stop])
            cand_ep_idx = np.where(
                (interval.starts < epa.stop) & (interval.stops > epa.start)
            )
            indices.append(cand_ep_idx[0][0])
    out = type(epoch)(new_intervals)
    out._domain = epoch.domain
    if return_indices:
        return out, indices
    return out

randomize_epochs(epoch, randomize_each=True, start_stop=None)

Randomly shifts the epochs of a EpochArray object and wraps them around the original time boundaries.

This method takes a EpochArray object as input, and can either randomly shift each epoch by a different amount (if randomize_each is True) or shift all the epochs by the same amount (if randomize_each is False). In either case, the method wraps the shifted epochs around the original time boundaries to make sure they remain within the original time range. It then returns the modified EpochArray object.

Parameters:

Name Type Description Default
epoch EpochArray

The EpochArray object whose epochs should be shifted and wrapped.

required
randomize_each bool

If True, each epoch will be shifted by a different random amount. If False, all the epochs will be shifted by the same random amount. Defaults to True.

True
start_stop array

If not None, time support will be taken from start_stop

None

Returns:

Name Type Description
new_epochs EpochArray

The modified EpochArray object with the shifted and wrapped epochs.

Source code in neuro_py/process/intervals.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def randomize_epochs(
    epoch: EpochArray,
    randomize_each: bool = True,
    start_stop: Optional[np.ndarray] = None,
) -> EpochArray:
    """
    Randomly shifts the epochs of a EpochArray object and wraps them around the original time boundaries.

    This method takes a EpochArray object as input, and can either randomly shift each epoch by a different amount
    (if `randomize_each` is True) or shift all the epochs by the same amount (if `randomize_each` is False).
    In either case, the method wraps the shifted epochs around the original time boundaries to make sure they remain
    within the original time range. It then returns the modified EpochArray object.

    Parameters
    ----------
    epoch : EpochArray
        The EpochArray object whose epochs should be shifted and wrapped.
    randomize_each : bool, optional
        If True, each epoch will be shifted by a different random amount.
        If False, all the epochs will be shifted by the same random amount. Defaults to True.
    start_stop : array, optional
        If not None, time support will be taken from start_stop

    Returns
    -------
    new_epochs : EpochArray
        The modified EpochArray object with the shifted and wrapped epochs.
    """

    def wrap_intervals(intervals, start, stop):
        idx = np.any(intervals > stop, axis=1)
        intervals[idx] = intervals[idx] - stop + start

        idx = np.any(intervals < start, axis=1)
        intervals[idx] = intervals[idx] - start + stop
        return intervals

    new_epochs = epoch.copy()

    if start_stop is None:
        start = new_epochs.start
        stop = new_epochs.stop
    else:
        start, stop = start_stop

    ts_range = stop - start

    if randomize_each:
        # Randomly shift each epoch by a different amount
        random_order = random.sample(
            range(-int(ts_range), int(ts_range)), new_epochs.n_intervals
        )

        new_intervals = new_epochs.data + np.expand_dims(random_order, axis=1)
        new_epochs._data = wrap_intervals(new_intervals, start, stop)
    else:
        # Shift all the epochs by the same amount
        random_shift = random.randint(-int(ts_range), int(ts_range))
        new_epochs._data = wrap_intervals((new_epochs.data + random_shift), start, stop)

    if not new_epochs.isempty:
        if np.any(new_epochs.data[:, 1] - new_epochs.data[:, 0] < 0):
            raise ValueError("start must be less than or equal to stop")

    new_epochs._sort()

    return new_epochs

shift_epoch_array(epoch, epoch_shift)

Shift an EpochArray by another EpochArray.

Shifting means that intervals in 'epoch' will be relative to intervals in 'epoch_shift' as if 'epoch_shift' intervals were without gaps.

Parameters:

Name Type Description Default
epoch EpochArray

The intervals to shift.

required
epoch_shift EpochArray

The intervals to shift by.

required

Returns:

Type Description
EpochArray

The shifted EpochArray.

Notes

This function restricts 'epoch' to those within 'epoch_shift' as epochs between 'epoch_shift' intervals would result in a duration of 0.

Visual representation: inputs: epoch = [ ] [ ][ ] [] epoch_shift = [ ][ ] [ ] becomes: epoch = [ ] [ ] [] epoch_shift = [ ][ ][ ]

Source code in neuro_py/process/intervals.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def shift_epoch_array(
    epoch: nel.EpochArray, epoch_shift: nel.EpochArray
) -> nel.EpochArray:
    """
    Shift an EpochArray by another EpochArray.

    Shifting means that intervals in 'epoch' will be relative to
    intervals in 'epoch_shift' as if 'epoch_shift' intervals were without gaps.

    Parameters
    ----------
    epoch : nel.EpochArray
        The intervals to shift.
    epoch_shift : nel.EpochArray
        The intervals to shift by.

    Returns
    -------
    nel.EpochArray
        The shifted EpochArray.

    Notes
    -----
    This function restricts 'epoch' to those within 'epoch_shift' as
    epochs between 'epoch_shift' intervals would result in a duration of 0.

    Visual representation:
    inputs:
        epoch       =   [  ]   [  ] [  ]  []
        epoch_shift =   [    ] [    ]   [    ]
    becomes:
        epoch       =   [  ]  [  ]    []
        epoch_shift =   [    ][    ][    ]
    """
    # input validation
    if not isinstance(epoch, nel.EpochArray):
        raise TypeError("epoch must be a nelpy EpochArray")
    if not isinstance(epoch_shift, nel.EpochArray):
        raise TypeError("epoch_shift must be a nelpy EpochArray")

    # restrict epoch to epoch_shift and extract starts and stops
    epoch_starts, epoch_stops = epoch[epoch_shift].data.T

    # shift starts and stops by epoch_shift
    _, epoch_starts_shifted = in_intervals(epoch_starts, epoch_shift.data, shift=True)
    _, epoch_stops_shifted = in_intervals(epoch_stops, epoch_shift.data, shift=True)

    # shift time support as well, if one exists
    support_starts_shifted, support_stops_shifted = -np.inf, np.inf
    if epoch.domain.start != -np.inf:
        _, support_starts_shifted = in_intervals(
            epoch.domain.start, epoch_shift.data, shift=True
        )
    if epoch.domain.stop != np.inf:
        _, support_stops_shifted = in_intervals(
            epoch.domain.stop, epoch_shift.data, shift=True
        )

    session_domain = nel.EpochArray([support_starts_shifted, support_stops_shifted])

    # package shifted intervals into epoch array with shifted time support
    return nel.EpochArray(
        np.array([epoch_starts_shifted, epoch_stops_shifted]).T, domain=session_domain
    )

split_epoch_by_width(intervals, bin_width=0.001)

Generate combined intervals (start, stop) at a specified width within given intervals.

Parameters:

Name Type Description Default
intervals List[Tuple[float, float]]

A list of (start, end) tuples representing intervals.

required
bin_width float

The width of each bin in seconds. Default is 0.001 (1 ms).

0.001

Returns:

Type Description
ndarray

A 2D array containing (start, stop) pairs for all bins across intervals.

Source code in neuro_py/process/intervals.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def split_epoch_by_width(
    intervals: List[Tuple[float, float]], bin_width: float = 0.001
) -> np.ndarray:
    """
    Generate combined intervals (start, stop) at a specified width within given intervals.

    Parameters
    ----------
    intervals : List[Tuple[float, float]]
        A list of (start, end) tuples representing intervals.
    bin_width : float
        The width of each bin in seconds. Default is 0.001 (1 ms).

    Returns
    -------
    np.ndarray
        A 2D array containing (start, stop) pairs for all bins across intervals.
    """
    bin_intervals = []
    for start, end in intervals:
        # Generate bin edges
        edges = np.arange(start, end, bin_width)
        edges = np.append(edges, end)  # Ensure the final end is included
        # Generate intervals (start, stop) for each bin
        intervals = np.stack((edges[:-1], edges[1:]), axis=1)
        bin_intervals.append(intervals)
    return np.vstack(bin_intervals)

split_epoch_equal_parts(intervals, n_parts, return_epoch_array=True)

Split multiple intervals into equal parts.

Parameters:

Name Type Description Default
intervals (array - like, shape(n_intervals, 2))

The intervals to split.

required
n_parts int

The number of parts to split each interval into.

required
return_epoch_array bool

If True, returns the intervals as a nelpy.EpochArray object. Defaults to True.

True

Returns:

Name Type Description
split_intervals (array - like, shape(n_intervals * n_parts, 2) or EpochArray)

The split intervals.

Source code in neuro_py/process/intervals.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def split_epoch_equal_parts(
    intervals: np.ndarray, n_parts: int, return_epoch_array: bool = True
) -> Union[np.ndarray, nel.EpochArray]:
    """
    Split multiple intervals into equal parts.

    Parameters
    ----------
    intervals : array-like, shape (n_intervals, 2)
        The intervals to split.
    n_parts : int
        The number of parts to split each interval into.
    return_epoch_array : bool, optional
        If True, returns the intervals as a nelpy.EpochArray object. Defaults to True.

    Returns
    -------
    split_intervals : array-like, shape (n_intervals * n_parts, 2) or nelpy.EpochArray
        The split intervals.
    """
    # Ensure intervals is a numpy array
    intervals = np.asarray(intervals)

    # Number of intervals
    n_intervals = intervals.shape[0]

    # Preallocate the output array
    split_intervals = np.zeros((n_intervals * n_parts, 2))

    for i, interval in enumerate(intervals):
        start, end = interval
        epoch_parts = np.linspace(start, end, n_parts + 1)
        epoch_parts = np.vstack((epoch_parts[:-1], epoch_parts[1:])).T
        split_intervals[i * n_parts : (i + 1) * n_parts] = epoch_parts

    if return_epoch_array:
        return nel.EpochArray(split_intervals)
    return split_intervals

truncate_epoch(epoch, time=3600)

Truncates an EpochArray to achieve a specified cumulative time duration.

This function takes an input EpochArray 'epoch' and a 'time' value representing the desired cumulative time duration in seconds. It returns a new EpochArray containing intervals that cumulatively match the specified time.

Parameters:

Name Type Description Default
epoch EpochArray

The input EpochArray containing intervals to be truncated.

required
time Union[int, float]

The desired cumulative time in seconds (default is 3600).

3600

Returns:

Type Description
EpochArray

A new EpochArray containing intervals that cumulatively match the specified time.

Algorithm
  1. Calculate the cumulative lengths of intervals in the 'epoch'.
  2. If the cumulative time of the 'epoch' is already less than or equal to 'time', return the original 'epoch'.
  3. Find the last interval that fits within the specified 'time' and create a new EpochArray 'truncated_intervals' with intervals up to that point.
  4. To achieve the desired cumulative time, calculate the remaining time needed to reach 'time'.
  5. Add portions of the next interval to 'truncated_intervals' until the desired 'time' is reached or all intervals are used.

Examples:

>>> epoch_data = [(0, 2), (3, 6), (8, 10)]
>>> epoch = nel.EpochArray(epoch_data)
>>> truncated_epoch = truncate_epoch(epoch, time=7)
Source code in neuro_py/process/intervals.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
def truncate_epoch(
    epoch: nel.EpochArray, time: Union[int, float] = 3600
) -> nel.EpochArray:
    """
    Truncates an EpochArray to achieve a specified cumulative time duration.

    This function takes an input EpochArray 'epoch' and a 'time' value representing
    the desired cumulative time duration in seconds. It returns a new EpochArray
    containing intervals that cumulatively match the specified time.

    Parameters
    ----------
    epoch : nel.EpochArray
        The input EpochArray containing intervals to be truncated.
    time : Union[int, float], optional
        The desired cumulative time in seconds (default is 3600).

    Returns
    -------
    nel.EpochArray
        A new EpochArray containing intervals that cumulatively match
        the specified time.

    Algorithm
    ---------
    1. Calculate the cumulative lengths of intervals in the 'epoch'.
    2. If the cumulative time of the 'epoch' is already less than or equal to 'time',
        return the original 'epoch'.
    3. Find the last interval that fits within the specified 'time' and create a new EpochArray
        'truncated_intervals' with intervals up to that point.
    4. To achieve the desired cumulative time, calculate the remaining time needed to reach 'time'.
    5. Add portions of the next interval to 'truncated_intervals' until the desired 'time' is reached
        or all intervals are used.

    Examples
    --------
    >>> epoch_data = [(0, 2), (3, 6), (8, 10)]
    >>> epoch = nel.EpochArray(epoch_data)
    >>> truncated_epoch = truncate_epoch(epoch, time=7)
    """

    if epoch.isempty:
        return epoch

    # calcuate cumulative lengths
    cumulative_lengths = epoch.lengths.cumsum()

    # No truncation needed
    if cumulative_lengths[-1] <= time:
        return epoch

    # Find the last interval that fits within the time and make new epoch
    idx = cumulative_lengths <= time
    truncated_intervals = nel.EpochArray(epoch.data[idx])

    # It's unlikely that the last interval will fit perfectly, so add the remainder from the next interval
    #   until the epoch is the desired length
    interval_i = 0
    while (time - truncated_intervals.duration) > 1e-10 or interval_i > len(epoch):
        # Add the last interval
        next_interval = int(np.where(cumulative_lengths >= time)[0][interval_i])

        remainder = (
            nel.EpochArray(
                [
                    epoch[next_interval].start,
                    epoch[next_interval].start + (time - truncated_intervals.duration),
                ]
            )
            & epoch[next_interval]
        )
        truncated_intervals = truncated_intervals | remainder
        interval_i += 1

    return truncated_intervals