Plotting utilities
For better colors use
pip install SciencePlotsand then in your notebook
import scienceplots
plt.style.use(['science','ieee', 'no-latex'])::: {#cell-3 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
import numpy as np
import matplotlib.pyplot as plt:::
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_poles_zeros(
p: np.ndarray, # (B, p) zeros
ax=None,
**kwargs,
):
B, _ = p.shape
fig, ax = plt.subplots(1, 1, figsize=(5, 5)) if ax is None else (None, ax)
ax.plot(
np.cos(np.linspace(0, 2 * np.pi)),
np.sin(np.linspace(0, 2 * np.pi)),
linestyle="dashed",
color="black",
alpha=0.3,
)
ax.set_xlim([-1.5, 1.5])
ax.set_ylim([-1.5, 1.5])
# draw lines from origin
ax.plot([-1.5, 1.5], [0, 0], linestyle="dashed", color="black", alpha=0.1)
ax.plot([0, 0], [-1.5, 1.5], linestyle="dashed", color="black", alpha=0.1)
ax.grid(True, which="both")
ax.scatter(p.real, p.imag, **kwargs):::
::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_solution(
gt: np.ndarray, # (time_steps, grid_size)
pred: np.ndarray, # (time_steps, grid_size)
ar_gt: np.ndarray = None, # (time_steps, grid_size)
ar_pred: np.ndarray = None, # (time_steps, grid_size)
) -> plt.Figure:
"""
Plot a comparison of the ground truth and predicted solution.
Args:
gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.
Returns:
A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
"""
vmin = np.min(gt)
vmax = np.max(gt)
time_steps = np.linspace(0, 1, gt.shape[0])
grid = np.linspace(0, 1, gt.shape[1])
fig, ax = plt.subplots(
1,
2 + 2 * int(ar_pred is not None),
figsize=(5 + 2 * int(ar_pred is not None) * 2, 6),
)
ax[0].pcolormesh(grid, time_steps, gt, vmin=vmin, vmax=vmax)
ax[0].set(title="gt u", xlabel="x", ylabel="t")
ax[1].pcolormesh(grid, time_steps, pred, vmin=vmin, vmax=vmax)
ax[1].set(title="pred u", xlabel="x", ylabel="t")
if ar_pred is not None:
T, G = ar_pred.shape
time_steps = np.linspace(0, 1, T)
grid = np.linspace(0, 1, G)
ax[2].pcolormesh(grid, time_steps, ar_gt, vmin=vmin, vmax=vmax)
ax[2].set(title="ar gt u", xlabel="x", ylabel="t")
ax[3].pcolormesh(grid, time_steps, ar_pred, vmin=vmin, vmax=vmax)
ax[3].set(title="ar pred u", xlabel="x", ylabel="t")
fig.tight_layout()
return fig:::
::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_solution_2d(
gt: np.ndarray, # (T, H, W)
pred: np.ndarray, # (T, H, W)
ar_pred: np.ndarray = None, # (T, H, W)
random_sample: bool = False, # whether to randomly sample a time step
) -> plt.Figure:
"""
Plot a comparison of the ground truth and predicted solution.
Args:
gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.
Returns:
A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
"""
vmin = np.min(gt)
vmax = np.max(gt)
fig, ax = plt.subplots(
1, 2 + int(ar_pred is not None), figsize=(6, 5 + int(ar_pred is not None) * 2)
)
if random_sample:
t = np.random.randint(gt.shape[0])
else:
t = gt.shape[0] // 2
ax[0].imshow(gt[t], vmin=vmin, vmax=vmax)
ax[0].set(title=f"gt at sample {t}", xlabel="x", ylabel="y")
ax[1].imshow(pred[t], vmin=vmin, vmax=vmax)
ax[1].set(title=f"pred at sample {t}", xlabel="x", ylabel="y")
if ar_pred is not None:
ax[2].imshow(ar_pred[t], vmin=vmin, vmax=vmax)
ax[2].set(title="ar pred", xlabel="x", ylabel="y")
fig.tight_layout()
return fig:::
::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_eigenvalues(
eigenvalues: np.ndarray, # (B, p)
sample_rate: int = None, # sample rate of the signal for radians otherwise we use normalized frequency
log: bool = False,
):
# filter only rhe eigenvalues from 0 to pi excluding -pi to 0
filtered_eigenvalues = np.diag(eigenvalues)[
(np.angle(np.diag(eigenvalues)) > 0) & (np.angle(np.diag(eigenvalues)) < np.pi)
]
# if sample rate is not given we assume that the eigenvalues are already normalized
frequency = (
np.angle(filtered_eigenvalues) * sample_rate / (2 * np.pi)
if sample_rate is not None
else np.angle(filtered_eigenvalues)
)
fig, ax = plt.subplots(1, 1, figsize=(5, 3))
ax.scatter(frequency, 20 * np.log10(np.abs(filtered_eigenvalues)), marker="x")
ax.set_xscale("log") if log else None
# Set ticks at each power of 10
# ticks = [10**i for i in range(int(np.log10(sample_rate / (2 * np.pi))))]
# ax.set_xticks(ticks)
# ax.set_ylim([0.990, 1.010])
ax.set_xlabel("Angle of the pole")
ax.set_ylabel("Magnitude of the pole in dB")
ax.grid(which="both")
fig.tight_layout()
return fig:::
::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
# from https://stackoverflow.com/a/52998713
def ewma_vectorized(data, alpha, offset=None, dtype=None, order="C", out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
offset = data[0]
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(
1.0 - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype
)
# create cumulative sum array
np.multiply(
data, (alpha * scaling_factors[-2]) / scaling_factors[:-1], dtype=dtype, out=out
)
np.cumsum(out, dtype=dtype, out=out)
# cumsums / scaling
out /= scaling_factors[-2::-1]
if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]
return out
def ewma_vectorized_2d(
data, alpha, axis=None, offset=None, dtype=None, order="C", out=None
):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)
if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = out_view.T
if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(
1.0 - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype
)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(
alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype
)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype,
out=out_view,
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]
if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
return out
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order="C", out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float
else:
dtype = np.dtype(dtype)
row_size = int(row_size) if row_size is not None else get_max_row_size(alpha, dtype)
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]
if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(
data_main_view,
alpha,
axis=1,
offset=0,
dtype=dtype,
order="C",
out=out_main_view,
)
scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
last_scaling_factor = scaling_factors[-1]
# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(
data[-trailing_n:],
alpha,
offset=out_main_view[-1, -1],
dtype=dtype,
order="C",
out=out[-trailing_n:],
)
return out
def get_max_row_size(alpha, dtype=float):
assert 0.0 <= alpha < 1.0
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon) / np.log(1 - alpha)) + 1:::
::: {#cell-9 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_freqz(
h: np.ndarray, # The filter's frequency response.
ax=None, # The axes to plot on. If not provided, a new figure and axes will be created.
**kwargs, # Additional arguments to pass to the plot function.
) -> plt.Axes:
"""
Plots the frequency response of a filter.
Parameters:
- h: jtx.ArrayLike: The filter's frequency response.
- ax: matplotlib.axes.Axes, optional: The axes to plot on. If not provided, a new figure and axes will be created.
Returns:
- ax: matplotlib.axes.Axes: The plotted axes.
"""
if ax is None:
fig, ax = plt.subplots(1, 1, figsize=(8, 3))
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Magnitude [dB]")
ax.semilogx(20 * np.log10(np.abs(h)), **kwargs)
ax.grid(True, which="both", ls="-")
return ax:::