1415 lines
50 KiB
Python
1415 lines
50 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Temporal segmentation
|
||
=====================
|
||
|
||
Recurrence and self-similarity
|
||
------------------------------
|
||
.. autosummary::
|
||
:toctree: generated/
|
||
|
||
cross_similarity
|
||
recurrence_matrix
|
||
recurrence_to_lag
|
||
lag_to_recurrence
|
||
timelag_filter
|
||
path_enhance
|
||
|
||
Temporal clustering
|
||
-------------------
|
||
.. autosummary::
|
||
:toctree: generated/
|
||
|
||
agglomerative
|
||
subsegment
|
||
"""
|
||
|
||
from decorator import decorator
|
||
|
||
import numpy as np
|
||
import scipy
|
||
import scipy.signal
|
||
import scipy.ndimage
|
||
|
||
import sklearn
|
||
import sklearn.cluster
|
||
import sklearn.feature_extraction
|
||
import sklearn.neighbors
|
||
|
||
from ._cache import cache
|
||
from . import util
|
||
from .filters import diagonal_filter
|
||
from .util.exceptions import ParameterError
|
||
from typing import Any, Callable, Optional, TypeVar, Union, overload
|
||
from typing_extensions import Literal
|
||
from ._typing import _WindowSpec, _FloatLike_co
|
||
|
||
__all__ = [
|
||
"cross_similarity",
|
||
"recurrence_matrix",
|
||
"recurrence_to_lag",
|
||
"lag_to_recurrence",
|
||
"timelag_filter",
|
||
"agglomerative",
|
||
"subsegment",
|
||
"path_enhance",
|
||
]
|
||
|
||
|
||
@overload
|
||
def cross_similarity(
|
||
data: np.ndarray,
|
||
data_ref: np.ndarray,
|
||
*,
|
||
k: Optional[int] = ...,
|
||
metric: str = ...,
|
||
sparse: Literal[False] = ...,
|
||
mode: str = ...,
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
|
||
full: bool = False,
|
||
) -> np.ndarray:
|
||
...
|
||
|
||
|
||
@overload
|
||
def cross_similarity(
|
||
data: np.ndarray,
|
||
data_ref: np.ndarray,
|
||
*,
|
||
k: Optional[int] = ...,
|
||
metric: str = ...,
|
||
sparse: Literal[True] = ...,
|
||
mode: str = ...,
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
|
||
full: bool = False,
|
||
) -> scipy.sparse.csc_matrix:
|
||
...
|
||
|
||
|
||
@cache(level=30)
|
||
def cross_similarity(
|
||
data: np.ndarray,
|
||
data_ref: np.ndarray,
|
||
*,
|
||
k: Optional[int] = None,
|
||
metric: str = "euclidean",
|
||
sparse: bool = False,
|
||
mode: str = "connectivity",
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
|
||
full: bool = False,
|
||
) -> Union[np.ndarray, scipy.sparse.csc_matrix]:
|
||
"""Compute cross-similarity from one data sequence to a reference sequence.
|
||
|
||
The output is a matrix ``xsim``, where ``xsim[i, j]`` is non-zero
|
||
if ``data_ref[..., i]`` is a k-nearest neighbor of ``data[..., j]``.
|
||
|
||
Parameters
|
||
----------
|
||
data : np.ndarray [shape=(..., d, n)]
|
||
A feature matrix for the comparison sequence.
|
||
If the data has more than two dimensions (e.g., for multi-channel inputs),
|
||
the leading dimensions are flattened prior to comparison.
|
||
For example, a stereo input with shape `(2, d, n)` is
|
||
automatically reshaped to `(2 * d, n)`.
|
||
|
||
data_ref : np.ndarray [shape=(..., d, n_ref)]
|
||
A feature matrix for the reference sequence
|
||
If the data has more than two dimensions (e.g., for multi-channel inputs),
|
||
the leading dimensions are flattened prior to comparison.
|
||
For example, a stereo input with shape `(2, d, n_ref)` is
|
||
automatically reshaped to `(2 * d, n_ref)`.
|
||
|
||
k : int > 0 [scalar] or None
|
||
the number of nearest-neighbors for each sample
|
||
|
||
Default: ``k = 2 * ceil(sqrt(n_ref))``,
|
||
or ``k = 2`` if ``n_ref <= 3``
|
||
|
||
metric : str
|
||
Distance metric to use for nearest-neighbor calculation.
|
||
|
||
See `sklearn.neighbors.NearestNeighbors` for details.
|
||
|
||
sparse : bool [scalar]
|
||
if False, returns a dense type (ndarray)
|
||
if True, returns a sparse type (scipy.sparse.csc_matrix)
|
||
|
||
mode : str, {'connectivity', 'distance', 'affinity'}
|
||
If 'connectivity', a binary connectivity matrix is produced.
|
||
|
||
If 'distance', then a non-zero entry contains the distance between
|
||
points.
|
||
|
||
If 'affinity', then non-zero entries are mapped to
|
||
``exp( - distance(i, j) / bandwidth)`` where ``bandwidth`` is
|
||
as specified below.
|
||
|
||
bandwidth : None, float > 0, ndarray, or str
|
||
str options include ``{'med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair'}``
|
||
|
||
If ndarray is supplied, use ndarray as bandwidth for each i,j pair.
|
||
|
||
If using ``mode='affinity'``, this can be used to set the
|
||
bandwidth on the affinity kernel.
|
||
|
||
If no value is provided or ``None``, default to ``'med_k_scalar'``.
|
||
|
||
If ``bandwidth='med_k_scalar'``, bandwidth is set automatically to the median
|
||
distance to the k'th nearest neighbor of each ``data[:, i]``.
|
||
|
||
If ``bandwidth='mean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
arithmetic mean between distances to the k-th nearest neighbor for sample i and sample j.
|
||
|
||
If ``bandwidth='gmean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
geometric mean between distances to the k-th nearest neighbor for sample i and j [#z]_.
|
||
|
||
If ``bandwidth='mean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
arithmetic mean between the average distances to the first k-th nearest neighbors for
|
||
sample i and sample j.
|
||
This is similar to the approach in Wang et al. (2014) [#w]_ but does not include the distance
|
||
between i and j.
|
||
|
||
If ``bandwidth='gmean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
geometric mean between the average distances to the first k-th nearest neighbors for
|
||
sample i and sample j.
|
||
|
||
If ``bandwidth='mean_k_avg_and_pair'``, bandwidth is estimated for each sample-pair (i, j) by
|
||
taking the arithmetic mean between three terms: the average distances to the first
|
||
k-th nearest neighbors for sample i and sample j respectively, as well as
|
||
the distance between i and j.
|
||
This is similar to the approach in Wang et al. (2014). [#w]_
|
||
|
||
.. [#z] Zelnik-Manor, Lihi, and Pietro Perona. (2004).
|
||
"Self-tuning spectral clustering." Advances in neural information processing systems 17.
|
||
|
||
.. [#w] Wang, Bo, et al. (2014).
|
||
"Similarity network fusion for aggregating data types on a genomic scale." Nat Methods 11, 333–337.
|
||
https://doi.org/10.1038/nmeth.2810
|
||
|
||
full : bool
|
||
If using ``mode ='affinity'`` or ``mode='distance'``, this option can be used to compute
|
||
the full affinity or distance matrix as opposed a sparse matrix with only none-zero terms
|
||
for the first k-neighbors of each sample.
|
||
This option has no effect when using ``mode='connectivity'``.
|
||
|
||
When using ``mode='distance'``, setting ``full=True`` will ignore ``k`` and ``width``.
|
||
When using ``mode='affinity'``, setting ``full=True`` will use ``k`` exclusively for
|
||
bandwidth estimation, and ignore ``width``.
|
||
|
||
Returns
|
||
-------
|
||
xsim : np.ndarray or scipy.sparse.csc_matrix, [shape=(n_ref, n)]
|
||
Cross-similarity matrix
|
||
|
||
See Also
|
||
--------
|
||
recurrence_matrix
|
||
recurrence_to_lag
|
||
librosa.feature.stack_memory
|
||
sklearn.neighbors.NearestNeighbors
|
||
scipy.spatial.distance.cdist
|
||
|
||
Notes
|
||
-----
|
||
This function caches at level 30.
|
||
|
||
Examples
|
||
--------
|
||
Find nearest neighbors in CQT space between two sequences
|
||
|
||
>>> hop_length = 1024
|
||
>>> y_ref, sr = librosa.load(librosa.ex('pistachio'))
|
||
>>> y_comp, sr = librosa.load(librosa.ex('pistachio'), offset=10)
|
||
>>> chroma_ref = librosa.feature.chroma_cqt(y=y_ref, sr=sr, hop_length=hop_length)
|
||
>>> chroma_comp = librosa.feature.chroma_cqt(y=y_comp, sr=sr, hop_length=hop_length)
|
||
>>> # Use time-delay embedding to get a cleaner recurrence matrix
|
||
>>> x_ref = librosa.feature.stack_memory(chroma_ref, n_steps=10, delay=3)
|
||
>>> x_comp = librosa.feature.stack_memory(chroma_comp, n_steps=10, delay=3)
|
||
>>> xsim = librosa.segment.cross_similarity(x_comp, x_ref)
|
||
|
||
Or fix the number of nearest neighbors to 5
|
||
|
||
>>> xsim = librosa.segment.cross_similarity(x_comp, x_ref, k=5)
|
||
|
||
Use cosine similarity instead of Euclidean distance
|
||
|
||
>>> xsim = librosa.segment.cross_similarity(x_comp, x_ref, metric='cosine')
|
||
|
||
Use an affinity matrix instead of binary connectivity
|
||
|
||
>>> xsim_aff = librosa.segment.cross_similarity(x_comp, x_ref, metric='cosine', mode='affinity')
|
||
|
||
Plot the feature and recurrence matrices
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True)
|
||
>>> imgsim = librosa.display.specshow(xsim, x_axis='s', y_axis='s',
|
||
... hop_length=hop_length, ax=ax[0])
|
||
>>> ax[0].set(title='Binary cross-similarity (symmetric)')
|
||
>>> imgaff = librosa.display.specshow(xsim_aff, x_axis='s', y_axis='s',
|
||
... cmap='magma_r', hop_length=hop_length, ax=ax[1])
|
||
>>> ax[1].set(title='Cross-affinity')
|
||
>>> ax[1].label_outer()
|
||
>>> fig.colorbar(imgsim, ax=ax[0], orientation='horizontal', ticks=[0, 1])
|
||
>>> fig.colorbar(imgaff, ax=ax[1], orientation='horizontal')
|
||
"""
|
||
data_ref = np.atleast_2d(data_ref)
|
||
data = np.atleast_2d(data)
|
||
|
||
if not np.allclose(data_ref.shape[:-1], data.shape[:-1]):
|
||
raise ParameterError(
|
||
f"data_ref.shape={data_ref.shape} and data.shape={data.shape} do not match on leading dimension(s)"
|
||
)
|
||
|
||
# swap data axes so the feature axis is last
|
||
data_ref = np.swapaxes(data_ref, -1, 0)
|
||
n_ref = data_ref.shape[0]
|
||
# Use F-ordering for reshape to preserve leading axis
|
||
data_ref = data_ref.reshape((n_ref, -1), order="F")
|
||
|
||
data = np.swapaxes(data, -1, 0)
|
||
n = data.shape[0]
|
||
data = data.reshape((n, -1), order="F")
|
||
|
||
if mode not in ["connectivity", "distance", "affinity"]:
|
||
raise ParameterError(
|
||
(
|
||
f"Invalid mode='{mode}'. Must be one of "
|
||
"['connectivity', 'distance', 'affinity']"
|
||
)
|
||
)
|
||
if k is None:
|
||
k = min(n_ref, 2 * np.ceil(np.sqrt(n_ref)))
|
||
|
||
k = int(k)
|
||
|
||
# using k for bandwidth estimation also and decouple k for full mode
|
||
bandwidth_k = k
|
||
if full and (mode != "connectivity"):
|
||
k = n
|
||
|
||
# Build the neighbor search object
|
||
# `auto` mode does not work with some choices of metric. Rather than special-case
|
||
# those here, we instead use a fall-back to brute force if auto fails.
|
||
try:
|
||
knn = sklearn.neighbors.NearestNeighbors(
|
||
n_neighbors=min(n_ref, k), metric=metric, algorithm="auto"
|
||
)
|
||
except ValueError:
|
||
knn = sklearn.neighbors.NearestNeighbors(
|
||
n_neighbors=min(n_ref, k), metric=metric, algorithm="brute"
|
||
)
|
||
|
||
knn.fit(data_ref)
|
||
|
||
# Get the knn graph
|
||
if mode == "affinity":
|
||
# sklearn's nearest neighbor doesn't support affinity,
|
||
# so we use distance here and then do the conversion post-hoc
|
||
kng_mode = "distance"
|
||
else:
|
||
kng_mode = mode
|
||
|
||
xsim = knn.kneighbors_graph(X=data, mode=kng_mode).tolil()
|
||
|
||
if not full:
|
||
# Retain only the top-k links per point
|
||
for i in range(n):
|
||
# Get the links from point i
|
||
links = xsim[i].nonzero()[1]
|
||
|
||
# Order them ascending
|
||
idx = links[np.argsort(xsim[i, links].toarray())][0]
|
||
|
||
# Everything past the kth closest gets squashed
|
||
xsim[i, idx[k:]] = 0
|
||
|
||
# Convert a compressed sparse row (CSR) format
|
||
xsim = xsim.tocsr()
|
||
xsim.eliminate_zeros()
|
||
|
||
if mode == "connectivity":
|
||
xsim = xsim.astype(bool)
|
||
elif mode == "affinity":
|
||
aff_bandwidth = __affinity_bandwidth(xsim, bandwidth, bandwidth_k)
|
||
xsim.data[:] = np.exp(xsim.data / (-1 * aff_bandwidth))
|
||
|
||
# Transpose to n_ref by n
|
||
xsim = xsim.T
|
||
|
||
if not sparse:
|
||
xsim = xsim.toarray()
|
||
|
||
return xsim
|
||
|
||
|
||
@overload
|
||
def recurrence_matrix(
|
||
data: np.ndarray,
|
||
*,
|
||
k: Optional[int] = ...,
|
||
width: int = ...,
|
||
metric: str = ...,
|
||
sym: bool = ...,
|
||
sparse: Literal[True] = ...,
|
||
mode: str = ...,
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = ...,
|
||
self: bool = ...,
|
||
axis: int = ...,
|
||
full: bool = False,
|
||
) -> scipy.sparse.csc_matrix:
|
||
...
|
||
|
||
|
||
@overload
|
||
def recurrence_matrix(
|
||
data: np.ndarray,
|
||
*,
|
||
k: Optional[int] = ...,
|
||
width: int = ...,
|
||
metric: str = ...,
|
||
sym: bool = ...,
|
||
sparse: Literal[False] = ...,
|
||
mode: str = ...,
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = ...,
|
||
self: bool = ...,
|
||
axis: int = ...,
|
||
full: bool = False,
|
||
) -> np.ndarray:
|
||
...
|
||
|
||
|
||
@cache(level=30)
|
||
def recurrence_matrix(
|
||
data: np.ndarray,
|
||
*,
|
||
k: Optional[int] = None,
|
||
width: int = 1,
|
||
metric: str = "euclidean",
|
||
sym: bool = False,
|
||
sparse: bool = False,
|
||
mode: str = "connectivity",
|
||
bandwidth: Optional[Union[np.ndarray, _FloatLike_co, str]] = None,
|
||
self: bool = False,
|
||
axis: int = -1,
|
||
full: bool = False,
|
||
) -> Union[np.ndarray, scipy.sparse.csc_matrix]:
|
||
"""Compute a recurrence matrix from a data matrix.
|
||
|
||
``rec[i, j]`` is non-zero if ``data[..., i]`` is a k-nearest neighbor
|
||
of ``data[..., j]`` and ``|i - j| >= width``
|
||
|
||
The specific value of ``rec[i, j]`` can have several forms, governed
|
||
by the ``mode`` parameter below:
|
||
|
||
- Connectivity: ``rec[i, j] = 1 or 0`` indicates that frames ``i`` and ``j`` are repetitions
|
||
|
||
- Affinity: ``rec[i, j] > 0`` measures how similar frames ``i`` and ``j`` are. This is also
|
||
known as a (sparse) self-similarity matrix.
|
||
|
||
- Distance: ``rec[i, j] > 0`` measures how distant frames ``i`` and ``j`` are. This is also
|
||
known as a (sparse) self-distance matrix.
|
||
|
||
The general term *recurrence matrix* can refer to any of the three forms above.
|
||
|
||
Parameters
|
||
----------
|
||
data : np.ndarray [shape=(..., d, n)]
|
||
A feature matrix.
|
||
If the data has more than two dimensions (e.g., for multi-channel inputs),
|
||
the leading dimensions are flattened prior to comparison.
|
||
For example, a stereo input with shape `(2, d, n)` is
|
||
automatically reshaped to `(2 * d, n)`.
|
||
|
||
k : int > 0 [scalar] or None
|
||
the number of nearest-neighbors for each sample
|
||
|
||
Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``,
|
||
or ``k = 2`` if ``t <= 2 * width + 1``
|
||
|
||
width : int >= 1 [scalar]
|
||
only link neighbors ``(data[..., i], data[..., j])``
|
||
if ``|i - j| >= width``
|
||
|
||
``width`` cannot exceed the length of the data.
|
||
|
||
metric : str
|
||
Distance metric to use for nearest-neighbor calculation.
|
||
|
||
See `sklearn.neighbors.NearestNeighbors` for details.
|
||
|
||
sym : bool [scalar]
|
||
set ``sym=True`` to only link mutual nearest-neighbors
|
||
|
||
sparse : bool [scalar]
|
||
if False, returns a dense type (ndarray)
|
||
if True, returns a sparse type (scipy.sparse.csc_matrix)
|
||
|
||
mode : str, {'connectivity', 'distance', 'affinity'}
|
||
If 'connectivity', a binary connectivity matrix is produced.
|
||
|
||
If 'distance', then a non-zero entry contains the distance between
|
||
points.
|
||
|
||
If 'affinity', then non-zero entries are mapped to
|
||
``exp( - distance(i, j) / bandwidth)`` where ``bandwidth`` is
|
||
as specified below.
|
||
|
||
bandwidth : None, float > 0, ndarray, or str
|
||
str options include ``{'med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair'}``
|
||
|
||
If ndarray is supplied, use ndarray as bandwidth for each i,j pair.
|
||
|
||
If using ``mode='affinity'``, the ``bandwidth`` option can be used to set the
|
||
bandwidth on the affinity kernel.
|
||
|
||
If no value is provided or ``None``, default to ``'med_k_scalar'``.
|
||
|
||
If ``bandwidth='med_k_scalar'``, a scalar bandwidth is set to the median distance
|
||
of the k-th nearest neighbor for all samples.
|
||
|
||
If ``bandwidth='mean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
arithmetic mean between distances to the k-th nearest neighbor for sample i and sample j.
|
||
|
||
If ``bandwidth='gmean_k'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
geometric mean between distances to the k-th nearest neighbor for sample i and j [#z]_.
|
||
|
||
If ``bandwidth='mean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
arithmetic mean between the average distances to the first k-th nearest neighbors for
|
||
sample i and sample j.
|
||
This is similar to the approach in Wang et al. (2014) [#w]_ but does not include the distance
|
||
between i and j.
|
||
|
||
If ``bandwidth='gmean_k_avg'``, bandwidth is estimated for each sample-pair (i, j) by taking the
|
||
geometric mean between the average distances to the first k-th nearest neighbors for
|
||
sample i and sample j.
|
||
|
||
If ``bandwidth='mean_k_avg_and_pair'``, bandwidth is estimated for each sample-pair (i, j) by
|
||
taking the arithmetic mean between three terms: the average distances to the first
|
||
k-th nearest neighbors for sample i and sample j respectively, as well as
|
||
the distance between i and j.
|
||
This is similar to the approach in Wang et al. (2014). [#w]_
|
||
|
||
.. [#z] Zelnik-Manor, Lihi, and Pietro Perona. (2004).
|
||
"Self-tuning spectral clustering." Advances in neural information processing systems 17.
|
||
|
||
.. [#w] Wang, Bo, et al. (2014).
|
||
"Similarity network fusion for aggregating data types on a genomic scale." Nat Methods 11, 333–337.
|
||
https://doi.org/10.1038/nmeth.2810
|
||
|
||
self : bool
|
||
If ``True``, then the main diagonal is populated with self-links:
|
||
0 if ``mode='distance'``, and 1 otherwise.
|
||
|
||
If ``False``, the main diagonal is left empty.
|
||
|
||
axis : int
|
||
The axis along which to compute recurrence.
|
||
By default, the last index (-1) is taken.
|
||
|
||
full : bool
|
||
If using ``mode ='affinity'`` or ``mode='distance'``, this option can be used to compute
|
||
the full affinity or distance matrix as opposed a sparse matrix with only none-zero terms
|
||
for the first k-neighbors of each sample.
|
||
This option has no effect when using ``mode='connectivity'``.
|
||
|
||
When using ``mode='distance'``, setting ``full=True`` will ignore ``k`` and ``width``.
|
||
When using ``mode='affinity'``, setting ``full=True`` will use ``k`` exclusively for
|
||
bandwidth estimation, and ignore ``width``.
|
||
|
||
Returns
|
||
-------
|
||
rec : np.ndarray or scipy.sparse.csc_matrix, [shape=(t, t)]
|
||
Recurrence matrix
|
||
|
||
See Also
|
||
--------
|
||
sklearn.neighbors.NearestNeighbors
|
||
scipy.spatial.distance.cdist
|
||
librosa.feature.stack_memory
|
||
recurrence_to_lag
|
||
|
||
Notes
|
||
-----
|
||
This function caches at level 30.
|
||
|
||
Examples
|
||
--------
|
||
Find nearest neighbors in CQT space
|
||
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'))
|
||
>>> hop_length = 1024
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
|
||
>>> # Use time-delay embedding to get a cleaner recurrence matrix
|
||
>>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3)
|
||
>>> R = librosa.segment.recurrence_matrix(chroma_stack)
|
||
|
||
Or fix the number of nearest neighbors to 5
|
||
|
||
>>> R = librosa.segment.recurrence_matrix(chroma_stack, k=5)
|
||
|
||
Suppress neighbors within +- 7 frames
|
||
|
||
>>> R = librosa.segment.recurrence_matrix(chroma_stack, width=7)
|
||
|
||
Use cosine similarity instead of Euclidean distance
|
||
|
||
>>> R = librosa.segment.recurrence_matrix(chroma_stack, metric='cosine')
|
||
|
||
Require mutual nearest neighbors
|
||
|
||
>>> R = librosa.segment.recurrence_matrix(chroma_stack, sym=True)
|
||
|
||
Use an affinity matrix instead of binary connectivity
|
||
|
||
>>> R_aff = librosa.segment.recurrence_matrix(chroma_stack, metric='cosine',
|
||
... mode='affinity')
|
||
|
||
Plot the feature and recurrence matrices
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True)
|
||
>>> imgsim = librosa.display.specshow(R, x_axis='s', y_axis='s',
|
||
... hop_length=hop_length, ax=ax[0])
|
||
>>> ax[0].set(title='Binary recurrence (symmetric)')
|
||
>>> imgaff = librosa.display.specshow(R_aff, x_axis='s', y_axis='s',
|
||
... hop_length=hop_length, cmap='magma_r', ax=ax[1])
|
||
>>> ax[1].set(title='Affinity recurrence')
|
||
>>> ax[1].label_outer()
|
||
>>> fig.colorbar(imgsim, ax=ax[0], orientation='horizontal', ticks=[0, 1])
|
||
>>> fig.colorbar(imgaff, ax=ax[1], orientation='horizontal')
|
||
"""
|
||
data = np.atleast_2d(data)
|
||
|
||
# Swap observations to the first dimension and flatten the rest
|
||
data = np.swapaxes(data, axis, 0)
|
||
t = data.shape[0]
|
||
# Use F-ordering here to preserve leading axis layout
|
||
data = data.reshape((t, -1), order="F")
|
||
|
||
if width < 1 or width >= (t - 1) // 2:
|
||
raise ParameterError(
|
||
"width={} must be at least 1 and at most (data.shape[{}] - 1) // 2={}".format(
|
||
width, axis, (t - 1) // 2
|
||
)
|
||
)
|
||
|
||
if mode not in ["connectivity", "distance", "affinity"]:
|
||
raise ParameterError(
|
||
(
|
||
f"Invalid mode='{mode}'. Must be one of "
|
||
"['connectivity', 'distance', 'affinity']"
|
||
)
|
||
)
|
||
if k is None:
|
||
k = 2 * np.ceil(np.sqrt(t - 2 * width + 1))
|
||
|
||
k = int(k)
|
||
|
||
# using k for bandwidth estimation also and decouple k for full mode
|
||
bandwidth_k = k
|
||
if full and (mode != "connectivity"):
|
||
k = t
|
||
|
||
# Build the neighbor search object
|
||
try:
|
||
knn = sklearn.neighbors.NearestNeighbors(
|
||
n_neighbors=min(t - 1, k + 2 * width), metric=metric, algorithm="auto"
|
||
)
|
||
except ValueError:
|
||
knn = sklearn.neighbors.NearestNeighbors(
|
||
n_neighbors=min(t - 1, k + 2 * width), metric=metric, algorithm="brute"
|
||
)
|
||
|
||
knn.fit(data)
|
||
|
||
# Get the knn graph
|
||
if mode == "affinity":
|
||
kng_mode = "distance"
|
||
else:
|
||
kng_mode = mode
|
||
|
||
rec = knn.kneighbors_graph(mode=kng_mode).tolil()
|
||
|
||
if not full:
|
||
# Remove connections within width
|
||
for diag in range(-width + 1, width):
|
||
rec.setdiag(0, diag)
|
||
|
||
# Retain only the top-k links per point
|
||
for i in range(t):
|
||
# Get the links from point i
|
||
links = rec[i].nonzero()[1]
|
||
|
||
# Order them ascending
|
||
idx = links[np.argsort(rec[i, links].toarray())][0]
|
||
|
||
# Everything past the kth closest gets squashed
|
||
rec[i, idx[k:]] = 0
|
||
|
||
if self:
|
||
if mode == "connectivity":
|
||
rec.setdiag(1)
|
||
elif mode == "affinity":
|
||
# we need to keep the self-loop in here, but not mess up the
|
||
# bandwidth estimation
|
||
#
|
||
# using negative distances here preserves the structure without changing
|
||
# the statistics of the data
|
||
rec.setdiag(-1)
|
||
else:
|
||
rec.setdiag(0)
|
||
|
||
# symmetrize
|
||
if sym:
|
||
# Note: this operation produces a CSR (compressed sparse row) matrix!
|
||
# This is why we have to do it after filling the diagonal in self-mode
|
||
rec = rec.minimum(rec.T)
|
||
|
||
rec = rec.tocsr()
|
||
rec.eliminate_zeros()
|
||
|
||
if mode == "connectivity":
|
||
rec = rec.astype(bool)
|
||
elif mode == "affinity":
|
||
# Set all the negatives back to 0
|
||
# Negatives are temporarily inserted above to preserve the sparsity structure
|
||
# of the matrix without corrupting the bandwidth calculations
|
||
rec.data[rec.data < 0] = 0.0
|
||
aff_bandwidth = __affinity_bandwidth(rec, bandwidth, bandwidth_k)
|
||
rec.data[:] = np.exp(rec.data / (-1 * aff_bandwidth))
|
||
|
||
# Transpose to be column-major
|
||
rec = rec.T
|
||
|
||
if not sparse:
|
||
rec = rec.toarray()
|
||
|
||
return rec
|
||
|
||
|
||
_ArrayOrSparseMatrix = TypeVar(
|
||
"_ArrayOrSparseMatrix", bound=Union[np.ndarray, scipy.sparse.spmatrix]
|
||
)
|
||
|
||
|
||
def recurrence_to_lag(
|
||
rec: _ArrayOrSparseMatrix, *, pad: bool = True, axis: int = -1
|
||
) -> _ArrayOrSparseMatrix:
|
||
"""Convert a recurrence matrix into a lag matrix.
|
||
|
||
``lag[i, j] == rec[i+j, j]``
|
||
|
||
This transformation turns diagonal structures in the recurrence matrix
|
||
into horizontal structures in the lag matrix.
|
||
These horizontal structures can be used to infer changes in the repetition
|
||
structure of a piece, e.g., the beginning of a new section as done in [#]_.
|
||
|
||
.. [#] Serra, J., Müller, M., Grosche, P., & Arcos, J. L. (2014).
|
||
Unsupervised music structure annotation by time series structure
|
||
features and segment similarity.
|
||
IEEE Transactions on Multimedia, 16(5), 1229-1240.
|
||
|
||
Parameters
|
||
----------
|
||
rec : np.ndarray, or scipy.sparse.spmatrix [shape=(n, n)]
|
||
A (binary) recurrence matrix, as returned by `recurrence_matrix`
|
||
|
||
pad : bool
|
||
If False, ``lag`` matrix is square, which is equivalent to
|
||
assuming that the signal repeats itself indefinitely.
|
||
|
||
If True, ``lag`` is padded with ``n`` zeros, which eliminates
|
||
the assumption of repetition.
|
||
|
||
axis : int
|
||
The axis to keep as the ``time`` axis.
|
||
The alternate axis will be converted to lag coordinates.
|
||
|
||
Returns
|
||
-------
|
||
lag : np.ndarray
|
||
The recurrence matrix in (lag, time) (if ``axis=1``)
|
||
or (time, lag) (if ``axis=0``) coordinates
|
||
|
||
Raises
|
||
------
|
||
ParameterError : if ``rec`` is non-square
|
||
|
||
See Also
|
||
--------
|
||
recurrence_matrix
|
||
lag_to_recurrence
|
||
util.shear
|
||
|
||
Examples
|
||
--------
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'))
|
||
>>> hop_length = 1024
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
|
||
>>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3)
|
||
>>> recurrence = librosa.segment.recurrence_matrix(chroma_stack)
|
||
>>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True)
|
||
>>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False)
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(nrows=2, sharex=True)
|
||
>>> librosa.display.specshow(lag_pad, x_axis='time', y_axis='lag',
|
||
... hop_length=hop_length, ax=ax[0])
|
||
>>> ax[0].set(title='Lag (zero-padded)')
|
||
>>> ax[0].label_outer()
|
||
>>> librosa.display.specshow(lag_nopad, x_axis='time', y_axis='lag',
|
||
... hop_length=hop_length, ax=ax[1])
|
||
>>> ax[1].set(title='Lag (no padding)')
|
||
"""
|
||
axis = np.abs(axis)
|
||
|
||
if rec.ndim != 2 or rec.shape[0] != rec.shape[1]:
|
||
raise ParameterError(f"non-square recurrence matrix shape: {rec.shape}")
|
||
|
||
sparse = scipy.sparse.issparse(rec)
|
||
|
||
if sparse:
|
||
# suppress type check here, mypy doesn't know about issparse
|
||
fmt = rec.format # type: ignore
|
||
|
||
t = rec.shape[axis]
|
||
|
||
if pad:
|
||
if sparse:
|
||
padding = np.asarray([[1, 0]], dtype=rec.dtype).swapaxes(axis, 0)
|
||
if axis == 0:
|
||
rec_fmt = "csr"
|
||
else:
|
||
rec_fmt = "csc"
|
||
rec = scipy.sparse.kron(padding, rec, format=rec_fmt)
|
||
else:
|
||
padding = np.array([(0, 0), (0, 0)])
|
||
padding[(1 - axis), :] = [0, t]
|
||
# Suppress type check, mypy doesn't know that rec is an ndarray here
|
||
rec = np.pad(rec, padding, mode="constant") # type: ignore
|
||
|
||
lag: _ArrayOrSparseMatrix = util.shear(rec, factor=-1, axis=axis)
|
||
|
||
if sparse:
|
||
# Suppress type check, mypy doesn't know
|
||
# that lag is sparse here
|
||
lag = lag.asformat(fmt) # type: ignore
|
||
|
||
return lag
|
||
|
||
|
||
def lag_to_recurrence(
|
||
lag: _ArrayOrSparseMatrix, *, axis: int = -1
|
||
) -> _ArrayOrSparseMatrix:
|
||
"""Convert a lag matrix into a recurrence matrix.
|
||
|
||
Parameters
|
||
----------
|
||
lag : np.ndarray or scipy.sparse.spmatrix
|
||
A lag matrix, as produced by ``recurrence_to_lag``
|
||
axis : int
|
||
The axis corresponding to the time dimension.
|
||
The alternate axis will be interpreted in lag coordinates.
|
||
|
||
Returns
|
||
-------
|
||
rec : np.ndarray or scipy.sparse.spmatrix [shape=(n, n)]
|
||
A recurrence matrix in (time, time) coordinates
|
||
For sparse matrices, format will match that of ``lag``.
|
||
|
||
Raises
|
||
------
|
||
ParameterError : if ``lag`` does not have the correct shape
|
||
|
||
See Also
|
||
--------
|
||
recurrence_to_lag
|
||
|
||
Examples
|
||
--------
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'))
|
||
>>> hop_length = 1024
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
|
||
>>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3)
|
||
>>> recurrence = librosa.segment.recurrence_matrix(chroma_stack)
|
||
>>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True)
|
||
>>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False)
|
||
>>> rec_pad = librosa.segment.lag_to_recurrence(lag_pad)
|
||
>>> rec_nopad = librosa.segment.lag_to_recurrence(lag_nopad)
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(nrows=2, ncols=2, sharex=True)
|
||
>>> librosa.display.specshow(lag_pad, x_axis='s', y_axis='lag',
|
||
... hop_length=hop_length, ax=ax[0, 0])
|
||
>>> ax[0, 0].set(title='Lag (zero-padded)')
|
||
>>> ax[0, 0].label_outer()
|
||
>>> librosa.display.specshow(lag_nopad, x_axis='s', y_axis='time',
|
||
... hop_length=hop_length, ax=ax[0, 1])
|
||
>>> ax[0, 1].set(title='Lag (no padding)')
|
||
>>> ax[0, 1].label_outer()
|
||
>>> librosa.display.specshow(rec_pad, x_axis='s', y_axis='time',
|
||
... hop_length=hop_length, ax=ax[1, 0])
|
||
>>> ax[1, 0].set(title='Recurrence (with padding)')
|
||
>>> librosa.display.specshow(rec_nopad, x_axis='s', y_axis='time',
|
||
... hop_length=hop_length, ax=ax[1, 1])
|
||
>>> ax[1, 1].set(title='Recurrence (without padding)')
|
||
>>> ax[1, 1].label_outer()
|
||
"""
|
||
if axis not in [0, 1, -1]:
|
||
raise ParameterError(f"Invalid target axis: {axis}")
|
||
|
||
axis = np.abs(axis)
|
||
|
||
if lag.ndim != 2 or (
|
||
lag.shape[0] != lag.shape[1] and lag.shape[1 - axis] != 2 * lag.shape[axis]
|
||
):
|
||
raise ParameterError(f"Invalid lag matrix shape: {lag.shape}")
|
||
|
||
# Since lag must be 2-dimensional, abs(axis) = axis
|
||
t = lag.shape[axis]
|
||
|
||
rec = util.shear(lag, factor=+1, axis=axis)
|
||
|
||
sub_slice = [slice(None)] * rec.ndim
|
||
sub_slice[1 - axis] = slice(t)
|
||
rec_slice: _ArrayOrSparseMatrix = rec[tuple(sub_slice)]
|
||
return rec_slice
|
||
|
||
|
||
_F = TypeVar("_F", bound=Callable[..., Any])
|
||
|
||
|
||
def timelag_filter(function: _F, pad: bool = True, index: int = 0) -> _F:
|
||
"""Apply a filter in the time-lag domain.
|
||
|
||
This is primarily useful for adapting image filters to operate on
|
||
`recurrence_to_lag` output.
|
||
|
||
Using `timelag_filter` is equivalent to the following sequence of
|
||
operations:
|
||
|
||
>>> data_tl = librosa.segment.recurrence_to_lag(data)
|
||
>>> data_filtered_tl = function(data_tl)
|
||
>>> data_filtered = librosa.segment.lag_to_recurrence(data_filtered_tl)
|
||
|
||
Parameters
|
||
----------
|
||
function : callable
|
||
The filtering function to wrap, e.g., `scipy.ndimage.median_filter`
|
||
pad : bool
|
||
Whether to zero-pad the structure feature matrix
|
||
index : int >= 0
|
||
If ``function`` accepts input data as a positional argument, it should be
|
||
indexed by ``index``
|
||
|
||
Returns
|
||
-------
|
||
wrapped_function : callable
|
||
A new filter function which applies in time-lag space rather than
|
||
time-time space.
|
||
|
||
Examples
|
||
--------
|
||
Apply a 31-bin median filter to the diagonal of a recurrence matrix.
|
||
With default, parameters, this corresponds to a time window of about
|
||
0.72 seconds.
|
||
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=30)
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr)
|
||
>>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=3, delay=3)
|
||
>>> rec = librosa.segment.recurrence_matrix(chroma_stack)
|
||
>>> from scipy.ndimage import median_filter
|
||
>>> diagonal_median = librosa.segment.timelag_filter(median_filter)
|
||
>>> rec_filtered = diagonal_median(rec, size=(1, 31), mode='mirror')
|
||
|
||
Or with affinity weights
|
||
|
||
>>> rec_aff = librosa.segment.recurrence_matrix(chroma_stack, mode='affinity')
|
||
>>> rec_aff_fil = diagonal_median(rec_aff, size=(1, 31), mode='mirror')
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(nrows=2, ncols=2, sharex=True, sharey=True)
|
||
>>> librosa.display.specshow(rec, y_axis='s', x_axis='s', ax=ax[0, 0])
|
||
>>> ax[0, 0].set(title='Raw recurrence matrix')
|
||
>>> ax[0, 0].label_outer()
|
||
>>> librosa.display.specshow(rec_filtered, y_axis='s', x_axis='s', ax=ax[0, 1])
|
||
>>> ax[0, 1].set(title='Filtered recurrence matrix')
|
||
>>> ax[0, 1].label_outer()
|
||
>>> librosa.display.specshow(rec_aff, x_axis='s', y_axis='s',
|
||
... cmap='magma_r', ax=ax[1, 0])
|
||
>>> ax[1, 0].set(title='Raw affinity matrix')
|
||
>>> librosa.display.specshow(rec_aff_fil, x_axis='s', y_axis='s',
|
||
... cmap='magma_r', ax=ax[1, 1])
|
||
>>> ax[1, 1].set(title='Filtered affinity matrix')
|
||
>>> ax[1, 1].label_outer()
|
||
"""
|
||
def __my_filter(wrapped_f, *args, **kwargs):
|
||
"""Wrap the filter with lag conversions"""
|
||
# Map the input data into time-lag space
|
||
args = list(args)
|
||
|
||
args[index] = recurrence_to_lag(args[index], pad=pad)
|
||
|
||
# Apply the filtering function
|
||
result = wrapped_f(*args, **kwargs)
|
||
|
||
# Map back into time-time and return
|
||
return lag_to_recurrence(result)
|
||
|
||
return decorator(__my_filter, function) # type: ignore
|
||
|
||
|
||
@cache(level=30)
|
||
def subsegment(
|
||
data: np.ndarray, frames: np.ndarray, *, n_segments: int = 4, axis: int = -1
|
||
) -> np.ndarray:
|
||
"""Sub-divide a segmentation by feature clustering.
|
||
|
||
Given a set of frame boundaries (``frames``), and a data matrix (``data``),
|
||
each successive interval defined by ``frames`` is partitioned into
|
||
``n_segments`` by constrained agglomerative clustering.
|
||
|
||
.. note::
|
||
If an interval spans fewer than ``n_segments`` frames, then each
|
||
frame becomes a sub-segment.
|
||
|
||
Parameters
|
||
----------
|
||
data : np.ndarray
|
||
Data matrix to use in clustering
|
||
frames : np.ndarray [shape=(n_boundaries,)], dtype=int, non-negative]
|
||
Array of beat or segment boundaries, as provided by
|
||
`librosa.beat.beat_track`,
|
||
`librosa.onset.onset_detect`,
|
||
or `agglomerative`.
|
||
n_segments : int > 0
|
||
Maximum number of frames to sub-divide each interval.
|
||
axis : int
|
||
Axis along which to apply the segmentation.
|
||
By default, the last index (-1) is taken.
|
||
|
||
Returns
|
||
-------
|
||
boundaries : np.ndarray [shape=(n_subboundaries,)]
|
||
List of sub-divided segment boundaries
|
||
|
||
See Also
|
||
--------
|
||
agglomerative : Temporal segmentation
|
||
librosa.onset.onset_detect : Onset detection
|
||
librosa.beat.beat_track : Beat tracking
|
||
|
||
Notes
|
||
-----
|
||
This function caches at level 30.
|
||
|
||
Examples
|
||
--------
|
||
Load audio, detect beat frames, and subdivide in twos by CQT
|
||
|
||
>>> y, sr = librosa.load(librosa.ex('choice'), duration=10)
|
||
>>> tempo, beats = librosa.beat.beat_track(y=y, sr=sr, hop_length=512)
|
||
>>> beat_times = librosa.frames_to_time(beats, sr=sr, hop_length=512)
|
||
>>> cqt = np.abs(librosa.cqt(y, sr=sr, hop_length=512))
|
||
>>> subseg = librosa.segment.subsegment(cqt, beats, n_segments=2)
|
||
>>> subseg_t = librosa.frames_to_time(subseg, sr=sr, hop_length=512)
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots()
|
||
>>> librosa.display.specshow(librosa.amplitude_to_db(cqt,
|
||
... ref=np.max),
|
||
... y_axis='cqt_hz', x_axis='time', ax=ax)
|
||
>>> lims = ax.get_ylim()
|
||
>>> ax.vlines(beat_times, lims[0], lims[1], color='lime', alpha=0.9,
|
||
... linewidth=2, label='Beats')
|
||
>>> ax.vlines(subseg_t, lims[0], lims[1], color='linen', linestyle='--',
|
||
... linewidth=1.5, alpha=0.5, label='Sub-beats')
|
||
>>> ax.legend()
|
||
>>> ax.set(title='CQT + Beat and sub-beat markers')
|
||
"""
|
||
frames = util.fix_frames(frames, x_min=0, x_max=data.shape[axis], pad=True)
|
||
|
||
if n_segments < 1:
|
||
raise ParameterError("n_segments must be a positive integer")
|
||
|
||
boundaries = []
|
||
idx_slices = [slice(None)] * data.ndim
|
||
|
||
for seg_start, seg_end in zip(frames[:-1], frames[1:]):
|
||
idx_slices[axis] = slice(seg_start, seg_end)
|
||
boundaries.extend(
|
||
seg_start
|
||
+ agglomerative(
|
||
data[tuple(idx_slices)], min(seg_end - seg_start, n_segments), axis=axis
|
||
)
|
||
)
|
||
|
||
return np.array(boundaries)
|
||
|
||
|
||
def agglomerative(
|
||
data: np.ndarray,
|
||
k: int,
|
||
*,
|
||
clusterer: Optional[sklearn.cluster.AgglomerativeClustering] = None,
|
||
axis: int = -1,
|
||
) -> np.ndarray:
|
||
"""Bottom-up temporal segmentation.
|
||
|
||
Use a temporally-constrained agglomerative clustering routine to partition
|
||
``data`` into ``k`` contiguous segments.
|
||
|
||
Parameters
|
||
----------
|
||
data : np.ndarray
|
||
data to cluster
|
||
k : int > 0 [scalar]
|
||
number of segments to produce
|
||
clusterer : sklearn.cluster.AgglomerativeClustering, optional
|
||
An optional AgglomerativeClustering object.
|
||
If `None`, a constrained Ward object is instantiated.
|
||
axis : int
|
||
axis along which to cluster.
|
||
By default, the last axis (-1) is chosen.
|
||
|
||
Returns
|
||
-------
|
||
boundaries : np.ndarray [shape=(k,)]
|
||
left-boundaries (frame numbers) of detected segments. This
|
||
will always include `0` as the first left-boundary.
|
||
|
||
See Also
|
||
--------
|
||
sklearn.cluster.AgglomerativeClustering
|
||
|
||
Examples
|
||
--------
|
||
Cluster by chroma similarity, break into 20 segments
|
||
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'), duration=15)
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr)
|
||
>>> bounds = librosa.segment.agglomerative(chroma, 20)
|
||
>>> bound_times = librosa.frames_to_time(bounds, sr=sr)
|
||
>>> bound_times
|
||
array([ 0. , 0.65 , 1.091, 1.927, 2.438, 2.902, 3.924,
|
||
4.783, 5.294, 5.712, 6.13 , 7.314, 8.522, 8.916,
|
||
9.66 , 10.844, 11.238, 12.028, 12.492, 14.095])
|
||
|
||
Plot the segmentation over the chromagram
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> import matplotlib.transforms as mpt
|
||
>>> fig, ax = plt.subplots()
|
||
>>> trans = mpt.blended_transform_factory(
|
||
... ax.transData, ax.transAxes)
|
||
>>> librosa.display.specshow(chroma, y_axis='chroma', x_axis='time', ax=ax)
|
||
>>> ax.vlines(bound_times, 0, 1, color='linen', linestyle='--',
|
||
... linewidth=2, alpha=0.9, label='Segment boundaries',
|
||
... transform=trans)
|
||
>>> ax.legend()
|
||
>>> ax.set(title='Power spectrogram')
|
||
"""
|
||
# Make sure we have at least two dimensions
|
||
data = np.atleast_2d(data)
|
||
|
||
# Swap data index to position 0
|
||
data = np.swapaxes(data, axis, 0)
|
||
|
||
# Flatten the features
|
||
n = data.shape[0]
|
||
data = data.reshape((n, -1), order="F")
|
||
|
||
if clusterer is None:
|
||
# Connect the temporal connectivity graph
|
||
grid = sklearn.feature_extraction.image.grid_to_graph(n_x=n, n_y=1, n_z=1)
|
||
|
||
# Instantiate the clustering object
|
||
clusterer = sklearn.cluster.AgglomerativeClustering(
|
||
n_clusters=k, connectivity=grid, memory=cache.memory
|
||
)
|
||
|
||
# Fit the model
|
||
clusterer.fit(data)
|
||
|
||
# Find the change points from the labels
|
||
boundaries = [0]
|
||
boundaries.extend(list(1 + np.nonzero(np.diff(clusterer.labels_))[0].astype(int)))
|
||
return np.asarray(boundaries)
|
||
|
||
|
||
def path_enhance(
|
||
R: np.ndarray,
|
||
n: int,
|
||
*,
|
||
window: _WindowSpec = "hann",
|
||
max_ratio: float = 2.0,
|
||
min_ratio: Optional[float] = None,
|
||
n_filters: int = 7,
|
||
zero_mean: bool = False,
|
||
clip: bool = True,
|
||
**kwargs: Any,
|
||
) -> np.ndarray:
|
||
"""Multi-angle path enhancement for self- and cross-similarity matrices.
|
||
|
||
This function convolves multiple diagonal smoothing filters with a self-similarity (or
|
||
recurrence) matrix R, and aggregates the result by an element-wise maximum.
|
||
|
||
Technically, the output is a matrix R_smooth such that::
|
||
|
||
R_smooth[i, j] = max_theta (R * filter_theta)[i, j]
|
||
|
||
where `*` denotes 2-dimensional convolution, and ``filter_theta`` is a smoothing filter at
|
||
orientation theta.
|
||
|
||
This is intended to provide coherent temporal smoothing of self-similarity matrices
|
||
when there are changes in tempo.
|
||
|
||
Smoothing filters are generated at evenly spaced orientations between min_ratio and
|
||
max_ratio.
|
||
|
||
This function is inspired by the multi-angle path enhancement of [#]_, but differs by
|
||
modeling tempo differences in the space of similarity matrices rather than re-sampling
|
||
the underlying features prior to generating the self-similarity matrix.
|
||
|
||
.. [#] Müller, Meinard and Frank Kurth.
|
||
"Enhancing similarity matrices for music audio analysis."
|
||
2006 IEEE International Conference on Acoustics Speech and Signal Processing Proceedings.
|
||
Vol. 5. IEEE, 2006.
|
||
|
||
.. note:: if using recurrence_matrix to construct the input similarity matrix, be sure to include the main
|
||
diagonal by setting ``self=True``. Otherwise, the diagonal will be suppressed, and this is likely to
|
||
produce discontinuities which will pollute the smoothing filter response.
|
||
|
||
Parameters
|
||
----------
|
||
R : np.ndarray
|
||
The self- or cross-similarity matrix to be smoothed.
|
||
Note: sparse inputs are not supported.
|
||
|
||
If the recurrence matrix is multi-dimensional, e.g. `shape=(c, n, n)`,
|
||
then enhancement is conducted independently for each leading channel.
|
||
|
||
n : int > 0
|
||
The length of the smoothing filter
|
||
|
||
window : window specification
|
||
The type of smoothing filter to use. See `filters.get_window` for more information
|
||
on window specification formats.
|
||
|
||
max_ratio : float > 0
|
||
The maximum tempo ratio to support
|
||
|
||
min_ratio : float > 0
|
||
The minimum tempo ratio to support.
|
||
If not provided, it will default to ``1/max_ratio``
|
||
|
||
n_filters : int >= 1
|
||
The number of different smoothing filters to use, evenly spaced
|
||
between ``min_ratio`` and ``max_ratio``.
|
||
|
||
If ``min_ratio = 1/max_ratio`` (the default), using an odd number
|
||
of filters will ensure that the main diagonal (ratio=1) is included.
|
||
|
||
zero_mean : bool
|
||
By default, the smoothing filters are non-negative and sum to one (i.e. are averaging
|
||
filters).
|
||
|
||
If ``zero_mean=True``, then the smoothing filters are made to sum to zero by subtracting
|
||
a constant value from the non-diagonal coordinates of the filter. This is primarily
|
||
useful for suppressing blocks while enhancing diagonals.
|
||
|
||
clip : bool
|
||
If True, the smoothed similarity matrix will be thresholded at 0, and will not contain
|
||
negative entries.
|
||
|
||
**kwargs : additional keyword arguments
|
||
Additional arguments to pass to `scipy.ndimage.convolve`
|
||
|
||
Returns
|
||
-------
|
||
R_smooth : np.ndarray, shape=R.shape
|
||
The smoothed self- or cross-similarity matrix
|
||
|
||
See Also
|
||
--------
|
||
librosa.filters.diagonal_filter
|
||
recurrence_matrix
|
||
|
||
Examples
|
||
--------
|
||
Use a 51-frame diagonal smoothing filter to enhance paths in a recurrence matrix
|
||
|
||
>>> y, sr = librosa.load(librosa.ex('nutcracker'))
|
||
>>> hop_length = 2048
|
||
>>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
|
||
>>> chroma_stack = librosa.feature.stack_memory(chroma, n_steps=10, delay=3)
|
||
>>> rec = librosa.segment.recurrence_matrix(chroma_stack, mode='affinity', self=True)
|
||
>>> rec_smooth = librosa.segment.path_enhance(rec, 51, window='hann', n_filters=7)
|
||
|
||
Plot the recurrence matrix before and after smoothing
|
||
|
||
>>> import matplotlib.pyplot as plt
|
||
>>> fig, ax = plt.subplots(ncols=2, sharex=True, sharey=True)
|
||
>>> img = librosa.display.specshow(rec, x_axis='s', y_axis='s',
|
||
... hop_length=hop_length, ax=ax[0])
|
||
>>> ax[0].set(title='Unfiltered recurrence')
|
||
>>> imgpe = librosa.display.specshow(rec_smooth, x_axis='s', y_axis='s',
|
||
... hop_length=hop_length, ax=ax[1])
|
||
>>> ax[1].set(title='Multi-angle enhanced recurrence')
|
||
>>> ax[1].label_outer()
|
||
>>> fig.colorbar(img, ax=ax[0], orientation='horizontal')
|
||
>>> fig.colorbar(imgpe, ax=ax[1], orientation='horizontal')
|
||
"""
|
||
if min_ratio is None:
|
||
min_ratio = 1.0 / max_ratio
|
||
elif min_ratio > max_ratio:
|
||
raise ParameterError(
|
||
f"min_ratio={min_ratio} cannot exceed max_ratio={max_ratio}"
|
||
)
|
||
|
||
R_smooth = None
|
||
for ratio in np.logspace(
|
||
np.log2(min_ratio), np.log2(max_ratio), num=n_filters, base=2
|
||
):
|
||
kernel = diagonal_filter(window, n, slope=ratio, zero_mean=zero_mean)
|
||
|
||
# Expand leading dimensions to match R
|
||
# This way, if R has shape, eg, [2, 3, n, n]
|
||
# the expanded kernel will have shape [1, 1, m, m]
|
||
|
||
# The following is valid for numpy >= 1.18
|
||
# kernel = np.expand_dims(kernel, axis=list(np.arange(R.ndim - kernel.ndim)))
|
||
|
||
# This is functionally equivalent, but works on numpy 1.17
|
||
shape = [1] * R.ndim
|
||
shape[-2:] = kernel.shape
|
||
kernel = np.reshape(kernel, shape)
|
||
|
||
if R_smooth is None:
|
||
R_smooth = scipy.ndimage.convolve(R, kernel, **kwargs)
|
||
else:
|
||
# Compute the point-wise maximum in-place
|
||
np.maximum(
|
||
R_smooth, scipy.ndimage.convolve(R, kernel, **kwargs), out=R_smooth
|
||
)
|
||
|
||
if clip:
|
||
# Clip the output in-place
|
||
np.clip(R_smooth, 0, None, out=R_smooth) # type: ignore
|
||
|
||
return np.asanyarray(R_smooth)
|
||
|
||
|
||
def __affinity_bandwidth(
|
||
rec: scipy.sparse.csr_matrix,
|
||
bw_mode: Optional[Union[np.ndarray, _FloatLike_co, str]],
|
||
k: int,
|
||
) -> Union[float, np.ndarray]:
|
||
# rec should be a csr_matrix
|
||
|
||
# the api allows users to specify a scalar bandwidth directly, besides the string based options.
|
||
if isinstance(bw_mode, np.ndarray):
|
||
bandwidth = bw_mode
|
||
# check if bw is the right size
|
||
if bandwidth.shape != rec.shape:
|
||
raise ParameterError(
|
||
f"Invalid matrix bandwidth shape: {bandwidth.shape}."
|
||
f"Should be {rec.shape}."
|
||
)
|
||
if (bandwidth <= 0).any():
|
||
raise ParameterError(
|
||
"Invalid bandwidth. All entries must be strictly positive."
|
||
)
|
||
return np.array(bandwidth[rec.nonzero()])
|
||
|
||
elif isinstance(bw_mode, (int, float)):
|
||
scalar_bandwidth = float(bw_mode)
|
||
if scalar_bandwidth <= 0:
|
||
raise ParameterError(
|
||
f"Invalid scalar bandwidth={scalar_bandwidth}. Must be strictly positive."
|
||
)
|
||
return scalar_bandwidth
|
||
|
||
if bw_mode is None:
|
||
bw_mode = "med_k_scalar"
|
||
|
||
if bw_mode not in [
|
||
"med_k_scalar",
|
||
"mean_k",
|
||
"gmean_k",
|
||
"mean_k_avg",
|
||
"gmean_k_avg",
|
||
"mean_k_avg_and_pair",
|
||
]:
|
||
raise ParameterError(
|
||
f"Invalid bandwidth='{bw_mode}'. Must be either a positive scalar or one of "
|
||
"['med_k_scalar', 'mean_k', 'gmean_k', 'mean_k_avg', 'gmean_k_avg', 'mean_k_avg_and_pair']"
|
||
)
|
||
|
||
# build a list of list that stores the distances to k nearest neighbors for all t points.
|
||
t = rec.shape[0]
|
||
knn_dists = []
|
||
for i in range(t):
|
||
# Get the links from point i
|
||
links = rec[i].nonzero()[1]
|
||
# catch empty dists lists in knn_dists
|
||
if len(links) == 0:
|
||
# Disconnected vertices are only a problem for point-wise bandwidth estimation
|
||
if bw_mode not in ["med_k_scalar"]:
|
||
raise ParameterError(f"The sample at time point {i} has no neighbors")
|
||
else:
|
||
# If we have no links, then there's no distance
|
||
# shove a nan in here
|
||
knn_dists.append(np.array([np.nan]))
|
||
else:
|
||
# Compute k nearest neighbors' distance and sort ascending
|
||
knn_dist_row = np.sort(rec[i, links].toarray()[0])[:k]
|
||
knn_dists.append(knn_dist_row)
|
||
|
||
# take the last element of each list for the distance to kth neighbor
|
||
dist_to_k = np.asarray([dists[-1] for dists in knn_dists])
|
||
avg_dist_to_first_ks = np.asarray([np.mean(dists) for dists in knn_dists])
|
||
|
||
if bw_mode == "med_k_scalar":
|
||
if not np.any(np.isfinite(dist_to_k)):
|
||
raise ParameterError("Cannot estimate bandwidth from an empty graph")
|
||
return float(np.nanmedian(dist_to_k))
|
||
|
||
if bw_mode in ["mean_k", "gmean_k"]:
|
||
# building bandwidth components (sigma) using sparse matrix structures and indices
|
||
sigma_i_data = np.empty_like(rec.data)
|
||
sigma_j_data = np.empty_like(rec.data)
|
||
for row in range(t):
|
||
sigma_i_data[rec.indptr[row] : rec.indptr[row + 1]] = dist_to_k[row]
|
||
col_idx = rec.indices[rec.indptr[row] : rec.indptr[row + 1]]
|
||
sigma_j_data[rec.indptr[row] : rec.indptr[row + 1]] = dist_to_k[col_idx]
|
||
|
||
if bw_mode == "mean_k":
|
||
out = np.array((sigma_i_data + sigma_j_data) / 2)
|
||
elif bw_mode == "gmean_k":
|
||
out = np.array((sigma_i_data * sigma_j_data) ** 0.5)
|
||
|
||
if bw_mode in ["mean_k_avg", "gmean_k_avg", "mean_k_avg_and_pair"]:
|
||
# building bandwidth components (sigma) using sparse matrix structures and indices
|
||
sigma_i_data = np.empty_like(rec.data)
|
||
sigma_j_data = np.empty_like(rec.data)
|
||
for row in range(t):
|
||
sigma_i_data[rec.indptr[row] : rec.indptr[row + 1]] = avg_dist_to_first_ks[
|
||
row
|
||
]
|
||
col_idx = rec.indices[rec.indptr[row] : rec.indptr[row + 1]]
|
||
sigma_j_data[rec.indptr[row] : rec.indptr[row + 1]] = avg_dist_to_first_ks[
|
||
col_idx
|
||
]
|
||
|
||
if bw_mode == "mean_k_avg":
|
||
out = np.array((sigma_i_data + sigma_j_data) / 2)
|
||
elif bw_mode == "gmean_k_avg":
|
||
out = np.array((sigma_i_data * sigma_j_data) ** 0.5)
|
||
elif bw_mode == "mean_k_avg_and_pair":
|
||
out = np.array((sigma_i_data + sigma_j_data + rec.data) / 3)
|
||
|
||
return out
|