Source code for vindy.networks.autoencoder_sindy

import numpy as np
import logging
import tensorflow as tf
from .base_model import BaseModel

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


[docs] class AutoencoderSindy(BaseModel): def __init__( self, sindy_layer, reduced_order, x, mu=None, scaling="individual", layer_sizes=[10, 10, 10], activation="selu", second_order=True, l1: float = 0, l2: float = 0, l_rec: float = 1, l_dz: float = 1, l_dx: float = 1, l_int: float = 0, dt=0, dtype="float32", **kwargs, ): """ Initialize the AutoencoderSindy model. Parameters ---------- sindy_layer : SindyLayer Layer to identify the governing equations of the latent dynamics. reduced_order : int Order of the reduced model. x : array-like Input data. mu : array-like, optional Parameter data (default is None). scaling : str, optional Scaling method (default is "individual"). layer_sizes : list of int, optional Sizes of the layers in the encoder and decoder (default is [10, 10, 10]). activation : str, optional Activation function (default is "selu"). second_order : bool, optional Whether the system is second order (default is True). l1 : float, optional L1 regularization factor for the autoencoder layers (default is 0). l2 : float, optional L2 regularization factor for the autoencoder layers (default is 0). l_rec : float, optional Weight of the reconstruction loss (default is 1). l_dz : float, optional Weight of the derivative loss (default is 1). l_dx : float, optional Weight of the derivative loss (default is 1). l_int : float, optional Weight of the integration loss (default is 0). dt : float, optional Time step (default is 0). dtype : str, optional Data type (default is "float32"). kwargs : dict Additional arguments. """ # assert that input arguments are valid self.assert_arguments(locals()) tf.keras.backend.set_floatx(dtype) self.dtype_ = dtype super(AutoencoderSindy, self).__init__(**kwargs) if not hasattr(self, "config"): self._init_to_config(locals()) self.sindy_layer = sindy_layer self.layer_sizes = layer_sizes self.activation = tf.keras.activations.get(activation) self.reduced_order = reduced_order self.second_order = second_order self.scaling = scaling # weighting of the different losses self.l_rec, self.l_dz, self.l_dx, self.l_int = l_rec, l_dz, l_dx, l_int self.dt = dt # kernel regularization weights self.l1, self.l2 = l1, l2 self.kernel_regularizer = tf.keras.regularizers.l1_l2(l1=l1, l2=l2) # create the model self.x_shape = x.shape[1:] if len(self.x_shape) == 1: self.flatten, self.unflatten = self.flatten_dummy, self.flatten_dummy elif len(self.x_shape) == 2: self.flatten, self.unflatten = self.flatten3d, self.unflatten3d x = self.flatten(x) # some subclasses initialize weights before building the model if hasattr(self, "init_weights"): self.init_weights() self.build_model(x, mu) # create loss tracker self.create_loss_trackers() @property def _scaling_methods(self): return ["individual", "global", "individual_sqrt", "none"]
[docs] def assert_arguments(self, arguments): """ Asserts that the arguments passed to the model are valid :param arguments: all arguments passed to the model :return: """ # base class asserts super(AutoencoderSindy, self).assert_arguments(arguments) # additional asserts for the autoencoder assert isinstance( arguments["reduced_order"], int ), "reduced_order must be an integer" assert arguments["scaling"] in self._scaling_methods, ( f"scaling must be one of " f"{self._scaling_methods}" ) # network architecture assert isinstance( arguments["layer_sizes"], list ), "layer_sizes must be a list of integers" for layer_size in arguments["layer_sizes"]: assert isinstance(layer_size, int), "layer_sizes must be a list of integers" # loss weights for scale_factor in ["l1", "l2", "l_rec", "l_dz", "l_dx", "l_int"]: assert type(arguments[scale_factor]) in ( float, int, ), f"{scale_factor} must be of type int/float"
[docs] def create_loss_trackers(self): """ Create loss trackers for the model. """ self.loss_trackers = dict() self.loss_trackers["loss"] = tf.keras.metrics.Mean(name="loss") self.loss_trackers["rec"] = tf.keras.metrics.Mean(name="rec") if self.l_dz > 0: self.loss_trackers["dz"] = tf.keras.metrics.Mean(name="dz") if self.l_dx > 0: self.loss_trackers["dx"] = tf.keras.metrics.Mean(name="dx") if self.l_int > 0: self.loss_trackers["int"] = tf.keras.metrics.Mean(name="int") self.loss_trackers["reg"] = tf.keras.metrics.Mean(name="reg") # update dict with sindy layer loss trackers if self.l_dx > 0 or self.l_dz > 0: self.loss_trackers.update(self.sindy_layer.loss_trackers)
[docs] def compile( self, optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss=tf.keras.losses.BinaryCrossentropy(), sindy_optimizer=None, **kwargs, ): """ Wrapper for the compile function of the keras model to enable the use of different optimizers for different parts of the model. Parameters ---------- optimizer : tf.keras.optimizers.Optimizer, optional Optimizer for the autoencoder (default is Adam with learning rate 1e-3). loss : tf.keras.losses.Loss, optional Loss function for the autoencoder (default is BinaryCrossentropy). sindy_optimizer : tf.keras.optimizers.Optimizer, optional Optimizer for the SINDy part of the model (default is None). kwargs : dict Additional arguments for the compile function. """ super(AutoencoderSindy, self).compile(optimizer=optimizer, loss=loss, **kwargs) if sindy_optimizer is None: self.sindy_optimizer = tf.keras.optimizers.get(optimizer) # in case we call the optimizer to update different parts of the model separately we need to build it first trainable_weights = self.get_trainable_weights() self.sindy_optimizer.build(trainable_weights) else: self.sindy_optimizer = tf.keras.optimizers.get(sindy_optimizer)
# self.reconstrution_loss = self.compiled_loss
[docs] @staticmethod def reconstruction_loss(x, x_pred): """ Calculate the reconstruction loss. Parameters ---------- x : array-like of shape (n_samples, n_features) Original input data. x_pred : array-like of shape (n_samples, n_features) Reconstructed input data. Returns ------- tf.Tensor Reconstruction loss. """ return tf.reduce_mean(tf.square(x - x_pred))
[docs] def get_trainable_weights(self): """ Return the trainable weights of the model. Returns ------- list List of trainable weights. """ return ( self.encoder.trainable_weights + self.decoder.trainable_weights + self.sindy.trainable_weights )
[docs] def build_model(self, x, mu): """ Build the model. Parameters ---------- x : array-like Full state. mu : array-like Parameters. """ x = tf.cast(x, dtype=self.dtype_) # encoder x_input, z = self.build_encoder(x) # sindy z_sindy, z_dot = self.build_sindy(z, mu) # build the decoder x = self.build_decoder(z) # build the models self.encoder = tf.keras.Model(inputs=x_input, outputs=z, name="encoder") self.decoder = tf.keras.Model(inputs=z, outputs=x, name="decoder") self.sindy = tf.keras.Model(inputs=z_sindy, outputs=z_dot, name="sindy")
[docs] def define_scaling(self, x): """ Define the scaling factor for given training data. Parameters ---------- x : array-like Training data. """ # scale the data if requested if self.scaling == "individual": # scale by squareroot of individual max value self.scale_factor = 1 / (tf.reduce_max(tf.abs(x), axis=0)) # self.model_noise_factor = 1 / (tf.reduce_max(tf.abs(x), axis=0)) # replace inf with ones to avoid division by zero self.scale_factor = tf.where( tf.math.is_inf(self.scale_factor), tf.ones_like(self.scale_factor), self.scale_factor, ) elif self.scaling == "individual_sqrt": # scale by squareroot of individual max value self.scale_factor = 1 / tf.sqrt(tf.reduce_max(tf.abs(x), axis=0)) # self.model_noise_factor = 1 / (tf.reduce_max(tf.abs(x), axis=0)) # replace inf with ones to avoid division by zero self.scale_factor = tf.where( tf.math.is_inf(self.scale_factor), tf.ones_like(self.scale_factor), self.scale_factor, ) elif self.scaling == "global": # scale all features by global max value self.scale_factor = 1.0 / tf.reduce_max(tf.abs(x)) else: self.scale_factor = 1.0
[docs] def scale(self, x): """ Scale the data. Parameters ---------- x : array-like Input data. Returns ------- array-like Scaled data. """ # scale the data x = x * self.scale_factor return x
[docs] def rescale(self, x): """ Rescale the data. Parameters ---------- x : array-like Scaled data. Returns ------- array-like Rescaled data. """ # rescale the data x = x / self.scale_factor return x
[docs] def build_encoder(self, x): """ Build a fully connected encoder with layers of size layer_sizes. Parameters ---------- x : array-like Input to the autoencoder. Returns ------- x_input : tf.keras.Input Input tensor. z : tf.Tensor Latent variable. """ x_input = tf.keras.Input(shape=(x.shape[1],), dtype=self.dtype_) z = x_input for n_neurons in self.layer_sizes: z = tf.keras.layers.Dense( n_neurons, activation=self.activation, kernel_regularizer=self.kernel_regularizer, )(z) z = tf.keras.layers.Dense( self.reduced_order, activation="linear", kernel_regularizer=self.kernel_regularizer, )(z) return x_input, z
[docs] def build_decoder(self, z): """ Build a fully connected decoder with layers of reversed sizes in layer_sizes. Parameters ---------- z : array-like Latent variable. Returns ------- tf.Tensor Reconstructed full state. """ # new decoder x_ = z for n_neurons in reversed(self.layer_sizes): x_ = tf.keras.layers.Dense( n_neurons, activation=self.activation, kernel_regularizer=self.kernel_regularizer, )(x_) x = tf.keras.layers.Dense( self.x_shape[0], activation="linear", kernel_regularizer=self.kernel_regularizer, )(x_) return x
[docs] @tf.function def build_loss(self, inputs): """ Split input into state, its derivative, and the parameters, perform the forward pass, calculate the loss, and update the weights. Parameters ---------- inputs : list List of array-like objects. Returns ------- dict Dictionary of losses. """ # second order systems dx_ddt = f(x, dx_dt, mu) x, dx_dt, dx_ddt, x_int, dx_int, mu, mu_int = self.split_inputs(inputs) # forward pass with tf.GradientTape() as tape: # only perform reconstruction if no identification loss is used if self.l_dx == 0 and self.l_dz == 0: losses = self.get_loss_rec(x) # calculate loss for second order systems (includes two time derivatives) elif self.second_order: losses = self.get_loss_2nd(x, dx_dt, dx_ddt, mu, x_int, dx_int, mu_int) # calculate loss for first order systems else: losses = self.get_loss(x, dx_dt, mu, x_int, mu_int) # split trainable variables for autoencoder and dynamics so that you can use separate optimizers trainable_weights = self.get_trainable_weights() # grads = tape.gradient(losses["loss"], trainable_weights) # self.optimizer.apply_gradients(zip(grads, trainable_weights)) n_ae_weights = len(self.encoder.trainable_weights) + len( self.decoder.trainable_weights ) grads = tape.gradient(losses["loss"], trainable_weights) grads_autoencoder = grads[:n_ae_weights] grads_sindy = grads[n_ae_weights:] # adjust weights for autoencoder self.optimizer.apply_gradients( zip(grads_autoencoder, trainable_weights[:n_ae_weights]) ) # in case of only reconstructing the data without dynamics there won't be gradients for the dynamics if self.l_dx > 0 or self.l_dz > 0: # adjust sindy weights with separate optimizer self.sindy_optimizer.apply_gradients( zip(grads_sindy, trainable_weights[n_ae_weights:]) ) return losses
[docs] def calc_latent_time_derivatives(self, x, dx_dt, dx_ddt=None): """ Calculate time derivatives of latent variables given the time derivatives of the input variables (used for comparison with SINDy) Parameters ---------- x : array-like Full state. dx_dt : array-like Time derivative of state. dx_ddt : array-like, optional Second time derivative of state (default is None). Returns ------- tuple Latent variables and their time derivatives. """ # in case the variables are not vectorized but in their physical geometrical description flatten them if len(x.shape) > 2: if dx_ddt is not None: x, dx_dt, dx_ddt = [self.flatten(x) for x in [x, dx_dt, dx_ddt]] dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1) else: x, dx_dt = [self.flatten(x) for x in [x, dx_dt]] x = tf.cast(x, self.dtype_) dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1) if dx_ddt is not None: dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable if dx_ddt is not None: with tf.GradientTape() as t11: with tf.GradientTape() as t12: t12.watch(x) z = self.encode(x) dz_dx = t12.batch_jacobian(z, x) dz_ddx = t11.batch_jacobian(dz_dx, x) else: with tf.GradientTape() as t12: t12.watch(x) z = self.encode(x) dz_dx = t12.batch_jacobian(z, x) # calculate first time derivative of the latent variable by application of the chain rule # dz_dt = dz_dx @ dx_dt # = dz_dxr @ (V^T @ dx_dt) dz_dt = dz_dx @ dx_dt dz_dt = tf.squeeze(dz_dt, axis=2) # calculate second time derivative of the latent variable # dz_ddt = dz_ddz @ (V^T @ dx_dt) + dz_dx @ dx_ddt # = dz_ddxr @ (V^T @ dx_dt) @ (V^T @ dx_dt) + dz_dxr @ (V^T @ dx_ddt) if dx_ddt is not None: dz_ddt = tf.squeeze( dz_ddx @ tf.expand_dims(dx_dt, axis=1), axis=3 ) @ dx_dt + dz_dx @ tf.expand_dims(tf.squeeze(dx_ddt, axis=-1), axis=-1) dz_ddt = tf.squeeze(dz_ddt, axis=2) return z.numpy(), dz_dt.numpy(), dz_ddt.numpy() else: return z.numpy(), dz_dt.numpy()
def _training_encoding(self, x, losses): """ For compatibility with the class we need a method that only returns the latent variable but not the mean and log variance. The mean and log variance are stored in the class attributes so that they can be accessed by the get_loss method. Parameters ---------- x : array-like Full state. losses : dict Dictionary of losses. Returns ------- tf.Tensor Latent variable. """ z = self.encoder(x) return z, losses
[docs] @tf.function def get_loss_rec(self, x): """ Calculate the reconstruction loss. Parameters ---------- x : array-like Full state. Returns ------- tf.Tensor Reconstruction loss. """ losses = dict(loss=0) z, losses = self._training_encoding(x, losses) x_pred = self.decoder(z) # calculate losses rec_loss = self.l_rec * self.reconstruction_loss( x, x_pred ) # reconstruction loss losses["rec"] = rec_loss reg_loss = tf.reduce_sum(self.losses) # regularization loss losses["reg"] = reg_loss losses["loss"] += rec_loss + reg_loss # total loss return losses
[docs] @tf.function def get_loss(self, x, dx_dt, mu, x_int=None, mu_int=None): """ Calculate loss for first order system. Parameters ---------- x : array-like Full state. dx_dt : array-like Time derivative of state. mu : array-like Control input. x_int : array-like, optional Full state at future time steps (default is None). mu_int : array-like, optional Control input at future time steps (default is None). Returns ------- dict Dictionary of losses. """ losses = dict(loss=0) x = tf.cast(x, self.dtype_) dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t12: t12.watch(x) z, losses = self._training_encoding(x, losses) dz_dx = t12.batch_jacobian(z, x) # calculate first time derivative of the latent variable by application of the chain rule # dz_ddt = dz_dx @ dx_dt dz_dt = dz_dx @ dx_dt # sindy approximation of the time derivative of the latent variable sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, None, mu) dz_dt_sindy = tf.expand_dims(sindy_pred, -1) # forward pass of decoder and time derivative of reconstructed variable with tf.GradientTape() as t22: t22.watch(z) x_ = self.decoder(z) if self.l_dx > 0: dx_dz = t22.batch_jacobian(x_, z) # calculate first time derivative of the reconstructed state by application of the chain rule dxf_dt = dx_dz @ dz_dt_sindy dx_loss = self.l_dx * self.compiled_loss( tf.concat([dxf_dt], axis=1), tf.concat([dx_dt], axis=1) ) losses["dx"] = dx_loss losses["loss"] += dx_loss # SINDy consistency loss if self.l_int: int_loss = self.get_int_loss([x_int, mu_int]) losses["int"] = int_loss losses["loss"] += int_loss # calculate losses reg_loss = tf.reduce_sum(self.losses) rec_loss = self.l_rec * self.reconstruction_loss(x, x_) # dz_loss = self.l_dz * self.compiled_loss(tf.concat([dz_dt], axis=1), # tf.concat([dz_dt_sindy], axis=1)) dz_loss = tf.math.log( 2 * np.pi * tf.reduce_mean(tf.keras.losses.mse(dz_dt, dz_dt_sindy)) + 1 ) losses["loss"] += rec_loss + dz_loss + reg_loss # calculate kl divergence for variational sindy if sindy_mean is not None: kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var) losses["kl_sindy"] = kl_loss_sindy losses["loss"] += kl_loss_sindy losses["reg"] = reg_loss losses["rec"] = rec_loss losses["dz"] = dz_loss return losses
[docs] @tf.function def get_loss_2nd( self, x, dx_dt, dx_ddt, mu, x_int=None, dx_dt_int=None, mu_int=None ): """ Calculate loss for second order system. Parameters ---------- x : array-like Full state. dx_dt : array-like Time derivative of state. dx_ddt : array-like Second time derivative of state. mu : array-like Control input. x_int : array-like, optional Full state at future time steps (default is None). dx_dt_int : array-like, optional Time derivative of state at future time steps (default is None). mu_int : array-like, optional Control input at future time steps (default is None). Returns ------- dict Dictionary of losses. """ losses = dict(loss=0) x = tf.cast(x, self.dtype_) dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1) dx_ddt = tf.expand_dims(tf.cast(dx_ddt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t11: with tf.GradientTape() as t12: t12.watch(x) z, losses = self._training_encoding(x, losses) t11.watch(x) dz_dx = t12.batch_jacobian(z, x) dz_ddx = t11.batch_jacobian(dz_dx, x) # calculate first time derivative of the latent variable by application of the chain rule # dz_ddt = dz_dx @ dx_dt dz_dt = dz_dx @ dx_dt # calculate second time derivative of the latent variable # dz_ddt = dz_ddz @ dx_dt + dz_dx @ dx_ddt dz_ddt = ( tf.squeeze(dz_ddx @ tf.expand_dims(dx_dt, axis=1), axis=3) @ dx_dt + dz_dx @ dx_ddt ) # sindy approximation of the time derivative of the latent variable sindy_pred, sindy_mean, sindy_log_var = self.evaluate_sindy_layer(z, dz_dt, mu) dz_dt_sindy = tf.expand_dims(sindy_pred[:, : self.reduced_order], -1) dz_ddt_sindy = tf.expand_dims(sindy_pred[:, self.reduced_order :], -1) # forward pass of decoder and time derivative of reconstructed variable with tf.GradientTape() as t21: t21.watch(z) with tf.GradientTape() as t22: t22.watch(z) x_ = self.decoder(z) if self.l_dx > 0: dx_dz = t22.batch_jacobian(x_, z) if self.l_dx > 0: dx_ddz = t21.batch_jacobian(dx_dz, z) # calculate first time derivative of the reconstructed state by application of the chain rule dxf_dt = dx_dz @ dz_dt_sindy # calculate second time derivative of the reconstructed state by application of the chain rule dxf_ddt = ( tf.squeeze((dx_ddz @ tf.expand_dims(dz_dt_sindy, axis=1)), axis=3)
[docs] @ dz_dt_sindy ) + dx_dz @ dz_ddt_sindy dx_loss = self.l_dx * self.compiled_loss( tf.concat([dxf_dt, dxf_ddt], axis=1), tf.concat([dx_dt, dx_ddt], axis=1), ) losses["dx"] = dx_loss losses["loss"] += dx_loss # SINDy consistency loss if self.l_int: int_loss = self.get_int_loss([x_int, dx_dt_int, mu_int]) losses["int"] = int_loss losses["loss"] += int_loss # calculate kl divergence for variational sindy if sindy_mean is not None: kl_loss_sindy = self.sindy_layer.kl_loss(sindy_mean, sindy_log_var) losses["kl_sindy"] = kl_loss_sindy losses["loss"] += kl_loss_sindy # calculate losses reg_loss = tf.reduce_sum(self.losses) rec_loss = self.l_rec * self.reconstruction_loss(x, x_) dz_loss = self.l_dz * self.compiled_loss( tf.concat([dz_dt, dz_ddt], axis=1), tf.concat([dz_dt_sindy, dz_ddt_sindy], axis=1), ) losses["loss"] += rec_loss + dz_loss + reg_loss losses["reg"] = reg_loss losses["rec"] = rec_loss losses["dz"] = dz_loss return losses
# @tf.function def encode(self, x): """ Encode full state. Parameters ---------- x : array-like of shape (n_samples, n_features, n_dof_per_feature) Full state. Returns ------- array-like of shape (n_samples, reduced_order) Latent variable. """ x = self.flatten(x) z = self.encoder(x) return z
# @tf.function
[docs] def decode(self, z): """ Decode latent variable. Parameters ---------- z : array-like of shape (n_samples, reduced_order) Latent variable. Returns ------- array-like of shape (n_samples, n_features, n_dof_per_feature) Full state. """ x_rec = self.decoder(z) return self.unflatten(x_rec)
[docs] @tf.function def reconstruct(self, x, _=None): """ Reconstruct full state. Parameters ---------- x : array-like of shape (n_samples, n_features, n_dof_per_feature) Full state. Returns ------- array-like of shape (n_samples, n_features, n_dof_per_feature) Reconstructed full state. """ z = self.encode(x) x_rec = self.decode(z) return x_rec