Plotting utilities
For better colors use
pip install SciencePlots
and then in your notebook
import scienceplots
'science','ieee', 'no-latex']) plt.style.use([
::: {#cell-3 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
import numpy as np
import matplotlib.pyplot as plt
:::
::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_poles_zeros(
# (B, p) zeros
p: np.ndarray, =None,
ax**kwargs,
):= p.shape
B, _
= plt.subplots(1, 1, figsize=(5, 5)) if ax is None else (None, ax)
fig, ax
ax.plot(0, 2 * np.pi)),
np.cos(np.linspace(0, 2 * np.pi)),
np.sin(np.linspace(="dashed",
linestyle="black",
color=0.3,
alpha
)-1.5, 1.5])
ax.set_xlim([-1.5, 1.5])
ax.set_ylim([# draw lines from origin
-1.5, 1.5], [0, 0], linestyle="dashed", color="black", alpha=0.1)
ax.plot([0, 0], [-1.5, 1.5], linestyle="dashed", color="black", alpha=0.1)
ax.plot([True, which="both")
ax.grid(
**kwargs) ax.scatter(p.real, p.imag,
:::
::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_solution(
# (time_steps, grid_size)
gt: np.ndarray, # (time_steps, grid_size)
pred: np.ndarray, = None, # (time_steps, grid_size)
ar_gt: np.ndarray = None, # (time_steps, grid_size)
ar_pred: np.ndarray -> plt.Figure:
) """
Plot a comparison of the ground truth and predicted solution.
Args:
gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.
Returns:
A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
"""
= np.min(gt)
vmin = np.max(gt)
vmax = np.linspace(0, 1, gt.shape[0])
time_steps = np.linspace(0, 1, gt.shape[1])
grid
= plt.subplots(
fig, ax 1,
2 + 2 * int(ar_pred is not None),
=(5 + 2 * int(ar_pred is not None) * 2, 6),
figsize
)
0].pcolormesh(grid, time_steps, gt, vmin=vmin, vmax=vmax)
ax[0].set(title="gt u", xlabel="x", ylabel="t")
ax[
1].pcolormesh(grid, time_steps, pred, vmin=vmin, vmax=vmax)
ax[1].set(title="pred u", xlabel="x", ylabel="t")
ax[
if ar_pred is not None:
= ar_pred.shape
T, G = np.linspace(0, 1, T)
time_steps = np.linspace(0, 1, G)
grid 2].pcolormesh(grid, time_steps, ar_gt, vmin=vmin, vmax=vmax)
ax[2].set(title="ar gt u", xlabel="x", ylabel="t")
ax[3].pcolormesh(grid, time_steps, ar_pred, vmin=vmin, vmax=vmax)
ax[3].set(title="ar pred u", xlabel="x", ylabel="t")
ax[
fig.tight_layout()return fig
:::
::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_solution_2d(
# (T, H, W)
gt: np.ndarray, # (T, H, W)
pred: np.ndarray, = None, # (T, H, W)
ar_pred: np.ndarray bool = False, # whether to randomly sample a time step
random_sample: -> plt.Figure:
) """
Plot a comparison of the ground truth and predicted solution.
Args:
gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.
Returns:
A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
"""
= np.min(gt)
vmin = np.max(gt)
vmax
= plt.subplots(
fig, ax 1, 2 + int(ar_pred is not None), figsize=(6, 5 + int(ar_pred is not None) * 2)
)
if random_sample:
= np.random.randint(gt.shape[0])
t else:
= gt.shape[0] // 2
t
0].imshow(gt[t], vmin=vmin, vmax=vmax)
ax[0].set(title=f"gt at sample {t}", xlabel="x", ylabel="y")
ax[
1].imshow(pred[t], vmin=vmin, vmax=vmax)
ax[1].set(title=f"pred at sample {t}", xlabel="x", ylabel="y")
ax[
if ar_pred is not None:
2].imshow(ar_pred[t], vmin=vmin, vmax=vmax)
ax[2].set(title="ar pred", xlabel="x", ylabel="y")
ax[
fig.tight_layout()return fig
:::
::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_eigenvalues(
# (B, p)
eigenvalues: np.ndarray, int = None, # sample rate of the signal for radians otherwise we use normalized frequency
sample_rate: bool = False,
log:
):# filter only rhe eigenvalues from 0 to pi excluding -pi to 0
= np.diag(eigenvalues)[
filtered_eigenvalues > 0) & (np.angle(np.diag(eigenvalues)) < np.pi)
(np.angle(np.diag(eigenvalues))
]
# if sample rate is not given we assume that the eigenvalues are already normalized
= (
frequency * sample_rate / (2 * np.pi)
np.angle(filtered_eigenvalues) if sample_rate is not None
else np.angle(filtered_eigenvalues)
)
= plt.subplots(1, 1, figsize=(5, 3))
fig, ax 20 * np.log10(np.abs(filtered_eigenvalues)), marker="x")
ax.scatter(frequency,
"log") if log else None
ax.set_xscale(# Set ticks at each power of 10
# ticks = [10**i for i in range(int(np.log10(sample_rate / (2 * np.pi))))]
# ax.set_xticks(ticks)
# ax.set_ylim([0.990, 1.010])
"Angle of the pole")
ax.set_xlabel("Magnitude of the pole in dB")
ax.set_ylabel(="both")
ax.grid(which
fig.tight_layout()return fig
:::
::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
# from https://stackoverflow.com/a/52998713
def ewma_vectorized(data, alpha, offset=None, dtype=None, order="C", out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
= np.array(data, copy=False)
data
if dtype is None:
if data.dtype == np.float32:
= np.float32
dtype else:
= np.float64
dtype else:
= np.dtype(dtype)
dtype
if data.ndim > 1:
# flatten input
= data.reshape(-1, order)
data
if out is None:
= np.empty_like(data, dtype=dtype)
out else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
= data[0]
offset
= np.array(alpha, copy=False).astype(dtype, copy=False)
alpha
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
= np.power(
scaling_factors 1.0 - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype
)# create cumulative sum array
np.multiply(* scaling_factors[-2]) / scaling_factors[:-1], dtype=dtype, out=out
data, (alpha
)=dtype, out=out)
np.cumsum(out, dtype
# cumsums / scaling
/= scaling_factors[-2::-1]
out
if offset != 0:
= np.array(offset, copy=False).astype(dtype, copy=False)
offset # add offsets
+= offset * scaling_factors[1:]
out
return out
def ewma_vectorized_2d(
=None, offset=None, dtype=None, order="C", out=None
data, alpha, axis
):"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
= np.array(data, copy=False)
data
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
= np.float32
dtype else:
= np.float64
dtype else:
= np.dtype(dtype)
dtype
if out is None:
= np.empty_like(data, dtype=dtype)
out else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
= offset[0]
offset return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
= out
out_view if axis < 0:
= data.ndim - int(axis)
axis
if axis == 0:
# transpose data views so columns are treated as rows
= data.T
data = out_view.T
out_view
if offset is None:
# use the first element of each row as the offset
= np.copy(data[:, 0])
offset elif np.size(offset) == 1:
= np.reshape(offset, (1,))
offset
= np.array(alpha, copy=False).astype(dtype, copy=False)
alpha
# calculate the moving average
= data.shape[1]
row_size = data.shape[0]
row_n = np.power(
scaling_factors 1.0 - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype
)# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(* scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype
alpha
)/ scaling_factors[np.newaxis, :-1],
=dtype,
dtype=out_view,
out
)=1, dtype=dtype, out=out_view)
np.cumsum(out_view, axis/= scaling_factors[np.newaxis, -2::-1]
out_view
if not (np.size(offset) == 1 and offset == 0):
= offset.astype(dtype, copy=False)
offset # add the offsets to the scaled cumulative sums
+= offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
out_view
return out
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order="C", out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
= np.array(data, copy=False)
data
if dtype is None:
if data.dtype == np.float32:
= np.float32
dtype else:
= np.float
dtype else:
= np.dtype(dtype)
dtype
= int(row_size) if row_size is not None else get_max_row_size(alpha, dtype)
row_size
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
= np.reshape(data, -1, order=order)
data
if out is None:
= np.empty_like(data, dtype=dtype)
out else:
assert out.shape == data.shape
assert out.dtype == dtype
= int(data.size // row_size) # the number of rows to use
row_n = int(data.size % row_size) # the amount of data leftover
trailing_n = data[0]
first_offset
if trailing_n > 0:
# set temporary results to slice view of out parameter
= np.reshape(out[:-trailing_n], (row_n, row_size))
out_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
data_main_view else:
= out
out_main_view = data
data_main_view
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(
data_main_view,
alpha,=1,
axis=0,
offset=dtype,
dtype="C",
order=out_main_view,
out
)
= (1 - alpha) ** np.arange(1, row_size + 1)
scaling_factors = scaling_factors[-1]
last_scaling_factor
# create offset array
= np.empty(out_main_view.shape[0], dtype=dtype)
offsets 0] = first_offset
offsets[# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
= offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
offsets[i]
# add the offsets to the result
+= offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
out_main_view
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(-trailing_n:],
data[
alpha,=out_main_view[-1, -1],
offset=dtype,
dtype="C",
order=out[-trailing_n:],
out
)return out
def get_max_row_size(alpha, dtype=float):
assert 0.0 <= alpha < 1.0
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
= np.finfo(dtype).tiny
epsilon # If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon) / np.log(1 - alpha)) + 1
:::
::: {#cell-9 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}
def plot_freqz(
# The filter's frequency response.
h: np.ndarray, =None, # The axes to plot on. If not provided, a new figure and axes will be created.
ax**kwargs, # Additional arguments to pass to the plot function.
-> plt.Axes:
) """
Plots the frequency response of a filter.
Parameters:
- h: jtx.ArrayLike: The filter's frequency response.
- ax: matplotlib.axes.Axes, optional: The axes to plot on. If not provided, a new figure and axes will be created.
Returns:
- ax: matplotlib.axes.Axes: The plotted axes.
"""
if ax is None:
= plt.subplots(1, 1, figsize=(8, 3))
fig, ax "Frequency [Hz]")
ax.set_xlabel("Magnitude [dB]")
ax.set_ylabel(20 * np.log10(np.abs(h)), **kwargs)
ax.semilogx(True, which="both", ls="-")
ax.grid(return ax
:::