Plotting utilities

For better colors use

pip install SciencePlots

and then in your notebook

import scienceplots
plt.style.use(['science','ieee', 'no-latex'])

::: {#cell-3 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

import numpy as np
import matplotlib.pyplot as plt

:::

::: {#cell-4 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

def plot_poles_zeros(
    p: np.ndarray,  # (B, p) zeros
    ax=None,
    **kwargs,
):
    B, _ = p.shape

    fig, ax = plt.subplots(1, 1, figsize=(5, 5)) if ax is None else (None, ax)
    ax.plot(
        np.cos(np.linspace(0, 2 * np.pi)),
        np.sin(np.linspace(0, 2 * np.pi)),
        linestyle="dashed",
        color="black",
        alpha=0.3,
    )
    ax.set_xlim([-1.5, 1.5])
    ax.set_ylim([-1.5, 1.5])
    # draw lines from origin
    ax.plot([-1.5, 1.5], [0, 0], linestyle="dashed", color="black", alpha=0.1)
    ax.plot([0, 0], [-1.5, 1.5], linestyle="dashed", color="black", alpha=0.1)
    ax.grid(True, which="both")

    ax.scatter(p.real, p.imag, **kwargs)

:::

::: {#cell-5 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

def plot_solution(
    gt: np.ndarray,  # (time_steps, grid_size)
    pred: np.ndarray,  # (time_steps, grid_size)
    ar_gt: np.ndarray = None,  # (time_steps, grid_size)
    ar_pred: np.ndarray = None,  # (time_steps, grid_size)
) -> plt.Figure:
    """
    Plot a comparison of the ground truth and predicted solution.

    Args:
        gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
        pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
        ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.

    Returns:
        A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
    """
    vmin = np.min(gt)
    vmax = np.max(gt)
    time_steps = np.linspace(0, 1, gt.shape[0])
    grid = np.linspace(0, 1, gt.shape[1])

    fig, ax = plt.subplots(
        1,
        2 + 2 * int(ar_pred is not None),
        figsize=(5 + 2 * int(ar_pred is not None) * 2, 6),
    )

    ax[0].pcolormesh(grid, time_steps, gt, vmin=vmin, vmax=vmax)
    ax[0].set(title="gt u", xlabel="x", ylabel="t")

    ax[1].pcolormesh(grid, time_steps, pred, vmin=vmin, vmax=vmax)
    ax[1].set(title="pred u", xlabel="x", ylabel="t")

    if ar_pred is not None:
        T, G = ar_pred.shape
        time_steps = np.linspace(0, 1, T)
        grid = np.linspace(0, 1, G)
        ax[2].pcolormesh(grid, time_steps, ar_gt, vmin=vmin, vmax=vmax)
        ax[2].set(title="ar gt u", xlabel="x", ylabel="t")
        ax[3].pcolormesh(grid, time_steps, ar_pred, vmin=vmin, vmax=vmax)
        ax[3].set(title="ar pred u", xlabel="x", ylabel="t")
    fig.tight_layout()
    return fig

:::

::: {#cell-6 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

def plot_solution_2d(
    gt: np.ndarray,  # (T, H, W)
    pred: np.ndarray,  # (T, H, W)
    ar_pred: np.ndarray = None,  # (T, H, W)
    random_sample: bool = False,  # whether to randomly sample a time step
) -> plt.Figure:
    """
    Plot a comparison of the ground truth and predicted solution.

    Args:
        gt: A numpy array of shape `(time_steps, grid_size)` representing the ground truth values.
        pred: A numpy array of shape `(time_steps, grid_size)` representing the predicted values.
        ar_pred (optional): A numpy array of shape `(time_steps, grid_size)` representing the autoregressive predicted values.

    Returns:
        A matplotlib figure object containing the subplots of the ground truth, predicted, and autoregressive predicted solutions.
    """
    vmin = np.min(gt)
    vmax = np.max(gt)

    fig, ax = plt.subplots(
        1, 2 + int(ar_pred is not None), figsize=(6, 5 + int(ar_pred is not None) * 2)
    )

    if random_sample:
        t = np.random.randint(gt.shape[0])
    else:
        t = gt.shape[0] // 2

    ax[0].imshow(gt[t], vmin=vmin, vmax=vmax)
    ax[0].set(title=f"gt at sample {t}", xlabel="x", ylabel="y")

    ax[1].imshow(pred[t], vmin=vmin, vmax=vmax)
    ax[1].set(title=f"pred at sample {t}", xlabel="x", ylabel="y")

    if ar_pred is not None:
        ax[2].imshow(ar_pred[t], vmin=vmin, vmax=vmax)
        ax[2].set(title="ar pred", xlabel="x", ylabel="y")

    fig.tight_layout()
    return fig

:::

::: {#cell-7 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

def plot_eigenvalues(
    eigenvalues: np.ndarray,  # (B, p)
    sample_rate: int = None,  # sample rate of the signal for radians otherwise we use normalized frequency
    log: bool = False,
):
    # filter only rhe eigenvalues from 0 to pi excluding -pi to 0
    filtered_eigenvalues = np.diag(eigenvalues)[
        (np.angle(np.diag(eigenvalues)) > 0) & (np.angle(np.diag(eigenvalues)) < np.pi)
    ]

    # if sample rate is not given we assume that the eigenvalues are already normalized
    frequency = (
        np.angle(filtered_eigenvalues) * sample_rate / (2 * np.pi)
        if sample_rate is not None
        else np.angle(filtered_eigenvalues)
    )

    fig, ax = plt.subplots(1, 1, figsize=(5, 3))
    ax.scatter(frequency, 20 * np.log10(np.abs(filtered_eigenvalues)), marker="x")

    ax.set_xscale("log") if log else None
    # Set ticks at each power of 10
    # ticks = [10**i for i in range(int(np.log10(sample_rate / (2 * np.pi))))]
    # ax.set_xticks(ticks)
    # ax.set_ylim([0.990, 1.010])
    ax.set_xlabel("Angle of the pole")
    ax.set_ylabel("Magnitude of the pole in dB")
    ax.grid(which="both")
    fig.tight_layout()
    return fig

:::

::: {#cell-8 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

# from https://stackoverflow.com/a/52998713
def ewma_vectorized(data, alpha, offset=None, dtype=None, order="C", out=None):
    """
    Calculates the exponential moving average over a vector.
    Will fail for large inputs.
    :param data: Input data
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param offset: optional
        The offset for the moving average, scalar. Defaults to data[0].
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Defaults to 'C'.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the input. If not provided or `None`,
        a freshly-allocated array is returned.
    """
    data = np.array(data, copy=False)

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float64
    else:
        dtype = np.dtype(dtype)

    if data.ndim > 1:
        # flatten input
        data = data.reshape(-1, order)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    if data.size < 1:
        # empty input, return empty array
        return out

    if offset is None:
        offset = data[0]

    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

    # scaling_factors -> 0 as len(data) gets large
    # this leads to divide-by-zeros below
    scaling_factors = np.power(
        1.0 - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype
    )
    # create cumulative sum array
    np.multiply(
        data, (alpha * scaling_factors[-2]) / scaling_factors[:-1], dtype=dtype, out=out
    )
    np.cumsum(out, dtype=dtype, out=out)

    # cumsums / scaling
    out /= scaling_factors[-2::-1]

    if offset != 0:
        offset = np.array(offset, copy=False).astype(dtype, copy=False)
        # add offsets
        out += offset * scaling_factors[1:]

    return out


def ewma_vectorized_2d(
    data, alpha, axis=None, offset=None, dtype=None, order="C", out=None
):
    """
    Calculates the exponential moving average over a given axis.
    :param data: Input data, must be 1D or 2D array.
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param axis: The axis to apply the moving average on.
        If axis==None, the data is flattened.
    :param offset: optional
        The offset for the moving average. Must be scalar or a
        vector with one element for each row of data. If set to None,
        defaults to the first value of each row.
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Ignored if axis is not None.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the desired output. If not provided or `None`,
        a freshly-allocated array is returned.
    """
    data = np.array(data, copy=False)

    assert data.ndim <= 2

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float64
    else:
        dtype = np.dtype(dtype)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    if data.size < 1:
        # empty input, return empty array
        return out

    if axis is None or data.ndim < 2:
        # use 1D version
        if isinstance(offset, np.ndarray):
            offset = offset[0]
        return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out)

    assert -data.ndim <= axis < data.ndim

    # create reshaped data views
    out_view = out
    if axis < 0:
        axis = data.ndim - int(axis)

    if axis == 0:
        # transpose data views so columns are treated as rows
        data = data.T
        out_view = out_view.T

    if offset is None:
        # use the first element of each row as the offset
        offset = np.copy(data[:, 0])
    elif np.size(offset) == 1:
        offset = np.reshape(offset, (1,))

    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

    # calculate the moving average
    row_size = data.shape[1]
    row_n = data.shape[0]
    scaling_factors = np.power(
        1.0 - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype
    )
    # create a scaled cumulative sum array
    np.multiply(
        data,
        np.multiply(
            alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype
        )
        / scaling_factors[np.newaxis, :-1],
        dtype=dtype,
        out=out_view,
    )
    np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
    out_view /= scaling_factors[np.newaxis, -2::-1]

    if not (np.size(offset) == 1 and offset == 0):
        offset = offset.astype(dtype, copy=False)
        # add the offsets to the scaled cumulative sums
        out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]

    return out


def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order="C", out=None):
    """
    Reshapes data before calculating EWMA, then iterates once over the rows
    to calculate the offset without precision issues
    :param data: Input data, will be flattened.
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param row_size: int, optional
        The row size to use in the computation. High row sizes need higher precision,
        low values will impact performance. The optimal value depends on the
        platform and the alpha being used. Higher alpha values require lower
        row size. Default depends on dtype.
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Defaults to 'C'.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the desired output. If not provided or `None`,
        a freshly-allocated array is returned.
    :return: The flattened result.
    """
    data = np.array(data, copy=False)

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float
    else:
        dtype = np.dtype(dtype)

    row_size = int(row_size) if row_size is not None else get_max_row_size(alpha, dtype)

    if data.size <= row_size:
        # The normal function can handle this input, use that
        return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)

    if data.ndim > 1:
        # flatten input
        data = np.reshape(data, -1, order=order)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    row_n = int(data.size // row_size)  # the number of rows to use
    trailing_n = int(data.size % row_size)  # the amount of data leftover
    first_offset = data[0]

    if trailing_n > 0:
        # set temporary results to slice view of out parameter
        out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
        data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
    else:
        out_main_view = out
        data_main_view = data

    # get all the scaled cumulative sums with 0 offset
    ewma_vectorized_2d(
        data_main_view,
        alpha,
        axis=1,
        offset=0,
        dtype=dtype,
        order="C",
        out=out_main_view,
    )

    scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
    last_scaling_factor = scaling_factors[-1]

    # create offset array
    offsets = np.empty(out_main_view.shape[0], dtype=dtype)
    offsets[0] = first_offset
    # iteratively calculate offset for each row
    for i in range(1, out_main_view.shape[0]):
        offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]

    # add the offsets to the result
    out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]

    if trailing_n > 0:
        # process trailing data in the 2nd slice of the out parameter
        ewma_vectorized(
            data[-trailing_n:],
            alpha,
            offset=out_main_view[-1, -1],
            dtype=dtype,
            order="C",
            out=out[-trailing_n:],
        )
    return out


def get_max_row_size(alpha, dtype=float):
    assert 0.0 <= alpha < 1.0
    # This will return the maximum row size possible on
    # your platform for the given dtype. I can find no impact on accuracy
    # at this value on my machine.
    # Might not be the optimal value for speed, which is hard to predict
    # due to numpy's optimizations
    # Use np.finfo(dtype).eps if you  are worried about accuracy
    # and want to be extra safe.
    epsilon = np.finfo(dtype).tiny
    # If this produces an OverflowError, make epsilon larger
    return int(np.log(epsilon) / np.log(1 - alpha)) + 1

:::

::: {#cell-9 .cell 0=‘e’ 1=‘x’ 2=‘p’ 3=‘o’ 4=‘r’ 5=‘t’}

def plot_freqz(
    h: np.ndarray,  # The filter's frequency response.
    ax=None,  # The axes to plot on. If not provided, a new figure and axes will be created.
    **kwargs,  # Additional arguments to pass to the plot function.
) -> plt.Axes:
    """
    Plots the frequency response of a filter.

    Parameters:
    - h: jtx.ArrayLike: The filter's frequency response.
    - ax: matplotlib.axes.Axes, optional: The axes to plot on. If not provided, a new figure and axes will be created.

    Returns:
    - ax: matplotlib.axes.Axes: The plotted axes.
    """
    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=(8, 3))
    ax.set_xlabel("Frequency [Hz]")
    ax.set_ylabel("Magnitude [dB]")
    ax.semilogx(20 * np.log10(np.abs(h)), **kwargs)
    ax.grid(True, which="both", ls="-")
    return ax

:::