import numpy as np
import logging
import tensorflow as tf
from .base_model import BaseModel
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
[docs]
class AutoencoderSindy(BaseModel):
[docs]
def __init__(
self,
sindy_layer,
reduced_order,
x,
mu=None,
scaling="individual",
layer_sizes=None,
activation="selu",
second_order=True,
l1: float = 0,
l2: float = 0,
l_rec: float = 1,
l_dz: float = 1,
l_dx: float = 1,
l_int: float = 0,
dt=0,
dtype="float32",
**kwargs,
):
"""
Autoencoder with SINDy dynamics in the latent space.
This model learns a reduced-order representation using an
autoencoder and identifies latent dynamics using a provided
SINDy layer.
Parameters
----------
sindy_layer : SindyLayer
Instance of a SINDy-compatible layer that computes latent
dynamics and associated losses.
reduced_order : int
Dimensionality of the latent space.
x : array-like
Example input data used to infer shapes and build the model.
mu : array-like, optional
Optional parameter/control inputs associated with the data.
scaling : {'individual', ...}, optional
Method used to scale inputs before encoding.
layer_sizes : list of int, optional
Hidden layer sizes for the encoder/decoder networks.
activation : str or callable, optional
Activation function for encoder/decoder hidden layers.
second_order : bool, optional
If True, the model treats dynamics as second-order.
l1, l2 : float, optional
Kernel regularization coefficients.
l_rec, l_dz, l_dx, l_int : float, optional
Weights for different loss components (reconstruction, derivative,
state derivative, integration consistency).
dt : float, optional
Time-step used for finite-difference approximations.
dtype : str, optional
Floating point precision used by Keras backend.
**kwargs
Additional keyword arguments forwarded to the base model.
"""
# set default layer sizes to avoid mutable default argument
if layer_sizes is None:
layer_sizes = [10, 10, 10]
# assert that input arguments are valid
self.assert_arguments(locals())
tf.keras.backend.set_floatx(dtype)
self.dtype_ = dtype
super(AutoencoderSindy, self).__init__(**kwargs)
if not hasattr(self, "config"):
self._init_to_config(locals())
self.sindy_layer = sindy_layer
self.layer_sizes = layer_sizes
self.activation = tf.keras.activations.get(activation)
self.reduced_order = reduced_order
self.second_order = second_order
self.scaling = scaling
# weighting of the different losses
self.l_rec, self.l_dz, self.l_dx, self.l_int = l_rec, l_dz, l_dx, l_int
self.dt = dt
# kernel regularization weights
self.l1, self.l2 = l1, l2
self.kernel_regularizer = tf.keras.regularizers.l1_l2(l1=l1, l2=l2)
# create the model
self.x_shape = x.shape[1:]
if len(self.x_shape) == 1:
self.flatten, self.unflatten = self.flatten_dummy, self.flatten_dummy
elif len(self.x_shape) == 2:
self.flatten, self.unflatten = self.flatten3d, self.unflatten3d
x = self.flatten(x)
# some subclasses initialize weights before building the model
if hasattr(self, "init_weights"):
self.init_weights()
self.build_model(x, mu)
# create loss tracker
self.create_loss_trackers()
[docs]
def assert_arguments(self, arguments):
"""
Validate initialization arguments.
Parameters
----------
arguments : dict
Mapping of argument names to values (locals() from initializer).
"""
# base class asserts
super(AutoencoderSindy, self).assert_arguments(arguments)
# additional asserts for the autoencoder
assert isinstance(
arguments["reduced_order"], int
), "reduced_order must be an integer"
assert arguments["scaling"] in self._scaling_methods, (
f"scaling must be one of " f"{self._scaling_methods}"
)
# network architecture
assert isinstance(
arguments["layer_sizes"], list
), "layer_sizes must be a list of integers"
for layer_size in arguments["layer_sizes"]:
assert isinstance(layer_size, int), "layer_sizes must be a list of integers"
# loss weights
for scale_factor in ["l1", "l2", "l_rec", "l_dz", "l_dx", "l_int"]:
assert type(arguments[scale_factor]) in (
float,
int,
), f"{scale_factor} must be of type int/float"
[docs]
def create_loss_trackers(self):
"""
Initialize Keras metric objects for logging losses during training.
Adds trackers depending on which loss components are enabled.
"""
self.loss_trackers = dict()
self.loss_trackers["loss"] = tf.keras.metrics.Mean(name="loss")
self.loss_trackers["rec"] = tf.keras.metrics.Mean(name="rec")
if self.l_dz > 0:
self.loss_trackers["dz"] = tf.keras.metrics.Mean(name="dz")
if self.l_dx > 0:
self.loss_trackers["dx"] = tf.keras.metrics.Mean(name="dx")
if self.l_int > 0:
self.loss_trackers["int"] = tf.keras.metrics.Mean(name="int")
self.loss_trackers["reg"] = tf.keras.metrics.Mean(name="reg")
# update dict with sindy layer loss trackers
if self.l_dx > 0 or self.l_dz > 0:
self.loss_trackers.update(self.sindy_layer.loss_trackers)
[docs]
def compile(
self,
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
loss=tf.keras.losses.BinaryCrossentropy(),
sindy_optimizer=None,
**kwargs,
):
"""
Compile the model and optionally configure a separate optimizer for the SINDy part.
Parameters
----------
optimizer : tf.keras.optimizers.Optimizer or compatible, optional
Optimizer for the autoencoder parameters.
loss : tf.keras.losses.Loss or callable, optional
Loss function for reconstruction.
sindy_optimizer : tf.keras.optimizers.Optimizer or compatible, optional
Optimizer for the SINDy parameters. If None, the main optimizer
will be used to build a SINDy optimizer with the same configuration.
"""
super(AutoencoderSindy, self).compile(optimizer=optimizer, loss=loss, **kwargs)
if sindy_optimizer is None:
self.sindy_optimizer = tf.keras.optimizers.get(optimizer)
# in case we call the optimizer to update different parts of the model separately we need to build it first
trainable_weights = self.get_trainable_weights()
self.sindy_optimizer.build(trainable_weights)
else:
self.sindy_optimizer = tf.keras.optimizers.get(sindy_optimizer)
[docs]
@staticmethod
def reconstruction_loss(x, x_pred):
"""
Calculate the reconstruction loss as mean squared error.
Parameters
----------
x : array-like
Ground-truth inputs.
x_pred : array-like
Reconstructed inputs.
Returns
-------
tf.Tensor
Mean squared error between x and x_pred.
"""
return tf.reduce_mean(tf.square(x - x_pred))
[docs]
def get_trainable_weights(self):
"""
Return trainable variables for optimizer updates.
Returns
-------
list
List of trainable TensorFlow variables for encoder, decoder and SINDy.
"""
return (
self.encoder.trainable_weights
+ self.decoder.trainable_weights
+ self.sindy.trainable_weights
)
[docs]
def build_model(self, x, mu):
"""
Assemble the encoder, decoder and SINDy sub-models.
Parameters
----------
x : array-like
Example input used to determine shapes.
mu : array-like, optional
Parameter inputs for the SINDy layer.
"""
x = tf.cast(x, dtype=self.dtype_)
# encoder
x_input, z = self.build_encoder(x)
# sindy
z_sindy, z_dot = self.build_sindy(z, mu)
# build the decoder
x = self.build_decoder(z)
# build the models
self.encoder = tf.keras.Model(inputs=x_input, outputs=z, name="encoder")
self.decoder = tf.keras.Model(inputs=z, outputs=x, name="decoder")
self.sindy = tf.keras.Model(inputs=z_sindy, outputs=z_dot, name="sindy")
[docs]
def build_encoder(self, x):
"""
Build a fully connected encoder with layers of specified sizes.
Parameters
----------
x : tf.Tensor or array-like
Input to the autoencoder.
Returns
-------
tuple of tf.Tensor
Tuple containing (x_input, z) where x_input is the input layer and z is the latent representation.
"""
x_input = tf.keras.Input(shape=(x.shape[1],), dtype=self.dtype_)
z = x_input
for n_neurons in self.layer_sizes:
z = tf.keras.layers.Dense(
n_neurons,
activation=self.activation,
kernel_regularizer=self.kernel_regularizer,
)(z)
z = tf.keras.layers.Dense(
self.reduced_order,
activation="linear",
kernel_regularizer=self.kernel_regularizer,
)(z)
return x_input, z
[docs]
def build_decoder(self, z):
"""
Build a fully connected decoder with reversed layer sizes.
Parameters
----------
z : tf.Tensor
Latent representation.
Returns
-------
tf.Tensor
Reconstructed output.
"""
# new decoder
x_ = z
for n_neurons in reversed(self.layer_sizes):
x_ = tf.keras.layers.Dense(
n_neurons,
activation=self.activation,
kernel_regularizer=self.kernel_regularizer,
)(x_)
x = tf.keras.layers.Dense(
self.x_shape[0],
activation="linear",
kernel_regularizer=self.kernel_regularizer,
)(x_)
return x
[docs]
def build_loss(self, inputs):
"""
Build and compute the loss for the autoencoder-SINDy model.
Splits input into state, its derivative and the parameters, performs the forward pass,
calculates the loss, and updates the weights.
Parameters
----------
inputs : list of array-like
List of input arrays containing states, derivatives, and parameters.
Returns
-------
dict
Dictionary of computed losses.
"""
# second order systems dx_ddt = f(x, dx_dt, mu)
x, dx_dt, dx_ddt, x_int, dx_int, mu, mu_int = self.split_inputs(inputs)
# forward pass
with tf.GradientTape() as tape:
# only perform reconstruction if no identification loss is used
if self.l_dx == 0 and self.l_dz == 0:
losses = self.get_loss_rec(x)
# calculate loss for second order systems (includes two time derivatives)
elif self.second_order:
losses = self.get_loss_2nd(x, dx_dt, dx_ddt, mu, x_int, dx_int, mu_int)
# calculate loss for first order systems
else:
losses = self.get_loss(x, dx_dt, mu, x_int, mu_int)
# split trainable variables for autoencoder and dynamics so that you can use separate optimizers
trainable_weights = self.get_trainable_weights()
# grads = tape.gradient(losses["loss"], trainable_weights)
# self.optimizer.apply_gradients(zip(grads, trainable_weights))
n_ae_weights = len(self.encoder.trainable_weights) + len(
self.decoder.trainable_weights
)
grads = tape.gradient(losses["loss"], trainable_weights)
grads_autoencoder = grads[:n_ae_weights]
grads_sindy = grads[n_ae_weights:]
# adjust weights for autoencoder
self.optimizer.apply_gradients(
zip(grads_autoencoder, trainable_weights[:n_ae_weights])
)
# in case of only reconstructing the data without dynamics there won't be gradients for the dynamics
if self.l_dx > 0 or self.l_dz > 0:
# adjust sindy weights with separate optimizer
self.sindy_optimizer.apply_gradients(
zip(grads_sindy, trainable_weights[n_ae_weights:])
)
return losses
[docs]
def calc_latent_time_derivatives(
self, x, dx_dt, dx_ddt=None, mean_or_sample="mean"
):
"""
Calculate time derivatives of latent variables given time derivatives of the inputs.
Parameters
----------
x : array-like
Full state of shape ``(n_samples, n_features, ...)``.
dx_dt : array-like
First time derivative of the full state.
dx_ddt : array-like, optional
Second time derivative of the full state, if available.
mean_or_sample : {'mean', 'sample'}, optional
Whether to use the mean or a sample from the encoder distribution.
Returns
-------
tuple
``(z, dz_dt[, dz_ddt])`` where the last item is returned only if ``dx_ddt`` is provided.
"""
# in case the variables are not vectorized but in their physical geometrical description flatten them
if len(x.shape) > 2:
if dx_ddt is not None:
x, dx_dt, dx_ddt = [self.flatten(x) for x in [x, dx_dt, dx_ddt]]
dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1)
else:
x, dx_dt = [self.flatten(x) for x in [x, dx_dt]]
x = tf.cast(x, self.dtype_)
dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1)
if dx_ddt is not None:
dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1)
# forward pass of encoder and time derivative of latent variable
if dx_ddt is not None:
with tf.GradientTape() as t11:
with tf.GradientTape() as t12:
t12.watch(x)
z = self.encode(x, mean_or_sample=mean_or_sample)
dz_dx = t12.batch_jacobian(z, x)
dz_ddx = t11.batch_jacobian(dz_dx, x)
else:
with tf.GradientTape() as t12:
t12.watch(x)
z = self.encode(x, mean_or_sample=mean_or_sample)
dz_dx = t12.batch_jacobian(z, x)
# calculate first time derivative of the latent variable by application of the chain rule
# dz_dt = dz_dx @ dx_dt
# = dz_dxr @ (V^T @ dx_dt)
dz_dt = dz_dx @ dx_dt
dz_dt = tf.squeeze(dz_dt, axis=2)
# calculate second time derivative of the latent variable
# dz_ddt = dz_ddz @ (V^T @ dx_dt) + dz_dx @ dx_ddt
# = dz_ddxr @ (V^T @ dx_dt) @ (V^T @ dx_dt) + dz_dxr @ (V^T @ dx_ddt)
if dx_ddt is not None:
dz_ddt = tf.squeeze(
dz_ddx @ tf.expand_dims(dx_dt, axis=1), axis=3
) @ dx_dt + dz_dx @ tf.expand_dims(tf.squeeze(dx_ddt, axis=-1), axis=-1)
dz_ddt = tf.squeeze(dz_ddt, axis=2)
return z.numpy(), dz_dt.numpy(), dz_ddt.numpy()
else:
return z.numpy(), dz_dt.numpy()
def _training_encoding(self, x, losses):
"""
Encode input to latent representation during training.
For compatibility with the class, this method only returns the latent variable
but not the mean and log variance. The mean and log variance are stored in the
class attributes so that they can be accessed by the get_loss method.
Parameters
----------
x : tf.Tensor or array-like
Input data.
losses : dict
Dictionary to store losses.
Returns
-------
tuple
Tuple containing (z, losses) where z is the latent representation.
"""
z = self.encoder(x)
return z, losses
[docs]
def get_loss_rec(self, x):
"""
Calculate reconstruction loss of autoencoder.
Parameters
----------
x : array-like of shape (n_samples, n_features)
Full state.
Returns
-------
dict
Dictionary of losses including 'rec', 'reg', and 'loss'.
"""
losses = dict(loss=0)
z, losses = self._training_encoding(x, losses)
x_pred = self.decoder(z)
# calculate losses
rec_loss = self.l_rec * self.reconstruction_loss(
x, x_pred
) # reconstruction loss
losses["rec"] = rec_loss
reg_loss = tf.reduce_sum(self.losses) # regularization loss
losses["reg"] = reg_loss
losses["loss"] += rec_loss + reg_loss # total loss
return losses
[docs]
def get_loss(self, x, dx_dt, mu, x_int=None, mu_int=None):
"""
Calculate loss for first order system.
Parameters
----------
x : array-like of shape (n_samples, n_features)
Full state.
dx_dt : array-like of shape (n_samples, n_features)
Time derivative of state.
mu : array-like of shape (n_samples, n_features)
Control input.
x_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Full state at {t+1,...,t+n_integrationsteps}.
mu_int : array-like of shape (n_samples, n_param, n_integrationsteps), optional
Control input at {t+1,...,t+n_integrationsteps}.
Returns
-------
dict
Dictionary of individual losses (rec_loss, dz_loss, dx_loss, int_loss, loss).
"""
losses = dict(loss=0)
x = tf.cast(x, self.dtype_)
dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1)
# forward pass of encoder and time derivative of latent variable
with tf.GradientTape() as t12:
t12.watch(x)
z, losses = self._training_encoding(x, losses)
dz_dx = t12.batch_jacobian(z, x)
# calculate first time derivative of the latent variable by application of the chain rule
# dz_ddt = dz_dx @ dx_dt
dz_dt = dz_dx @ dx_dt
# sindy approximation of the time derivative of the latent variable
sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, None, mu)
dz_dt_sindy = tf.expand_dims(sindy_pred, -1)
# forward pass of decoder and time derivative of reconstructed variable
with tf.GradientTape() as t22:
t22.watch(z)
x_ = self.decoder(z)
if self.l_dx > 0:
dx_dz = t22.batch_jacobian(x_, z)
# calculate first time derivative of the reconstructed state by application of the chain rule
dxf_dt = dx_dz @ dz_dt_sindy
dx_loss = self.l_dx * self.compute_loss(
None, tf.concat([dxf_dt], axis=1), tf.concat([dx_dt], axis=1)
)
losses["dx"] = dx_loss
losses["loss"] += dx_loss
# SINDy consistency loss
if self.l_int:
int_loss = self.get_int_loss([x_int, mu_int])
losses["int"] = int_loss
losses["loss"] += int_loss
# calculate losses
reg_loss = tf.reduce_sum(self.losses)
rec_loss = self.l_rec * self.reconstruction_loss(x, x_)
# dz_loss = self.l_dz * self.compute_loss(None, tf.concat([dz_dt], axis=1),
# tf.concat([dz_dt_sindy], axis=1))
dz_loss = tf.math.log(
2 * np.pi * tf.reduce_mean(tf.keras.losses.mse(dz_dt, dz_dt_sindy)) + 1
)
losses["loss"] += rec_loss + dz_loss + reg_loss
# calculate kl divergence for variational sindy
if sindy_mean is not None:
kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var)
losses["kl_sindy"] = kl_loss_sindy
losses["loss"] += kl_loss_sindy
losses["reg"] = reg_loss
losses["rec"] = rec_loss
losses["dz"] = dz_loss
return losses
[docs]
def get_loss_2nd(
self, x, dx_dt, dx_ddt, mu, x_int=None, dx_dt_int=None, mu_int=None
):
"""
Calculate loss for second order system.
Parameters
----------
x : array-like of shape (n_samples, n_features)
Full state.
dx_dt : array-like of shape (n_samples, n_features)
Time derivative of state.
dx_ddt : array-like of shape (n_samples, n_features)
Second time derivative of state.
mu : array-like of shape (n_samples, n_param)
Control input.
x_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Full state at {t+1,...,t+n_integrationsteps}.
dx_dt_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Time derivative of state at {t+1,...,t+n_integrationsteps}.
mu_int : array-like of shape (n_samples, n_param, n_integrationsteps), optional
Control input at {t+1,...,t+n_integrationsteps}.
Returns
-------
dict
Dictionary of individual losses (rec_loss, dz_loss, dx_loss, int_loss, loss).
"""
losses = dict(loss=0)
x = tf.cast(x, self.dtype_)
dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1)
dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1)
# forward pass of encoder and time derivative of latent variable
with tf.GradientTape() as t11:
with tf.GradientTape() as t12:
t12.watch(x)
z, losses = self._training_encoding(x, losses)
t11.watch(x)
dz_dx = t12.batch_jacobian(z, x)
dz_ddx = t11.batch_jacobian(dz_dx, x)
# calculate first time derivative of the latent variable by application of the chain rule
# dz_ddt = dz_dx @ dx_dt
dz_dt = dz_dx @ dx_dt
# calculate second time derivative of the latent variable
# dz_ddt = dz_ddz @ dx_dt + dz_dx @ dx_ddt
dz_ddt = (
tf.squeeze(dz_ddx @ tf.expand_dims(dx_dt, axis=1), axis=3) @ dx_dt
+ dz_dx @ dx_ddt
)
# sindy approximation of the time derivative of the latent variable
sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, dz_dt, mu)
dz_dt_sindy = tf.expand_dims(sindy_pred[:, : self.reduced_order], -1)
dz_ddt_sindy = tf.expand_dims(sindy_pred[:, self.reduced_order :], -1)
# forward pass of decoder and time derivative of reconstructed variable
with tf.GradientTape() as t21:
t21.watch(z)
with tf.GradientTape() as t22:
t22.watch(z)
x_ = self.decoder(z)
if self.l_dx > 0:
dx_dz = t22.batch_jacobian(x_, z)
if self.l_dx > 0:
dx_ddz = t21.batch_jacobian(dx_dz, z)
# calculate first time derivative of the reconstructed state by application of the chain rule
dxf_dt = dx_dz @ dz_dt_sindy
# calculate second time derivative of the reconstructed state by application of the chain rule
dxf_ddt = (
tf.squeeze((dx_ddz @ tf.expand_dims(dz_dt_sindy, axis=1)), axis=3)
[docs]
@ dz_dt_sindy
) + dx_dz @ dz_ddt_sindy
dx_loss = self.l_dx * self.compute_loss(
None,
tf.concat([dxf_dt, dxf_ddt], axis=1),
tf.concat([dx_dt, dx_ddt], axis=1),
)
losses["dx"] = dx_loss
losses["loss"] += dx_loss
# SINDy consistency loss
if self.l_int:
int_loss = self.get_int_loss([x_int, dx_dt_int, mu_int])
losses["int"] = int_loss
losses["loss"] += int_loss
# calculate kl divergence for variational sindy
if sindy_mean is not None:
kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var)
losses["kl_sindy"] = kl_loss_sindy
losses["loss"] += kl_loss_sindy
# calculate losses
reg_loss = tf.reduce_sum(self.losses)
rec_loss = self.l_rec * self.reconstruction_loss(x, x_)
dz_loss = self.l_dz * self.compute_loss(
None,
tf.concat([dz_dt, dz_ddt], axis=1),
tf.concat([dz_dt_sindy, dz_ddt_sindy], axis=1),
)
losses["loss"] += rec_loss + dz_loss + reg_loss
losses["reg"] = reg_loss
losses["rec"] = rec_loss
losses["dz"] = dz_loss
return losses
# @tf.function
def encode(self, x, training=False, mean_or_sample="mean"):
"""
Encode full state to latent variables.
Parameters
----------
x : array-like
Full state input of shape ``(n_samples, n_features, ...)``.
training : bool, optional
If True, run under training mode.
mean_or_sample : {'mean', 'sample'}, optional
Return either the posterior mean or a sampled latent vector.
Returns
-------
tf.Tensor
Latent representation with shape ``(n_samples, reduced_order)``.
"""
x = self.flatten(x)
z = self.encoder(x)
return z
# @tf.function
[docs]
def decode(self, z):
"""
Decode latent variable to full state.
Parameters
----------
z : array-like of shape (n_samples, reduced_order)
Latent variable.
Returns
-------
array-like of shape (n_samples, n_features, n_dof_per_feature)
Reconstructed full state.
"""
x_rec = self.decoder(z)
return self.unflatten(x_rec)
[docs]
def reconstruct(self, x, _=None):
"""
Reconstruct full state from inputs.
Parameters
----------
x : array-like
Full state input of shape ``(n_samples, n_features, ...)``.
_ : optional
Placeholder for API compatibility.
Returns
-------
array-like
Reconstructed full state with the original shape.
"""
z = self.encode(x)
x_rec = self.decode(z)
return x_rec