import logging
import tensorflow as tf
from .base_model import BaseModel
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
[docs]
class IdentificationNetwork(BaseModel):
[docs]
def __init__(
self,
sindy_layer,
x,
mu=None,
scaling="individual",
second_order=True,
l_dz: float = 1,
l_int: float = 0,
dt=0,
dtype="float32",
**kwargs,
):
"""
Identification network using a SINDy layer.
Parameters
----------
sindy_layer : SindyLayer
SINDy-compatible layer used to model system dynamics.
x : array-like
Example input data used to infer shapes.
mu : array-like, optional
Optional control/parameter inputs.
scaling : str, optional
Scaling strategy for inputs.
second_order : bool, optional
Whether the underlying system is second-order.
l_dz : float, optional
Weight for latent derivative loss.
l_int : float, optional
Weight for integration consistency loss.
dt : float, optional
Time-step for finite differences.
dtype : str, optional
Keras float dtype.
**kwargs
Forwarded to the base model.
"""
# assert that input arguments are valid
self.assert_arguments(locals())
tf.keras.backend.set_floatx(dtype)
self.dtype_ = dtype
super(IdentificationNetwork, self).__init__(**kwargs)
if not hasattr(self, "config"):
self._init_to_config(locals())
self.sindy_layer = sindy_layer
self.second_order = second_order
# weighting of the different losses
self.l_dz, self.l_int = l_dz, l_int
self.dt = dt
self.scaling = scaling
# create the model
self.x_shape = x.shape[1:]
if len(self.x_shape) == 1:
self.flatten, self.unflatten = self.flatten_dummy, self.flatten_dummy
elif len(self.x_shape) == 2:
self.flatten, self.unflatten = self.flatten3d, self.unflatten3d
x = self.flatten(x)
# some subclasses initialize weights before building the model
if hasattr(self, "init_weights"):
self.init_weights()
self.build_model(x, mu)
# create loss tracker
self.create_loss_trackers()
[docs]
def create_loss_trackers(self):
"""
Initialize loss trackers used during training.
"""
self.loss_trackers = dict()
self.loss_trackers["loss"] = tf.keras.metrics.Mean(name="loss")
self.loss_trackers["dz"] = tf.keras.metrics.Mean(name="dz")
if self.l_int > 0:
self.loss_trackers["int"] = tf.keras.metrics.Mean(name="int")
self.loss_trackers["reg"] = tf.keras.metrics.Mean(name="reg")
# update dict with sindy layer loss trackers
self.loss_trackers.update(self.sindy_layer.loss_trackers)
[docs]
def get_trainable_weights(self):
"""
Return trainable variables for the identification-only model.
Returns
-------
list
List of trainable TensorFlow variables (SINDy weights).
"""
return self.sindy.trainable_weights
[docs]
def build_model(self, z, mu):
"""
Build the SINDy model mapping latent variables to their derivatives.
Parameters
----------
z : array-like
Example latent input used to infer shapes.
mu : array-like, optional
Parameter/control inputs for SINDy.
"""
z = tf.keras.Input(shape=(z.shape[1],), dtype=self.dtype_)
# sindy
z_sindy, z_dot = self.build_sindy(z, mu)
# build the models
self.sindy = tf.keras.Model(inputs=z_sindy, outputs=z_dot, name="sindy")
[docs]
def build_loss(self, inputs):
"""
Compute training loss from inputs and apply optimizer steps.
Parameters
----------
inputs : list
List containing state, derivatives, and optional parameter/integration data.
Returns
-------
dict
Dictionary with individual loss components and total loss under key 'loss'.
"""
# second order systems dx_ddt = f(x, dx_dt, mu)
x, dx_dt, dx_ddt, x_int, dx_int, mu, mu_int = self.split_inputs(inputs)
# forward pass
with tf.GradientTape() as tape:
# calculate loss for second order systems (includes two time derivatives)
if self.second_order:
losses = self.get_loss_2nd(x, dx_dt, dx_ddt, mu, x_int, dx_int, mu_int)
# calculate loss for first order systems
else:
losses = self.get_loss(x, dx_dt, mu, x_int, mu_int)
# split trainable variables for autoencoder and dynamics so that you can use separate optimizers
trainable_weights = self.get_trainable_weights()
grads = tape.gradient(losses["loss"], trainable_weights)
# adjust weights for autoencoder
self.optimizer.apply_gradients(zip(grads, trainable_weights))
return losses
[docs]
def get_loss(self, z, dz_dt, mu, z_int=None, mu_int=None):
"""
Calculate loss for first order system.
Parameters
----------
z : array-like of shape (n_samples, n_features)
Full state.
dz_dt : array-like of shape (n_samples, n_features)
Time derivative of state.
mu : array-like of shape (n_samples, n_features)
Control input.
z_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Full state at {t+1,...,t+n_integrationsteps}.
mu_int : array-like of shape (n_samples, n_param, n_integrationsteps), optional
Control input at {t+1,...,t+n_integrationsteps}.
Returns
-------
dict
Dictionary of individual losses including 'loss', 'dz', 'int', 'reg'.
"""
losses = dict(loss=0)
z = tf.cast(z, self.dtype_)
dz_dt = tf.expand_dims(tf.cast(dz_dt, dtype=self.dtype_), axis=-1)
# sindy approximation of the time derivative of the latent variable
sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, None, mu)
dz_dt_sindy = tf.expand_dims(sindy_pred, -1)
# SINDy consistency loss
if self.l_int:
int_loss = self.get_int_loss([z_int, mu_int])
losses["int"] = int_loss
losses["loss"] += int_loss
# calculate losses
reg_loss = tf.reduce_sum(self.losses)
dz_loss = self.l_dz * self.compute_loss(
None, tf.concat([dz_dt], axis=1), tf.concat([dz_dt_sindy], axis=1)
)
losses["reg"] = reg_loss
losses["dz"] = dz_loss
losses["loss"] += dz_loss + reg_loss
# calculate kl divergence for variational sindy
if sindy_mean is not None:
kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var)
losses["kl_sindy"] = kl_loss_sindy
losses["loss"] += kl_loss_sindy
return losses
[docs]
def get_loss_2nd(
self, z, dz_dt, dz_ddt, mu, z_int=None, dz_dt_int=None, mu_int=None
):
"""
Calculate loss for second order system.
Parameters
----------
z : array-like of shape (n_samples, n_features)
Full state.
dz_dt : array-like of shape (n_samples, n_features)
Time derivative of state.
dz_ddt : array-like of shape (n_samples, n_features)
Second time derivative of state.
mu : array-like of shape (n_samples, n_param)
Control input.
z_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Full state at {t+1,...,t+n_integrationsteps}.
dz_dt_int : array-like of shape (n_samples, n_features, n_integrationsteps), optional
Time derivative of state at {t+1,...,t+n_integrationsteps}.
mu_int : array-like of shape (n_samples, n_param, n_integrationsteps), optional
Control input at {t+1,...,t+n_integrationsteps}.
Returns
-------
dict
Dictionary of individual losses including 'loss', 'dz', 'int', 'reg'.
"""
losses = dict(loss=0)
z = tf.cast(z, self.dtype_)
dz_dt = tf.expand_dims(tf.cast(dz_dt, dtype=self.dtype_), axis=-1)
dz_ddt = tf.expand_dims(tf.cast(dz_ddt, dtype=self.dtype_), axis=-1)
if dz_dt_int is not None:
dz_dt_int = tf.reshape(dz_dt_int, shape=(-1, dz_dt_int.shape[-1]))
dz_dt_int = tf.expand_dims(tf.cast(dz_dt_int, dtype=self.dtype_), axis=-1)
# sindy approximation of the time derivative of the latent variable
sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, dz_dt, mu)
dz_dt_sindy = tf.expand_dims(sindy_pred[:, : self.reduced_order], -1)
dz_ddt_sindy = tf.expand_dims(sindy_pred[:, self.reduced_order :], -1)
# SINDy consistency loss
if self.l_int:
int_loss = self.get_int_loss([z_int, dz_dt_int, dz_dt_int, mu_int])
losses["int"] = int_loss
losses["loss"] += int_loss
# calculate kl divergence for variational sindy
if sindy_mean is not None:
kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var)
losses["kl_sindy"] = kl_loss_sindy
losses["loss"] += kl_loss_sindy
# calculate losses
reg_loss = tf.reduce_sum(self.losses)
dz_loss = self.l_dz * self.compute_loss(
None,
tf.concat([dz_dt, dz_ddt], axis=1),
tf.concat([dz_dt_sindy, dz_ddt_sindy], axis=1),
)
losses["loss"] += dz_loss + reg_loss
losses["reg"] = reg_loss
losses["dz"] = dz_loss
return losses