diff --git a/benchmarks/decomposition_methods.ipynb b/benchmarks/decomposition_methods.ipynb new file mode 100644 index 0000000000..b76117593e --- /dev/null +++ b/benchmarks/decomposition_methods.ipynb @@ -0,0 +1,236 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 119, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The autoreload extension is already loaded. To reload it, use:\n", + " %reload_ext autoreload\n" + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from __future__ import annotations\n", + "\n", + "import time\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import pandas as pd\n", + "from tqdm import tqdm\n", + "\n", + "from river.decomposition import (\n", + " OnlineDMD,\n", + " OnlinePCA,\n", + " OnlineSVD,\n", + " OnlineSVDZhang,\n", + ")\n", + "from river.preprocessing import Hankelizer\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2\n", + "\n", + "# Set the random seed for reproducibility\n", + "seed = 42\n", + "np.random.seed(seed)\n", + "\n", + "# Step 1: Generate Gaussian noise with mean 0 and variance 1\n", + "gaussian_noise = np.random.normal(0, 1, (10000, 1))\n", + "\n", + "# Step 2: Generate exponentially increasing X from 0 to 10\n", + "steps = np.logspace(0, 1, 10)\n", + "X = np.concatenate([np.full(1000, exp_val) for exp_val in steps])[\n", + " :, np.newaxis\n", + "]\n", + "\n", + "# Step 3: Combine the Gaussian noise with the exponential increments\n", + "X = gaussian_noise + X\n", + "\n", + "# Display the result array\n", + "plt.plot(X)\n", + "plt.grid(True)" + ] + }, + { + "cell_type": "code", + "execution_count": 71, + "metadata": {}, + "outputs": [], + "source": [ + "models = [\n", + " OnlineDMD(r=2, seed=seed),\n", + " OnlinePCA(n_components=2, seed=seed),\n", + " OnlineSVD(n_components=2, seed=seed),\n", + " OnlineSVDZhang(n_components=2, seed=seed),\n", + "]\n", + "n_feats_range = range(2, 20)\n", + "repeat = 5\n", + "iterations = len(models) * len(n_feats_range) * repeat * len(X)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "times_per_model_np = {model.__class__.__name__: [] for model in models}\n", + "times_per_model_pd = {model.__class__.__name__: [] for model in models}\n", + "\n", + "with tqdm(total=iterations, mininterval=10) as pbar:\n", + " for model in models:\n", + " for n_features in n_feats_range:\n", + " for X_iter, times_per_model_ in zip(\n", + " [X, pd.DataFrame(X).to_dict(orient=\"records\")],\n", + " [times_per_model_np, times_per_model_pd]\n", + " ):\n", + " pipeline = Hankelizer(n_features) | model.clone()\n", + " times = np.zeros(repeat)\n", + " for rep in range(repeat):\n", + " tic = time.time()\n", + " for x in X_iter:\n", + " pipeline.transform_one(x)\n", + " pipeline.learn_one(x)\n", + " pbar.update(1)\n", + " times[rep] = time.time() - tic\n", + " times_per_model_[model.__class__.__name__].append(times)\n", + "\n", + "df_times_per_model_np = pd.DataFrame(times_per_model_np, index=n_feats_range)\n", + "df_times_per_model_pd = pd.DataFrame(times_per_model_pd, index=n_feats_range)" + ] + }, + { + "cell_type": "code", + "execution_count": 131, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# [donotremove]\n", + "df_samples_per_second_np = df_times_per_model_np.map(\n", + " lambda x: len(X) / np.mean(x)\n", + ")\n", + "df_samples_per_second_pd = df_times_per_model_pd.map(\n", + " lambda x: len(X) / np.mean(x)\n", + ")\n", + "\n", + "ax = df_samples_per_second_np.add_prefix(\"np: \").plot(\n", + " grid=True,\n", + " logy=True,\n", + ")\n", + "ax.set_prop_cycle(None) # type: ignore\n", + "df_samples_per_second_pd.add_prefix(\"pd: \").plot(\n", + " grid=True,\n", + " logy=True,\n", + " style=\"--\",\n", + " ax=ax,\n", + ")\n", + "ax.set_xlabel(\"no. features\")\n", + "ax.set_ylabel(\"samples/second\")\n", + "_ = ax.set_title(\"Performance of 'array' vs. 'df' for varying no. features\")" + ] + }, + { + "cell_type": "code", + "execution_count": 121, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "OnlineDMD (np - pd) [samples/second] 241.793371\n", + "OnlinePCA (np - pd) [samples/second] 1444.972470\n", + "OnlineSVD (np - pd) [samples/second] -58.670012\n", + "OnlineSVDZhang (np - pd) [samples/second] 309.198108\n", + "dtype: float64" + ] + }, + "execution_count": 121, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# [donotremove]\n", + "# Regarding whole dataset, how much slower is pd comparing to np in abs\n", + "(df_samples_per_second_np.mean() - df_samples_per_second_pd.mean()).add_suffix(\n", + " \" (np - pd) [samples/second]\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 118, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "OnlineDMD (np - pd) [%] 0.024179\n", + "OnlinePCA (np - pd) [%] 0.144497\n", + "OnlineSVD (np - pd) [%] -0.005867\n", + "OnlineSVDZhang (np - pd) [%] 0.030920\n", + "dtype: float64" + ] + }, + "execution_count": 118, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# [donotremove]\n", + "# Regarding whole dataset, how much slower is pd comparing to np in %\n", + "(df_samples_per_second_np.mean() - df_samples_per_second_pd.mean()).add_suffix(\n", + " \" (np - pd) [%]\"\n", + ") / len(X)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.2" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/unreleased.md b/docs/unreleased.md new file mode 100644 index 0000000000..529739242e --- /dev/null +++ b/docs/unreleased.md @@ -0,0 +1,21 @@ +# Unreleased + +## drift + +- Added `FHDDM` drift detector. +- Added a `iter_polars` function to iterate over the rows of a polars DataFrame. + +## neighbors + +- Simplified `neighbors.SWINN` to avoid recursion limit and pickling issues. + +## decomposition + +- Added `decomposition.OnlineSVD` class to perform Singular Value Decomposition. +- Added `decomposition.OnlinePCA` class to perform Principal Component Analysis. +- Added `decomposition.OnlineDMD` class to perform Dynamic Mode Decomposition. +- Added `decomposition.OnlineDMDwC` class to perform Dynamic Mode Decomposition with Control. + +## preprocessing + +- Added `preprocessing.Hankelizer` class to perform Hankelization of data stream. diff --git a/river/compose/pipeline.py b/river/compose/pipeline.py index 2c894ede04..023723c73c 100644 --- a/river/compose/pipeline.py +++ b/river/compose/pipeline.py @@ -471,11 +471,11 @@ def learn_one(self, x: dict, y=None, **params): # Here the step is not a transformer, and it's supervised, such as a LinearRegression. # This is usually the last step of the pipeline. elif step._supervised: - step.learn_one(x=x, y=y) + step.learn_one(x=x, y=y, **params) # Here the step is not a transformer, and it's unsupervised, such as a KMeans. This # is also usually the last step of the pipeline. else: - step.learn_one(x=x) + step.learn_one(x=x, **params) def _transform_one(self, x: dict): """This methods takes care of applying the first n - 1 steps of the pipeline, which are diff --git a/river/decomposition/__init__.py b/river/decomposition/__init__.py new file mode 100644 index 0000000000..fd87840687 --- /dev/null +++ b/river/decomposition/__init__.py @@ -0,0 +1,16 @@ +"""Decomposition. + +""" +from __future__ import annotations + +from .odmd import OnlineDMD, OnlineDMDwC +from .opca import OnlinePCA +from .osvd import OnlineSVD, OnlineSVDZhang + +__all__ = [ + "OnlineSVD", + "OnlineSVDZhang", + "OnlineDMD", + "OnlineDMDwC", + "OnlinePCA", +] diff --git a/river/decomposition/odmd.py b/river/decomposition/odmd.py new file mode 100644 index 0000000000..5a10e283cc --- /dev/null +++ b/river/decomposition/odmd.py @@ -0,0 +1,1275 @@ +"""Online Dynamic Mode Decomposition (DMD) in [River API](riverml.xyz). + +This module contains the implementation of the Online DMD, Weighted Online DMD, +and DMD with Control algorithms. It is based on the paper by Zhang et al. [^1] +and implementation of authors available at +[GitHub](https://github.com/haozhg/odmd). However, this implementation provides +a more flexible interface aligned with River API covers and separates update +and revert methods to operate with Rolling and TimeRolling wrapers. + +TODO: + + - [x] Compute amlitudes of the singular values of the input matrix. + - [x] Benchmark on performance with np vs pd input + - [ ] Update prediction computation for continuous time + x(t) = Phi exp(diag(ln(Lambda) / dt) * t) Phi^+ x(0) (MIT lecture) + continuous time eigenvalues exp(Lambda * dt) (Zhang et al. 2019) + - [ ] Figure out how to use as both MiniBatchRegressor and MiniBatchTransformer + - [ ] Find out why some values of A change sign between consecutive updates + - [ ] Fix inconsistency in xi (amplitudes) computation + +References: + [^1]: Zhang, H., Clarence Worth Rowley, Deem, E.A. and Cattafesta, L.N. + (2019). Online Dynamic Mode Decomposition for Time-Varying Systems. Siam + Journal on Applied Dynamical Systems, 18(3), pp.1586-1609. + doi:[10.1137/18m1192329](https://doi.org/10.1137/18m1192329). +""" + +from __future__ import annotations + +from typing import Literal + +import numpy as np +import pandas as pd +import scipy as sp + +from river.base import MiniBatchRegressor, MiniBatchTransformer + +from .osvd import OnlineSVDZhang as OnlineSVD + +__all__ = [ + "OnlineDMD", + "OnlineDMDwC", +] + + +class OnlineDMD(MiniBatchRegressor, MiniBatchTransformer): + """Online Dynamic Mode Decomposition (DMD). + + This regressor is a class that implements online dynamic mode decomposition + The time complexity (multiply-add operation for one iteration) is O(4n^2), + and space complexity is O(2n^2), where n is the state dimension. + + This estimator supports learning with mini-batches with same time and space + complexity as the online learning. It can be used as Rolling or TimeRolling + estimator. + + OnlineDMD implements `transform_one` and `transform_many` methods like + unsupervised MiniBatchTransformer. In such case, we may use `learn_one` + without `y` and `learn_many` without `Y` to learn the model. + In that case OnlineDMD preserves previous snapshot and uses it as x while + current snapshot is used as y, therefore, being delayed by one sample. + + At time step t, define two matrices X(t) = [x(1),x(2),...,x(t)], + Y(t) = [y(1),y(2),...,y(t)], that contain all the past snapshot pairs, + where x(t), y(t) are the n dimensional state vector, y(t) = f(x(t)) is + the image of x(t), f() is the dynamics. + + Here, if the (discrete-time) dynamics are given by z(t) = f(z(t-1)), + then x(t), y(t) should be measurements correponding to consecutive + states z(t-1) and z(t). + + An exponential weighting factor can be used to place more weight on + recent data. + + Args: + r: Number of modes to keep. If 0 (default), all modes are kept. + w: Weighting factor in (0,1]. Smaller value allows more adpative + learning, but too small weighting may result in model identification + instability (relies only on limited recent snapshots). + initialize: number of snapshot pairs to initialize the model with. If 0 + the model will be initialized with random matrix A and P = \alpha I + where \alpha is a large positive scalar. If initialize is smaller + than the state dimension, it will be set to the state dimension and + raise a warning. Defaults to 1. + exponential_weighting: Whether to use exponential weighting in revert + seed: Random seed for reproducibility (initialize A with random values) + + Attributes: + m: state dimension x(t) as in z(t) = f(z(t-1)) or y(t) = f(t, x(t)) + n_seen: number of seen samples (read-only), reverted if windowed + feature_names_in_: list of feature names. Used for dict inputs. + A: DMD matrix, size n by n (non-Hermitian) + _P: inverse of covariance matrix of X (symmetric) + + Examples: + >>> import numpy as np + >>> import pandas as pd + >>> n = 101; freq = 2.; tspan = np.linspace(0, 10, n); dt = 0.1 + >>> a1 = 1; a2 = 1; phase1 = -np.pi; phase2 = np.pi / 2 + >>> w1 = np.cos(np.pi * freq * tspan) + >>> w2 = -np.sin(np.pi * freq * tspan) + >>> df = pd.DataFrame({'w1': w1[:-1], 'w2': w2[:-1]}) + + >>> model = OnlineDMD(r=2, w=0.1, initialize=0) + >>> X, Y = df.iloc[:-1], df.shift(-1).iloc[:-1] + + >>> for (_, x), (_, y) in zip(X.iterrows(), Y.iterrows()): + ... x, y = x.to_dict(), y.to_dict() + ... model.learn_one(x, y) + >>> eig, _ = np.log(model.eig[0]) / dt + >>> r, i = eig.real, eig.imag + >>> np.isclose(eig.real, 0.) + True + >>> np.isclose(eig.imag, np.pi * freq) + True + + >>> model.xi # TODO: verify the result + array([0.54244, 0.54244]) + + >>> from river.utils import Rolling + >>> model = Rolling(OnlineDMD(r=2, w=1.), 10) + >>> X, Y = df.iloc[:-1], df.shift(-1).iloc[:-1] + + >>> for (_, x), (_, y) in zip(X.iterrows(), Y.iterrows()): + ... x, y = x.to_dict(), y.to_dict() + ... model.update(x, y) + + >>> eig, _ = np.log(model.eig[0]) / dt + >>> r, i = eig.real, eig.imag + >>> np.isclose(eig.real, 0.) + True + >>> np.isclose(eig.imag, np.pi * freq) + True + + >>> np.isclose(model.truncation_error(X.values, Y.values), 0) + True + + >>> w_pred = model.predict_one(np.array([w1[-2], w2[-2]])) + >>> np.allclose(w_pred, [w1[-1], w2[-1]]) + True + + >>> w_pred = model.predict_many(np.array([1, 0]), 10) + >>> np.allclose(w_pred.T, [w1[1:11], w2[1:11]]) + True + + References: + [^1]: Zhang, H., Clarence Worth Rowley, Deem, E.A. and Cattafesta, L.N. + (2019). Online Dynamic Mode Decomposition for Time-Varying Systems. + Siam Journal on Applied Dynamical Systems, 18(3), pp.1586-1609. + doi:[10.1137/18m1192329](https://doi.org/10.1137/18m1192329). + """ + + def __init__( + self, + r: int = 0, + w: float = 1.0, + initialize: int = 1, + exponential_weighting: bool = False, + eig_rtol: float | None = None, + seed: int | None = None, + ) -> None: + self.r = int(r) + if self.r != 0: + # Forcing orthogonality makes the results more unstable + self._svd = OnlineSVD( + n_components=self.r, + seed=seed, + ) + self.w = float(w) + assert self.w > 0 and self.w <= 1 + self.initialize = int(initialize) + self.exponential_weighting = exponential_weighting + self.eig_rtol = eig_rtol + assert self.eig_rtol is None or 0.0 <= self.eig_rtol < 1.0 + self.seed = seed # used with sparse SVD, otherwise its deterministic + + np.random.seed(self.seed) + + self.m: int + self.n_seen: int = 0 + self.feature_names_in_: list[str] + self.A: np.ndarray + self._P: np.ndarray + self._Y: np.ndarray # for xi and modes computation + + self._A_last: np.ndarray + self._A_allclose: bool = False + self._n_cached: int = 0 # TODO: remove before merge + self._n_computed: int = 0 # TODO: remove before merge + + # Properties to be reset at each update + self._eig: tuple[np.ndarray, np.ndarray] | None = None + self._modes: np.ndarray | None = None + self._xi: np.ndarray | None = None + + @property + def eig(self) -> tuple[np.ndarray, np.ndarray]: + """Compute and return DMD eigenvalues and DMD modes at current step""" + if self._eig is None: + # TODO: need to check if SVD is initialized in case r < m. Otherwise, transformation will fail. + # TODO: explore faster ways to compute eig + # TODO: find out whether Phi should have imaginary part + Lambda, Phi = sp.linalg.eig(self.A, check_finite=False) + + sort_idx = np.argsort(Lambda)[::-1] + if not np.array_equal(sort_idx, range(len(Lambda))): + Lambda = Lambda[sort_idx] + Phi = Phi[:, sort_idx] + self._eig = Lambda, Phi + self._n_computed += 1 + return self._eig + + @property + def modes(self) -> np.ndarray: + """Reconstruct high dimensional discrete-time DMD modes""" + if self._modes is None: + _, Phi_comp = self.eig + if self.r < self.m: + # Exact DMD modes (Tu et al. (2016)) + # self._Y.T @ self._svd._Vt.T is increasingly more computationally expensive without rolling + # self._modes = ( + # self._Y.T + # @ self._svd._Vt.T # sign may change if sparse SVD is used + # @ np.diag(1 / self._svd._S) + # @ Phi_comp # sign may change if sparse EIG is used + # ) + + # Projected DMD modes (Schmid (2010)) - faster, not guaranteed + # self._modes = self._svd._U @ Phi_comp + # This regularization works much better than the above + # if high variance in svs of X + self._modes = ( + self._svd._U @ np.diag(1 / self._svd._S) @ Phi_comp + ) + else: + self._modes = Phi_comp + return self._modes + + @property + def xi(self) -> np.ndarray: + """Amlitudes of the singular values of the input matrix.""" + if self._xi is None: + Lambda, Phi = self.eig + # Compute Discrete temporal dynamics matrix (Vandermonde matrix). + C = np.vander(Lambda, self.n_seen, increasing=True) + # xi = self.Phi.conj().T @ self._Y @ np.linalg.pinv(self.C) + + from scipy.optimize import minimize + + def objective_function(x): + return np.linalg.norm( + self._Y[:, : self.r].T - Phi @ np.diag(x) @ C, "fro" + ) + 0.5 * np.linalg.norm(x, 1) + + # Minimize the objective function + xi = minimize(objective_function, np.ones(self.r)).x + self._xi = xi + return self._xi + + @property + def A_allclose(self) -> bool: + """Check if A has changed since last update of eigenvalues""" + if self.eig_rtol is None: + return False + return np.allclose( + np.abs(self._A_last[: self.A.shape[0], : self.A.shape[1]]), + np.abs(self.A), + rtol=self.eig_rtol, + ) + + def _init_update(self) -> None: + if self.r == 0: + self.r = self.m + if self.initialize > 0 and self.initialize < self.r: + self.initialize = self.r + + # Zhang (2019) suggests to initialize A with random values + self.A = np.eye(self.r) + self._A_last = self.A.copy() + self._X_init = np.empty((self.initialize, self.m)) + self._Y_init = np.empty((self.initialize, self.m)) + self._Y = np.empty((0, self.m)) + + def _truncate_w_svd( + self, + x: np.ndarray, + y: np.ndarray, + svd_modify: Literal["update", "revert"] | None = None, + ): + U_prev = self._svd._U + # We can update svd on x now without leaking new sample which is in y + # try: + if svd_modify == "update": + self._svd.update(x) + elif svd_modify == "revert": + self._svd.revert(x) + _U = self._svd._U + _UU = _U.T @ U_prev + x = x @ _U + # p != self.m and p == self.A.shape[0] in case of DMDwC + p = self.A.shape[0] + y = y @ _U[: y.shape[1], :p] + # Check if A is square + if self.A.shape[0] == self.A.shape[1]: + self.A = _UU @ self.A @ _UU.T + # If A is not square, it is called by DMDwC + else: + _UUp = _UU[:p, :p] + # _UUq = _UU[p:, p:] + # self.A = np.column_stack( + # (_UUp @ self.A[:, :p] @ _UUp.T, _UUp @ self.A[:, p:] @ _UUq.T) + # ) + self.A = _UUp @ self.A @ _UU.T + # Understand why we divide by w + self._P = np.linalg.inv(_UU @ np.linalg.inv(self._P) @ _UU.T) / self.w + + return x, y + + def _update_A_P( + self, X: np.ndarray, Y: np.ndarray, W: float | np.ndarray + ) -> None: + Xt = X.T + AX = self.A.dot(Xt) + PX = self._P.dot(Xt) + PXt = PX.T + Gamma = np.linalg.inv(W + X.dot(PX)) + # update A on new data + self.A += (Y.T - AX).dot(Gamma).dot(PXt) + # update P, group Px*Px' to ensure positive definite + self._P = (self._P - PX.dot(Gamma).dot(PXt)) / self.w + # TODO: understand why is this needed (tests fail when commented out) + # Any matrix congruent to a symmetric matrix is again symmetric: if X + # is a symmetric matrix, then so is A@X@A.T for any matrix A. + self._P = (self._P + self._P.T) / 2 + + # Reset properties + # TODO: explore what revert does with reseting properties + if not self.A_allclose: + self._eig = None + self._A_last = self.A.copy() + else: + self._n_cached += 1 + + self._modes = None + + def update( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + ) -> None: + """Update the DMD computation with a new pair of snapshots (x, y) + + Here, if the (discrete-time) dynamics are given by z(t) = f(z(t-1)), + then (x,y) should be measurements correponding to consecutive states + z(t-1) and z(t). + + Args: + x: 1D array, shape (m, ), x(t) as in y(t) = f(t, x(t)) + y: 1D array, shape (m, ), y(t) as in y(t) = f(t, x(t)) + """ + # If Hankelizer is used, we need to use DMD without y + if y is None: + if not hasattr(self, "_x_last"): + self._x_last = x + return + else: + y = x + x = self._x_last + self._x_last = y + + if isinstance(x, dict): + self.feature_names_in_ = list(x.keys()) + x = np.array(list(x.values()), ndmin=2) + x_ = x.reshape(1, -1) + if isinstance(y, dict): + assert self.feature_names_in_ == list(y.keys()) + y = np.array(list(y.values()), ndmin=2) + y_ = y.reshape(1, -1) + + # Initialize properties which depend on the shape of x + if self.n_seen == 0: + self.m = x_.shape[1] + self._init_update() + + # Collect buffer of past snapshots to compute modes and xi + if self._Y.shape[0] <= self.n_seen + 1: + self._Y = np.row_stack([self._Y, y_]) + if self._Y.shape[0] > self.n_seen + 1: + self._Y = self._Y[-(self.n_seen + 1) :, :] + + # Initialize A and P with first self.initialize snapshot pairs + if bool(self.initialize) and self.n_seen < self.initialize: + self._X_init[self.n_seen, :] = x_ + self._Y_init[self.n_seen, :] = y_ + if self.n_seen == self.initialize - 1: + self.learn_many(self._X_init, self._Y_init) + # revert the number of seen samples to avoid doubling + self.n_seen -= self._X_init.shape[0] + del self._X_init, self._Y_init + # Update incrementally if initialized + else: + if self.n_seen == 0: + epsilon = 1e-15 + alpha = 1.0 / epsilon + self._P = alpha * np.identity(self.r) + if self.r < self.m: + x_, y_ = self._truncate_w_svd(x_, y_, svd_modify="update") + + self._update_A_P(x_, y_, 1.0) + + self.n_seen += 1 + + def learn_one( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + ) -> None: + """Allias for update method.""" + self.update(x, y) + + def revert( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + ) -> None: + """Gradually forget the older snapshots and revert the DMD computation. + + Compatible with Rolling and TimeRolling wrappers. + + Args: + x: 1D array, shape (1, m), x(t) as in y(t) = f(t, x(t)) + y: 1D array, shape (1, m), y(t) as in y(t) = f(t, x(t)) + + TODO: + - [ ] it seems like this does not work as expected + """ + if self.n_seen < self.initialize: + raise RuntimeError( + f"Cannot revert {self.__class__.__name__} before " + "initialization. If used with Rolling or TimeRolling, window " + f"size should be increased to {self.initialize + 1 if y is None else 0}." + ) + if y is None: + if not hasattr(self, "_x_first"): + self._x_first = x + return + else: + y = x + x = self._x_first + self._x_first = y + + if isinstance(x, dict): + x = np.array(list(x.values())) + x_ = x.reshape(1, -1) + if isinstance(y, dict): + y = np.array(list(y.values())) + y_ = y.reshape(1, -1) + + if self.r < self.m: + x_, y_ = self._truncate_w_svd(x_, y_, svd_modify="revert") + + # Apply exponential weighting factor + if self.exponential_weighting: + weight = 1.0 / -(self.w**self.n_seen) + else: + weight = -1.0 + + self._update_A_P(x_, y_, weight) + + self.n_seen -= 1 + + def _update_many( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame, + ) -> None: + """Update the DMD computation with a new batch of snapshots (X,Y). + + This method brings no change in theoretical time and space complexity. + However, it allows parallel computing by vectorizing update in loop. + + Args: + X: The input snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + Y: The output snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + + TODO: + - [ ] find out why not equal to for loop update implementation + when weights are used + + """ + p = X.shape[0] + if self.exponential_weighting: + weights = np.sqrt(self.w) ** np.arange(p - 1, -1, -1) + else: + weights = np.ones(p) + # Zhang (2019): Gamma = (C^{-1} U^T P U )^{−1} ) + C_inv = np.diag(np.reciprocal(weights)) + + if isinstance(X, pd.DataFrame): + X_ = X.values + else: + X_ = X + if isinstance(Y, pd.DataFrame): + Y_ = Y.values + else: + Y_ = Y + if self.r < self.m: + X_, Y_ = self._truncate_w_svd(X_, Y_, svd_modify="update") + self._update_A_P(X_, Y_, C_inv) + + def update_many( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame | None = None, + ) -> None: + """Learn the OnlineDMD model using multiple snapshot pairs. + + Useful for initializing the model with a batch of snapshot pairs. + Otherwise, it is equivalent to calling update method in a loop. + + Args: + X: The input snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + Y: The output snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + """ + if Y is None: + if isinstance(X, pd.DataFrame): + Y = X.shift(-1).iloc[:-1] + X = X.iloc[:-1] + elif isinstance(X, np.ndarray): + Y = np.roll(X, -1)[:-1] + X = X[:-1] + + if isinstance(X, pd.DataFrame): + X = X.values + if isinstance(Y, pd.DataFrame): + Y = Y.values + + # necessary condition for over-constrained initialization + n = X.shape[0] + # Exponential weighting factor - older snapshots are weighted less + if self.exponential_weighting: + weights = (np.sqrt(self.w) ** np.arange(n - 1, -1, -1))[ + :, np.newaxis + ] + else: + weights = np.ones((n, 1)) + Xqhat, Yqhat = weights * X, weights * Y + + self.n_seen += n + + # Initialize A and P with first p snapshot pairs + if not hasattr(self, "_P"): + self.m = X.shape[1] + if self.r == 0: + self.r = self.m + + _rank_X = np.linalg.matrix_rank(X) + if not _rank_X >= self.r: + raise ValueError( + f"Failed rank(X) [{_rank_X}] >= n_modes [{self.r}].\n" + "Increase the number of snapshots (increase initialize " + f"[{self.initialize}] if learn_many was not called " + "directly) or reduce the number of modes." + ) + XX = Xqhat.T @ Xqhat + # TODO: think about using correlation matrix to avoid scaling issues + # https://stats.stackexchange.com/questions/12200/normalizing-variables-for-svd-pca + # std = np.sqrt(np.diag(XX)) + # XX = XX / np.outer(std, std) + # Perform truncated DMD + if self.r < self.m: + self._svd.learn_many(Xqhat) + _U, _S, _V = self._svd._U, self._svd._S, self._svd._Vt + + _m = Yqhat.shape[1] + _l = self.m - _m + + # DMDwC, A = U.T @ K @ U; B = U.T @ K [Proctor (2016)] + if _l != 0: + _UU = _U.T @ np.row_stack([_U[:_m], np.eye(_l, self.r)]) + # DMD, A = U.T @ K @ U + else: + _UU = np.eye(self.r) + + # TODO: Verify if equivalent to Proctor (2016). They compute U_hat from SVD(Y), we select the first r columns of U + self.A = ( + _U.T[:, : Yqhat.shape[1]] + @ Yqhat.T + @ _V.T + @ np.diag(1 / _S) + ) @ _UU + self._P = np.linalg.inv(_U.T @ XX @ _U) / self.w + # Perform exact DMD + else: + self.A = Yqhat.T.dot(np.linalg.pinv(Xqhat.T)) + self._P = np.linalg.inv(XX) / self.w + + self._A_last = self.A.copy() + # Store the last p snapshots for xi computation + self._Y = Yqhat + self.initialize = 0 + # Update incrementally if initialized + # Zhang (2019): "single rank-s update is roughly the same as applying + # the rank-1 formula s times" + else: + self._update_many(Xqhat, Yqhat) + if self._Y.shape[0] <= self.n_seen: + self._Y = np.row_stack([self._Y, Yqhat]) + if self._Y.shape[0] > self.n_seen: + self._Y = self._Y[-(self.n_seen) :, :] + + def learn_many( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame | None = None, + ) -> None: + """Allias for update_many method.""" + self.update_many(X, Y) + + def predict_one(self, x: dict | np.ndarray) -> np.ndarray: + """ + Predicts the next state given the current state. + + Parameters: + x: The current state. + + Returns: + np.ndarray: The predicted next state. + """ + # Map A back to original space + if self.r < self.m: + A = self._svd._U @ self.A @ self._svd._U.T + else: + A = self.A + mat = np.zeros((2, self.m)) + mat[0, :] = x if isinstance(x, np.ndarray) else list(x.values()) + for s in range(1, 2): + mat[s, :] = (A @ mat[s - 1, :]).real + return mat[-1, :] + + def predict_many(self, x: dict | np.ndarray, horizon: int) -> np.ndarray: + """ + Predicts multiple future values based on the given initial value. + + Args: + x: The initial value. + horizon (int): The number of future values to predict. + + Returns: + np.ndarray: An array containing the predicted future values. + + TODO: + - [ ] Align predict_many with river API + """ + # Map A back to original space + if self.r < self.m: + A = self._svd._U @ self.A @ self._svd._U.T + else: + A = self.A + mat = np.zeros((horizon + 1, self.m)) + mat[0, :] = x if isinstance(x, np.ndarray) else list(x.values()) + for s in range(1, horizon + 1): + mat[s, :] = (A @ mat[s - 1, :]).real + return mat[1:, :] + + def forecast(self, horizon: int, xs: list[dict] | None = None) -> list: + x = self._x_last + if not hasattr(self, "m"): + self.m = len(x) + # Map A back to original space + if self.r < self.m: + if hasattr(self._svd, "_U"): + A = self._svd._U @ self.A @ self._svd._U.T + else: + return np.zeros((horizon, 1)).flatten().tolist() + else: + A = self.A + mat = np.zeros((horizon + 1, self.m)) + mat[0, :] = x if isinstance(x, np.ndarray) else list(x.values()) + for s in range(1, horizon + 1): + mat[s, :] = (A @ mat[s - 1, :]).real + return mat[1:, -1].flatten().tolist() + + def truncation_error( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame, + ) -> float: + """Compute the truncation error of the DMD model on the given data. + + Since this implementation computes exact DMD, the truncation error is relevant only for initialization. + + Args: + X: 2D array, shape (p, m), matrix [x(1),x(2),...x(p)] + Y: 2D array, shape (p, m), matrix [y(1),y(2),...y(p)] + + Returns: + float: Truncation error of the DMD model + """ + Y_hat = self.A @ X.T + return float(np.linalg.norm(Y - Y_hat.T) / np.linalg.norm(Y)) + + def transform_one(self, x: dict | np.ndarray) -> dict: + """ + Transforms the given input sample. + + Args: + x: The input to transform. + + Returns: + np.ndarray: The transformed input. + """ + if isinstance(x, dict): + x = np.array(list(x.values())) + if not hasattr(self, "A") or ( + hasattr(self, "_svd") and not hasattr(self._svd, "_U") + ): + return dict( + zip( + range(self.r), + np.zeros(self.r), + ) + ) + return dict(zip(range(self.r), x @ self.modes)) + + def transform_many( + self, X: np.ndarray | pd.DataFrame + ) -> np.ndarray | pd.DataFrame: + """ + Transforms the given input sequence. + + Args: + x: The input to transform. + + Returns: + np.ndarray: The transformed input. + """ + M = self.modes + return X @ M + + +class OnlineDMDwC(OnlineDMD): + """Online Dynamic Mode Decomposition (DMD) with Control. + + This regressor is a class that implements online dynamic mode decomposition + The time complexity (multiply-add operation for one iteration) is O(4n^2), + and space complexity is O(2n^2), where n is the state dimension. + + This estimator supports learning with mini-batches with same time and space + complexity as the online learning. + + At time step t, define three matrices X(t) = [x(1),x(2),...,x(t)], + Y(t) = [y(1),y(2),...,y(t)], U(t) = [U(1),U(2),...,U(t)] that contain all + the past snapshot pairs, where x(t), y(t) are the n dimensional state + vectors, and u(t) is m dimensional control input vector, given by + y(t) = f(x(t), u(t)). + + x(t), y(t) should be measurements correponding to consecutive states z(t-1) + and z(t). + + An exponential weighting factor can be used to place more weight on + recent data. + + Args: + B: control matrix, size n by m. If None, the control matrix will be + identified from the snapshots. Defaults to None. + p: truncation of states. If 0 (default), compute exact DMD. + q: truncation of control. If 0 (default), compute exact DMD. + w: weighting factor in (0,1]. Smaller value allows more adpative + learning, but too small weighting may result in model identification + instability (relies only on limited recent snapshots). + initialize: number of snapshot pairs to initialize the model with. If 0 + the model will be initialized with random matrix A and P = \alpha I + where \alpha is a large positive scalar. If initialize is smaller + than the state dimension, it will be set to the state dimension and + raise a warning. Defaults to 1. + exponential_weighting: whether to use exponential weighting in revert + seed: random seed for reproducibility (initialize A with random values) + + Attributes: + m: augumented state dimension. if B is None, m = x.shape[1], else m = x.shape[1] + u.shape[1] + n_seen: number of seen samples (read-only), reverted if windowed + A: DMD matrix, size n by n + _P: inverse of covariance matrix of X + + Examples: + >>> import numpy as np + >>> import pandas as pd + + >>> n = 101 + >>> freq = 2.0 + >>> tspan = np.linspace(0, 10, n) + >>> dt = 0.1 + >>> a1 = 1 + >>> a2 = 1 + >>> phase1 = -np.pi + >>> phase2 = np.pi / 2 + >>> w1 = np.cos(np.pi * freq * tspan) + >>> w2 = -np.sin(np.pi * freq * tspan) + >>> u_ = np.ones(n) + >>> u_[tspan > 5] *= 2 + >>> w1[tspan > 5] *= 2 + >>> w2[tspan > 5] *= 2 + >>> df = pd.DataFrame({"w1": w1[:-1], "w2": w2[:-1]}) + >>> U = pd.DataFrame({"u": u_[:-2]}) + + >>> model = OnlineDMDwC(p=2, q=1, w=0.1, initialize=4) + >>> X, Y = df.iloc[:-1], df.shift(-1).iloc[:-1] + + >>> for (_, x), (_, y), (_, u) in zip(X.iterrows(), Y.iterrows(), U.iterrows()): + ... x, y, u = x.to_dict(), y.to_dict(), u.to_dict() + ... model.learn_one(x, y, u) + >>> eig, _ = np.log(model.eig[0]) / dt + >>> r, i = eig.real, eig.imag + >>> np.isclose(eig.real, 0.0) + True + >>> np.isclose(eig.imag, np.pi * freq) + True + + Supports mini-batch learning: + >>> from river.utils import Rolling + + >>> model = Rolling(OnlineDMDwC(p=2, q=1, w=1.0), 10) + >>> X, Y = df.iloc[:-1], df.shift(-1).iloc[:-1] + + >>> for (_, x), (_, y), (_, u) in zip(X.iterrows(), Y.iterrows(), U.iterrows()): + ... x, y, u = x.to_dict(), y.to_dict(), u.to_dict() + ... model.update(x, y, u) + + >>> eig, _ = np.log(model.eig[0]) / dt + >>> r, i = eig.real, eig.imag + >>> np.isclose(eig.real, 0.0) + True + >>> np.isclose(eig.imag, np.pi * freq) + True + + # TODO: find out why not passing + # >>> np.isclose(model.truncation_error(X.values, Y.values, U.values), 0) + # True + + >>> w_pred = model.predict_one( + ... np.array([w1[-2], w2[-2]]), + ... np.array([u_[-2]]), + ... ) + >>> np.allclose(w_pred, [w1[-1], w2[-1]]) + True + + >>> w_pred = model.predict_one( + ... np.array([w1[-2], w2[-2]]), + ... np.array([u_[-2]]), + ... ) + >>> np.allclose(w_pred, [w1[-1], w2[-1]]) + True + + >>> w_pred = model.predict_many(np.array([1, 0]), np.ones((10, 1)), 10) + >>> np.allclose(w_pred.T, [w1[1:11], w2[1:11]]) + True + + References: + [^1]: Zhang, H., Clarence Worth Rowley, Deem, E.A. and Cattafesta, L.N. + (2019). Online Dynamic Mode Decomposition for Time-Varying Systems. + Siam Journal on Applied Dynamical Systems, 18(3), pp.1586-1609. + doi:[10.1137/18m1192329](https://doi.org/10.1137/18m1192329). + """ + + def __init__( + self, + B: np.ndarray | None = None, + p: int = 0, + q: int = 0, # TODO: fix case when q is 0 + w: float = 1.0, + initialize: int = 1, + exponential_weighting: bool = False, + eig_rtol: float | None = None, + seed: int | None = None, + ) -> None: + super().__init__( + p + q, + w, + initialize, + exponential_weighting, + eig_rtol, + seed, + ) + self.p = p + self.q = q + self.B = B + self.known_B = B is not None + self.l: int + + @property + def modes(self) -> np.ndarray: + """Reconstruct high dimensional DMD modes""" + if self._modes is None: + _, Phi = self.eig + if self.r < self.m: + # Sign of eigenvectors and singular vectors may change based on underlying algorithm initialization + # Proctor (2016) + # self._Y.T @ self._svd._Vt.T is increasingly more computationally expensive without rolling + self._modes = ( + self._Y.T + @ self._svd._Vt.T[:, : self.p] + @ np.diag(1 / self._svd._S[: self.p]) + @ Phi + ) + # Following has similar results to our modification + # self._modes = (self._Y.T @ self._svd._Vt.T @ np.diag(1/self._svd._S))[:, :self.p] @ Phi + + # This is faster but significantly alter results for OnlineDMDwC. + self._modes = (self._svd._U @ np.diag(1 / self._svd._S))[ + : self.m - self.l, : self.p + ] @ Phi + else: + self._modes = Phi + return self._modes + + @property + def xi(self) -> np.ndarray: + """Amlitudes of the singular values of the input matrix.""" + return np.linalg.pinv(self.modes) @ np.array( + list(self._x_first.values()) + ) + + def _init_update(self) -> None: + if not hasattr(self, "l"): + super()._init_update() + return + if self.p == 0: + self.p = self.m + if self.q == 0: + self.q = self.l + if self.known_B: + self.r = self.p + else: + self.r = self.p + self.q + # TODO: if p or q == 0 in __init__, we need to reinitialize SVD + self._svd = OnlineSVD( + n_components=self.r, + seed=self.seed, + ) + if self.initialize < self.r: + self.initialize = self.r + + self.A = np.eye(self.p) + self._A_last = self.A.copy() + if not self.known_B: + self.B = np.eye(self.p, self.q) + self._A_last = np.column_stack((self.A, self.B)) + self._U_init = np.zeros((self.initialize, self.l)) + self._X_init = np.empty((self.initialize, self.m)) + self._Y_init = np.empty((self.initialize, self.m)) + self._Y = np.empty((0, self.m)) + + def _reconstruct_AB(self): + # self.m stores augumented state dimension + _m = self.m - self.l if not self.known_B else self.m + if self.r < self.m: + A = ( + self._svd._U[:_m, : self.p] + @ self.A + @ self._svd._U[:_m, : self.p].T + ) + B = ( + self._svd._U[:_m, : self.p] + @ self.B + @ self._svd._U[-self.q :, -self.l :] + ) + else: + A = self.A + B = self.B + return A, B + + def update( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + u: dict | np.ndarray | None = None, + ) -> None: + """Update the DMD computation with a new pair of snapshots (x, y) + + Here, if the (discrete-time) dynamics are given by z(t) = f(z(t-1)), + then (x,y) should be measurements correponding to consecutive states + z(t-1) and z(t). + + Args: + x: 1D array, shape (n, ), x(t) as in y(t) = f(t, x(t)) + y: 1D array, shape (n, ), y(t) as in y(t) = f(t, x(t)) + u: 1D array, shape (m, ), u(t) as in y(t) = f(t, x(t), u(t)) + """ + if y is None: + if not hasattr(self, "_x_last"): + self._x_last = x + self._u_last = u + return + else: + y = x + x = self._x_last + self._x_last = y + _u_hold = u + u = self._u_last + self._u_last = _u_hold + + if isinstance(x, dict): + x = np.array(list(x.values())) + x = x.reshape(1, -1) + if isinstance(y, dict): + y = np.array(list(y.values())) + y = y.reshape(1, -1) + if isinstance(u, dict): + u = np.array(list(u.values())) + if isinstance(u, np.ndarray): + u = u.reshape(1, -1) + # Needed in case of recursive call from learn_many within parent class + if u is None: + super().update(x, y) + else: + if self.n_seen == 0: + self.m = x.shape[1] + self.l = u.shape[1] + self._init_update() + self.m += 0 if self.known_B else u.shape[1] + + if self.initialize and self.n_seen <= self.initialize - 1: + # Accumulate buffer of past snapshots for initialization + self._X_init[self.n_seen, :] = x + self._Y_init[self.n_seen, :] = y + self._U_init[self.n_seen, :] = u + # Run the initialization after collecting enough snapshots + if self.n_seen == self.initialize - 1: + self.learn_many(self._X_init, self._Y_init, self._U_init) + # Subtract the number of seen samples to avoid doubling + self.n_seen -= self._X_init.shape[0] + self.n_seen += 1 + + else: + if self.known_B and self.B is not None: + y = y - u @ self.B.T + else: + x = np.column_stack((x, u)) + if self.B is not None: # For correct type hinting + self.A = np.column_stack((self.A, self.B)) + super().update(x, y) + + # In case that learn_many was called, A is already square + if self.A.shape[0] < self.A.shape[1]: + self.B = self.A[: self.p, -self.q :] + self.A = self.A[: self.p, : -self.q] + + def learn_one( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + u: dict | np.ndarray | None = None, + ) -> None: + """Allias for OnlineDMDwC.update method.""" + return self.update(x, y, u) + + def revert( + self, + x: dict | np.ndarray, + y: dict | np.ndarray | None = None, + u: dict | np.ndarray | None = None, + ) -> None: + """Gradually forget the older snapshots and revert the DMD computation. + + Compatible with Rolling and TimeRolling wrappers. + + Args: + x: 1D array, shape (n, ), x(t) + y: 1D array, shape (n, ), y(t) + u: 1D array, shape (m, ), u(t) + """ + if u is None: + super().revert(x, y) + return + + if y is None: + if not hasattr(self, "_x_first"): + self._x_first = x + self._u_first = u + return + else: + y = x + x = self._x_first + self._x_first = y + _u_hold = u + u = self._u_first + self._u_first = _u_hold + + if isinstance(x, dict): + x = np.array(list(x.values())) + x = x.reshape(1, -1) + if isinstance(y, dict): + y = np.array(list(y.values())) + y = y.reshape(1, -1) + if isinstance(u, dict): + u = np.array(list(u.values())) + u = u.reshape(1, -1) + if self.known_B and self.B is not None: + y = y - u @ self.B.T + else: + x = np.column_stack((x, u)) + if self.B is not None: + self.A = np.column_stack((self.A, self.B)) + + super().revert(x, y) + + if not self.known_B: + self.B = self.A[: self.p, -self.q :] + self.A = self.A[: self.p, : -self.q] + + def _update_many( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame, + U: np.ndarray | pd.DataFrame | None = None, + ) -> None: + """Update the DMD computation with a new batch of snapshots (X,Y). + + This method brings no change in theoretical time and space complexity. + However, it allows parallel computing by vectorizing update in loop. + + Args: + X: The input snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + Y: The output snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + U: The control input snapshot matrix of shape (p, l), where p is the number of snapshots and p is the number of control inputs. + """ + if U is None: + super()._update_many(X, Y) + else: + if self.known_B: + Y = Y - self.B @ U + else: + X = np.column_stack((X, U)) + if self.n_seen == 0: + self.m = X.shape[1] + self.l = U.shape[1] + self._init_update() + if not self.known_B and self.B is not None: + self.A = np.column_stack((self.A, self.B)) + self.l = U.shape[1] + super()._update_many(X, Y) + + if not self.known_B: + self.B = self.A[:, -self.q :] + self.A = self.A[:, : -self.q] + + def learn_many( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame | None = None, + U: np.ndarray | pd.DataFrame | None = None, + ) -> None: + """Learn the OnlineDMDwC model using multiple snapshot pairs. + + Useful for initializing the model with a batch of snapshot pairs. + Otherwise, it is equivalent to calling update method in a loop. + + Args: + X: The input snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + Y: The output snapshot matrix of shape (p, m), where p is the number of snapshots and m is the number of features. + U: The output snapshot matrix of shape (p, l), where p is the number of snapshots and l is the number of control inputs. + """ + if U is None: + super().learn_many(X, Y) + return + + if isinstance(X, pd.DataFrame): + X = X.values + if isinstance(Y, pd.DataFrame): + Y = Y.values + if isinstance(U, pd.DataFrame): + U = U.values + + if Y is None: + Y = np.roll(X, -1)[:-1] + X = X[:-1] + U = U[:-1] + + if self.known_B and self.B is not None: + Y = Y - U @ self.B.T + else: + X = np.column_stack((X, U)) + if self.B is not None: # If learn_many is not called first + self.A = np.column_stack((self.A, self.B)) + + self.l = U.shape[1] + super().learn_many(X, Y) + + if self.p == 0: + self.p = self.m + if self.q == 0: + self.q = self.l + if not self.known_B: + self.B = self.A[: self.p, -self.q :] + self.A = self.A[: self.p, : -self.q] + + def predict_one( + self, x: dict | np.ndarray, u: dict | np.ndarray + ) -> np.ndarray: + """ + Predicts the next state given the current state. + + Parameters: + x: The current state. + u: The control input. + + Returns: + np.ndarray: The predicted next state. + """ + if isinstance(u, dict): + u = np.array(list(u.values())) + _m = len(x) + A, B = self._reconstruct_AB() + + mat = np.zeros((2, _m)) + mat[0, :] = x if isinstance(x, np.ndarray) else list(x.values()) + for s in range(1, 2): + action = (B @ u).real + # TODO: map A back to original space + mat[s, :] = (A @ mat[s - 1, :]).real + action + return mat[-1, :] + + def predict_many( + self, + x: dict | np.ndarray, + U: np.ndarray | pd.DataFrame, + horizon: int, + ) -> np.ndarray: + """ + Predicts multiple future values based on the given initial value. + + Args: + x: The initial value. + U: The control input matrix of shape (horizon, l), where l is the number of control inputs. + horizon (int): The number of future values to predict. + + Returns: + np.ndarray: An array containing the predicted future values. + + TODO: + - [ ] Align predict_many with river API + """ + if isinstance(U, pd.DataFrame): + U = U.values + _m = len(x) + A, B = self._reconstruct_AB() + + mat = np.zeros((horizon + 1, _m)) + mat[0, :] = x if isinstance(x, np.ndarray) else list(x.values()) + for s in range(1, horizon + 1): + action = (B @ U[s - 1, :]).real + mat[s, :] = (A @ mat[s - 1, :]).real + action + return mat[1:, :] + + def truncation_error( + self, + X: np.ndarray | pd.DataFrame, + Y: np.ndarray | pd.DataFrame, + U: np.ndarray | pd.DataFrame, + ) -> float: + """Compute the truncation error of the DMD model on the given data. + + Args: + X: 2D array, shape (n, m), matrix [x(1),x(2),...x(n)] + Y: 2D array, shape (n, m), matrix [y(1),y(2),...y(n)] + U: 2D array, shape (n, l), matrix [u(1),u(2),...u(n)] + + Returns: + float: Truncation error of the DMD model + """ + + A, B = self._reconstruct_AB() + Y_hat = A @ X.T + B @ U.T + return float(np.linalg.norm(Y - Y_hat.T) / np.linalg.norm(Y)) diff --git a/river/decomposition/opca.py b/river/decomposition/opca.py new file mode 100644 index 0000000000..9acdfd6904 --- /dev/null +++ b/river/decomposition/opca.py @@ -0,0 +1,200 @@ +"""Online Principal Component Analysis (PCA) in [River API](riverml.xyz). + +This module contains the implementation of the Online PCA algorithm. +It is based on the paper by Eftekhari et al. [^1] + +References: + [^1]: Eftekhari, A., Ongie, G., Balzano, L., Wakin, M. B. (2019). Streaming Principal Component Analysis From Incomplete Data. Journal of Machine Learning Research, 20(86), pp.1-62. url:http://jmlr.org/papers/v20/16-627.html. +""" +from __future__ import annotations + +from collections import deque + +import numpy as np + +from river.base import Transformer + +__all__ = [ + "OnlinePCA", +] + + +class OnlinePCA(Transformer): + """_summary_ + + Args: + n_components: Desired dimensionality of output data. The default value is useful for visualisation. + b: size of the blocks. Must be greater than or equal to n_components. + lambda_: tuning parameter + sigma: reject threshold + tau: reject threshold + + Attributes: + feature_names_in_: List of input features. + n_seen: Number of samples seen. + Y_k: Block of received data of size (n_features_in_, b). + S_hat: R-dimensional subspace with orthonormal basis (n_features_in_, n_components) + + Examples: + >>> import pandas as pd + >>> np.random.seed(0) + >>> m = 20 + >>> n = 80 + >>> mean = [5, 10, 15] + >>> covariance_matrix = [[1, 0.5, 0.3], + ... [0.5, 1, 0.2], + ... [0.3, 0.2, 1]] + >>> num_samples = 100 + >>> X = np.random.multivariate_normal(mean, covariance_matrix, num_samples) + >>> n_nans = 2 + >>> nan_indices = np.random.choice(range(X.shape[0]), size=n_nans, replace=False) + >>> X[nan_indices] = np.nan + >>> pca = OnlinePCA(n_components=2) + >>> for x in X[:50]: + ... pca.learn_one(x) + >>> pca.transform_one(X[-1, :]) + {0: -17.9652, 1: -0.8711} + + >>> pca = OnlinePCA(n_components=2, b=4) + >>> X = pd.DataFrame(X) + >>> for _, x in X.iloc[:50].iterrows(): + ... pca.learn_one(x.to_dict()) + >>> pca.transform_one(X.iloc[-1, :].to_dict()) + {0: -17.9470, 1: -1.0941} + + """ + + def __init__( + self, + n_components: int = 2, + b: int | None = None, + lambda_: float = 0.0, + sigma: float = 0.0, + tau: float = 0.0, + seed: int | None = None, + ): + self.n_components = int(n_components) + # Default maximizes the efficiency [Eftekhari, et al. (2019)] + if not b: + b = self.n_components + else: + b = int(b) + self.b = b + assert lambda_ >= 0 + self.lambda_ = lambda_ + assert sigma >= 0 + self.sigma = sigma + assert tau >= 0 + self.tau = tau + + self.feature_names_in_: list[str] + self.n_features_in_: int # n [Eftekhari, et al. (2019)] + self.n_seen: int = 0 # k [Eftekhari, et al. (2019)] + self.Y_k: deque + self.P_omega_k: deque + self.S_hat: np.ndarray + self.seed = seed + np.random.seed(self.seed) + + def learn_one(self, x: dict | np.ndarray): + """_summary_ + + Args: + x: Incomplete observation of data matrix. Accepts NaNs (n_features_in_,) + """ + if isinstance(x, dict): + if self.n_seen == 0: + self.feature_names_in_ = list(x.keys()) + else: + assert not set(self.feature_names_in_).difference( + set(x.keys()) + ) + x = np.array(list(x.values())) + # TODO: align with OnlineSVD + # x = x.reshape(1, -1) + + if self.n_seen == 0: + self.n_features_in_ = x.shape[0] + if self.n_components == 0: + self.n_components = self.n_features_in_ + # Make b feasible if not set and learn_one is called first + if not self.b: + self.b = self.n_components + self.Y_k = deque(maxlen=self.b) + self.P_omega_k = deque(maxlen=self.b) + # Initialize S_hat with random orthonormal matrix for transform_one + r_mat = np.random.randn(self.n_features_in_, self.n_components) + self.S_hat, _ = np.linalg.qr(r_mat) + + # Random index set over which s_t is observed + omega_t = ~np.isnan(x) # (n_features_in_,) + # TODO: find out whether correct + x = np.nan_to_num(x, nan=0.0) + # Projection onto coordinate set. Diagonal entry corresponding to the index set omega_t (n_features_in_, n_features_in_) + P_omega_t = np.diag(omega_t).astype(int) + self.Y_k.append(x) + self.P_omega_k.append(P_omega_t) + + if len(self.Y_k) == self.b: + # Reinitialize S_hat now when deque is full + if self.n_seen == self.b - 1: + # Let S_hat \in \mathbb{R}^{n \times b} be the + _, _, V = np.linalg.svd( + np.array(self.Y_k), full_matrices=False + ) + self.S_hat = V.T[:, : self.n_components] + else: + R_k = np.empty((self.n_features_in_, self.b)) + # range((self.n_seen - 1) * self.b + 1, self.n_seen * self.b) [Eftekhari, et al. (2019)] + for k, (y_t, P_omega_t) in enumerate( + zip(self.Y_k, self.P_omega_k) + ): + P_omega_t_comp = ( + np.identity(self.n_features_in_) - P_omega_t + ) + + I_r = np.identity(self.n_components) + S_hat_t = self.S_hat.T + R_k[:, k] = ( + y_t + + P_omega_t_comp + @ self.S_hat + @ np.linalg.pinv( + S_hat_t @ P_omega_t @ self.S_hat + + self.lambda_ * I_r + ) + @ S_hat_t + @ y_t + ) + U_r, sigma_r, _ = np.linalg.svd(R_k) + _sigma_below_thresh = ( + sigma_r[self.n_components - 1] < self.sigma + ) + if self.b > self.n_components: + _sigma_ratio_below_thresh = ( + sigma_r[self.n_components] + <= (1 + self.tau) * sigma_r[1] + ) + else: + _sigma_ratio_below_thresh = True + if ~(_sigma_below_thresh or _sigma_ratio_below_thresh): + self.S_hat = U_r[:, : self.n_components] + + self.Y_k.clear() # Non overlapping blocks + + self.n_seen += 1 + + def transform_one(self, x: dict | np.ndarray) -> dict: + if isinstance(x, dict): + x = np.array(list(x.values())) + # If transform one is called before any learning has been done + # TODO: consider raising an runtime error + if not hasattr(self, "S_hat"): + return dict( + zip( + range(self.n_components), + np.zeros(self.n_components), + ) + ) + x = x @ self.S_hat + return dict(zip(range(self.n_components), x)) diff --git a/river/decomposition/osvd.py b/river/decomposition/osvd.py new file mode 100644 index 0000000000..7e1e2eef78 --- /dev/null +++ b/river/decomposition/osvd.py @@ -0,0 +1,798 @@ +"""Online Singular Value Decomposition (SVD) in [River API](riverml.xyz). + +This module contains the implementation of the Online SVD algorithm. +It is based on the paper by Brand et al. [^1] + +References: + [^1]: Brand, M. (2006). Fast low-rank modifications of the thin singular value decomposition. Linear Algebra and its Applications, 415(1), pp.20-30. doi:[10.1016/j.laa.2005.07.021](https://doi.org/10.1016/j.laa.2005.07.021). + [^2]: Zhang, Y. (2022). An answer to an open question in the incremental SVD. doi:[10.48550/arXiv.2204.05398](https://doi.org/10.48550/arXiv.2204.05398). + [^3]: Zhang, Y. (2022). A note on incremental SVD: reorthogonalization. doi:[10.48550/arXiv.2204.05398](https://doi.org/10.48550/arXiv.2204.05398). +""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import scipy as sp + +from river.base import MiniBatchTransformer + +__all__ = [ + "OnlineSVD", + "OnlineSVDZhang", +] + + +def test_orthonormality(vectors, tol=1e-12): # pragma: no cover + """ + Test orthonormality of a set of vectors. + + Parameters: + vectors : numpy.ndarray + Matrix where each column represents a vector + tol : float, optional + Tolerance for checking orthogonality and unit length + + Returns: + is_orthonormal : bool + True if vectors are orthonormal, False otherwise + """ + # Check unit length + norms = np.linalg.norm(vectors, axis=0) + is_unit_length = np.allclose(norms, 1, atol=tol) + + # Check orthogonality + inner_products = np.dot(vectors.T, vectors) + off_diagonal = inner_products - np.diag(np.diag(inner_products)) + is_orthogonal = np.allclose(off_diagonal, 0, atol=tol) + + # Check if both conditions are satisfied + is_orthonormal = is_unit_length and is_orthogonal + + return is_orthonormal + + +def _orthogonalize(U, S, Vt, tol=1e-12, solver="arpack", random_state=None): + """Orthogonalize the singular value decomposition. + + This function orthogonalizes the singular value decomposition by performing + a QR decomposition on the left and right singular vectors. + + TODO: verify if this is the correct way to orthogonalize the SVD. + [^3]: Zhang, Y. (2022). A note on incremental SVD: reorthogonalization. doi:[10.48550/arXiv.2204.05398](https://doi.org/10.48550/arXiv.2204.05398). + """ + n_components = S.shape[0] + # In house implementation of full reorthogonalization + # UQ, UR = np.linalg.qr(U, mode="complete") + # VQ, VR = np.linalg.qr(Vt, mode="complete") + # A = UR @ np.diag(S) @ VR + # tU, tS, tV = _svd(A, 0, None, solver, random_state) + # return UQ @ tU_, tSigma_, VQ @ tV_ + + # Zhang, Y. (2022) + if (U[:, -1].T @ U[:, 0] > tol).any(): + for i in range(n_components): + alpha = U[:, i : i + 1] # m x 1 + for j in range(i - 1): + beta = U[:, j] # m x 1 + U[:, i] = U[:, i] - (alpha.T @ beta) * beta + norm = np.linalg.norm(U[:, i]) + U[:, i] = U[:, i] / norm + return U, S, Vt + + +def _sort_svd(U, S, Vt): + """Sort the singular value decomposition in descending order. + + As sparse SVD does not guarantee the order of the singular values, we + need to sort the singular value decomposition in descending order. + """ + sort_idx = np.argsort(S)[::-1] + if not np.array_equal(sort_idx, range(len(S))): + S = S[sort_idx] + U = U[:, sort_idx] + Vt = Vt[sort_idx, :] + return U, S, Vt + + +def _truncate_svd(U, S, Vt, n_components): + """Truncate the singular value decomposition to the n components. + + Full SVD returns the full matrices U, S, and V in correct order. If the + result acqisition is faster than sparse SVD, we combine the results of + full SVD with truncation. + """ + U = U[:, :n_components] + S = S[:n_components] + Vt = Vt[:n_components, :] + return U, S, Vt + + +def _svd(A, n_components, tol=0.0, v0=None, solver=None, random_state=None): + """Compute the singular value decomposition of a matrix. + + This function computes the singular value decomposition of a matrix A. + If n_components < min(A.shape), the function uses sparse SVD for speed up. + """ + # TODO: sparse is slow if not n_components << min(A.shape) + # analyze performance benefits for various differencec between + # n_components and min(A.shape) + if 0 < n_components and n_components < min(A.shape): + U, S, Vt = sp.sparse.linalg.svds( + A, + k=n_components, + tol=tol, + v0=v0, + solver=solver, + random_state=random_state, + ) + U, S, Vt = _sort_svd(U, S, Vt) + else: + U, S, Vt = np.linalg.svd(A, full_matrices=False) + # # TODO: implement Optimal truncation if n_components is not set + # # Gavish, M., & Donoho, D. L. (2014). The optimal hard threshold for singular values is 4/sqrt(3). + # beta = A.shape[0] / A.shape[1] + # omega = 0.56 * beta**3 - 0.95 * beta**2 + 1.82 * beta + 1.43 + # n_components = sum(S > omega) + U, S, Vt = _truncate_svd(U, S, Vt, n_components) + return U, S, Vt + + +class OnlineSVD(MiniBatchTransformer): + """Online Singular Value Decomposition (SVD). + + Args: + n_components: Desired dimensionality of output data. The default value is useful for visualisation. + initialize: Number of initial samples to use for the initialization of the algorithm. The value must be greater than `n_components`. + force_orth: If True, the algorithm will force the singular vectors to be orthogonal. *Note*: Significantly increases the computational cost. + seed: Random seed. + + Attributes: + n_components: Desired dimensionality of output data. + initialize: Number of initial samples to use for the initialization of the algorithm. The value must be greater than `n_components`. + feature_names_in_: List of input features. + _U: Left singular vectors (n_features_in_, n_components). + _S: Singular values (n_components,). + _Vt: Right singular vectors (transposed) (n_components, n_seen). + + Examples: + >>> np.random.seed(0) + >>> r = 3 + >>> m = 4 + >>> n = 80 + >>> X = pd.DataFrame(np.linalg.qr(np.random.rand(n, m))[0]) + >>> svd = OnlineSVD(n_components=r, force_orth=False) + >>> svd.learn_many(X.iloc[: r * 2]) + >>> svd._U.shape == (m, r), svd._Vt.shape == (r, r * 2) + (True, True) + + >>> svd.transform_one(X.iloc[10].to_dict()) + {0: ...0.0494..., 1: ...0.0030..., 2: ...0.0111...} + + >>> for _, x in X.iloc[10:-1].iterrows(): + ... svd.learn_one(x.values.reshape(1, -1)) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0488..., 1: ...0.0613..., 2: ...0.1150...} + + >>> svd.update(X.iloc[-1].to_dict()) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0409..., 1: ...0.0336..., 2: ...0.1287...} + + For higher dimensional data and forced orthogonality, revert may not return us to the original state. + >>> svd.revert(X.iloc[-1].to_dict(), idx=-1) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0488..., 1: ...0.0613..., 2: ...0.1150...} + + >>> svd = OnlineSVD(n_components=0, initialize=3, force_orth=True) + >>> svd.learn_many(X.iloc[:30]) + + >>> svd.learn_many(X.iloc[30:60]) + >>> svd.transform_many(X.iloc[60:62]).abs() + 0 1 2 3 + 60 0.103403 0.134656 0.108399 0.125872 + 61 0.063485 0.023943 0.120235 0.088502 + + References: + [^1]: Brand, M. (2006). Fast low-rank modifications of the thin singular value decomposition. Linear Algebra and its Applications, 415(1), pp.20-30. doi:[10.1016/j.laa.2005.07.021](https://doi.org/10.1016/j.laa.2005.07.021). + """ + + def __init__( + self, + n_components: int = 2, + initialize: int = 0, + force_orth: bool = True, + solver="arpack", + seed: int | None = None, + ): + self.n_components = n_components + self.initialize = initialize + self.force_orth = force_orth + self.solver = solver + self.seed = seed + + np.random.seed(self.seed) + + self.n_features_in_: int + self.feature_names_in_: list + self.n_seen: int = 0 + + self._U: np.ndarray + self._S: np.ndarray + self._Vt: np.ndarray + + @classmethod + def _from_state( + cls: type[OnlineSVD], + U: np.ndarray, + S: np.ndarray, + Vt: np.ndarray, + force_orth: bool = True, + seed: int | None = None, + ): + new = cls( + n_components=S.shape[0], + initialize=0, + force_orth=force_orth, + seed=seed, + ) + new.n_features_in_ = U.shape[0] + new.n_seen = Vt.shape[1] + + new._U = U + new._S = S + new._Vt = Vt + + return new + + def _init_first_pass(self, x): + self.n_features_in_ = x.shape[1] + if self.n_components == 0: + self.n_components = self.n_features_in_ + self._X_init = np.empty((0, self.n_features_in_)) + if x.shape[0] == 1: + # Make initialize feasible if not set and learn_one is called first + if not self.initialize: + self.initialize = self.n_components + # Initialize _U with random orthonormal matrix for transform_one + r_mat = np.random.randn(self.n_features_in_, self.n_components) + self._U, _ = np.linalg.qr(r_mat) + + def update(self, x: dict | np.ndarray): + if isinstance(x, dict): + self.feature_names_in_ = list(x.keys()) + x = np.array(list(x.values()), ndmin=2) + if len(x.shape) == 1: + x = x.reshape(1, -1) # 1 x m + + if self.n_seen == 0: + self._init_first_pass(x) + + # Initialize if called without learn_many + if bool(self.initialize) and self.n_seen < self.initialize: + self._X_init = np.row_stack((self._X_init, x)) + if len(self._X_init) == self.initialize: + self.learn_many(self._X_init) + # learn many updated seen, we need to revert last sample which + # will be accounted for again at the end of update + self.n_seen -= x.shape[0] + else: + A = x.T # m x c + c = A.shape[1] + + Ut = self._U.T # r x m + M = Ut @ A # r x c + P = A - self._U @ M # m x c + # Results seems to be the same for non rank-increasing updates. + Po = np.linalg.qr(P)[0] + Pot = Po.T # c x m or m x m if m < c + R_A = Pot @ P # c x c + + # pad V with zeros to create place for new singular vector + # (could be omitted to preserve size of V) + _Vt = np.pad(self._Vt, ((0, 0), (0, c))) # r x n + c + nc = _Vt.shape[1] + B = np.zeros((nc, c)) # n + c x c + B[-c:, :] = 1.0 + N = _Vt @ B # r x c + V = _Vt.T # n + c x r + # Might be less numerically stable + # VVT = V @ _Vt # n + c x n + c + # Q = (np.eye(nc) - VVT) @ B # n + c x c + Q = B - V @ N # n + c x c + Qot = np.linalg.qr(Q)[0].T # c x n + c + # R_B = Q.T @ Q # c x c + + Z = np.zeros((c, self.n_components)) # c x r + K = np.block([[np.diag(self._S), M], [Z, R_A]]) # r + c x r + c + + U_, S_, Vt_ = _svd( + K, + self.n_components, + # v0=np.column_stack((self._U, Pot.T))[0,:], # N > M + v0=np.row_stack((_Vt, Qot))[:, 0], # N <= M + solver=self.solver, + random_state=self.seed, + ) # r + c x r; ...; r x r + c + + U_ = np.column_stack((self._U, Po)) @ U_ # m x r + Vt_ = Vt_ @ np.row_stack((_Vt, Qot)) # r x n + c + + if self.force_orth: + U_, S_, Vt_ = _orthogonalize(U_, S_, Vt_) + + self._U, self._S, self._Vt = U_, S_, Vt_ + + self.n_seen += x.shape[0] + + def revert(self, x: dict | np.ndarray, idx: int = 0): + c = 1 if isinstance(x, dict) else x.shape[0] + nc = self._Vt.shape[1] + B = np.zeros((nc, c)) # n + c x c + B[-c:] = np.identity(c) + # Schmid takes first c columns of Vt + # N = _Vt @ B # r x c + if idx >= 0: + N = self._Vt[:, idx : idx + c] # r x c + elif idx == -1: + N = self._Vt[:, -c:] # r x c + else: + N = self._Vt[:, -c + idx + 1 : idx + 1] # r x c + V = self._Vt.T # n + c x r + Q = B - V @ N # n + c x c + Qot = np.linalg.qr(Q)[ + 0 + ].T # c x n + c; Orthonormal basis of column space of q + + S_ = np.pad(np.diag(self._S), ((0, c), (0, c))) # r + c x r + c + # For full-rank SVD, this results in nn == 1. + NtN = N.T @ N # c x c + norm_n = np.sqrt(1.0 - NtN) # c x c + norm_n[np.isnan(norm_n)] = 0.0 + K = S_ @ ( + np.identity(S_.shape[0]) + - np.row_stack((N, np.zeros((c, c)))) @ np.row_stack((N, norm_n)).T + ) # r + c x r + c + U_, S_, Vt_ = _svd( + K, + self.n_components, + # Seems like this converges to different results + v0=np.row_stack((self._Vt, Qot))[:, 0], + solver=self.solver, + random_state=self.seed, + ) # r + c x r; ...; r x r + c + + # Since the update is not rank-increasing, we can skip computation of P + # otherwise we do U_ = np.column_stack((self._U, P)) @ U_ + U_ = self._U @ U_[: self.n_components, :] # m x r + + Vt_ = Vt_ @ np.row_stack((self._Vt, Qot))[:, :-c] # r x n + # Vt_ = Vt_[:, : self.n_components] @ self._Vt[:, :-c] + + if self.force_orth: # and not test_orthonormality(U_): + U_, S_, Vt_ = _orthogonalize(U_, S_, Vt_) + + self._U, self._S, self._Vt = U_, S_, Vt_ + self.n_seen -= c + + def learn_one(self, x: dict | np.ndarray): + """Allias for update method.""" + self.update(x) + + def learn_many(self, X: np.ndarray | pd.DataFrame): + if isinstance(X, pd.DataFrame): + self.feature_names_in_ = list(X.columns) + X = X.values + else: + self.feature_names_in_ = [str(i) for i in range(X.shape[0])] + + if self.n_seen == 0: + self._init_first_pass(X) + + if ( + hasattr(self, "_U") + and hasattr(self, "_S") + and hasattr(self, "_Vt") + ): + if X.shape[0] <= self.n_features_in_: + self.learn_one(X) + else: + for X_part in [ + X[i : i + self.n_features_in_] + for i in range(0, X.shape[0], self.n_features_in_) + ]: + self.learn_one(X_part) + + else: + assert np.linalg.matrix_rank(X.T) >= self.n_components + self._U, self._S, self._Vt = _svd( + X.T, + self.n_components, + solver=self.solver, + random_state=self.seed, + ) + + self.n_seen = X.shape[0] + + def transform_one(self, x: dict | np.ndarray) -> dict: + if isinstance(x, dict): + x = np.array(list(x.values())) + + # If transform one is called before any learning has been done + # TODO: consider raising an runtime error + if not hasattr(self, "_U"): + return dict( + zip( + range(self.n_components), + np.zeros(self.n_components), + ) + ) + + return dict(zip(range(self.n_components), x @ self._U)) + + def transform_many(self, X: np.ndarray | pd.DataFrame) -> pd.DataFrame: + # If transform one is called before any learning has been done + # TODO: consider raising an runtime error + if not hasattr(self, "_U"): + return pd.DataFrame( + np.zeros((X.shape[0], self.n_components)), + index=range(self.n_components), + ) + assert X.shape[1] == self.n_features_in_ + + X_ = X @ self._U + return pd.DataFrame(X_) + + +class OnlineSVDZhang(OnlineSVD): + """Online Singular Value Decomposition (SVD) using Zhang Algorithm. + + This OnlineSVD implementation handles reorthogonalization and rank-increasing updates automatically. + + Args: + n_components: Desired dimensionality of output data. The default value is useful for visualisation. + initialize: Number of initial samples to use for the initialization of the algorithm. The value must be greater than `n_components`. + rank_updates: If True, the algorithm will allow rank-increasing updates. *Note*: Significantly increases the computational cost. + seed: Random seed. + + Attributes: + n_components: Desired dimensionality of output data. + initialize: Number of initial samples to use for the initialization of the algorithm. The value must be greater than `n_components`. + feature_names_in_: List of input features. + _U: Left singular vectors (n_features_in_, n_components). + _S: Singular values (n_components,). + _Vt: Right singular vectors (transposed) (n_components, n_seen). + + Examples: + >>> np.random.seed(0) + >>> r = 3 + >>> m = 4 + >>> n = 80 + >>> X = pd.DataFrame(np.linalg.qr(np.random.rand(n, m))[0]) + >>> svd = OnlineSVDZhang(n_components=r, rank_updates=False) + >>> svd.learn_many(X.iloc[: r * 2]) + >>> svd._U.shape == (m, r), svd._Vt.shape == (r, r * 2) + (True, True) + + >>> svd.transform_one(X.iloc[10].to_dict()) + {0: ...0.0494..., 1: ...0.0030..., 2: ...0.0111...} + + >>> for _, x in X.iloc[10:-1].iterrows(): + ... svd.learn_one(x.values.reshape(1, -1)) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0488..., 1: ...0.0613..., 2: ...0.1150...} + + >>> svd.update(X.iloc[-1].to_dict()) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0409..., 1: ...0.0336..., 2: ...0.1287...} + + For higher dimensional data and forced orthogonality, revert may not return us to the original state. + >>> svd.revert(X.iloc[-1].to_dict(), idx=-1) + >>> svd.transform_one(X.iloc[0].to_dict()) + {0: ...0.0488..., 1: ...0.0613..., 2: ...0.1150...} + + >>> svd = OnlineSVDZhang(n_components=0, initialize=3, rank_updates=False) + >>> svd.learn_many(X.iloc[:30]) + + >>> svd.learn_many(X.iloc[30:60]) + + # TODO: Fix the problem related to ongoing batch updates + >>> svd.transform_many(X.iloc[60:62]).abs() + 0 1 2 3 + 60 0.103403 0.134656 0.108399 0.125872 + 61 0.063485 0.023943 0.120235 0.088502 + + References: + [^2]: Zhang, Y. (2022). An answer to an open question in the incremental SVD. doi:[10.48550/arXiv.2204.05398](https://doi.org/10.48550/arXiv.2204.05398). + """ + + def __init__( + self, + n_components: int = 2, + initialize: int = 0, + tol: float = 1e-12, + rank_updates: bool = False, + seed: int | None = None, + ): + super().__init__( + n_components=n_components, + initialize=initialize, + force_orth=False, + seed=seed, + ) + self.tol: float = tol + self.rank_updates = rank_updates + + self._V_buff: np.ndarray + self._U0: np.ndarray + self._q_u: int = 0 + self._q_r: int = 0 + self.W: np.ndarray + + @classmethod + def _from_state( + cls: type[OnlineSVDZhang], + U: np.ndarray, + S: np.ndarray, + V: np.ndarray, + rank_updates: bool = False, + seed: int | None = None, + ): + new = cls( + n_components=S.shape[0], + initialize=0, + rank_updates=rank_updates, + seed=seed, + ) + new.n_features_in_ = U.shape[0] + new.n_seen = V.shape[1] + + new._U = U + new._S = S + new._Vt = V + + new._V_buff = np.empty((new.n_components, 0)) + new._U0 = np.identity(new.n_components) + new.W = np.identity(new.n_features_in_) + + return new + + def _init_first_pass(self, x): + super()._init_first_pass(x) + self._V_buff = np.empty((self.n_components, 0)) + self._U0 = np.identity(self.n_components) + # TODO: Allow weighting specified by user + self.W = np.identity(self.n_features_in_) + + def update(self, x: dict | np.ndarray): + if isinstance(x, dict): + self.feature_names_in_ = list(x.keys()) + x = np.array(list(x.values()), ndmin=2) + if len(x.shape) == 1: + x = x.reshape(1, -1) + + if self.n_seen == 0: + self._init_first_pass(x) + + c = x.shape[0] + + # Initialize if called without learn_many + if bool(self.initialize) and self.n_seen < self.initialize: + self._X_init = np.row_stack((self._X_init, x)) + if len(self._X_init) == self.initialize: + self.learn_many(self._X_init) + # learn many updated seen, we need to revert last sample which + # will be accounted for again at the end of update + self.n_seen -= c + else: + if c > 1: + from warnings import warn + + warn( + "Calling update/learn_many with batches provides different results than incrementing one sample at the time." + ) + r = self.n_components + A = x.T # m x c + _U, _S, _V = self._U, self._S, self._Vt.T # m x r, r x 1, n x r + _Ut = _U.T # r x m + # Step 1: Calculate d, e, p + M = _Ut @ (self.W @ A) # r x c + P = A - _U @ M # m x c + PtP = P.T @ self.W @ P # c x c + PtP_cond = (PtP < 0.0).any() + if PtP_cond: + # Approx. 2x slower more stable solution for batched updates + Po = np.linalg.qr(P)[0] + Pot = Po.T # c x m + Ra = Pot @ P # c x c + else: + Ra = np.sqrt(PtP) # c x c + # Step 2: Check tolerance + if (Ra < self.tol).all(): # n_incr += c + self._q_u += c # 1 x 1 + self._V_buff = np.column_stack((self._V_buff, M)) # r x n_incr + else: + if self._q_u > 0: + # Step 7: Construct Y + Y = np.column_stack( + (np.diag(_S), self._V_buff) + ) # r x r + n_incr + # Step 8: Perform SVD on Y + UY, SY, VYt = np.linalg.svd( + Y, full_matrices=False + ) # r x r, r x 1, r x r + n_incr + VY = VYt.T # r + n_incr x r + # Step 9: Update U0, _S, _V + self._U0 = self._U0 @ UY # r x r + _S = SY # r x 1 + _V1 = VY[:r, :-1] # r x r + n_incr - 1 + _V2 = VY[r, :-1] # 1 x r + n_incr - 1 + _V = np.row_stack( + (_V @ _V1, _V2) + ) # n + 1 x r + n_incr - 1 + # Step 11: Calculate d + M = UY.T @ M # r x c + # Step 13: Normalize e + if not PtP_cond: + Po = P @ np.linalg.inv(Ra) # m x c + Pot = Po.T # c x m + # Step 14: Reorthogonalize if |e>W*_U(:, 1)| > tol + if (np.abs(Pot @ (self.W @ _U[:, 0])) > self.tol).any(): + Po = Po - _U @ (_Ut @ (self.W @ Po)) # m x c + Po = np.linalg.qr(Po)[0] + # Step 17: Construct Y + Y = np.block( + [ + [np.diag(_S), M], + [np.zeros((c, r)), Ra], + ] + ) # r + c x r + c + # Not using sp.sparse.linalg.svds for non-rank increasing + # updates as it is slower than np.linalg.svd + UY, SY, VYt = np.linalg.svd( + Y + ) # r + c x r + c, r + c x 1, r + c x r + c + VY = VYt.T # r + c x r + c + # Step 20: Update U0 + self._U0 = ( + np.block( + [ + [ + self._U0, + np.zeros((self._U0.shape[0], c)), + ], + [ + np.zeros((c, self._U0.shape[1])), + np.eye(c, c), + ], + ] + ) + @ UY + ) # r + c x r + c + _Ue = np.column_stack((_U, Po)) # m x k + c + # Step 19: Check if rank increasing + if self.rank_updates and SY[r] > self.tol: + # Step 20 - 21: Update _U, _S, _V + _U = _Ue @ self._U0 # m x r + c + _S = SY # r + c x c + _V1 = VY[:r, :] # r x r + c + _V2 = VY[r, :] # 1 x r + c + _V = np.row_stack((_V @ _V1, _V2)) # n + 1 x r + 1 + self._U0 = np.eye(r + 1) # r + 1 x r + 1 + else: + # Step 23 - 24: Update _U, _S, _V + _U = _Ue @ self._U0[:, :r] # m x r + _S = SY[:r] # r x 1 + V_1pad = VY.shape[1] - _V.shape[1] + _V = ( + np.block( + [ + [_V, np.zeros((_V.shape[0], V_1pad))], + [ + np.zeros((c, _V.shape[1])), + np.eye(c, V_1pad), + ], + ] + ) + @ VY[:, :r] + ) # n + 1 x r + self._U0 = np.eye(r) # r x r + + # Alg. 11 + # We note that the output of Algorithm 7 (11), V , may be not empty. This implies that the output of Algorithm 7 (11) is not the SVD of U. Hence we have to update the SVD for the vectors in V + # This step adds rows to _V to account for the ones buffered in V + if self._q_u > 0 and self._V_buff.shape[1] > 0: + # Step 2: Construct Y + Y = np.column_stack( + (np.diag(_S), self._V_buff) + ) # r x r + v_cols + # Step 3: Perform SVD on Y + UY, SY, VYt = np.linalg.svd(Y, full_matrices=False) + VY = VYt.T # r + 1 x r + 1 + # Step 4: Update _U, _S, _V + _U = _U @ UY + _S = SY + _V1 = VY[:r, :] + _V2 = VY[r : r + self._q_u + c - 1, :] + _V = np.row_stack((_V @ _V1, _V2)) + + self.n_components = _S.shape[0] + self._V_buff = np.empty((self.n_components, 0)) + self._q_u = 0 + self._U, self._S, self._Vt = _U, _S, _V.T + + self.n_seen += c + + def revert(self, x: dict | np.ndarray, idx: int = 0): + _U, _S, _V = self._U, self._S, self._Vt.T # m x r, r x 1, n + c x r + + c = 1 if isinstance(x, dict) else x.shape[0] + nc = self._Vt.shape[1] + # Step 1: Calculate N, Q, Qot + B = np.zeros((nc, c)) # n + c x c + B[-c:] = np.identity(c) + + if idx >= 0: + N = self._Vt[:, idx : idx + c] # r x c + elif idx == -1: + N = self._Vt[:, -c:] # r x c + else: + N = self._Vt[:, -c + idx + 1 : idx + 1] # r x c + Q = B - _V @ N # n + c x c + QtQ = Q.T @ Q + QtQ_cond = (QtQ < 0.0).any() + if QtQ_cond: + Qot = np.linalg.qr(Q)[0].T # c x n + c + Ra = Qot @ Q # c x c + else: + Qot = None + Ra = np.sqrt(Q.T @ Q) # c x c + # TODO: not activated at all, check why + if Ra.size > 0 and (Ra < self.tol).all(): + self._q_r += c + else: + if self._q_r > 0: + c += self._q_r + B = np.zeros((nc, c)) # n + c x c + B[-c:] = np.identity(c) + if idx >= 0: + N = self._Vt[:, idx : idx + c] # r x c + elif idx == -1: + N = self._Vt[:, -c:] # r x c + else: + N = self._Vt[:, -c + idx + 1 : idx + 1] # r x c + Q = B - _V @ N # n + c x c + Qot = None + # Step 13: Normalize Q + if Qot is None: + Qot = np.linalg.qr(Q)[ + 0 + ].T # c x n + c; Orthonormal basis of column space of q + # We do not touch original U therefore we leave reorthogonalization to update method :) + + S_ = np.pad(np.diag(_S), ((0, c), (0, c))) # r + c x r + c + # For full-rank SVD, this results in nn == 1. + NtN = N.T @ N # c x c + # TODO: validate if correct for c > 1 + norm_n = np.sqrt(1.0 - NtN) # c x c + norm_n[np.isnan(norm_n)] = 0.0 + K = S_ @ ( + np.identity(S_.shape[0]) + - np.row_stack((N, np.zeros((c, c)))) + @ np.row_stack((N, norm_n)).T + ) # r + c x r + c + # TODO: Maybe we can truncate and use full_matrices=True to get sqared Vt + U_, S_, Vt_ = np.linalg.svd(K, full_matrices=False) + + if self.rank_updates and S_[-1] <= self.tol: + self.n_components -= 1 + U_ = _U @ U_[: self.n_components, : self.n_components] # m x r + S_ = S_[: self.n_components] + Vt_ = ( + Vt_[: self.n_components, :] + @ np.row_stack((self._Vt, Qot))[:, :-c] + ) # r x n + + self._q_r = 0 + self._U, self._S, self._Vt = U_, S_, Vt_ + + self.n_seen -= c diff --git a/river/decomposition/test_odmd.py b/river/decomposition/test_odmd.py new file mode 100644 index 0000000000..39b091ff7e --- /dev/null +++ b/river/decomposition/test_odmd.py @@ -0,0 +1,176 @@ +"""Test conversion from river to scikit-learn API and back. + +Requires two modifications to river code: +1. change line 49 in river.compat.river_to_sklearn to +`SKLEARN_INPUT_Y_PARAMS = {"multi_output": True, "y_numeric": False}` +2. change line 194 in river.compat.river_to_sklearn to +`y_pred = np.empty(shape=(len(X), X.shape[1]))` +""" +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest +from scipy.integrate import odeint + +from river.decomposition.odmd import OnlineDMD +from river.utils import Rolling + +epsilon = 1e-1 + + +def dyn(x, t): + x1, x2 = x + dxdt = [(1 + epsilon * t) * x2, -(1 + epsilon * t) * x1] + return dxdt + + +# integrate from initial condition [1,0] +samples = 101 +tspan = np.linspace(0, 10, samples) +dt = 0.1 +x0 = [1, 0] +xsol = odeint(dyn, x0, tspan).T +# extract snapshots +X, Y = xsol[:, :-1].T, xsol[:, 1:].T +t = tspan[1:] +n, m = X.shape +A = np.empty((n, m, m)) +eigvals = np.empty((n, m), dtype=complex) +for k in range(n): + A[k, :, :] = np.array( + [[0, (1 + epsilon * t[k])], [-(1 + epsilon * t[k]), 0]] + ) + eigvals[k, :] = np.linalg.eigvals(A[k, :, :]) + + +def test_input_types(): + n_init = round(samples / 2) + + odmd1 = OnlineDMD() + + odmd1.learn_many(X[:n_init, :], Y[:n_init, :]) + for x, y in zip(X[n_init:, :], Y[n_init:, :]): + odmd1.learn_one(x, y) + + X_, Y_ = pd.DataFrame(X), pd.DataFrame(Y) + + odmd2 = OnlineDMD() + + odmd2.learn_many(X_.iloc[:n_init], Y_.iloc[:n_init]) + for x, y in zip(X_.iloc[n_init:].values, Y_.iloc[n_init:].values): + odmd2.learn_one(x, y) + + assert np.allclose(odmd1.A, odmd2.A) + + +def test_one_many_close(): + n_init = round(samples / 2) + + odmd1 = OnlineDMD() + odmd2 = OnlineDMD() + + odmd1.learn_many(X[:n_init, :], Y[:n_init, :]) + odmd2.learn_many(X[:n_init, :], Y[:n_init, :]) + + eig_o1 = np.log(np.linalg.eigvals(odmd1.A)) / dt + eig_o2 = np.log(np.linalg.eigvals(odmd2.A)) / dt + assert np.allclose(eig_o1, eig_o2) + + for x, y in zip(X[n_init:, :], Y[n_init:, :]): + odmd1.learn_one(x, y) + + odmd2.learn_many(X[n_init:, :], Y[n_init:, :]) + eig_o1 = np.log(np.linalg.eigvals(odmd1.A)) / dt + eig_o2 = np.log(np.linalg.eigvals(odmd2.A)) / dt + print(eig_o1, eig_o2) + assert np.allclose(eig_o1, eig_o2) + + +def test_errors_raised(): + odmd = OnlineDMD() + + with pytest.raises(Exception): + odmd._update_many(X, Y) + + rodmd = Rolling(OnlineDMD(), window_size=1) # type: ignore + with pytest.raises(Exception): + for x, y in zip(X, Y): + rodmd.update(x, y) + + +def test_allclose_unsupervised_supervised(): + m_u = OnlineDMD(r=2, w=0.1, initialize=0) + m_s = OnlineDMD(r=2, w=0.1, initialize=0) + + for x, y in zip(X, Y): + m_u.learn_one(x) + m_s.learn_one(x, y) + eig_u, _ = np.log(m_u.eig[0]) / dt + eig_s, _ = np.log(m_u.eig[0]) / dt + + assert np.allclose(eig_u, eig_s) + + +# TODO: test various combinations of truncated and exact state and control parts of DMDwC + +# Proctor et al. (2016) "Dynamic Mode Decomposition with Control" suggests that +# the DMDwC where B is unknown requires a second SVD computation for output +# space of Y. As the computation and updates of SVDs are expensive, we want to +# avoid this if possible. This test checks if the SVD of augumented state + +# control space is at least as close to SVD of original space than the SVD of +# the output space to the SVD of the original space. +def test_one_svd_is_enough(): + import numpy as np + import pandas as pd + import scipy as sp + np.random.seed(0) + + n = 101 + freq = 2.0 + tspan = np.linspace(0, 10, n) + w1 = np.cos(np.pi * freq * tspan) + w2 = -np.sin(np.pi * freq * tspan) + w3 = np.sin(2 * np.pi * freq * tspan) + u_ = np.ones(n) + u_[tspan > 5] *= 2 + w1[tspan > 5] *= 2 + w2[tspan > 5] *= 2 + w3[tspan > 5] *= 2 + df = pd.DataFrame({"w1": w1[:-1], "w2": w2[:-1], "w3": w3[:-1]}) + X, Y = df.iloc[:-1], df.shift(-1).iloc[:-1] + U = pd.DataFrame({"u": u_[:-2]}) + X_ = X.copy() + X_["u"] = U + + u_orig, s_orig, _ = sp.sparse.linalg.svds( + X.values.T, k=2, return_singular_vectors="u" + ) + u_aug, s_aug, _ = sp.sparse.linalg.svds( + X_.values.T, k=3, return_singular_vectors="u" + ) + u_out, s_out, _ = sp.sparse.linalg.svds( + Y.values.T, k=2, return_singular_vectors="u" + ) + + assert (np.abs(u_orig - u_aug[:3, :2]) <= np.abs(u_orig - u_out)).all() + assert (np.abs(s_orig - s_aug[:2]) <= np.abs(s_orig - s_out)).all() + +# TODO: find out why this test fails +# def test_allclose_weighted_true(): +# n_init = round(samples / 2) +# odmd = OnlineDMD(w=0.1) +# odmd.learn_many(X[:n_init, :], Y[:n_init, :]) + +# eigvals_online_ = np.empty((n, m), dtype=complex) +# for i, (x, y) in enumerate(zip(X, Y)): +# odmd.learn_one(x, y) +# eigvals_online_[i, :] = np.log(np.linalg.eigvals(odmd.A)) / dt + +# slope_eig_true = np.diff(eigvals)[n_init:, 0].mean() +# slope_eig_online = np.diff(eigvals_online_)[n_init:, 0].mean() +# np.allclose( +# slope_eig_true, +# slope_eig_online, +# atol=1e-4, +# ) diff --git a/river/decomposition/test_odmdwc.py b/river/decomposition/test_odmdwc.py new file mode 100644 index 0000000000..031e25eb9f --- /dev/null +++ b/river/decomposition/test_odmdwc.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from river.decomposition.odmd import OnlineDMD, OnlineDMDwC +from river.utils import Rolling + +T = 10 +t_diff = 0.01 +samples = int(T / t_diff) - 1 +time_space = np.linspace(0, T, num=samples + 1) + + +def omega(t): + return 1 + 0.1 * t + + +def u_t(x): + return K_prop * x + + +X = np.zeros((samples + 1, 2)) +X[0, :] = np.array([4, 7]) + +K_prop = -1 + +B = np.array([1, 0]) +U = np.zeros((samples + 1, 1)) + +i = 1 +true_eigs_ = [] +for k in np.linspace(t_diff, T, num=samples): + A_t = np.array([[t_diff, -omega(k)], [omega(k), 0.1 * t_diff]]) + true_eigs_.append(np.imag(np.log(np.linalg.eig(A_t)[0]))) + + control_input = np.matmul(B, u_t(X[i - 1]).T) * t_diff + U[i, :] = control_input + autonomous_state = np.matmul(X[i - 1, :], A_t) * t_diff + X[i - 1, :] + X[i, :] = autonomous_state + control_input + i += 1 + +true_eigs = np.vstack(true_eigs_) + +X = X[:-1, :] +Y = X[1:, :] +U = U[:-1, :] + + +def test_input_types(): + n_init = round(samples / 2) + + odmd1 = OnlineDMDwC(initialize=n_init) + + for x, y, u in zip(X, Y, U): + odmd1.learn_one(x, y, u) + + X_, Y_, U_ = pd.DataFrame(X), pd.DataFrame(Y), pd.DataFrame(U) + + odmd2 = OnlineDMDwC(initialize=n_init) + + for x, y, u in zip( + X_.to_dict(orient="records"), + Y_.to_dict(orient="records"), + U_.to_dict(orient="records"), + ): + odmd2.learn_one(x, y, u) + + assert np.allclose(odmd1.A, odmd2.A) + + +def test_dmdwc_variations(): + odmd = OnlineDMD(initialize=10) + odmdc_weight = OnlineDMDwC( + initialize=10, w=0.995, exponential_weighting=True + ) + odmdc_b = OnlineDMDwC(initialize=10, B=B.reshape(-1, 1)) + odmdc_window = Rolling(OnlineDMDwC(initialize=10), window_size=100) + odmdc_b_window = Rolling( + OnlineDMDwC(initialize=10, B=B.reshape(-1, 1)), window_size=100 + ) + + for x_, y_, u_ in zip(X, Y, U): + odmd.learn_one(x_, y_) + odmdc_weight.learn_one(x_, y_, u_) + odmdc_b.learn_one(x_, y_, u_) + odmdc_window.learn_one(x_, y_, u_) + odmdc_b_window.learn_one(x_, y_, u_) + + atol = np.abs(get_ct_eigs(odmd.A) - true_eigs[-1]) * 1.5 + eig_weight = get_ct_eigs(odmdc_weight.A) + assert np.allclose(eig_weight, true_eigs[-1], atol=atol) + eig_b = get_ct_eigs(odmdc_b.A) + assert np.allclose(eig_b, true_eigs[-1], atol=atol) + eig_window = get_ct_eigs(odmdc_window.A) + assert np.allclose(eig_window, true_eigs[-1], atol=atol) + eig_b_window = get_ct_eigs(odmdc_b_window.A) + assert np.allclose(eig_b_window, true_eigs[-1], atol=atol) + +def get_ct_eigs(A): + return np.imag(np.log(np.linalg.eigvals(A))) / t_diff + + +def test_close_learn_one_learn_many(): + pass diff --git a/river/preprocessing/__init__.py b/river/preprocessing/__init__.py index 458def0676..4a7a00544b 100644 --- a/river/preprocessing/__init__.py +++ b/river/preprocessing/__init__.py @@ -9,6 +9,7 @@ from __future__ import annotations from .feature_hasher import FeatureHasher +from .hankel import Hankelizer from .impute import PreviousImputer, StatImputer from .lda import LDA from .one_hot import OneHotEncoder @@ -31,6 +32,7 @@ "Binarizer", "FeatureHasher", "GaussianRandomProjector", + "Hankelizer", "LDA", "MaxAbsScaler", "MinMaxScaler", diff --git a/river/preprocessing/hankel.py b/river/preprocessing/hankel.py new file mode 100644 index 0000000000..0875cd1cc9 --- /dev/null +++ b/river/preprocessing/hankel.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from collections import deque +from typing import Literal + +from river.base import Transformer + +__all__ = ["Hankelizer"] + + +class Hankelizer(Transformer): + """Time Delay Embedding using Hankelization. + + Convert a time series into a time delay embedded Hankel vectors. + + Args: + w: The number of data snapshots to preserve + return_partial: Whether to return partial Hankel matrices when the + window is not full. Default "copy" fills missing with copies. + + Examples: + >>> h = Hankelizer(w=3) + >>> h.learn_one({"a": 1, "b": 2}) + >>> h.transform_one({"a": 1, "b": 2}) + {'a_0': 1, 'b_0': 2, 'a_1': 1, 'b_1': 2, 'a_2': 1, 'b_2': 2} + + >>> h = Hankelizer(w=3, return_partial=False) + >>> h.learn_one({"a": 1, "b": 2}) + >>> h.transform_one({"a": 1, "b": 2}) + Traceback (most recent call last): + ... + ValueError: The window is not full yet. Set `return_partial` to True ... + + >>> h = Hankelizer(w=3, return_partial=True) + >>> h.learn_one({"a": 1, "b": 2}) + >>> h.transform_one({"a": 1, "b": 2}) + {'a_0': nan, 'b_0': nan, 'a_1': nan, 'b_1': nan, 'a_2': 1, 'b_2': 2} + + Actually, transform_one does not care about the data as the learn should precede. + >>> h.learn_one({"a": 3, "b": 4}) + >>> h.transform_one({"a": 5, "b": 6}) + {'a_0': nan, 'b_0': nan, 'a_1': 1, 'b_1': 2, 'a_2': 3, 'b_2': 4} + >>> h._window + deque([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}], maxlen=3) + + Transform and learn in one go. + >>> h.learn_transform_one({"a": 5, "b": 6}) + {'a_0': 1, 'b_0': 2, 'a_1': 3, 'b_1': 4, 'a_2': 5, 'b_2': 6} + + TODO: + - [ ] Find out how to hankelize u while staying aligned with pipeline + """ + + def __init__( + self, w: int = 2, return_partial: bool | Literal["copy"] = "copy" + ): + self.w = w + self.return_partial = return_partial + + self._window: deque = deque(maxlen=self.w) + self.feature_names_in_: list[str] + self.n_features_in_: int + + def learn_one(self, x: dict): + if not hasattr(self, "feature_names_in_") and isinstance(x, dict): + self.feature_names_in_ = list(x.keys()) + self.n_features_in_ = len(x) + + self._window.append(x) + + def transform_one(self, x: dict): + if not isinstance(x, dict): + on_arrays = True + else: + on_arrays = False + # TODO: If called before learn_one, creates duplicate sample + _window = list(self._window) + w_past_current = len(_window) + if w_past_current == 0: + _window = [x] + # To avoid overflowing the window + w_past_current = 1 + if not self.return_partial and w_past_current < self.w: + raise ValueError( + "The window is not full yet. Set `return_partial` to True to return partial Hankel matrices." + ) + else: + n_missing = self.w - w_past_current + _window = [_window[0]] * (n_missing) + _window + if not self.return_partial == "copy": + for i in range(n_missing): + _window[i] = {k: float("nan") for k in _window[0]} + if on_arrays: + import numpy as np + + return np.array([v for d in _window for v in d]) + else: + return { + f"{k}_{i}": v + for i, d in enumerate(_window) + for k, v in d.items() + } + + def learn_transform_one(self, x: dict): + self.learn_one(x) + y = self.transform_one(x) + return y