Source code for vindy.utils.utils
import numpy as np
import os
import matplotlib.pyplot as plt
import imageio
[docs]
def add_lognormal_noise(trajectory, sigma):
"""
Add lognormal noise to a trajectory.
Parameters
----------
trajectory : array-like
The trajectory data to add noise to.
sigma : float
Standard deviation parameter for the lognormal distribution.
Returns
-------
tuple of array-like
Tuple containing (noisy_trajectory, noise).
"""
noise = np.random.lognormal(mean=0, sigma=sigma, size=trajectory.shape)
return trajectory * noise, noise
[docs]
def coefficient_distributions_to_csv(sindy_layer, outdir, var_names=[], param_names=[]):
"""
Save the coefficient distributions of the SINDy layer to CSV files.
Parameters
----------
sindy_layer : SindyLayer
SINDy layer containing coefficient distributions.
outdir : str
Output directory for CSV files.
var_names : list of str, optional
Names of state variables (default is []).
param_names : list of str, optional
Names of parameters (default is []).
"""
if not var_names:
var_names = [f"z{i}" for i in range(1, sindy_layer.output_dim + 1)]
if not param_names:
param_names = [f"p_{i}" for i in range(1, sindy_layer.param_dim + 1)]
feature_names = [
name_.replace("*", "")
for name_ in sindy_layer.get_feature_names(var_names, param_names)
]
n_vars = sindy_layer.state_dim
n_features = len(feature_names)
_, mean, log_scale = sindy_layer._coeffs
# reverse log_scale
scale = sindy_layer.priors.reverse_log(log_scale.numpy())
mean_values, scale_values = (
mean.numpy().reshape(n_vars, n_features).T,
scale.numpy().reshape(n_vars, n_features).T,
)
# minimum scale value for which Laplacian dist can be plotted in pgfplots is 1e-4
scale_values = np.maximum(scale_values, 1e-4)
for i in range(n_vars):
save_value = np.concatenate(
[
np.array(feature_names)[:, np.newaxis],
mean_values[:, i : i + 1],
scale_values[:, i : i + 1],
],
axis=1,
).T
np.savetxt(
os.path.join(outdir, f"vindy_{var_names[i]}_dot.csv"),
save_value,
delimiter=",",
fmt="%s",
comments="",
header=",".join(np.array(range(len(feature_names))).astype(str)),
)
[docs]
def coefficient_distribution_gif(
mean_over_epochs, scale_over_epochs, sindy_layer, outdir
):
"""
Create a GIF showing how coefficient distributions evolve over epochs.
Parameters
----------
mean_over_epochs : list of array-like
Mean values of coefficients at each epoch.
scale_over_epochs : list of array-like
Scale values of coefficients at each epoch.
sindy_layer : SindyLayer
SINDy layer instance with visualization methods.
outdir : str
Output directory for saving frames and the GIF.
"""
os.makedirs(os.path.join(outdir, "coefficients"), exist_ok=True)
# determine how many frames to generate (capped at 401 frames: indices 0-400)
max_frames = min(len(mean_over_epochs), len(scale_over_epochs), 401)
# create gif showing how the coefficient distributions evolve over time
for i, (mean_, scale_) in enumerate(zip(mean_over_epochs, scale_over_epochs)):
if i >= max_frames:
break
x_range = 1.5 # - (1.5 * i / len(mean_over_epochs))
# dont show figure
fig = sindy_layer._visualize_coefficients(
mean_, scale_, x_range=[-x_range, x_range], y_range=[0, 6]
)
# fig title
fig.suptitle(f"Epoch {i}")
# save fig as frame for gif
fig.savefig(os.path.join(outdir, "coefficients", f"coeffs_{i}.png"))
plt.close(fig)
# make gif from frames
images = []
for i in range(max_frames):
images.append(
imageio.imread(os.path.join(outdir, "coefficients", f"coeffs_{i}.png"))
)
imageio.mimsave(
os.path.join(outdir, "coefficients", "coeffs.gif"),
images,
duration=100,
)
def plot_train_history(history, outdir, validation: bool = True):
"""
Plot the training history.
Parameters
----------
history : dict or keras.callbacks.History
Training history object or dictionary.
outdir : str
Output directory path for saving plots.
validation : bool, default=True
Whether to plot validation metrics.
"""
os.makedirs(outdir, exist_ok=True)
# plot training history
fig, ax = plt.subplots(1, 1, figsize=(6, 4))
for loss_term, loss_values in history.items():
if (
(not validation and "val_" in loss_term)
or (validation and "val_" not in loss_term)
) and "coeffs" not in loss_term:
ax.plot(loss_values, label=loss_term)
ax.set_yscale("log")
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
ax.legend()
plt.show()
fig.savefig(os.path.join(outdir, "training_history.png"))
plt.close(fig)
def plot_coefficients_train_history(history, outdir):
"""
Plot the coefficient training history.
Parameters
----------
history : dict
Training history containing 'coeffs_mean' and 'coeffs_scale'.
outdir : str
Output directory path for saving plots.
"""
mean_over_epochs = np.array(history["coeffs_mean"]).squeeze()
scale_over_epochs = np.array(history["coeffs_scale"]).squeeze()
os.makedirs(outdir, exist_ok=True)
# plot training history
fig, ax = plt.subplots(2, 1, figsize=(6, 4))
ax[0].plot(mean_over_epochs)
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("Coefficient mean")
ax[1].plot(scale_over_epochs)
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("Coefficient scale")
plt.show()
fig.savefig(os.path.join(outdir, "coefficients_history.png"))
plt.close(fig)
[docs]
def switch_data_format(
data, n_sims, n_timesteps, spatial_shape=None, target_format="auto"
):
"""
Convert between vectorized (2D), simulation-wise flattened (3D), and full spatial (5D) data formats.
Parameters
- data: np.ndarray. One of:
* 2D: (n_sims * n_timesteps, features)
* 3D: (n_sims, n_timesteps, features)
* 5D: (n_sims, n_timesteps, Nx, Ny, channels)
- n_sims: int, number of simulations
- n_timesteps: int, timesteps per simulation
- spatial_shape: optional tuple describing spatial dims. Accepts (Nx, Ny, channels) or (N, channels) or (Nx, Ny).
When converting to/from 5D, this is required unless it can be inferred unambiguously from feature size.
- target_format: 'auto' (default), '2d', '3d', or '5d'. When 'auto', the function chooses a sensible target based on input.
Returns
- Converted np.ndarray in requested format.
Examples
- 2D -> 5D: provide spatial_shape=(Nx,Ny,channels) and target_format='5d'
- 5D -> 2D: target_format='2d' or rely on 'auto' to get 3D flattened by default
"""
if data is None:
return None
if target_format not in ("auto", "2d", "3d", "5d"):
raise ValueError("target_format must be one of 'auto','2d','3d','5d'")
# Input is vectorized 2D: (n_sims * n_timesteps, features)
if data.ndim == 2 and data.shape[0] == n_sims * n_timesteps:
if target_format == "2d" or (target_format == "auto" and data.ndim == 2):
return data
features = data.shape[1]
if target_format == "5d":
if spatial_shape is None:
raise ValueError(
"spatial_shape (Nx,Ny,channels) is required to reshape to 5D"
)
# accept (Nx,Ny,channels) or (N,channels)
if len(spatial_shape) == 3:
Nx, Ny, channels = spatial_shape
elif len(spatial_shape) == 2:
# (N, channels)
N, channels = spatial_shape
# try to factor N into Nx,Ny by assuming square grid
Nx = int(np.sqrt(N))
if Nx * Nx != N:
raise ValueError(
"Cannot infer Nx,Ny from N; provide (Nx,Ny,channels)"
)
Ny = Nx
else:
raise ValueError("spatial_shape must be length 2 or 3")
if features != Nx * Ny * channels:
raise ValueError(
f"Feature size {features} does not match provided spatial_shape {spatial_shape}"
)
return data.reshape(n_sims, n_timesteps, Nx, Ny, channels)
# default: to 3D flattened features
return data.reshape(n_sims, n_timesteps, -1)
# Input is simulation-wise flattened 3D: (n_sims, n_timesteps, features)
if data.ndim == 3 and data.shape[0] == n_sims and data.shape[1] == n_timesteps:
if target_format == "3d" or (
target_format == "auto" and data.ndim == 3 and spatial_shape is None
):
return data
if target_format == "2d":
return data.reshape(-1, data.shape[-1])
# convert to 5D
features = data.shape[2]
if spatial_shape is None:
# try infer square grid and single channel
Nx = int(np.sqrt(features))
if Nx * Nx == features:
Ny = Nx
channels = 1
else:
raise ValueError("spatial_shape required to reshape 3D to 5D")
else:
if len(spatial_shape) == 3:
Nx, Ny, channels = spatial_shape
elif len(spatial_shape) == 2:
N, channels = spatial_shape
Nx = int(np.sqrt(N))
if Nx * Nx != N:
raise ValueError(
"Cannot infer Nx,Ny from N; provide (Nx,Ny,channels)"
)
Ny = Nx
else:
raise ValueError("spatial_shape must be length 2 or 3")
if features != Nx * Ny * channels:
raise ValueError(
f"Feature size {features} does not match provided spatial_shape {spatial_shape}"
)
return data.reshape(n_sims, n_timesteps, Nx, Ny, channels)
# Input is full spatial 5D: (n_sims, n_timesteps, Nx, Ny, channels)
if data.ndim == 5 and data.shape[0] == n_sims and data.shape[1] == n_timesteps:
if target_format == "5d" or (target_format == "auto" and data.ndim == 5):
return data
# flatten to 3D
flat3 = data.reshape(n_sims, n_timesteps, -1)
if target_format == "3d" or (target_format == "auto"):
return flat3
# flatten to 2D
return flat3.reshape(-1, flat3.shape[-1])
# If none matched, raise
raise ValueError(
f'Data shape {getattr(data, "shape", None)} not compatible with n_sims={n_sims}, n_timesteps={n_timesteps}'
)
"""
Shared utility functions for examples.
This module contains common functions used across different example scripts
(MEMS, reaction_diffusion, etc.) to avoid code duplication.
"""
import os
import random
import logging
import datetime
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
[docs]
def get_config():
"""
Import and return the config module.
This function handles the import of the examples config module with proper
fallbacks so the examples can be run both when `examples` is a package and
when it's just a directory with `config.py` next to the example scripts.
Returns
-------
module
The config module object.
Raises
------
ImportError
If config.py doesn't exist or can't be imported.
"""
# 1) Preferred: examples is a package (examples/__init__.py exists)
try:
import examples.config as config
return config
except Exception:
pass
# 2) If running the example from inside the examples folder, a top-level
# `import config` may work (e.g. python MEMS.py when cwd is examples/MEMS)
try:
import config as config
return config
except Exception:
pass
# 3) Fallback: try to locate examples/config.py on disk and import it by path
import importlib.util
from pathlib import Path
candidates = []
# a) examples/config.py relative to project root (assume repo layout)
try:
repo_root = Path(__file__).resolve().parents[2]
candidates.append(repo_root / "examples" / "config.py")
except Exception:
pass
# b) examples/config.py relative to current working directory
candidates.append(Path.cwd() / "examples" / "config.py")
# c) examples/config.py next to this utils file (edge case)
candidates.append(Path(__file__).resolve().parent.parent / "examples" / "config.py")
# d) direct config.py in cwd
candidates.append(Path.cwd() / "config.py")
for candidate in candidates:
try:
candidate = candidate.resolve()
except Exception:
continue
if candidate.exists() and candidate.is_file():
try:
spec = importlib.util.spec_from_file_location("examples_config", str(candidate))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
except Exception:
# try next candidate
continue
# Nothing worked: raise a helpful error
raise ImportError(
"Could not import examples config. Please ensure that there is a file named 'config.py' in the examples/ folder. "
"You can copy examples/config.py.template to examples/config.py and customize it with the correct data paths and parameters for your setup. "
"Checked locations: {}".format(
", ".join(str(p) for p in candidates)
)
)
[docs]
def set_seed(seed: int):
"""
Set seed for reproducibility in TensorFlow, NumPy, and Python's random module.
Parameters
----------
seed : int
The seed value to set.
"""
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)
[docs]
def validate_data_path(data_path: str, zenodo_doi: str = "10.5281/zenodo.18313843"):
"""
Validate that a data file exists and provide a helpful error message if not.
Parameters
----------
data_path : str
Path to the data file.
zenodo_doi : str, optional
Zenodo DOI for downloading the data (default is "10.5281/zenodo.18313843").
Raises
------
FileNotFoundError
If the data file does not exist.
"""
if not os.path.isfile(data_path):
raise FileNotFoundError(
f"Data file {data_path} not found. "
f"Please download the file from Zenodo (http://doi.org/{zenodo_doi}) and "
f"specify the correct path in the examples/config.py file."
)
[docs]
def plot_train_history(trainhist, result_dir, validation=True):
"""
Plot training history including loss curves.
Parameters
----------
trainhist : dict
Training history dictionary containing loss values.
result_dir : str
Directory to save the plot.
validation : bool, optional
Whether to include validation loss in the plot (default is True).
"""
try:
os.makedirs(result_dir, exist_ok=True)
fig, ax = plt.subplots(figsize=(10, 6))
# Plot training loss
if "loss" in trainhist:
ax.plot(trainhist["loss"], label="Training Loss", linewidth=2)
# Plot validation loss if requested and available
if validation and "val_loss" in trainhist:
ax.plot(trainhist["val_loss"], label="Validation Loss", linewidth=2)
ax.set_xlabel("Epoch", fontsize=12)
ax.set_ylabel("Loss", fontsize=12)
ax.set_title("Training History", fontsize=14, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.set_yscale("log")
plt.tight_layout()
plt.show()
# Save figure
suffix = "_val" if validation else "_train"
save_path = os.path.join(result_dir, f"training_history{suffix}.png")
plt.savefig(save_path, dpi=300, bbox_inches="tight")
logging.info(f"Saved training history plot to {save_path}")
plt.close(fig)
except Exception as e:
logging.warning(f"Failed to plot training history: {e}")
[docs]
def plot_coefficients_train_history(trainhist, result_dir):
"""
Plot the evolution of SINDy coefficients during training.
Parameters
----------
trainhist : dict
Training history dictionary.
result_dir : str
Directory to save the plot.
"""
os.makedirs(result_dir, exist_ok=True)
# Check if coefficient history is available
if "coeffs_mean" not in trainhist:
logging.warning("No SINDy coefficients found in training history")
return
coeffs = np.array(trainhist["coeffs_mean"])
# Plot coefficient evolution
fig, ax = plt.subplots(figsize=(12, 6))
n_coeffs = coeffs.shape[1] if coeffs.ndim > 1 else 1
for i in range(n_coeffs):
if coeffs.ndim > 1:
ax.plot(coeffs[:, i], label=f"Coeff {i}", linewidth=1.5)
else:
ax.plot(coeffs, label=f"Coeff {i}", linewidth=1.5)
ax.set_xlabel("Epoch", fontsize=12)
ax.set_ylabel("Coefficient Value", fontsize=12)
ax.set_title("SINDy Coefficient Evolution", fontsize=14, fontweight="bold")
ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
save_path = os.path.join(result_dir, "coefficients_history.png")
plt.savefig(save_path, dpi=300, bbox_inches="tight")
logging.info(f"Saved coefficient history plot to {save_path}")
plt.close(fig)
[docs]
def create_result_directory(base_dir: str, model_name: str) -> str:
"""
Create a result directory for saving model outputs.
Parameters
----------
base_dir : str
Base directory for results.
model_name : str
Name of the model/experiment.
Returns
-------
str
Path to the created result directory.
"""
result_dir = os.path.join(base_dir, model_name)
os.makedirs(result_dir, exist_ok=True)
logging.info(f"Result directory: {result_dir}")
return result_dir
[docs]
def log_model_summary(veni, result_dir: str = None):
"""
Log a summary of the VENI model architecture.
Parameters
----------
veni : VENI
The VENI model instance.
result_dir : str, optional
Directory to save the summary text file (default is None).
"""
try:
logging.info("=" * 60)
logging.info("Model Summary:")
logging.info("=" * 60)
# Log key parameters
logging.info(f"Reduced order: {veni.reduced_order}")
logging.info(f"Second order: {veni.second_order}")
logging.info(f"Scaling method: {veni.scaling}")
# Get model summary
summary_lines = []
veni.summary(print_fn=lambda x: summary_lines.append(x))
for line in summary_lines:
logging.info(line)
# Save to file if directory provided
if result_dir:
os.makedirs(result_dir, exist_ok=True)
summary_path = os.path.join(result_dir, "model_summary.txt")
with open(summary_path, "w") as f:
f.write("\n".join(summary_lines))
logging.info(f"Model summary saved to {summary_path}")
logging.info("=" * 60)
except Exception as e:
logging.warning(f"Failed to log model summary: {e}")
[docs]
def get_latent_initial_conditions(veni, x, dxdt, dxddt, mean_or_sample):
"""
Compute the initial conditions in latent space for integration.
Computes initial conditions based on the provided state and its derivatives.
Parameters
----------
veni : VENI
The VENI model instance.
x : array-like
The state data array.
dxdt : array-like
The first time derivative of the state data array.
dxddt : array-like or None
The second time derivative of the state data array (can be None if not available).
mean_or_sample : str
Whether to compute the mean initial condition or sample from the distribution
("mean" or "sample").
Returns
-------
array-like
Initial conditions in latent space.
"""
if veni.second_order:
z0, dz0, _ = veni.calc_latent_time_derivatives(
x, dxdt, dxddt, mean_or_sample=mean_or_sample
)
ic = np.concatenate(
[z0[0], dz0[0]], axis=-1
) # remove batch dimension and concatenate
else:
z0, dz0 = veni.calc_latent_time_derivatives(
x, dxdt, None, mean_or_sample=mean_or_sample
)
ic = z0[0] # remove batch dimension
return ic
[docs]
def perform_inference(
veni,
sim_ids,
n_sims,
n_timesteps,
t,
x,
dxdt=None,
params=None,
):
"""
Perform inference on test trajectories and plot the results.
Parameters
----------
veni : VENI
The trained VENI model.
sim_ids : list of int
List of test trajectory indices.
n_sims : int
Number of simulations.
n_timesteps : int
Number of timesteps in each test trajectory.
t : array-like
Test time steps.
x : array-like
Scaled test data.
dxdt : array-like, optional
Scaled test data derivatives (default is None).
params : array-like, optional
Test parameters (default is None).
Returns
-------
tuple
Tuple containing (t_preds, z_preds) - predicted trajectories and their
corresponding time steps.
"""
# Reshape data into simulation-wise format
T = switch_data_format(t, n_sims, n_timesteps, target_format="3d")
z, dzdt = veni.calc_latent_time_derivatives(x, dxdt)
Z = switch_data_format(z, n_sims, n_timesteps, target_format="3d")
DZDT = switch_data_format(dzdt, n_sims, n_timesteps, target_format="3d")
Params = switch_data_format(params, n_sims, n_timesteps, target_format="3d")
z_preds = []
t_preds = []
start_time = datetime.datetime.now()
for i, j in enumerate(sim_ids):
logging.info(f"Processing trajectory {i+1}/{len(sim_ids)}")
# Perform integration
ic = np.concatenate([Z[j, 0], DZDT[j, 0]]) if veni.second_order else Z[j, 0]
sol = veni.integrate(
ic,
T[j].squeeze(),
mu=Params[j] if params is not None else None,
)
z_preds.append(sol.y)
t_preds.append(sol.t)
end_time = datetime.datetime.now()
logging.info(
f"Inference time: {(end_time - start_time).total_seconds()/len(sim_ids):.2f} seconds per trajectory"
)
# Convert predictions to arrays
z_preds = np.array(z_preds)
t_preds = np.array(t_preds)
return Z, z_preds, t_preds
[docs]
def plot_inference_results(t_preds, z_preds, T, Z, sim_ids, state_id=0):
"""
Plot inference results for test trajectories.
Parameters
----------
t_preds : list of array-like
Predicted time steps for each trajectory.
z_preds : list of array-like
Predicted latent states for each trajectory.
T : array-like
True time steps.
Z : array-like
True latent states.
sim_ids : list of int
List of simulation indices to plot.
state_id : int, optional
Index of the state variable to plot (default is 0).
"""
# Plot inference results
fig, axs = plt.subplots(len(sim_ids), 1, figsize=(12, 12), sharex=True)
fig.suptitle(f"Inference of Test Trajectories")
for i, j in enumerate(sim_ids):
axs[i].set_title(f"Test Trajectory {j}")
axs[i].plot(T[j], Z[j][:, state_id], color="blue", label="True")
axs[i].plot(
t_preds[i],
z_preds[i][state_id],
color="red",
linestyle="--",
label="Predicted",
)
axs[i].set_xlabel("$t$")
axs[i].set_ylabel("$z$")
axs[i].legend()
plt.tight_layout()
plt.show()
[docs]
def perform_forward_uq(
veni,
sim_ids,
n_traj,
n_sims,
n_timesteps,
t,
x,
dxdt,
dxddt=None,
params=None,
sigma=3,
):
"""
Perform forward uncertainty quantification by sampling trajectories.
Samples trajectories from the SINDy model. The function is flexible with optional
`dxddt` and `params` (pass None if not available).
Parameters
----------
veni : VENI
The VENI model instance.
sim_ids : list of int
List of simulation indices to process.
n_traj : int
Number of trajectories to sample for each simulation.
n_sims : int
Total number of simulations in the dataset.
n_timesteps : int
Number of timesteps in each simulation.
t : array-like
Time vector.
x : array-like
State data.
dxdt : array-like
First time derivative of state data.
dxddt : array-like, optional
Second time derivative of state data (default is None).
params : array-like, optional
Additional parameters for integration (default is None).
sigma : float, optional
Number of standard deviations for confidence intervals (default is 3).
Returns
-------
dict
Dictionary with keys: sampled_times, latent_trajectories_samples,
mean_latent_samples, std_latent_samples, mean_latent, lower_bound_latent,
upper_bound_latent, z, dzdt.
"""
second_order = veni.second_order
# Accept tensors or numpy arrays
def _to_numpy(a):
if a is None:
return None
try:
return a.numpy()
except Exception:
return np.asarray(a)
x = _to_numpy(x)
dxdt = _to_numpy(dxdt)
dxddt = _to_numpy(dxddt)
params = _to_numpy(params)
t = _to_numpy(t)
# Switch to simulation-wise 3D arrays if needed
X = switch_data_format(x, n_sims, n_timesteps, target_format="3d")
DXDT = switch_data_format(dxdt, n_sims, n_timesteps, target_format="3d")
DXDDT = (
switch_data_format(dxddt, n_sims, n_timesteps, target_format="3d")
if second_order
else None
)
T = switch_data_format(t, n_sims, n_timesteps, target_format="3d")
Params = (
switch_data_format(params, n_sims, n_timesteps, target_format="3d")
if params is not None
else None
)
# compute latent derivatives from the provided (possibly vectorized) arrays
if second_order:
z, dzdt, _ = veni.calc_latent_time_derivatives(x, dxdt, dxddt)
else:
z, dzdt = veni.calc_latent_time_derivatives(x, dxdt, None)
# ensure z and dzdt have simulation-wise shapes (n_sims, n_timesteps, n_states)
Z = switch_data_format(_to_numpy(z), n_sims, n_timesteps, target_format="3d")
DZDT = switch_data_format(_to_numpy(dzdt), n_sims, n_timesteps, target_format="3d")
# Save kernel state
kernel_orig, kernel_scale_orig = (
veni.sindy_layer.kernel,
veni.sindy_layer.kernel_scale,
)
sampled_times = []
latent_trajectories_samples = []
mean_latent_trajectories = []
for i in sim_ids:
logging.info("Processing trajectory %d/%d", i + 1, len(sim_ids))
# mu parameter for SINDy integration
mu = Params[i] if params is not None else None
# time vector for integration
tvec = T[i].squeeze()
# Get initial conditions in physical space for the first timestep of each simulation
x0, dx0dt0, dx0ddt0 = (
X[i, 0:1],
DXDT[i, 0:1],
DXDDT[i, 0:1] if second_order else None,
)
# sampling
traj_samples = []
traj_times = []
for traj in range(n_traj):
logging.info("\tSampling model %d/%d", traj + 1, n_traj)
# sample initial condition
ic = get_latent_initial_conditions(
veni, x0, dx0dt0, dx0ddt0, mean_or_sample="sample"
)
sol, coeffs = veni.sindy_layer.integrate_uq(ic, tvec, mu=mu)
traj_samples.append(np.asarray(sol.y))
traj_times.append(np.asarray(sol.t))
sampled_times.append(traj_times)
latent_trajectories_samples.append(traj_samples)
# Mean / nominal integration (using original kernel)
veni.sindy_layer.kernel, veni.sindy_layer.kernel_scale = (
kernel_orig,
kernel_scale_orig,
)
# mean initial condition (using mean prediction from encoder)
ic = get_latent_initial_conditions(
veni, x0, dx0dt0, dx0ddt0, mean_or_sample="mean"
)
sol_mean = veni.integrate(ic, tvec, mu=mu)
mean_latent_trajectories.append(np.asarray(sol_mean.y))
# convert lists to arrays
latent_trajectories_samples = np.array(latent_trajectories_samples)
# expected shape: (ns, n_traj, n_states, n_timesteps) -> transpose to (ns, n_traj, n_timesteps, n_states)
latent_trajectories_samples = np.transpose(
latent_trajectories_samples, (0, 1, 3, 2)
)
# statistics across sampled trajectories: mean/std over axis=1 (samples)
mean_latent_samples = np.mean(latent_trajectories_samples, axis=1)
std_latent_samples = np.std(latent_trajectories_samples, axis=1)
# mean trajectories: convert list to array and ensure shape (ns, n_timesteps, n_states)
mean_latent = np.array(mean_latent_trajectories)
mean_latent = np.transpose(mean_latent, (0, 2, 1))
lower_bound_latent = mean_latent - sigma * std_latent_samples
upper_bound_latent = mean_latent + sigma * std_latent_samples
return {
"sampled_times": sampled_times,
"latent_trajectories_samples": latent_trajectories_samples,
"mean_latent_samples": mean_latent_samples,
"std_latent_samples": std_latent_samples,
"mean_latent": mean_latent,
"lower_bound_latent": lower_bound_latent,
"upper_bound_latent": upper_bound_latent,
"z": Z,
"dzdt": DZDT,
}
[docs]
def uq_plots(
sampled_times,
mean_latent,
mean_latent_samples,
std_latent_samples,
t_test,
z_test,
test_ids,
state_id=0,
):
"""
Generate uncertainty quantification plots.
Parameters
----------
sampled_times : list of array-like
Time points for sampled UQ trajectories.
mean_latent : array-like
Mean trajectories from deterministic integration.
mean_latent_samples : array-like
Mean of sampled trajectories.
std_latent_samples : array-like
Standard deviation of sampled trajectories.
t_test : array-like
Test time steps.
z_test : array-like
Latent states for test data.
test_ids : list of int
List of test trajectory indices to plot.
state_id : int, optional
Index of the state variable to plot (default is 0).
"""
n_test = len(test_ids)
# plot the mean and 3*std of the trajectories
fig, axs = plt.subplots(n_test, 1, figsize=(12, 12), sharex=True)
fig.suptitle(f"Integrated Test Trajectories")
for i, i_test in enumerate(test_ids):
axs[i].set_title(f"Test Trajectory {i_test + 1}")
# for i in range(2):
axs[i].plot(t_test[i_test], z_test[i_test][:, state_id], color="blue")
axs[i].plot(
sampled_times[i][0],
mean_latent[i, :, state_id],
color="red",
linestyle="--",
)
axs[i].fill_between(
sampled_times[i][0],
mean_latent_samples[i][:, state_id]
- 3 * std_latent_samples[i][:, state_id],
mean_latent_samples[i][:, state_id]
+ 3 * std_latent_samples[i][:, state_id],
color="red",
alpha=0.3,
)
axs[i].set_xlabel("$t$")
axs[i].set_ylabel("$z$")
plt.tight_layout()
plt.show()