Linear utility functions
::: {#cell-3 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
import numpy as np
from typing import List, Union, Tuple, Optional, Dict
:::
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def sort_eigvals_eigvecs(
# eigenvalues
eigvals: np.ndarray, # eigenvectors
eigvecs: np.ndarray, -> Tuple[np.ndarray, np.ndarray]: # (eigenvalues, eigenvectors)
) = np.argsort(np.abs(eigvals))[::-1]
idx = eigvals[idx]
eigvals = eigvecs[:, idx]
eigvecs return eigvals, eigvecs
def get_linear_model(
# data matrix (state_vars, time_steps)
X: np.ndarray, # shifted data matrix (state_vars, time_steps)
X_prime: np.ndarray, sorted: bool = True, # sort eigenvalues and eigenvectors
-> Tuple[np.ndarray, np.ndarray]: # (eigenvalues, eigenvectors)
) """
Returns the linear model A such that X' = AX
"""
= np.linalg.lstsq(X.T, X_prime.T, rcond=None)[0]
A_lstsq = A_lstsq.T
A_lstsq = np.linalg.eig(A_lstsq)
eigenvalues, eigenvectors
return (
sort_eigvals_eigvecs(eigenvalues, eigenvectors)if sorted
else (eigenvalues, eigenvectors)
)
:::
DMD Utils
::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
import jax.numpy as jnp
from pydmd.utils import pseudo_hankel_matrix
from pydmd import DMD
:::
::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def hankelize(
u: jnp.ndarray,int = 2,
d:
):# slice the data and hankelize
= u
u_ref = pseudo_hankel_matrix(u_ref.T, d=d).T
u_ref return u_ref
def fit_dmd_to_sample(x: jnp.ndarray, r: int = 50) -> DMD: # (timesteps, gridpoints)
= DMD(svd_rank=r)
dmd
dmd.fit(x.squeeze().T)return dmd
def fast_predict(
# gridsize
y: jnp.ndarray,
inv_modes: jnp.ndarray,
fwd_modes: jnp.ndarray,
eigs: jnp.ndarray,int = None,
lenght:
):if lenght is None:
= y.shape[0]
lenght
# we need to predict the next 3999 timesteps
+= 1
lenght
= jnp.vander(eigs, lenght, increasing=True)
states = inv_modes @ y
x_0 = fwd_modes @ (states * x_0[..., None])
pred
# slice from the second timestep and convert to (time, gridsize)
return pred[:, 1:].T.real
:::
Utilities to replace weights
::: {#cell-9 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
from pydmd.utils import pseudo_hankel_matrix
from pydmd import DMD, BOPDMD
:::
::: {#cell-10 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def get_linear_approximation(
# (time_steps, grid_size)
y: jnp.ndarray, int = 50,
r: str = "dmd", # "dmd", "full", "analytical"
method: -> Tuple[jnp.ndarray, jnp.ndarray]:
) # fit the model parameters using DMD before training
if method in ["dmd"]:
= DMD(svd_rank=r)
dmd
dmd.fit(y.T)= sort_eigvals_eigvecs(dmd.eigs, dmd.modes)
eigvals, eigvecs = jnp.log(eigvals) # make continuous
eigvals
elif method in ["analytical"]:
= 1 / 4000
dt = stiff_string_eigendecomposition(n_max_modes=r)
eigvals, eigvecs = eigvals * dt
eigvals = eigvecs.T
eigvecs return eigvals, eigvecs
def set_params_from_linear(
params: Dict,
single_eigvals: jnp.ndarray,
single_lstvecs: jnp.ndarray,str, # "lru" or "koopman"
model: -> Dict:
) if model == "lru":
= lambda x: jnp.where(x < 0, x + 2 * jnp.pi, x)
map_to_0_2pi
# nu_log = jnp.nan_to_num(jnp.log(-jnp.log(jnp.abs(single_eigvals))))
# theta_log = jnp.nan_to_num(jnp.log(map_to_0_2pi(jnp.angle(single_eigvals))))
= jnp.log(-single_eigvals.real)
nu_log = jnp.nan_to_num(jnp.log(map_to_0_2pi(single_eigvals.imag)))
theta_log
"first_layer"]["nu_log"] = nu_log
params["first_layer"]["theta_log"] = theta_log
params[# set eigenvectors
= jnp.linalg.pinv(single_lstvecs)
inv_lstvecs
"first_layer"]["B_re"] = inv_lstvecs.real
params["first_layer"]["B_im"] = inv_lstvecs.imag
params[
"first_layer"]["C_re"] = single_lstvecs.real
params["first_layer"]["C_im"] = single_lstvecs.imag
params[
elif model == "koopman":
################
# single_lstvecs = lstvecs[:, ::2]
# single_lstvecs = single_lstvecs[:single_lstvecs.shape[0] // 2, :single_lstvecs.shape[1] // 2]
= jnp.concatenate(
conj_eigenvalues
[+ 1j * jnp.abs(single_eigvals.imag),
single_eigvals.real - 1j * jnp.abs(single_eigvals.imag),
single_eigvals.real
]
)
= jnp.concatenate(
conj_eigenvecs
[+ 1j * 0,
single_lstvecs.real - 1j * 0,
single_lstvecs.real
],=1,
axis
)
= jnp.linalg.pinv(conj_eigenvecs)
inv_eigenvecs = jnp.concatenate(
inv_eigenvecs_as_real =0
[inv_eigenvecs.real, inv_eigenvecs.imag], axis
).T= jnp.concatenate(
fwd_eigenvecs_as_real =1
[conj_eigenvecs.real, conj_eigenvecs.imag], axis
).T
"params"]["batched_koopman"]["encoder"]["encoder"]["kernel"] = (
params[
inv_eigenvecs_as_real
)"params"]["batched_koopman"]["decoder"]["decoder"]["kernel"] = (
params[
fwd_eigenvecs_as_real
)
"params"]["batched_koopman"]["weight_real"] = conj_eigenvalues.real[
params[0] // 2
: conj_eigenvalues.shape[
]"params"]["batched_koopman"]["weight_imag"] = conj_eigenvalues.imag[
params[0] // 2
: conj_eigenvalues.shape[
]
return params
:::