From 97e6cfaeda39be96198057fe66a6508fa450d486 Mon Sep 17 00:00:00 2001 From: Tony Bagnall Date: Mon, 17 Jun 2024 16:36:51 +0100 Subject: [PATCH 01/14] reimannian --- aeon_neuro/_wip/__init__.py | 0 aeon_neuro/_wip/distances/__init__.py | 0 aeon_neuro/_wip/distances/_reimannian.py | 17 +++++++++ aeon_neuro/_wip/transformations/__init__.py | 0 .../_wip/transformations/series/__init__.py | 0 .../transformations/series/_power_spectrum.py | 36 +++++++++++++++++++ 6 files changed, 53 insertions(+) create mode 100644 aeon_neuro/_wip/__init__.py create mode 100644 aeon_neuro/_wip/distances/__init__.py create mode 100644 aeon_neuro/_wip/distances/_reimannian.py create mode 100644 aeon_neuro/_wip/transformations/__init__.py create mode 100644 aeon_neuro/_wip/transformations/series/__init__.py create mode 100644 aeon_neuro/_wip/transformations/series/_power_spectrum.py diff --git a/aeon_neuro/_wip/__init__.py b/aeon_neuro/_wip/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/distances/__init__.py b/aeon_neuro/_wip/distances/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/distances/_reimannian.py b/aeon_neuro/_wip/distances/_reimannian.py new file mode 100644 index 0000000..533370d --- /dev/null +++ b/aeon_neuro/_wip/distances/_reimannian.py @@ -0,0 +1,17 @@ +"""Implement as a distance function here.""" +def reimannian_distance( + x: np.ndarray, + y: np.ndarray, +) -> float: + r"""Compute the Reimannian distance between two time series. + + Parameters + ---------- + x : np.ndarray + First time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + y : np.ndarray + Second time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + """ + return x+y diff --git a/aeon_neuro/_wip/transformations/__init__.py b/aeon_neuro/_wip/transformations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/transformations/series/__init__.py b/aeon_neuro/_wip/transformations/series/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/transformations/series/_power_spectrum.py b/aeon_neuro/_wip/transformations/series/_power_spectrum.py new file mode 100644 index 0000000..217a10d --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/_power_spectrum.py @@ -0,0 +1,36 @@ +"""Power spectrum matrix transform, may call it something else.""" + +from aeon.transformations.series.base import BaseSeriesTransformer + + + +class PowerSpectrumMatrix(BaseSeriesTransformer): + """Implement here.""" + _tags = { + "X_inner_type": "np.ndarray", + "capability:multivariate": True, + "fit_is_empty": True, + } + def __init__( + self, + n_lags=None, + ): + self.n_lags = n_lags + super().__init__(axis=1) + def _transform(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : np.ndarray + Data to be transformed, shape (n_channels, n_timepoints) + y : ignored argument for interface compatibility + Additional data, e.g., labels for transformation + + Returns + ------- + transformed version of X + """ + return X From 1b11da2aab2d109d2fb144a6c9985b46b5cf1f1c Mon Sep 17 00:00:00 2001 From: Tony Bagnall Date: Mon, 17 Jun 2024 16:46:09 +0100 Subject: [PATCH 02/14] riemannian --- aeon_neuro/_wip/distances/{_reimannian.py => _riemannian.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename aeon_neuro/_wip/distances/{_reimannian.py => _riemannian.py} (95%) diff --git a/aeon_neuro/_wip/distances/_reimannian.py b/aeon_neuro/_wip/distances/_riemannian.py similarity index 95% rename from aeon_neuro/_wip/distances/_reimannian.py rename to aeon_neuro/_wip/distances/_riemannian.py index 533370d..0318a44 100644 --- a/aeon_neuro/_wip/distances/_reimannian.py +++ b/aeon_neuro/_wip/distances/_riemannian.py @@ -1,5 +1,5 @@ """Implement as a distance function here.""" -def reimannian_distance( +def riemannian_distance( x: np.ndarray, y: np.ndarray, ) -> float: From 4634e015b52c210f6476913c949aab4cda697bfa Mon Sep 17 00:00:00 2001 From: AidenRushbrooke <72034940+AidenRushbrooke@users.noreply.github.com> Date: Fri, 28 Jun 2024 01:20:43 +0100 Subject: [PATCH 03/14] Added 3 versions of Riemannian distance --- aeon_neuro/_wip/distances/_riemannian.py | 61 +++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/aeon_neuro/_wip/distances/_riemannian.py b/aeon_neuro/_wip/distances/_riemannian.py index 0318a44..f635e42 100644 --- a/aeon_neuro/_wip/distances/_riemannian.py +++ b/aeon_neuro/_wip/distances/_riemannian.py @@ -1,5 +1,58 @@ + +import numpy as np +from scipy.linalg import logm,eig +import math """Implement as a distance function here.""" -def riemannian_distance( +def riemannian_distance_a( + x: np.ndarray, + y: np.ndarray, +) -> float: + r"""Compute the Reimannian distance between two time series. + + Parameters + ---------- + x : np.ndarray + First time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + y : np.ndarray + Second time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + """ + x_cov = np.cov(x) + y_cov = np.cov(y) + x_trace = np.trace(x_cov) + y_trace = np.trace(y_cov) + x_root = logm(x_cov) + c_temp = logm(np.matmul(x_root,np.matmul(y_cov,x_root))) + c = 2*np.trace(c_temp) + return math.sqrt(x_trace+y_trace-c) + + +def riemannian_distance_b( + x: np.ndarray, + y: np.ndarray, +) -> float: + r"""Compute the Reimannian distance between two time series. + https://hal.science/hal-00820475/document + https://hal.science/hal-00602700/document + Parameters + ---------- + x : np.ndarray + First time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + y : np.ndarray + Second time series, either univariate, shape ``(n_timepoints,)``, or + multivariate, shape ``(n_channels, n_timepoints)``. + """ + x_cov = np.cov(x) + y_cov = np.cov(y) + eigenvalues = eig(x_cov,y_cov) + distance=0 + for i in eigenvalues[0]: + distance+=math.log(np.real(i))**2 + return math.sqrt(distance) + +def riemannian_distance_c( x: np.ndarray, y: np.ndarray, ) -> float: @@ -14,4 +67,8 @@ def riemannian_distance( Second time series, either univariate, shape ``(n_timepoints,)``, or multivariate, shape ``(n_channels, n_timepoints)``. """ - return x+y + x_cov = np.cov(x) + y_cov = np.cov(y) + x_inv_root = logm(np.linalg.inv(x_cov)) + distance = logm(np.matmul(x_inv_root,np.matmul(y_cov,x_inv_root))) + return np.linalg.norm(distance) From d0e8e46f8719ecd836c8de448f97399f03aff85d Mon Sep 17 00:00:00 2001 From: Futuer <57895730+futuer-szd@users.noreply.github.com> Date: Mon, 8 Jul 2024 10:24:49 +0100 Subject: [PATCH 04/14] [ENH] add riemannian distance functions between matrices (#58) * add riemannian distance functions between matrices add tests for riemannian distance functions between matrices add example notebook for riemannian distance functions between matrices * add references for examples --------- Co-authored-by: futuer --- .../_wip/distances/_riemannian_matrix.py | 296 ++++++++++++++++++ .../distances/tests/test_riemannian_matrix.py | 74 +++++ .../_wip/distances/riemannian_matrix.ipynb | 227 ++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 aeon_neuro/_wip/distances/_riemannian_matrix.py create mode 100644 aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py create mode 100644 examples/_wip/distances/riemannian_matrix.ipynb diff --git a/aeon_neuro/_wip/distances/_riemannian_matrix.py b/aeon_neuro/_wip/distances/_riemannian_matrix.py new file mode 100644 index 0000000..b1c96d6 --- /dev/null +++ b/aeon_neuro/_wip/distances/_riemannian_matrix.py @@ -0,0 +1,296 @@ +"""Riemannian distances between SPD/HPD matrices.""" + +import numpy as np +from pyriemann.utils.distance import distance_riemann +from scipy.linalg import sqrtm + + +def _is_hpd(matrix): + r"""Check if a matrix is Hermitian Positive Definite (HPD).""" + if not np.allclose(matrix, matrix.conj().T): + return False + try: + np.linalg.cholesky(matrix) + return True + except np.linalg.LinAlgError: + return False + + +def _check_inputs(A, B, W=None): + if W is None: + if not A.shape == B.shape: + raise ValueError("Inputs must have equal dimensions") + if A.ndim != 2 or B.ndim != 2: + raise ValueError("Inputs must be 2D ndarrays") + if not _is_hpd(A): + raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") + if not _is_hpd(B): + raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") + else: + if not A.shape == B.shape == W.shape: + raise ValueError("Inputs must have equal dimensions") + if A.ndim != 2 or B.ndim != 2 or W.ndim != 2: + raise ValueError("Inputs must be 2D ndarrays") + if not _is_hpd(A): + raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") + if not _is_hpd(B): + raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") + if not _is_hpd(W): + raise ValueError("Matrix W must be Hermitian Positive Definite (HPD)") + + +def riemannian_distance_1( + A: np.ndarray, + B: np.ndarray, +) -> float: + r"""Compute the Reimannian distance between two SPD/HPD matrices. + + SPD: Symmetric Positive Definite + HPD: Hermitian Positive Definite + + The first type of Riemannian distance between two SPD/HPD matrices + :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: + + .. math:: + d_{R1}(\mathbf{A}, \mathbf{B}) = + \sqrt{\text{tr} \mathbf{A} + + \text{tr} \mathbf{B} - + 2 \text{tr} \left[(\mathbf{A} \mathbf{B})^{1/2}\right]} + + Parameters + ---------- + A : np.ndarray + First SPD/HPD matrices, 2D ndarray. + y : np.ndarray + Second SPD/HPD matrices, same dimensions as A. + + Returns + ------- + d : float + Riemannian distance between A and B. + + References + ---------- + .. [1] `Riemannian Distances for Signal Classification by Power Spectral Density + `_ + Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, + 2013, 7, pp. 655-669 + """ + _check_inputs(A, B) + same_data = B is A + if same_data: + return 0 + + d = np.sqrt(np.trace(A) + np.trace(B) - 2 * np.trace(sqrtm(np.dot(A, B)))) + + return d + + +def riemannian_distance_2( + A: np.ndarray, + B: np.ndarray, +) -> float: + r"""Compute the Reimannian distance between two SPD/HPD matrices. + + SPD: Symmetric Positive Definite + HPD: Hermitian Positive Definite + + The second type of Riemannian distance between two SPD/HPD matrices + :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: + + .. math:: + d_{R2}(\mathbf{A}, \mathbf{B}) = + \sqrt{\text{tr} \mathbf{A} + + \text{tr} \mathbf{B} - + 2 \text{tr} \left[\mathbf{A}^{1/2} \mathbf{B}^{1/2}\right]} + + Parameters + ---------- + A : np.ndarray + First SPD/HPD matrices, 2D ndarray. + B : np.ndarray + Second SPD/HPD matrices, same dimensions as A. + + Returns + ------- + d : float + Riemannian distance between A and B. + + References + ---------- + .. [1] `Riemannian Distances for Signal Classification by Power Spectral Density + `_ + Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, + 2013, 7, pp. 655-669 + """ + _check_inputs(A, B) + same_data = B is A + if same_data: + return 0 + + d = np.sqrt(np.trace(A) + np.trace(B) - 2 * np.trace(np.dot(sqrtm(A), sqrtm(B)))) + + return d + + +def riemannian_distance_3( + A: np.ndarray, + B: np.ndarray, + squared: bool = False, +) -> float: + r"""Affine-invariant Riemannian distance between SPD/HPD matrices. + + A direct call to method ``pyriemann.utils.distance.distance_riemann`` [1]_. + + The affine-invariant Riemannian distance between two SPD/HPD matrices + :math:`\mathbf{A}` and :math:`\mathbf{B}` is [2]_: + + .. math:: + d(\mathbf{A},\mathbf{B}) = + {\left( \sum_i \log(\lambda_i)^2 \right)}^{1/2} + + where :math:`\lambda_i` are the joint eigenvalues of :math:`\mathbf{A}` and + :math:`\mathbf{B}`. + + Parameters + ---------- + A : ndarray, shape (..., n, n) + First SPD/HPD matrices, 2D ndarray. + B : ndarray, shape (..., n, n) + Second SPD/HPD matrices, same dimensions as A. + squared : bool, default False + Return squared distance. + + Returns + ------- + d : float or ndarray, shape (...,) + Affine-invariant Riemannian distance between A and B. + + References + ---------- + # noqa: E501 + .. [1] https://pyriemann.readthedocs.io/en/latest/generated/pyriemann.utils.distance.distance_riemann.html + .. [2] `A differential geometric approach to the geometric mean of + symmetric positive-definite matrices + `_ + M. Moakher. SIAM J Matrix Anal Appl, 2005, 26 (3), pp. 735-747 + """ + return distance_riemann(A, B, squared) + + +def weighted_riemannian_distance_1( + A: np.ndarray, + B: np.ndarray, + W: np.ndarray, +) -> float: + r"""Compute the weighted Reimannian distance between two SPD/HPD matrices. + + SPD: Symmetric Positive Definite + HPD: Hermitian Positive Definite + + The first type of weighted Riemannian distance between two SPD/HPD matrices + :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: + + .. math:: + d_{W1}(\mathbf{A}, \mathbf{B}, \mathbf{W}) = + \sqrt{\text{tr} \mathbf{W} \mathbf{A} + + \text{tr} \mathbf{W} \mathbf{B} - + 2 \text{tr} \mathbf{A}^{1/2} \mathbf{W} \mathbf{B} \mathbf{W} \mathbf{A}^{1/2}} + + where :math:`\mathbf{W}` is the weight matrix of :math:`\mathbf{A}` and + :math:`\mathbf{B}`. + + Parameters + ---------- + A : np.ndarray + First SPD/HPD matrices, 2D ndarray. + B : np.ndarray + Second SPD/HPD matrices, same dimensions as A. + W : np.ndarray + Weight matrix. + + Returns + ------- + d : float + Weighted Riemannian distance between A and B. + + References + ---------- + .. [1] `Riemannian Distances for Signal Classification by Power Spectral Density + `_ + Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, + 2013, 7, pp. 655-669 + """ + _check_inputs(A, B, W) + same_data = B is A + if same_data: + return 0 + + B_sqrt = sqrtm(B) + term1 = np.trace(np.dot(W, A)) + term2 = np.trace(np.dot(W, B)) + term3 = np.dot(np.dot(B_sqrt, W), np.dot(A, np.dot(W, B_sqrt))) + term3_ = np.trace(sqrtm(term3)) + d_W = np.sqrt(term1 + term2 - 2 * term3_) + + return d_W + + +def weighted_riemannian_distance_2( + A: np.ndarray, + B: np.ndarray, + W: np.ndarray, +) -> float: + r"""Compute the weighted Reimannian distance between two SPD/HPD matrices. + + SPD: Symmetric Positive Definite + HPD: Hermitian Positive Definite + + The second type of weighted Riemannian distance between two SPD/HPD matrices + :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: + + .. math:: + d_{W2}(\mathbf{A}, \mathbf{B}, \mathbf{W}) = + \sqrt{\text{tr} \mathbf{W} \mathbf{A} + + \text{tr} \mathbf{W} \mathbf{B} - + \text{tr} \mathbf{W} \mathbf{A}^{1/2} \mathbf{B}^{1/2} - + \text{tr} \mathbf{W} \mathbf{B}^{1/2} \mathbf{A}^{1/2}} + + where :math:`\mathbf{W}` is the weight matrix of :math:`\mathbf{A}` and + :math:`\mathbf{B}`. + + Parameters + ---------- + A : np.ndarray + First SPD/HPD matrices, 2D ndarray. + B : np.ndarray + Second SPD/HPD matrices, same dimensions as A. + W : np.ndarray + Weight matrix. + + Returns + ------- + d : float + Weighted Riemannian distance between A and B. + + References + ---------- + .. [1] `Riemannian Distances for Signal Classification by Power Spectral Density + `_ + Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, + 2013, 7, pp. 655-669 + """ + _check_inputs(A, B, W) + same_data = B is A + if same_data: + return 0 + + B_sqrt = sqrtm(B) + A_sqrt = sqrtm(A) + term1 = np.trace(np.dot(W, A)) + term2 = np.trace(np.dot(W, B)) + term3 = np.trace(np.dot(np.dot(W, A_sqrt), B_sqrt)) + term4 = np.trace(np.dot(np.dot(W, B_sqrt), A_sqrt)) + d_W = np.sqrt(term1 + term2 - term3 - term4) + + return d_W diff --git a/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py b/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py new file mode 100644 index 0000000..e45fc2d --- /dev/null +++ b/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py @@ -0,0 +1,74 @@ +import numpy as np +import pytest + +from aeon_neuro._wip.distances._riemannian_matrix import ( + _check_inputs, + _is_hpd, + riemannian_distance_1, + riemannian_distance_2, + riemannian_distance_3, + weighted_riemannian_distance_1, + weighted_riemannian_distance_2, +) + +A = np.array([[2, -1j], [1j, 2]]) + +B = np.array([[3, 0], [0, 3]]) + +W = np.array([[1, 0], [0, 1]]) + +C = np.array([[1, 2], [3, 4]]) + + +def test_is_hpd(): + assert _is_hpd(A) is True + assert _is_hpd(B) is True + assert _is_hpd(C) is False + + +def test_check_inputs(): + with pytest.raises(ValueError): + _check_inputs(A, C) + with pytest.raises(ValueError): + _check_inputs(A, np.array([1, 2])) + with pytest.raises(ValueError): + _check_inputs(A, np.array([[1, 2]])) + with pytest.raises(ValueError): + _check_inputs(A, B, np.array([1, 2])) + with pytest.raises(ValueError): + _check_inputs(A, B, np.array([[1, 2]])) + with pytest.raises(ValueError): + _check_inputs(A, B, C) + + _check_inputs(A, B) + _check_inputs(A, B, W) + + +def test_riemannian_distance_1(): + d = riemannian_distance_1(A, B) + assert d >= 0 + assert np.isclose(d, 0.732, atol=0.001) + + +def test_riemannian_distance_2(): + d = riemannian_distance_2(A, B) + assert d >= 0 + assert np.isclose(d, 0.732, atol=0.001) + + +def test_riemannian_distance_3(): + d = riemannian_distance_3(A, B) + assert d >= 0 + assert np.isclose(d, 1.099, atol=0.001) + + +def test_weighted_riemannian_distance_1(): + d_W = weighted_riemannian_distance_1(A, B, W) + assert d_W >= 0 + assert np.isclose(d_W, 0.732, atol=0.001) + + +def test_weighted_riemannian_distance_2(): + d_W = weighted_riemannian_distance_2(A, B, W) + assert d_W >= 0 + assert np.isclose(d_W, 0.732, atol=0.001) diff --git a/examples/_wip/distances/riemannian_matrix.ipynb b/examples/_wip/distances/riemannian_matrix.ipynb new file mode 100644 index 0000000..73defeb --- /dev/null +++ b/examples/_wip/distances/riemannian_matrix.ipynb @@ -0,0 +1,227 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Riemannian Distance between matrices in aeon-neuro\n", + "\n", + "The measurement of the distance between matrices is a crucial step \n", + "in analyzing the disparity between spectrograms, \n", + "and the Riemannian distance effectively reflects this difference. \n", + "\n", + "We have implemented two different settings for calculating the Riemannian distance, \n", + "along with their respective weighted versions according to [1], \n", + "and wrapped a Affine-invariant Riemannian distance calculation from PyRiemannian." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Conditions for the input matrix\n", + "\n", + "The input matrices should be SPD (Symmetric Positive Definite) or HPD (Hermitian Positive Definite).\n", + "\n", + "### SPD: Symmetric Positive Definite\n", + "\n", + "1. **Symmetric**: A matrix $A$ is symmetric if it is equal to its transpose, i.e., $A = A^T$. This means that the matrix is mirrored along its diagonal, so $A[i, j] = A[j, i]$ for all $i$ and $j$.\n", + "\n", + "2. **Positive Definite**: A matrix $A$ is positive definite if for any non-zero vector $x$, the quadratic form $x^T A x$ is positive, i.e., $x^T A x > 0$.\n", + "\n", + "A Symmetric Positive Definite (SPD) matrix is a real square matrix that is both symmetric and positive definite.\n", + "\n", + "### HPD: Hermitian Positive Definite\n", + "\n", + "1. **Hermitian**: A matrix $A$ is Hermitian if it is equal to its conjugate transpose, i.e., $A = A^*$. For a complex matrix, this means $A[i, j] = \\overline{A[j, i]}$, where $\\overline{A[j, i]}$ denotes the complex conjugate of $A[j, i]$. For real matrices, Hermitian matrices are simply symmetric matrices.\n", + "\n", + "2. **Positive Definite**: Similar to the SPD case, a matrix $A$ is positive definite if for any non-zero vector $x$, the quadratic form $x^* A x$ is positive, i.e., $x^* A x > 0$, where $x^*$ is the conjugate transpose of $x$.\n", + "\n", + "A Hermitian Positive Definite (HPD) matrix is a complex square matrix that is both Hermitian and positive definite." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The first type of Riemannian distance\n", + "\n", + "$$\n", + "d_{R1}(\\mathbf{A}, \\mathbf{B}) =\n", + " \\sqrt{\\text{tr} \\mathbf{A} +\n", + " \\text{tr} \\mathbf{B} -\n", + " 2 \\text{tr} \\left[(\\mathbf{A} \\mathbf{B})^{1/2}\\right]}\n", + "$$\n", + "weighted version:\n", + "\n", + "$$\n", + "d_{W1}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W}) =\n", + " \\sqrt{\\text{tr} \\mathbf{W} \\mathbf{A} +\n", + " \\text{tr} \\mathbf{W} \\mathbf{B} -\n", + " 2 \\text{tr} \\mathbf{A}^{1/2} \\mathbf{W} \\mathbf{B} \\mathbf{W} \\mathbf{A}^{1/2}}\n", + "$$" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Reimannian distance = (0.7320508075688744+0j)\n", + " weighted Reimannian distance = (0.7320508075688829+0j)\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "\n", + "from aeon_neuro._wip.distances._riemannian_matrix import (\n", + " riemannian_distance_1,\n", + " weighted_riemannian_distance_1,\n", + ")\n", + "\n", + "A = np.array([[2, -1j], [1j, 2]])\n", + "B = np.array([[3, 0], [0, 3]])\n", + "W = np.array([[1, 0], [0, 1]])\n", + "\n", + "dR1 = riemannian_distance_1(A, B)\n", + "dW1 = weighted_riemannian_distance_1(A, B, W)\n", + "\n", + "print(f\" Reimannian distance = {dR1}\\n weighted Reimannian distance = {dW1}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## The second type of Riemannian distance\n", + "\n", + "$$\n", + "d_{R2}(\\mathbf{A}, \\mathbf{B}) =\n", + " \\sqrt{\\text{tr} \\mathbf{A} +\n", + " \\text{tr} \\mathbf{B} -\n", + " 2 \\text{tr} \\left[\\mathbf{A}^{1/2} \\mathbf{B}^{1/2}\\right]}\n", + "$$\n", + "weighted version:\n", + "\n", + "$$\n", + "d_{W2}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W}) =\n", + " \\sqrt{\\text{tr} \\mathbf{W} \\mathbf{A} +\n", + " \\text{tr} \\mathbf{W} \\mathbf{B} -\n", + " \\text{tr} \\mathbf{W} \\mathbf{A}^{1/2} \\mathbf{B}^{1/2} -\n", + " \\text{tr} \\mathbf{W} \\mathbf{B}^{1/2} \\mathbf{A}^{1/2}}\n", + "$$" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Reimannian distance = (0.7320508075688816+0j)\n", + " weighted Reimannian distance = (0.7320508075688816+0j)\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "\n", + "from aeon_neuro._wip.distances._riemannian_matrix import (\n", + " riemannian_distance_2,\n", + " weighted_riemannian_distance_2,\n", + ")\n", + "\n", + "A = np.array([[2, -1j], [1j, 2]])\n", + "B = np.array([[3, 0], [0, 3]])\n", + "W = np.array([[1, 0], [0, 1]])\n", + "\n", + "dR2 = riemannian_distance_2(A, B)\n", + "dW2 = weighted_riemannian_distance_2(A, B, W)\n", + "\n", + "print(f\" Reimannian distance = {dR2}\\n weighted Reimannian distance = {dW2}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Affine-invariant Riemannian distance\n", + "\n", + "A direct call to method ``pyriemann.utils.distance.distance_riemann``.\n", + "$$\n", + " d(\\mathbf{A},\\mathbf{B}) =\n", + " {\\left( \\sum_i \\log(\\lambda_i)^2 \\right)}^{1/2}\n", + "$$\n", + "\n", + "See [pyriemann document](https://pyriemann.readthedocs.io/en/latest/generated/pyriemann.utils.distance.distance_riemann.html) for more usage details." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Affine-invariant Reimannian distance = 1.0986122886681096\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "\n", + "from aeon_neuro._wip.distances._riemannian_matrix import riemannian_distance_3\n", + "\n", + "A = np.array([[2, -1j], [1j, 2]])\n", + "B = np.array([[3, 0], [0, 3]])\n", + "W = np.array([[1, 0], [0, 1]])\n", + "\n", + "dR3 = riemannian_distance_3(A, B)\n", + "\n", + "print(f\" Affine-invariant Reimannian distance = {dR3}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# References\n", + "\n", + "[1] Li, Y., & Wong, K.M. [Riemannian Distances for Signal Classification by Power Spectral Density](https://ieeexplore.ieee.org/document/6509394), IEEE Journal of Selected Topics in Signal Processing, 7, pp. 655-669, 2013.\n", + "\n", + "[2] https://pyriemann.readthedocs.io/en/latest/generated/pyriemann.utils.distance.distance_riemann.html" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "aeon_neuro", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From a94ee22bec53917d12228fd5dd9ee59526a0f7cc Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Wed, 21 Aug 2024 10:56:41 -0700 Subject: [PATCH 05/14] fix pre-commit errors --- aeon_neuro/_wip/distances/_riemannian.py | 28 +++++++++++-------- aeon_neuro/_wip/distances/tests/__init__.py | 0 .../transformations/series/_power_spectrum.py | 4 ++- 3 files changed, 20 insertions(+), 12 deletions(-) create mode 100644 aeon_neuro/_wip/distances/tests/__init__.py diff --git a/aeon_neuro/_wip/distances/_riemannian.py b/aeon_neuro/_wip/distances/_riemannian.py index f635e42..8d56733 100644 --- a/aeon_neuro/_wip/distances/_riemannian.py +++ b/aeon_neuro/_wip/distances/_riemannian.py @@ -1,8 +1,11 @@ +"""Implement as a distance function here.""" -import numpy as np -from scipy.linalg import logm,eig import math -"""Implement as a distance function here.""" + +import numpy as np +from scipy.linalg import eig, logm + + def riemannian_distance_a( x: np.ndarray, y: np.ndarray, @@ -23,18 +26,20 @@ def riemannian_distance_a( x_trace = np.trace(x_cov) y_trace = np.trace(y_cov) x_root = logm(x_cov) - c_temp = logm(np.matmul(x_root,np.matmul(y_cov,x_root))) - c = 2*np.trace(c_temp) - return math.sqrt(x_trace+y_trace-c) - + c_temp = logm(np.matmul(x_root, np.matmul(y_cov, x_root))) + c = 2 * np.trace(c_temp) + return math.sqrt(x_trace + y_trace - c) + def riemannian_distance_b( x: np.ndarray, y: np.ndarray, ) -> float: r"""Compute the Reimannian distance between two time series. + https://hal.science/hal-00820475/document https://hal.science/hal-00602700/document + Parameters ---------- x : np.ndarray @@ -46,12 +51,13 @@ def riemannian_distance_b( """ x_cov = np.cov(x) y_cov = np.cov(y) - eigenvalues = eig(x_cov,y_cov) - distance=0 + eigenvalues = eig(x_cov, y_cov) + distance = 0 for i in eigenvalues[0]: - distance+=math.log(np.real(i))**2 + distance += math.log(np.real(i)) ** 2 return math.sqrt(distance) + def riemannian_distance_c( x: np.ndarray, y: np.ndarray, @@ -70,5 +76,5 @@ def riemannian_distance_c( x_cov = np.cov(x) y_cov = np.cov(y) x_inv_root = logm(np.linalg.inv(x_cov)) - distance = logm(np.matmul(x_inv_root,np.matmul(y_cov,x_inv_root))) + distance = logm(np.matmul(x_inv_root, np.matmul(y_cov, x_inv_root))) return np.linalg.norm(distance) diff --git a/aeon_neuro/_wip/distances/tests/__init__.py b/aeon_neuro/_wip/distances/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/transformations/series/_power_spectrum.py b/aeon_neuro/_wip/transformations/series/_power_spectrum.py index 217a10d..a5c5d73 100644 --- a/aeon_neuro/_wip/transformations/series/_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/_power_spectrum.py @@ -3,20 +3,22 @@ from aeon.transformations.series.base import BaseSeriesTransformer - class PowerSpectrumMatrix(BaseSeriesTransformer): """Implement here.""" + _tags = { "X_inner_type": "np.ndarray", "capability:multivariate": True, "fit_is_empty": True, } + def __init__( self, n_lags=None, ): self.n_lags = n_lags super().__init__(axis=1) + def _transform(self, X, y=None): """Transform X and return a transformed version. From 8a5ef17ee804a46573e22459ad9bc070693a82b9 Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Wed, 21 Aug 2024 11:10:49 -0700 Subject: [PATCH 06/14] fix run-notebook-examples error, add pyriemann as optional dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index a70d95e..39adf3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ dependencies = [ [project.optional-dependencies] all_extras = [ "aeon[all_extras]", + "pyriemann>=0.6" ] dev = [ "aeon[dev]", From 715ee9a115c872299d644f0f590a383ab3954052 Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Tue, 10 Sep 2024 09:03:08 -0700 Subject: [PATCH 07/14] add CrossSpectralMatrix class and tests --- .../transformations/series/_power_spectrum.py | 68 ++++++++++++++++--- .../transformations/series/tests/__init__.py | 0 .../series/tests/test_power_spectrum.py | 31 +++++++++ 3 files changed, 89 insertions(+), 10 deletions(-) create mode 100644 aeon_neuro/_wip/transformations/series/tests/__init__.py create mode 100644 aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py diff --git a/aeon_neuro/_wip/transformations/series/_power_spectrum.py b/aeon_neuro/_wip/transformations/series/_power_spectrum.py index a5c5d73..d1983c8 100644 --- a/aeon_neuro/_wip/transformations/series/_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/_power_spectrum.py @@ -1,10 +1,39 @@ -"""Power spectrum matrix transform, may call it something else.""" +"""Cross spectral matrix transformer.""" from aeon.transformations.series.base import BaseSeriesTransformer +from mne.time_frequency.csd import _csd_fourier, _vector_to_sym_mat +from scipy.fft import rfftfreq -class PowerSpectrumMatrix(BaseSeriesTransformer): - """Implement here.""" +class CrossSpectralMatrix(BaseSeriesTransformer): + """Cross spectral (density) matrix transformer. + + A matrix representing the pairwise covariances between channels + in the frequency domain -- i.e. between power spectral densities. + Matrices are estimated per frequency using the short-time fourier algorithm, + then averages over the specified frequency range (by default 0-60Hz for EEG). + + Parameters + ---------- + sfreq : int or float, optional + Sampling frequency in Hz, by default 120. + fmin : int or float, optional + Minimum frequency of interest in Hz, by default 0. + fmax : int or float, optional + Maximum frequency of interest in Hz, by default 60. + + Examples + -------- + >>> from aeon_neuro._wip.transformations.series._power_spectrum import ( + ... CrossSpectralMatrix) + >>> import numpy as np + >>> n_channels, n_timepoints = 5, 360 + >>> X = np.random.standard_normal(size=(n_channels, n_timepoints)) + >>> transformer = CrossSpectralMatrix() + >>> X_transformed = transformer.fit_transform(X) + >>> X_transformed.shape == (n_channels, n_channels) + True + """ _tags = { "X_inner_type": "np.ndarray", @@ -12,12 +41,11 @@ class PowerSpectrumMatrix(BaseSeriesTransformer): "fit_is_empty": True, } - def __init__( - self, - n_lags=None, - ): - self.n_lags = n_lags - super().__init__(axis=1) + def __init__(self, sfreq=120, fmin=0, fmax=60): + self.sfreq = sfreq + self.fmin = fmin + self.fmax = fmax + super().__init__(axis=1) # (n_channels, n_timepoints) def _transform(self, X, y=None): """Transform X and return a transformed version. @@ -35,4 +63,24 @@ def _transform(self, X, y=None): ------- transformed version of X """ - return X + # frequency mask + n_timepoints = X.shape[-1] + n_fft = n_timepoints + freq_fft = rfftfreq(n_fft, 1.0 / self.sfreq) + freq_mask = (freq_fft > 0) & (freq_fft >= self.fmin) & (freq_fft <= self.fmax) + + if freq_mask.sum() == 0: + raise ValueError( + f"FFT cannot be computed for sfreq={self.sfreq} \ + within the frequency range={self.fmin}-{self.fmax}. \ + Please increase the frequency range." + ) + + # CSD matrix by frequency (stored as vectors) + X_vector = _csd_fourier( + X, sfreq=self.sfreq, n_times=n_timepoints, freq_mask=freq_mask, n_fft=n_fft + ) + # average over frequencies + # convert vector to symmetric matrix of shape (n_channels, n_channels) + X_matrix = _vector_to_sym_mat(X_vector.mean(axis=1)) + return X_matrix diff --git a/aeon_neuro/_wip/transformations/series/tests/__init__.py b/aeon_neuro/_wip/transformations/series/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py new file mode 100644 index 0000000..e7150fe --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py @@ -0,0 +1,31 @@ +"""Tests for cross spectral matrix transform.""" + +import numpy as np +import pytest + +from aeon_neuro._wip.distances._riemannian_matrix import _is_hpd +from aeon_neuro._wip.transformations.series._power_spectrum import CrossSpectralMatrix + +n_channels, n_timepoints = 7, 12000 +rng = np.random.default_rng(seed=0) + + +def test_transform(): + """Test cross spectral matrix transformer.""" + transformer = CrossSpectralMatrix() + X = rng.standard_normal(size=(n_channels, n_timepoints)) + X_transformed = transformer.fit_transform(X) + + assert X_transformed.shape == (n_channels, n_channels) # expected shape + assert _is_hpd(X_transformed) # Hermitian Positive Definite + + +def test_value_errors(): + """Test cross spectral matrix errors.""" + transformer = CrossSpectralMatrix() + X = rng.standard_normal(size=(n_channels, n_timepoints)) + transformer.fit_transform(X) + + with pytest.raises(ValueError, match="FFT cannot be computed for sfreq=.*"): + transformer.set_params(fmin=100, fmax=110) + transformer.fit_transform(X) From db6d64c5b4e16827b912133943314e7784c31434 Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Thu, 12 Sep 2024 17:34:48 -0700 Subject: [PATCH 08/14] edit docstring --- .../_wip/transformations/series/_power_spectrum.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/aeon_neuro/_wip/transformations/series/_power_spectrum.py b/aeon_neuro/_wip/transformations/series/_power_spectrum.py index d1983c8..b55277e 100644 --- a/aeon_neuro/_wip/transformations/series/_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/_power_spectrum.py @@ -8,10 +8,12 @@ class CrossSpectralMatrix(BaseSeriesTransformer): """Cross spectral (density) matrix transformer. - A matrix representing the pairwise covariances between channels - in the frequency domain -- i.e. between power spectral densities. - Matrices are estimated per frequency using the short-time fourier algorithm, - then averages over the specified frequency range (by default 0-60Hz for EEG). + Estimate the pairwise cross spectral matrix between channels in the frequency + domain -- i.e., between power spectral densities. The result is a hermitian + positive definite (HPD) complex-valued matrix of shape (n_channels, n_channels). + Matrices are computed as a function of frequency using the short-time fourier + algorithm, then averaged over the specified frequency range + (by default 0-60Hz for EEG). Parameters ---------- @@ -33,6 +35,8 @@ class CrossSpectralMatrix(BaseSeriesTransformer): >>> X_transformed = transformer.fit_transform(X) >>> X_transformed.shape == (n_channels, n_channels) True + >>> np.iscomplexobj(X_transformed) + True """ _tags = { From 49df46bcb970ca75b418e2aeb4e616c59aa24a4c Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Thu, 12 Sep 2024 17:35:07 -0700 Subject: [PATCH 09/14] add CovarianceMatrix class and tests --- .../transformations/series/_covariance.py | 64 +++++++++++++++++++ .../series/tests/test_covariance.py | 27 ++++++++ 2 files changed, 91 insertions(+) create mode 100644 aeon_neuro/_wip/transformations/series/_covariance.py create mode 100644 aeon_neuro/_wip/transformations/series/tests/test_covariance.py diff --git a/aeon_neuro/_wip/transformations/series/_covariance.py b/aeon_neuro/_wip/transformations/series/_covariance.py new file mode 100644 index 0000000..787e783 --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/_covariance.py @@ -0,0 +1,64 @@ +"""Covariance matrix transformer.""" + +from aeon.transformations.series.base import BaseSeriesTransformer + + +class CovarianceMatrix(BaseSeriesTransformer): + """Covariance matrix transformer. + + Estimate the (unbiased) pairwise covariance matrix between channels in the + time domain. The result is a positive semidefinite (PSD) real-valued matrix + of shape (n_channels, n_channels). + + Parameters + ---------- + correlation : bool, optional + Whether to estimate the pairwise correlation (standardized covariance) matrix, + by default False + + Examples + -------- + >>> from aeon_neuro._wip.transformations.series._covariance import ( + ... CovarianceMatrix) + >>> import numpy as np + >>> n_channels, n_timepoints = 5, 360 + >>> X = np.random.standard_normal(size=(n_channels, n_timepoints)) + >>> transformer = CovarianceMatrix() + >>> X_transformed = transformer.fit_transform(X) + >>> X_transformed.shape == (n_channels, n_channels) + True + >>> np.iscomplexobj(X_transformed) + False + """ + + _tags = { + "X_inner_type": "np.ndarray", + "capability:multivariate": True, + "fit_is_empty": True, + } + + def __init__(self, correlation=False): + self.correlation = correlation + super().__init__(axis=1) # (n_channels, n_timepoints) + + def _transform(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : np.ndarray + Data to be transformed, shape (n_channels, n_timepoints) + y : ignored argument for interface compatibility + Additional data, e.g., labels for transformation + + Returns + ------- + transformed version of X + """ + n_timepoints = X.shape[-1] + X_centered = X - X.mean(axis=1, keepdims=True) + if self.correlation: + X_centered /= X.std(axis=1, keepdims=True) + return (X_centered @ X_centered.T) / (n_timepoints - 1) diff --git a/aeon_neuro/_wip/transformations/series/tests/test_covariance.py b/aeon_neuro/_wip/transformations/series/tests/test_covariance.py new file mode 100644 index 0000000..e7dc1a8 --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/tests/test_covariance.py @@ -0,0 +1,27 @@ +"""Tests for covariance matrix transform.""" + +import numpy as np + +from aeon_neuro._wip.transformations.series._covariance import CovarianceMatrix + +n_channels, n_timepoints = 7, 12000 +rng = np.random.default_rng(seed=0) + + +def test_transform(): + """Test covariance matrix transformer.""" + transformer = CovarianceMatrix() + X = rng.normal(scale=2, size=(n_channels, n_timepoints)) + X_transformed = transformer.fit_transform(X) + + # test expected shape + assert X_transformed.shape == (n_channels, n_channels) + + # test positive semi-definite + evals = np.linalg.eigvals(X_transformed) + assert np.all(evals >= 0) + + # test correlation matrix = identity matrix + transformer.set_params(correlation=True) + X_transformed = transformer.fit_transform(X) + np.testing.assert_allclose(X_transformed, np.eye(n_channels), atol=0.02) From 6d043077cca7a9822c856da4af2980ef5a3ca98a Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Fri, 13 Sep 2024 14:25:55 -0700 Subject: [PATCH 10/14] add argument to return the magnitude of the cross spectra --- .../transformations/series/_power_spectrum.py | 17 +++++++++++++++-- .../series/tests/test_power_spectrum.py | 5 +++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/aeon_neuro/_wip/transformations/series/_power_spectrum.py b/aeon_neuro/_wip/transformations/series/_power_spectrum.py index b55277e..6548a85 100644 --- a/aeon_neuro/_wip/transformations/series/_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/_power_spectrum.py @@ -1,5 +1,6 @@ """Cross spectral matrix transformer.""" +import numpy as np from aeon.transformations.series.base import BaseSeriesTransformer from mne.time_frequency.csd import _csd_fourier, _vector_to_sym_mat from scipy.fft import rfftfreq @@ -11,6 +12,7 @@ class CrossSpectralMatrix(BaseSeriesTransformer): Estimate the pairwise cross spectral matrix between channels in the frequency domain -- i.e., between power spectral densities. The result is a hermitian positive definite (HPD) complex-valued matrix of shape (n_channels, n_channels). + The magnitude of the HPD matrix is real-valued. Matrices are computed as a function of frequency using the short-time fourier algorithm, then averaged over the specified frequency range (by default 0-60Hz for EEG). @@ -23,6 +25,9 @@ class CrossSpectralMatrix(BaseSeriesTransformer): Minimum frequency of interest in Hz, by default 0. fmax : int or float, optional Maximum frequency of interest in Hz, by default 60. + magnitude : bool, optional + If True, return the magnitude of the cross spectral matrix (real-valued). + If False, return the complex-valued matrix, by default False. Examples -------- @@ -37,6 +42,9 @@ class CrossSpectralMatrix(BaseSeriesTransformer): True >>> np.iscomplexobj(X_transformed) True + >>> X_transformed = transformer.set_params(magnitude=True).fit_transform(X) + >>> np.iscomplexobj(X_transformed) + False """ _tags = { @@ -45,10 +53,11 @@ class CrossSpectralMatrix(BaseSeriesTransformer): "fit_is_empty": True, } - def __init__(self, sfreq=120, fmin=0, fmax=60): + def __init__(self, sfreq=120, fmin=0, fmax=60, magnitude=False): self.sfreq = sfreq self.fmin = fmin self.fmax = fmax + self.magnitude = magnitude super().__init__(axis=1) # (n_channels, n_timepoints) def _transform(self, X, y=None): @@ -87,4 +96,8 @@ def _transform(self, X, y=None): # average over frequencies # convert vector to symmetric matrix of shape (n_channels, n_channels) X_matrix = _vector_to_sym_mat(X_vector.mean(axis=1)) - return X_matrix + + if self.magnitude: + return np.abs(X_matrix) + else: + return X_matrix diff --git a/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py index e7150fe..d065667 100644 --- a/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py @@ -19,6 +19,11 @@ def test_transform(): assert X_transformed.shape == (n_channels, n_channels) # expected shape assert _is_hpd(X_transformed) # Hermitian Positive Definite + # test positive semi-definite + X_transformed = transformer.set_params(magnitude=True).fit_transform(X) + evals = np.linalg.eigvals(X_transformed) + assert np.all(evals >= 0) + def test_value_errors(): """Test cross spectral matrix errors.""" From c3b3e6fdb932f6c00f0f4c4ed9087204bea501bc Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Fri, 13 Sep 2024 14:27:18 -0700 Subject: [PATCH 11/14] use biased estimator, so diagonals of correlation = 1 --- aeon_neuro/_wip/transformations/series/_covariance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aeon_neuro/_wip/transformations/series/_covariance.py b/aeon_neuro/_wip/transformations/series/_covariance.py index 787e783..e2e4357 100644 --- a/aeon_neuro/_wip/transformations/series/_covariance.py +++ b/aeon_neuro/_wip/transformations/series/_covariance.py @@ -6,7 +6,7 @@ class CovarianceMatrix(BaseSeriesTransformer): """Covariance matrix transformer. - Estimate the (unbiased) pairwise covariance matrix between channels in the + Estimate the pairwise covariance matrix between channels in the time domain. The result is a positive semidefinite (PSD) real-valued matrix of shape (n_channels, n_channels). @@ -61,4 +61,4 @@ def _transform(self, X, y=None): X_centered = X - X.mean(axis=1, keepdims=True) if self.correlation: X_centered /= X.std(axis=1, keepdims=True) - return (X_centered @ X_centered.T) / (n_timepoints - 1) + return (X_centered @ X_centered.T) / (n_timepoints) From 07093626989abd96a8dc430b1a7de1f6ed66c9b5 Mon Sep 17 00:00:00 2001 From: Gabriel Riegner Date: Sat, 14 Sep 2024 15:36:56 -0700 Subject: [PATCH 12/14] add examples of riemannian classifiers --- .../distances/riemannian_classification.ipynb | 203 ++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 examples/_wip/distances/riemannian_classification.ipynb diff --git a/examples/_wip/distances/riemannian_classification.ipynb b/examples/_wip/distances/riemannian_classification.ipynb new file mode 100644 index 0000000..c78ef5b --- /dev/null +++ b/examples/_wip/distances/riemannian_classification.ipynb @@ -0,0 +1,203 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Comparison of Riemannian Classifiers\n", + "\n", + "Riemannian geometry offers methods for extracting features for classification from multichannel EEG signals, by treating functional connectivity matrices as points on a Riemannian manifold - a smooth, curved space where distances between points are defined by the geometry of the manifold rather than the linear geometry of Euclidean space. We demonstrate how to estimate pairwise connectivity matrices between EEG channels in both the time (covariance) and frequency (cross-spectral density) domains. Importantly, both forms are symmetric/hermitian positive definite and reside on a Riemannian manifold. Using Riemannian distances between matrices, we compare a number of classification algorithms from the [pyRiemann](https://pyriemann.readthedocs.io/en/latest/index.html) package." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/griegner/git-repositories/aeon-neuro/.venv/lib/python3.12/site-packages/aeon/base/__init__.py:24: FutureWarning: The aeon package will soon be releasing v1.0.0 with the removal of legacy modules and interfaces such as BaseTransformer and BaseForecaster. This will contain breaking changes. See aeon-toolkit.org for more information. Set aeon.AEON_DEPRECATION_WARNING or the AEON_DEPRECATION_WARNING environmental variable to 'False' to disable this warning.\n", + " warnings.warn(\n", + "OMP: Info #276: omp_set_nested routine deprecated, please use omp_set_max_active_levels instead.\n" + ] + } + ], + "source": [ + "import numpy as np\n", + "from aeon.datasets import load_classification\n", + "from aeon.transformations.collection.base import BaseCollectionTransformer\n", + "from pyriemann.classification import (\n", + " MDM,\n", + " SVC,\n", + " FgMDM,\n", + " KNearestNeighbor,\n", + " MeanField,\n", + " TSclassifier,\n", + ")\n", + "from sklearn.pipeline import make_pipeline\n", + "\n", + "from aeon_neuro._wip.transformations.series._covariance import CovarianceMatrix\n", + "from aeon_neuro._wip.transformations.series._power_spectrum import CrossSpectralMatrix" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Load EEG Classification Dataset\n", + "\n", + "The KDD dataset segments an EEG series from a single subject into a collection of epochs with corresponding labels indicating whether the subject's hand is resting on a table (\"rest\") or raised (\"task\"). Some processing steps have been pre-applied: bandpass filtering between 0.5Hz-100Hz, downsampling to 100 timepoints, and applying a channel selection algorithm (see the [data loading example](../../data_loading.ipynb)). Both the training and testing sets have shape '(40_epochs, 4_channels, 100_timepoints)' with 20 epochs of each class." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "X shape: (40, 4, 100)\ty shape: (40,)\n" + ] + } + ], + "source": [ + "X_train, y_train = load_classification(\n", + " name=\"KDD_MTSC\", split=\"TRAIN\", extract_path=\"../aeon_neuro/data/KDD_Examples\"\n", + ")\n", + "\n", + "X_test, y_test = load_classification(\n", + " name=\"KDD_MTSC\", split=\"TEST\", extract_path=\"../aeon_neuro/data/KDD_Examples\"\n", + ")\n", + "\n", + "print(f\"X shape: {X_train.shape}\\ty shape: {y_train.shape}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Pipeline of Transformers and Classifiers\n", + "\n", + "The transformers take a collection of multivariate series of '(n_epochs, n_channels, n_timepoints)' and output functional connectivity matrices for each epoch, '(n_epochs, n_channels, n_channels)'. Specifically, we estimate covariance and cross-spectral density matrices. Since some of the classifiers used are only defined on real-valued domains, so we compute the magnitude of the cross-spectral densities, which takes the absolute value of the real part of the complex entries of the matrix. The pipeline first applies these transformers, then classifies the resulting matrices using Riemannian means and distances between matrices. The following classifiers are compared (see [pyRiemann documentation](https://pyriemann.readthedocs.io/en/latest/index.html)):\n", + "\n", + "1. minimum distance to mean\n", + "2. minimum distance to mean with geodesic smoothing\n", + "3. tangent space embedding\n", + "4. k-nearest neighbors\n", + "5. support-vector machine\n", + "6. minimum distance to mean field" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "class SeriesToCollectionWrapper(BaseCollectionTransformer):\n", + " \"\"\"Treat a SeriesTransformer as a CollectionTransformer.\n", + "\n", + " Parameters\n", + " ----------\n", + " transformer : SeriesTransformer\n", + " The transformer to wrap.\n", + " \"\"\"\n", + "\n", + " _tags = {\"capability:multivariate\": True}\n", + "\n", + " def __init__(self, transformer):\n", + " self.transformer = transformer\n", + " super().__init__()\n", + "\n", + " def _transform(self, X, y=None):\n", + " return np.array([self.transformer.fit_transform(x) for x in X])\n", + "\n", + "\n", + "# initialize transformers and classifiers\n", + "# use magnitude of the CSD bc many of the classifiers handle real-valued matrices\n", + "cov_transformer = SeriesToCollectionWrapper(CovarianceMatrix())\n", + "csd_transformer = SeriesToCollectionWrapper(\n", + " CrossSpectralMatrix(sfreq=112.5, magnitude=True)\n", + ")\n", + "\n", + "# all classifiers compare SPD matrices by a Riemannian distance measure\n", + "classifiers = {\n", + " \"minimum distance to mean\": MDM(),\n", + " \"geodesic filtering\": FgMDM(),\n", + " \"tangent space embedding\": TSclassifier(),\n", + " \"k-nearest neighbors\": KNearestNeighbor(),\n", + " \"support-vector machine\": SVC(),\n", + " \"minimum distance to mean field\": MeanField(),\n", + "}\n", + "\n", + "pipelines = {}\n", + "for name, clf in classifiers.items():\n", + " pipelines[f\"cov matrix + {name}\"] = make_pipeline(cov_transformer, clf)\n", + " pipelines[f\"csd matrix + {name}\"] = make_pipeline(csd_transformer, clf)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compare Classification Accuracies" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cov matrix + minimum distance to mean accuracy: 0.675\n", + "csd matrix + minimum distance to mean accuracy: 0.8\n", + "cov matrix + geodesic filtering accuracy: 0.8\n", + "csd matrix + geodesic filtering accuracy: 0.875\n", + "cov matrix + tangent space embedding accuracy: 0.7\n", + "csd matrix + tangent space embedding accuracy: 0.825\n", + "cov matrix + k-nearest neighbors accuracy: 0.875\n", + "csd matrix + k-nearest neighbors accuracy: 0.925\n", + "cov matrix + support-vector machine accuracy: 0.7\n", + "csd matrix + support-vector machine accuracy: 0.85\n", + "cov matrix + minimum distance to mean field accuracy: 0.675\n", + "csd matrix + minimum distance to mean field accuracy: 0.725\n" + ] + } + ], + "source": [ + "for name, pipeline in pipelines.items():\n", + " pipeline.fit(X_train, y_train)\n", + " y_pred = pipeline.predict(X_test)\n", + " print(f\"{name: <45} accuracy: {np.mean(y_pred == y_test)}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From bb402d36a6ce7e009bba1d1da2d7f462901da942 Mon Sep 17 00:00:00 2001 From: Futuer <57895730+futuer-szd@users.noreply.github.com> Date: Fri, 4 Oct 2024 18:31:08 +0800 Subject: [PATCH 13/14] 1. delete `input check`, store in local folder (#93) 2. fix formula fault in examples and \_riemannian\_matrix.py 3. add pairwise distance function Co-authored-by: futuer --- .../_wip/distances/_riemannian_matrix.py | 142 +++++++++++------- .../_wip/distances/riemannian_matrix.ipynb | 57 ++++--- 2 files changed, 121 insertions(+), 78 deletions(-) diff --git a/aeon_neuro/_wip/distances/_riemannian_matrix.py b/aeon_neuro/_wip/distances/_riemannian_matrix.py index b1c96d6..d6a8708 100644 --- a/aeon_neuro/_wip/distances/_riemannian_matrix.py +++ b/aeon_neuro/_wip/distances/_riemannian_matrix.py @@ -5,40 +5,6 @@ from scipy.linalg import sqrtm -def _is_hpd(matrix): - r"""Check if a matrix is Hermitian Positive Definite (HPD).""" - if not np.allclose(matrix, matrix.conj().T): - return False - try: - np.linalg.cholesky(matrix) - return True - except np.linalg.LinAlgError: - return False - - -def _check_inputs(A, B, W=None): - if W is None: - if not A.shape == B.shape: - raise ValueError("Inputs must have equal dimensions") - if A.ndim != 2 or B.ndim != 2: - raise ValueError("Inputs must be 2D ndarrays") - if not _is_hpd(A): - raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") - if not _is_hpd(B): - raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") - else: - if not A.shape == B.shape == W.shape: - raise ValueError("Inputs must have equal dimensions") - if A.ndim != 2 or B.ndim != 2 or W.ndim != 2: - raise ValueError("Inputs must be 2D ndarrays") - if not _is_hpd(A): - raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") - if not _is_hpd(B): - raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") - if not _is_hpd(W): - raise ValueError("Matrix W must be Hermitian Positive Definite (HPD)") - - def riemannian_distance_1( A: np.ndarray, B: np.ndarray, @@ -52,7 +18,7 @@ def riemannian_distance_1( :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: .. math:: - d_{R1}(\mathbf{A}, \mathbf{B}) = + d_{R_1}(\mathbf{A}, \mathbf{B}) = \sqrt{\text{tr} \mathbf{A} + \text{tr} \mathbf{B} - 2 \text{tr} \left[(\mathbf{A} \mathbf{B})^{1/2}\right]} @@ -76,7 +42,6 @@ def riemannian_distance_1( Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, 2013, 7, pp. 655-669 """ - _check_inputs(A, B) same_data = B is A if same_data: return 0 @@ -99,11 +64,11 @@ def riemannian_distance_2( :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: .. math:: - d_{R2}(\mathbf{A}, \mathbf{B}) = - \sqrt{\text{tr} \mathbf{A} + - \text{tr} \mathbf{B} - - 2 \text{tr} \left[\mathbf{A}^{1/2} \mathbf{B}^{1/2}\right]} - + d_{R_2}(\mathbf{A}, \mathbf{B}) = + \sqrt{\text{tr} \mathbf{A} + + \text{tr} \mathbf{B} - + 2 \text{tr} \left[\mathbf{A}^{1/2} \mathbf{B}^{1/2}\right]} + Parameters ---------- A : np.ndarray @@ -123,7 +88,6 @@ def riemannian_distance_2( Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, 2013, 7, pp. 655-669 """ - _check_inputs(A, B) same_data = B is A if same_data: return 0 @@ -192,10 +156,10 @@ def weighted_riemannian_distance_1( :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: .. math:: - d_{W1}(\mathbf{A}, \mathbf{B}, \mathbf{W}) = - \sqrt{\text{tr} \mathbf{W} \mathbf{A} + - \text{tr} \mathbf{W} \mathbf{B} - - 2 \text{tr} \mathbf{A}^{1/2} \mathbf{W} \mathbf{B} \mathbf{W} \mathbf{A}^{1/2}} + d_{R_1w}(\mathbf{A}, \mathbf{B}, \mathbf{W_1}) = + \sqrt{\text{tr}\left[ \mathbf{W_1} \mathbf{A}\right] + + \text{tr}\left[ \mathbf{W_1} \mathbf{B}\right] - + 2 \text{tr}\left[ (\mathbf{B}^{1/2} \mathbf{W_1} \mathbf{A} \mathbf{W_1} \mathbf{B}^{1/2})^{1/2}\right]} where :math:`\mathbf{W}` is the weight matrix of :math:`\mathbf{A}` and :math:`\mathbf{B}`. @@ -221,7 +185,6 @@ def weighted_riemannian_distance_1( Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, 2013, 7, pp. 655-669 """ - _check_inputs(A, B, W) same_data = B is A if same_data: return 0 @@ -250,11 +213,11 @@ def weighted_riemannian_distance_2( :math:`\mathbf{A}` and :math:`\mathbf{B}` is [1]_: .. math:: - d_{W2}(\mathbf{A}, \mathbf{B}, \mathbf{W}) = - \sqrt{\text{tr} \mathbf{W} \mathbf{A} + - \text{tr} \mathbf{W} \mathbf{B} - - \text{tr} \mathbf{W} \mathbf{A}^{1/2} \mathbf{B}^{1/2} - - \text{tr} \mathbf{W} \mathbf{B}^{1/2} \mathbf{A}^{1/2}} + d_{R_2w}(\mathbf{A}, \mathbf{B}, \mathbf{W_2}) = + \sqrt{\text{tr}\left[ \mathbf{W_2} \mathbf{A}\right] + + \text{tr}\left[ \mathbf{W_2} \mathbf{B}\right] - + \text{tr}\left[ \mathbf{W_2} \mathbf{A}^{1/2} \mathbf{B}^{1/2}\right] - + \text{tr}\left[ \mathbf{W_2} \mathbf{B}^{1/2} \mathbf{A}^{1/2}\right]} where :math:`\mathbf{W}` is the weight matrix of :math:`\mathbf{A}` and :math:`\mathbf{B}`. @@ -280,7 +243,6 @@ def weighted_riemannian_distance_2( Li, Y., & Wong, K.M. IEEE Journal of Selected Topics in Signal Processing, 2013, 7, pp. 655-669 """ - _check_inputs(A, B, W) same_data = B is A if same_data: return 0 @@ -294,3 +256,77 @@ def weighted_riemannian_distance_2( d_W = np.sqrt(term1 + term2 - term3 - term4) return d_W + + +def pairwise_weighted_riemannian_distance_1( + X: np.ndarray, + W: np.ndarray, + Y: np.ndarray = None, +) -> np.ndarray: + r"""Compute the pairwise weighted Reimannian distance between two sets of SPD/HPD matrices. + + The first type of pairwise weighted Riemannian distance + + Parameters + ---------- + X : np.ndarray + First set of SPD/HPD matrices, 3D ndarray. + W : np.ndarray + Weight matrix. + Y : np.ndarray, default None + + Returns + ------- + distances : np.ndarray + Pairwise weighted Riemannian distances between X and Y or X itself. + """ + if Y is None: + Y = X + + n_samples_X, n_samples_Y = X.shape[0], Y.shape[0] + distances = np.zeros((n_samples_X, n_samples_Y)) + for i in range(n_samples_X): + for j in range(n_samples_Y): + d = 0 + for k in range(X.shape[1]): + d += weighted_riemannian_distance_1(X[i, k], Y[j, k], W) + distances[i, j] = d + + return distances + + +def pairwise_weighted_riemannian_distance_2( + X: np.ndarray, + W: np.ndarray, + Y: np.ndarray = None, +) -> np.ndarray: + r"""Compute the pairwise weighted Reimannian distance between two sets of SPD/HPD matrices. + + The second type of pairwise weighted Riemannian distance + + Parameters + ---------- + X : np.ndarray + First set of SPD/HPD matrices, 3D ndarray. + W : np.ndarray + Weight matrix. + Y : np.ndarray, default None + + Returns + ------- + distances : np.ndarray + Pairwise weighted Riemannian distances between X and Y or X itself. + """ + if Y is None: + Y = X + + n_samples_X, n_samples_Y = X.shape[0], Y.shape[0] + distances = np.zeros((n_samples_X, n_samples_Y)) + for i in range(n_samples_X): + for j in range(n_samples_Y): + d = 0 + for k in range(X.shape[1]): + d += weighted_riemannian_distance_2(X[i, k], Y[j, k], W) + distances[i, j] = d + + return distances \ No newline at end of file diff --git a/examples/_wip/distances/riemannian_matrix.ipynb b/examples/_wip/distances/riemannian_matrix.ipynb index 73defeb..afae4ec 100644 --- a/examples/_wip/distances/riemannian_matrix.ipynb +++ b/examples/_wip/distances/riemannian_matrix.ipynb @@ -47,7 +47,7 @@ "## The first type of Riemannian distance\n", "\n", "$$\n", - "d_{R1}(\\mathbf{A}, \\mathbf{B}) =\n", + "d_{R_1}(\\mathbf{A}, \\mathbf{B}) =\n", " \\sqrt{\\text{tr} \\mathbf{A} +\n", " \\text{tr} \\mathbf{B} -\n", " 2 \\text{tr} \\left[(\\mathbf{A} \\mathbf{B})^{1/2}\\right]}\n", @@ -55,16 +55,16 @@ "weighted version:\n", "\n", "$$\n", - "d_{W1}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W}) =\n", - " \\sqrt{\\text{tr} \\mathbf{W} \\mathbf{A} +\n", - " \\text{tr} \\mathbf{W} \\mathbf{B} -\n", - " 2 \\text{tr} \\mathbf{A}^{1/2} \\mathbf{W} \\mathbf{B} \\mathbf{W} \\mathbf{A}^{1/2}}\n", + "d_{R_1w}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W_1}) =\n", + " \\sqrt{\\text{tr}\\left[ \\mathbf{W_1} \\mathbf{A}\\right] +\n", + " \\text{tr}\\left[ \\mathbf{W_1} \\mathbf{B}\\right] -\n", + " 2 \\text{tr}\\left[ (\\mathbf{B}^{1/2} \\mathbf{W_1} \\mathbf{A} \\mathbf{W_1} \\mathbf{B}^{1/2})^{1/2}\\right]}\n", "$$" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 1, "metadata": {}, "outputs": [ { @@ -72,7 +72,7 @@ "output_type": "stream", "text": [ " Reimannian distance = (0.7320508075688744+0j)\n", - " weighted Reimannian distance = (0.7320508075688829+0j)\n" + " weighted Reimannian distance = (0.7320509235880549+0j)\n" ] } ], @@ -89,9 +89,9 @@ "W = np.array([[1, 0], [0, 1]])\n", "\n", "dR1 = riemannian_distance_1(A, B)\n", - "dW1 = weighted_riemannian_distance_1(A, B, W)\n", + "dR1w = weighted_riemannian_distance_1(A, B, W)\n", "\n", - "print(f\" Reimannian distance = {dR1}\\n weighted Reimannian distance = {dW1}\")" + "print(f\" Reimannian distance = {dR1}\\n weighted Reimannian distance = {dR1w}\")" ] }, { @@ -101,33 +101,33 @@ "## The second type of Riemannian distance\n", "\n", "$$\n", - "d_{R2}(\\mathbf{A}, \\mathbf{B}) =\n", - " \\sqrt{\\text{tr} \\mathbf{A} +\n", - " \\text{tr} \\mathbf{B} -\n", - " 2 \\text{tr} \\left[\\mathbf{A}^{1/2} \\mathbf{B}^{1/2}\\right]}\n", + "d_{R_2}(\\mathbf{A}, \\mathbf{B}) =\n", + " \\sqrt{\\text{tr} \\mathbf{A} +\n", + " \\text{tr} \\mathbf{B} -\n", + " 2 \\text{tr} \\left[\\mathbf{A}^{1/2} \\mathbf{B}^{1/2}\\right]}\n", "$$\n", "weighted version:\n", "\n", "$$\n", - "d_{W2}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W}) =\n", - " \\sqrt{\\text{tr} \\mathbf{W} \\mathbf{A} +\n", - " \\text{tr} \\mathbf{W} \\mathbf{B} -\n", - " \\text{tr} \\mathbf{W} \\mathbf{A}^{1/2} \\mathbf{B}^{1/2} -\n", - " \\text{tr} \\mathbf{W} \\mathbf{B}^{1/2} \\mathbf{A}^{1/2}}\n", + "d_{R_2w}(\\mathbf{A}, \\mathbf{B}, \\mathbf{W_2}) =\n", + " \\sqrt{\\text{tr}\\left[ \\mathbf{W_2} \\mathbf{A}\\right] +\n", + " \\text{tr}\\left[ \\mathbf{W_2} \\mathbf{B}\\right] -\n", + " \\text{tr}\\left[ \\mathbf{W_2} \\mathbf{A}^{1/2} \\mathbf{B}^{1/2}\\right] -\n", + " \\text{tr}\\left[ \\mathbf{W_2} \\mathbf{B}^{1/2} \\mathbf{A}^{1/2}\\right]}\n", "$$" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - " Reimannian distance = (0.7320508075688816+0j)\n", - " weighted Reimannian distance = (0.7320508075688816+0j)\n" + " Reimannian distance = (0.7320509235880633+0j)\n", + " weighted Reimannian distance = (0.7320509235880633+0j)\n" ] } ], @@ -144,9 +144,9 @@ "W = np.array([[1, 0], [0, 1]])\n", "\n", "dR2 = riemannian_distance_2(A, B)\n", - "dW2 = weighted_riemannian_distance_2(A, B, W)\n", + "dR2W = weighted_riemannian_distance_2(A, B, W)\n", "\n", - "print(f\" Reimannian distance = {dR2}\\n weighted Reimannian distance = {dW2}\")" + "print(f\" Reimannian distance = {dR2}\\n weighted Reimannian distance = {dR2W}\")" ] }, { @@ -166,14 +166,14 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - " Affine-invariant Reimannian distance = 1.0986122886681096\n" + " Affine-invariant Reimannian distance = 1.0986122886681098\n" ] } ], @@ -191,6 +191,13 @@ "print(f\" Affine-invariant Reimannian distance = {dR3}\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have also implemented pairwise distance methods ``pairwise_weighted_riemannian_distance_1`` and ``pairwise_weighted_riemannian_distance_2`` to compute pairwise distances between a series of matrices. The input is a 3D matrix composed of a series of 2D matrices, and the output is a 2D square matrix. This method is useful for certain specific computational scenarios, such as the distance matrix in customized k-NN." + ] + }, { "cell_type": "markdown", "metadata": {}, From 288267164ccc72642808c44f5c9c764bc7a95239 Mon Sep 17 00:00:00 2001 From: Futuer <57895730+futuer-szd@users.noreply.github.com> Date: Thu, 21 Nov 2024 18:53:23 +0800 Subject: [PATCH 14/14] 1. add Burg-typeNuttallStrand in _nuttall_strand.py (#97) this is a power spectral density matrices transformation 2. add a simple test for _nuttall_strand.py above 3. add function to calculate optimal weights for riemannian density matrices in _get_optimal_weights.py 4. add normal pairewise riemannian distance function in _riemannian_matrix.py 5. get _check_inputs and _is_hpd function back to test_riemannian_matrix.py 6. change a link import for _is_hpd function in test_power_spectrum.py 7. add a special knn in _nuttall_knn.py for the 3D input which get from NuttallStrand transform Co-authored-by: futuer --- .../_wip/classification/_nuttall_knn.py | 95 ++++++++ .../_wip/distances/_get_optimal_weights.py | 160 +++++++++++++ .../_wip/distances/_riemannian_matrix.py | 68 ++++++ .../distances/tests/test_riemannian_matrix.py | 38 ++- .../transformations/series/_nuttall_strand.py | 221 ++++++++++++++++++ .../series/tests/test_nuttall_strand.py | 24 ++ .../series/tests/test_power_spectrum.py | 2 +- 7 files changed, 605 insertions(+), 3 deletions(-) create mode 100644 aeon_neuro/_wip/classification/_nuttall_knn.py create mode 100644 aeon_neuro/_wip/distances/_get_optimal_weights.py create mode 100644 aeon_neuro/_wip/transformations/series/_nuttall_strand.py create mode 100644 aeon_neuro/_wip/transformations/series/tests/test_nuttall_strand.py diff --git a/aeon_neuro/_wip/classification/_nuttall_knn.py b/aeon_neuro/_wip/classification/_nuttall_knn.py new file mode 100644 index 0000000..e4082cd --- /dev/null +++ b/aeon_neuro/_wip/classification/_nuttall_knn.py @@ -0,0 +1,95 @@ +"""Cumstomed K-nearest neighbors classifier for Burg-type Nuttall-Strand algorithm.""" + +from sklearn.neighbors import KNeighborsClassifier +from aeon_neuro._wip.distances._get_optimal_weights import get_optimal_weight1, get_optimal_weight2 +from aeon_neuro._wip.distances._riemannian_matrix import ( + pairwise_riemannian_distance_1, + pairwise_riemannian_distance_2, + pairwise_weighted_riemannian_distance_1, + pairwise_weighted_riemannian_distance_2, +) + + +class NuttallKNN(KNeighborsClassifier): + """Custom K-nearest neighbors classifier for Burg-type Nuttall-Strand algorithm. + + Parameters + ---------- + mode : int 1 or 2 + Choose a Riemannian distance function to be used, by default 1. + weighted : bool + If True, use optimal weighted Riemannian distance, by default False. + n_neighbors : int, optional + Number of neighbors to use, by default 3. + n_jobs : int, optional + Number of jobs to run in parallel, by default 1. + + Examples + -------- + >>> from aeon_neuro._wip.classification._nuttall_knn import NuttallKNN + >>> from aeon_neuro._wip.transformations.series._nuttall_strand import ( + ... NuttallStrand) + >>> from sklearn.metrics import accuracy_score + >>> from aeon.datasets import load_classification + >>> import numpy as np + >>> import warnings + >>> warnings.filterwarnings("ignore") + >>> n_freqs=25 + >>> transformer = NuttallStrand(model_order=5, n_freqs=n_freqs, fmax=60) + >>> X, y= load_classification("EyesOpenShut") + >>> X_train = X[:56] + >>> X_test = X[56:58] + >>> y_train = y[:56] + >>> y_test = y[56:58] + >>> X_train_psd = [] + >>> X_test_psd = [] + >>> for i in range(X_train.shape[0]): + ... X_train_psd.append(transformer.fit_transform(X_train[i])) + >>> for i in range(X_test.shape[0]): + ... X_test_psd.append(transformer.fit_transform(X_test[i])) + >>> X_train_psd = np.array(X_train_psd) + >>> X_test_psd = np.array(X_test_psd) + >>> knn = NuttallKNN(mode=1, weighted=True, n_neighbors=1, n_jobs=-1) + >>> knn.fit(X_train_psd, y_train) + >>> y_pred = knn.predict(X_test_psd) + >>> accuracy_score(y_test, y_pred) + 0.5 + """ + + def __init__(self, mode: int = 1, weighted: bool = False, n_neighbors=3, n_jobs=-1): + assert mode in [1, 2], "Mode must be 1 or 2" + self.mode = mode + self.weighted = weighted + super().__init__(metric='precomputed', n_neighbors=n_neighbors, n_jobs=n_jobs) + + def fit(self, X, y): + self.X_train_ = X + distances = None + if self.mode == 1: + if self.weighted is False: + distances = pairwise_riemannian_distance_1(X) + else: + self.W_ = get_optimal_weight1(X, y) + distances = pairwise_weighted_riemannian_distance_1(X, self.W_) + else: + if self.weighted is False: + distances = pairwise_riemannian_distance_2(X) + else: + self.W_ = get_optimal_weight2(X, y) + distances = pairwise_weighted_riemannian_distance_2(X, self.W_) + super().fit(distances, y) + + + def predict(self, X): + distances = None + if self.mode == 1: + if self.weighted is False: + distances = pairwise_riemannian_distance_1(X, self.X_train_) + else: + distances = pairwise_weighted_riemannian_distance_1(X, self.W_, self.X_train_) + else: + if self.weighted is False: + distances = pairwise_riemannian_distance_2(X, self.X_train_) + else: + distances = pairwise_weighted_riemannian_distance_2(X, self.W_, self.X_train_) + return super().predict(distances) \ No newline at end of file diff --git a/aeon_neuro/_wip/distances/_get_optimal_weights.py b/aeon_neuro/_wip/distances/_get_optimal_weights.py new file mode 100644 index 0000000..c3d0931 --- /dev/null +++ b/aeon_neuro/_wip/distances/_get_optimal_weights.py @@ -0,0 +1,160 @@ +"""Calculate optimal weights for weighted Riemannian distance.""" + +import numpy as np +from scipy.linalg import sqrtm +from numpy.linalg import eig + + +def _get_inv(A, rcond_threshold=1e-10, cond_threshold=1e10): + r"""Calculate the inverse of a matrix.""" + try: + # Calculate the condition number of the matrix + cond_A = np.linalg.cond(A) + if cond_A > cond_threshold: + # If the condition number is too large, use the pseudo-inverse + # print(f"Condition number is {cond_A:.2e}, using pinv instead of inv") + inv_A = np.linalg.pinv(A, rcond=rcond_threshold) + else: + # If the condition number is appropriate, calculate the inverse + inv_A = np.linalg.inv(A) + except np.linalg.LinAlgError: + # If the matrix is singular, use the pseudo-inverse + # print("Matrix is singular, using pinv instead of inv") + inv_A = np.linalg.pinv(A, rcond=rcond_threshold) + + return inv_A + + +def _svd_w1(P_ik, P_jk): + P_half_ik = sqrtm(P_ik) + P_half_jk = sqrtm(P_jk) + + # Singular Value Decomposition (SVD) + product_matrix = P_half_jk @ P_half_ik + # if product_matrix.dtype == np.complex256: + # product_matrix = product_matrix.astype(np.complex128) + V_ij1, Sigma_ij, V_ij2_H = np.linalg.svd(product_matrix) + + # Construct unitary matrices + U_ik = V_ij1 + U_jk = V_ij2_H.T.conj() + + P_tilde_ik = P_half_ik @ U_ik + P_tilde_jk = P_half_jk @ U_jk + + return P_tilde_ik, P_tilde_jk + + +def _compute_summation_matrices_w1(X_train_psd, y_train): + n_cases, k, M, M = X_train_psd.shape + + M_S1 = np.zeros((M, M), dtype=np.complex128) + M_D1 = np.zeros((M, M), dtype=np.complex128) + + # Compute the summation matrices + for i in range(n_cases): + for j in range(i+1, n_cases): + for m in range(k): + P_ik = X_train_psd[i, m] + P_jk = X_train_psd[j, m] + + P_tilde_ik, P_tilde_jk = _svd_w1(P_ik, P_jk) + + diff = P_tilde_ik - P_tilde_jk + diff_H = diff.conj().T + + if y_train[i] == y_train[j]: + M_S1 += diff @ diff_H + else: + M_D1 += diff @ diff_H + + return M_S1, M_D1 + + +def _compute_summation_matrices_w2(X_train_psd, y_train): + n_cases, k, M, M = X_train_psd.shape + + M_S1 = np.zeros((M, M), dtype=np.complex128) + M_D1 = np.zeros((M, M), dtype=np.complex128) + + # Compute the summation matrices + for i in range(n_cases): + for j in range(i+1, n_cases): + for m in range(k): + P_ik = X_train_psd[i, m] + P_jk = X_train_psd[j, m] + + P_half_ik = sqrtm(P_ik) + P_half_jk = sqrtm(P_jk) + + diff = P_half_ik - P_half_jk + diff_H = diff.conj().T + + if y_train[i] == y_train[j]: + M_S1 += diff @ diff_H + else: + M_D1 += diff @ diff_H + + return M_S1, M_D1 + + +def _eigenvector_decomposition(M_S1, M_D1, PCA_num): + # Perform eigenvalue decomposition on the matrix M_S1^-1 * M_D1 + eigenvalues, eigenvectors = eig(_get_inv(M_S1) @ M_D1) + + # Sort the eigenvectors based on the eigenvalues in descending order + idx = np.argsort(eigenvalues)[::-1] + top_eigenvectors = eigenvectors[:, idx[:PCA_num]] + + return top_eigenvectors + +def _compute_weighting_matrix(top_eigenvectors): + Omega = top_eigenvectors + W = Omega @ Omega.conj().T + return W + + +def get_optimal_weight1(X_train_psd, y_train): + """Calculate optimal weights for weighted Riemannian distance 1. + + Parameters + ---------- + X_train_psd : np.ndarray + Power spectral density matrices of shape (n_cases, n_freqs, n_channels, n_channels). + Get from Burg-type Nuttall-Strand transformation. + y_train : np.ndarray + Labels of shape (n_cases,). + + Returns + ------- + W1 : np.ndarray + Optimal weighting matrix of shape (n_channels, n_channels). + """ + PCA_num = X_train_psd.shape[-1] + M_S1, M_D1 = _compute_summation_matrices_w1(X_train_psd, y_train) + top_eigenvectors = _eigenvector_decomposition(M_S1, M_D1, PCA_num) + W1 = _compute_weighting_matrix(top_eigenvectors) + return W1 + + +def get_optimal_weight2(X_train_psd, y_train): + """Calculate optimal weights for weighted Riemannian distance 2. + + Parameters + ---------- + X_train_psd : np.ndarray + Power spectral density matrices of shape (n_cases, n_freqs, n_channels, n_channels). + Get from Burg-type Nuttall-Strand transformation. + y_train : np.ndarray + Labels of shape (n_cases,). + + Returns + ------- + W2 : np.ndarray + Optimal weighting matrix of shape (n_channels, n_channels). + """ + PCA_num = X_train_psd.shape[-1] + M_S2, M_D2 = _compute_summation_matrices_w2(X_train_psd, y_train) + top_eigenvectors = _eigenvector_decomposition(M_S2, M_D2, PCA_num) + W2 = _compute_weighting_matrix(top_eigenvectors) + return W2 \ No newline at end of file diff --git a/aeon_neuro/_wip/distances/_riemannian_matrix.py b/aeon_neuro/_wip/distances/_riemannian_matrix.py index d6a8708..4336132 100644 --- a/aeon_neuro/_wip/distances/_riemannian_matrix.py +++ b/aeon_neuro/_wip/distances/_riemannian_matrix.py @@ -258,6 +258,74 @@ def weighted_riemannian_distance_2( return d_W +def pairwise_riemannian_distance_1( + X: np.ndarray, + Y: np.ndarray = None, +) -> np.ndarray: + r"""Compute the pairwise Reimannian distance between two sets of SPD/HPD matrices. + + The first type of pairwise Riemannian distance + + Parameters + ---------- + X : np.ndarray + First set of SPD/HPD matrices, 3D ndarray. + Y : np.ndarray, default None + + Returns + ------- + distances : np.ndarray + Pairwise Riemannian distances between X and Y or X itself. + """ + if Y is None: + Y = X + + n_samples_X, n_samples_Y = X.shape[0], Y.shape[0] + distances = np.zeros((n_samples_X, n_samples_Y)) + for i in range(n_samples_X): + for j in range(n_samples_Y): + d = 0 + for k in range(X.shape[1]): + d += weighted_riemannian_distance_1(X[i, k], Y[j, k]) + distances[i, j] = d + + return distances + + +def pairwise_riemannian_distance_2( + X: np.ndarray, + Y: np.ndarray = None, +) -> np.ndarray: + r"""Compute the pairwise Reimannian distance between two sets of SPD/HPD matrices. + + The second type of pairwise Riemannian distance + + Parameters + ---------- + X : np.ndarray + First set of SPD/HPD matrices, 3D ndarray. + Y : np.ndarray, default None + + Returns + ------- + distances : np.ndarray + Pairwise Riemannian distances between X and Y or X itself. + """ + if Y is None: + Y = X + + n_samples_X, n_samples_Y = X.shape[0], Y.shape[0] + distances = np.zeros((n_samples_X, n_samples_Y)) + for i in range(n_samples_X): + for j in range(n_samples_Y): + d = 0 + for k in range(X.shape[1]): + d += weighted_riemannian_distance_2(X[i, k], Y[j, k]) + distances[i, j] = d + + return distances + + def pairwise_weighted_riemannian_distance_1( X: np.ndarray, W: np.ndarray, diff --git a/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py b/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py index e45fc2d..cf752cc 100644 --- a/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py +++ b/aeon_neuro/_wip/distances/tests/test_riemannian_matrix.py @@ -1,9 +1,9 @@ +"""Tests for the Riemannian distance matrix.""" + import numpy as np import pytest from aeon_neuro._wip.distances._riemannian_matrix import ( - _check_inputs, - _is_hpd, riemannian_distance_1, riemannian_distance_2, riemannian_distance_3, @@ -20,6 +20,40 @@ C = np.array([[1, 2], [3, 4]]) +def _is_hpd(matrix): + r"""Check if a matrix is Hermitian Positive Definite (HPD).""" + if not np.allclose(matrix, matrix.conj().T): + return False + try: + np.linalg.cholesky(matrix) + return True + except np.linalg.LinAlgError: + return False + + +def _check_inputs(A, B, W=None): + if W is None: + if not A.shape == B.shape: + raise ValueError("Inputs must have equal dimensions") + if A.ndim != 2 or B.ndim != 2: + raise ValueError("Inputs must be 2D ndarrays") + if not _is_hpd(A): + raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") + if not _is_hpd(B): + raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") + else: + if not A.shape == B.shape == W.shape: + raise ValueError("Inputs must have equal dimensions") + if A.ndim != 2 or B.ndim != 2 or W.ndim != 2: + raise ValueError("Inputs must be 2D ndarrays") + if not _is_hpd(A): + raise ValueError("Matrix A must be Hermitian Positive Definite (HPD)") + if not _is_hpd(B): + raise ValueError("Matrix B must be Hermitian Positive Definite (HPD)") + if not _is_hpd(W): + raise ValueError("Matrix W must be Hermitian Positive Definite (HPD)") + + def test_is_hpd(): assert _is_hpd(A) is True assert _is_hpd(B) is True diff --git a/aeon_neuro/_wip/transformations/series/_nuttall_strand.py b/aeon_neuro/_wip/transformations/series/_nuttall_strand.py new file mode 100644 index 0000000..3efc652 --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/_nuttall_strand.py @@ -0,0 +1,221 @@ +"""Burg-type Nuttall-Strand algorithm.""" + +import numpy as np +from scipy.linalg import solve +from aeon.transformations.series.base import BaseSeriesTransformer + + +class NuttallStrand(BaseSeriesTransformer): + """Power spectral density matrix transformer. + + Estimate the power spectral density matrix between channels in the frequency + domain using Burg-type Nuttall-Strand algorithm, a.k.a. multivariate Burg. + + The result is a set of Hermitian positive definite (HPD) + complex-valued matrices of shape (n_channels, n_channels). + + Parameters + ---------- + model_order : int + The order of the model used for spectral estimation. + Better to be less than 15. + If the data is complex, the model_order should be smaller. + n_freqs : int, optional + Number of power spectral density matrices in the output for one sample. + Number of frequency points in the curve representation of X after transformation, + by default 10. + fmax : float, optional + Maximum frequency of interest in Hz, by default 30 * 2 * pi. + + References + ---------- + .. [1] O. Strand, Multichannel complex maximum entropy (autoregressive) spectral analysis, + in IEEE Transactions on Automatic Control, vol. 22, no. 4, pp. 634-640, + August 1977, doi: 10.1109/TAC.1977.1101545. + .. [2] Nuttall, Albert H.., Multivariate Linear Predictive Spectral Analysis Employing + Weighted Forward and Backward Averaging: A Generalization of Burg's Algorithm.” (1976). + + Examples + -------- + >>> from aeon_neuro._wip.transformations.series._nuttall_strand import ( + ... NuttallStrand) + >>> from aeon_neuro._wip.distances.tests.test_riemannian_matrix import _is_hpd + >>> import numpy as np + >>> n_channels, n_timepoints = 5, 360 + >>> n_freqs=5 + >>> X = np.random.standard_normal(size=(n_channels, n_timepoints)) + >>> transformer = NuttallStrand(model_order=3, n_freqs=n_freqs) + >>> X_transformed = transformer.fit_transform(X) + >>> X_transformed.shape == (n_freqs, n_channels, n_channels) + True + >>> for i in range(n_freqs): + ... _is_hpd(X_transformed[i]) + True + True + True + True + True + """ + + _tags = { + "X_inner_type": "np.ndarray", + "capability:multivariate": True, + "fit_is_empty": True, + } + + def __init__(self, model_order, n_freqs=10, fmax=30*2*np.pi): + self.model_order = model_order + self.n_freqs = n_freqs + self.fmax = fmax + super().__init__(axis=1) # (n_channels, n_timepoints) + + + def _get_inv(self, A, rcond_threshold=1e-10, cond_threshold=1e10): + r"""Calculate the inverse of a matrix.""" + try: + # Calculate the condition number of the matrix + cond_A = np.linalg.cond(A) + if cond_A > cond_threshold: + # If the condition number is too large, use the pseudo-inverse + # print(f"Condition number is {cond_A:.2e}, using pinv instead of inv") + inv_A = np.linalg.pinv(A, rcond=rcond_threshold) + else: + # If the condition number is appropriate, calculate the inverse + inv_A = np.linalg.inv(A) + except np.linalg.LinAlgError: + # If the matrix is singular, use the pseudo-inverse + # print("Matrix is singular, using pinv instead of inv") + inv_A = np.linalg.pinv(A, rcond=rcond_threshold) + + return inv_A + + def _solve_Cfw(self, E, G, B, P_fw, P_bw): + r"""Solve a vector equation.""" + P_fw_inv = self._get_inv(P_fw) + n = B.shape[0] + # Construct Kronecker product and vectorization + I = np.eye(n) + K1 = np.kron(I, B) + K2 = np.kron((P_fw_inv @ E).T, P_bw) + # Construct the full coefficient matrix + K = K1 + K2 + # Vectorize -2G + vec_neg_2G = (-2 * G).flatten().reshape(-1, 1) + # Solve for the vectorized C_N + vec_C_N = solve(K, vec_neg_2G) + + # Reshape C_N back to matrix form + C_N = vec_C_N.reshape(n, n).T + return C_N + + def _update_filter_coefficients(self, F, B, C_fw, C_bw): + r"""Update filter coefficients.""" + B_bw = B + n_columns = F.shape[1] + zero_rows = np.zeros((n_columns, n_columns)) + + F_q = np.vstack((F, zero_rows)) + np.vstack((zero_rows, B_bw)) @ C_fw + B_q = np.vstack((F, zero_rows)) @ C_bw + np.vstack((zero_rows, B_bw)) + + return F_q, B_q + + def _compute_psd_matrix(self, F, P_fw, n_channels): + r"""Calculate power spectral density matrix.""" + psd_matrix = np.zeros((self.n_freqs, n_channels, n_channels), dtype=complex) + freqs = np.linspace(0, self.fmax, self.n_freqs) + + for i, omega in enumerate(freqs): + A_w = np.eye(n_channels, dtype=complex) + A_neg_w = np.eye(n_channels, dtype=complex) + + for q in range(1, self.model_order+1): + assert len(F) == self.model_order + 1 + A_w += F[self.model_order][q*n_channels: (q+1)*n_channels] * np.exp(-1j * omega * q) + A_neg_w += F[self.model_order][q*n_channels: (q+1)*n_channels] * np.exp(-1j * -omega * q) + + psd_matrix[i] = self._get_inv(A_neg_w) @ P_fw[self.model_order] @ self._get_inv(A_w).T + + return psd_matrix + + + def _nuttall_strand_algorithm(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : np.ndarray + Data to be transformed, shape (n_channels, n_timepoints) + y : ignored argument for interface compatibility + + Returns + ------- + transformed version of X + a set of power spectral density matrices of shape (n_freqs, n_channels, n_channels) + """ + # time_series [n_channels, n_timepoints] + n_channels, n_timepoints = X.shape + T = n_timepoints + # forward and backward power matrices + P_fw = np.zeros((self.model_order+1, n_channels, n_channels)) + P_bw = np.zeros((self.model_order+1, n_channels, n_channels)) + covariance_matrix = (X @ X.T) / T + P_fw[0] = covariance_matrix + P_bw[0] = covariance_matrix + + e = np.zeros((self.model_order+1, n_channels, T)) + b = np.zeros((self.model_order+1, n_channels, T)) + + C_fw = np.zeros((self.model_order+1, n_channels, n_channels)) + C_bw = np.zeros((self.model_order+1, n_channels, n_channels)) + + F = [] + B_bw = [] + F.append(np.eye(n_channels)) + B_bw.append(np.eye(n_channels)) + + for q in range(1, self.model_order + 1): + for k in range(0, (T - q - 1)): + if q == 1: + e[1, :, k] = X[:, k+1] + b[1, :, k] = X[:, k] + else: + e[q, :, k] = e[q-1, :, k+1] + C_fw[q-1].T @ b[q-1, :, k+1] + b[q, :, k] = b[q-1, :, k] + C_bw[q-1].T @ e[q-1, :, k] + + E = (e[q] @ e[q].T) / (T-q) + G = (b[q] @ e[q].T) / (T-q) + Bi = (b[q] @ b[q].T) / (T-q) + + C_fw[q] = self._solve_Cfw(E, G, Bi, P_fw[q-1], P_bw[q-1]) + C_bw[q] = self._get_inv(P_fw[q-1]) @ C_fw[q].T @ P_bw[q-1] + + P_fw[q] = P_fw[q-1] - C_fw[q].T @ P_bw[q-1] @ C_fw[q] + P_bw[q] = P_bw[q-1] - C_bw[q].T @ P_fw[q-1] @ C_bw[q] + + F_q, B_q = self._update_filter_coefficients(F[q-1], B_bw[q-1], C_fw[q], C_bw[q]) + F.append(F_q) + B_bw.append(B_q) + + return self._compute_psd_matrix(F, P_fw, n_channels) + + + def _transform(self, X, y=None): + """Transform X and return a transformed version. + + private _transform containing the core logic, called from transform + + Parameters + ---------- + X : np.ndarray + Data to be transformed, shape (n_channels, n_timepoints) + y : ignored argument for interface compatibility + + Returns + ------- + transformed version of X + a set of power spectral density matrices of shape (n_freqs, n_channels, n_channels) + """ + + return self._nuttall_strand_algorithm(X) \ No newline at end of file diff --git a/aeon_neuro/_wip/transformations/series/tests/test_nuttall_strand.py b/aeon_neuro/_wip/transformations/series/tests/test_nuttall_strand.py new file mode 100644 index 0000000..f7f6567 --- /dev/null +++ b/aeon_neuro/_wip/transformations/series/tests/test_nuttall_strand.py @@ -0,0 +1,24 @@ +"""Tests for the Nuttall-Strand transformer.""" + +import numpy as np +from aeon_neuro._wip.transformations.series._nuttall_strand import NuttallStrand +from aeon_neuro._wip.distances.tests.test_riemannian_matrix import _is_hpd + +n_channels, n_timepoints = 7, 12000 +rng = np.random.default_rng(seed=0) +n_freqs=10 + + +def test_transform(): + """Test Nuttall-Strand power spectral density matrices transformer.""" + transformer = NuttallStrand(model_order=3, n_freqs=n_freqs) + X = rng.normal(scale=2, size=(n_channels, n_timepoints)) + X_transformed = transformer.fit_transform(X) + + # test expected shape + assert X_transformed.shape == (n_freqs, n_channels, n_channels) + + # test positive semi-definite + for i in range(n_freqs): + assert _is_hpd(X_transformed[i]) + \ No newline at end of file diff --git a/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py index d065667..0c4bef1 100644 --- a/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py +++ b/aeon_neuro/_wip/transformations/series/tests/test_power_spectrum.py @@ -3,7 +3,7 @@ import numpy as np import pytest -from aeon_neuro._wip.distances._riemannian_matrix import _is_hpd +from aeon_neuro._wip.distances.tests.test_riemannian_matrix import _is_hpd from aeon_neuro._wip.transformations.series._power_spectrum import CrossSpectralMatrix n_channels, n_timepoints = 7, 12000