Source code for vindy.layers.vindy_layer

import numpy as np
import logging
import tensorflow as tf
import matplotlib.pyplot as plt
from .sindy_layer import SindyLayer
from vindy.distributions import Gaussian, BaseDistribution

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


[docs] class VindyLayer(SindyLayer):
[docs] def __init__(self, beta=1, priors=Gaussian(0.0, 1.0), **kwargs): """ Layer for variational identification of nonlinear dynamics (VINDy). Approximates the time derivative of the latent variable. Feature libraries are applied to the latent variables and a (sparse) variational inference is performed to obtain the coefficients. Parameters ---------- beta : float or int, default=1 Scaling factor for the KL divergence. priors : BaseDistribution or list of BaseDistribution, default=Gaussian(0.0, 1.0) Prior distribution(s) for the coefficients. **kwargs Additional keyword arguments, see SindyLayer. """ super(VindyLayer, self).__init__(**kwargs) self.assert_additional_args(beta, priors) self.priors = priors self.beta = beta self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_sindy")
[docs] def assert_additional_args(self, beta, priors): """ Validate that the additional arguments are correct. Parameters ---------- beta : float or int Scaling factor for the KL divergence. priors : BaseDistribution or list of BaseDistribution Prior distribution(s) for the coefficients. """ # assert that input arguments are valid that are not checked in the super class assert isinstance(beta, float) or isinstance(beta, int), "beta must be a float" # priors if isinstance(priors, list): assert ( len(priors) == self.n_dofs ), f"Number of priors must match the number of dofs ({self.n_dofs})" for prior in priors: assert isinstance(prior, BaseDistribution), ( "All priors must be an instance inheriting from " "BaseDistribution" ) else: assert isinstance( priors, BaseDistribution ), "priors must be a class inheriting from BaseDistribution"
[docs] def init_weigths(self, kernel_regularizer): super(VindyLayer, self).init_weigths(kernel_regularizer) # initialize the log variance of the coefficients init = tf.random_uniform_initializer(minval=-1, maxval=1) l1, l2 = kernel_regularizer.l1, kernel_regularizer.l2 scale_regularizer = LogVarL1L2Regularizer(l1=l1, l2=l2, dtype=self.dtype_) self.kernel_scale = self.add_weight( name="SINDy_log_scale", initializer=init, shape=self.kernel_shape, dtype=self.dtype_, regularizer=scale_regularizer, )
@property def loss_trackers(self): return dict(kl_sindy=self.kl_loss_tracker) @property def _coeffs(self): """ Get the coefficients of the SINDy layer sampled from the defined distribution. Returns the coefficients sampled from the defined distribution parametrized by the layer's kernel (weights). Returns ------- tuple of tf.Tensor Tuple containing (coeffs, coeffs_mean, coeffs_log_scale). """ # split the kernel into mean and log variance coeffs_mean, coeffs_log_scale = self.kernel, self.kernel_scale # draw samples from the normal distribution if isinstance(self.priors, list): trainable_coeffs = [] for i, prior in enumerate(self.priors): trainable_coeffs.append( prior([coeffs_mean[i : i + 1], coeffs_log_scale[i : i + 1]]) ) trainable_coeffs = tf.concat(trainable_coeffs, axis=0) else: trainable_coeffs = self.priors([coeffs_mean, coeffs_log_scale]) # fill the coefficient matrix with the trainable coefficients coeffs = self.fill_coefficient_matrix(trainable_coeffs) return coeffs, coeffs_mean, coeffs_log_scale
[docs] def kl_loss(self, mean, scale): """ Compute the KL divergence between the priors and coefficient distributions. Parameters ---------- mean : tf.Tensor Mean of the coefficient distributions. scale : tf.Tensor Scale (log variance) of the coefficient distributions. Returns ------- tf.Tensor Scaled KL divergence loss. """ if isinstance(self.priors, list): kl_loss = tf.cast(0, self.dtype_) for prior in self.priors: kl_loss += prior.KL_divergence(mean, scale) else: kl_loss = self.priors.KL_divergence(mean, scale) return self.beta * tf.reduce_sum(kl_loss)
[docs] def get_sindy_coeffs(self): _, coeffs_mean, _ = self._coeffs coeffs = self.fill_coefficient_matrix(coeffs_mean) return coeffs.numpy()
@tf.function def call(self, inputs, training=False): """ Apply the VINDy layer to the inputs. Applies the feature libraries to the inputs, samples the coefficients from a normal distribution parametrized by the layer's kernel (weights), and computes the dot product of the features and the coefficients. Parameters ---------- inputs : tf.Tensor Input tensor. training : bool, default=False Whether the model is in training mode. Returns ------- tf.Tensor or list of tf.Tensor If training: [z_dot, coeffs_mean, coeffs_log_var] If not training: z_dot """ # todo: think about whether we want to have deterministic coefficients during inference (after training) or not z_features = self.features(inputs) coeffs, coeffs_mean, coeffs_log_var = self._coeffs if training: z_dot = z_features @ tf.transpose(coeffs) return [z_dot, coeffs_mean, coeffs_log_var] else: # in case of evaluation, we use the mean of the coefficients z_dot = z_features @ tf.transpose(self.fill_coefficient_matrix(coeffs_mean)) return z_dot
[docs] def visualize_coefficients(self, x_range=None, z=None, mu=None): """ Visualize the coefficients of the SINDy layer as distributions. Parameters ---------- x_range : tuple, optional Range for x-axis. z : array-like, optional Latent state variable names. mu : array-like, optional Parameter variable names. """ # get coefficient parameterization _, mean, log_scale = self._coeffs _ = self._visualize_coefficients( mean.numpy(), log_scale.numpy(), x_range=x_range, z=z, mu=mu ) plt.show()
def _visualize_coefficients( self, mean, log_scale, x_range=None, y_range=None, z=None, mu=None, figsize=None, y_ticks=True, ): # get name of the corresponding features feature_names = self.get_feature_names(z, mu) n_variables = self.state_dim n_plots = int(self.n_dofs / n_variables) mean = mean.reshape(n_variables, n_plots).T log_scale = log_scale.reshape(n_variables, n_plots).T # create a plot with one subplot for each (trainablie) coefficient # for j in range(n_figures): if figsize is None: figsize = (n_variables * 10, 10) fig, axs = plt.subplots(n_plots, n_variables, figsize=figsize, sharex=True) # in case of a one-dimensional system, we append a dimension to axs if n_variables == 1: axs = axs[:, np.newaxis] for j in range(n_variables): for i in range(n_plots): # draw a vertical line at 0 axs[i][j].axvline(x=0, color="gray", linestyle="-") # plot the distribution of the coefficients if isinstance(self.priors, list): distribution = self.priors[i] else: distribution = self.priors scale = distribution.reverse_log(log_scale[i, j]) distribution.plot(mean[i, j], scale, ax=axs[i][j]) # put feature name as ylabel if j == 0: axs[i][j].set_ylabel( f"${feature_names[i]}$", rotation=90, labelpad=10 ) # set x range if x_range is not None: axs[i][j].set_xlim(x_range) # set y range if y_range is not None: axs[i][j].set_ylim(y_range) if not y_ticks: axs[i][j].set_yticks([]) plt.tight_layout() # ensure that ylabel don't overlap with axis ticks plt.subplots_adjust(left=0.1) return fig
[docs] def pdf_thresholding(self, threshold: float = 1.0): """ Cancel coefficients based on their probability density function at zero. Cancels the coefficients of the SINDy layer if their corresponding probability density function at zero is above the threshold, i.e., if pdf(0) > threshold. Parameters ---------- threshold : float, default=1.0 Threshold value for cancelling coefficients. """ # get current _, loc, log_scale = self._coeffs feature_names = np.array([self.get_feature_names()] * self.state_dim).flatten() # cancel coefficients for i, (loc_, log_scale_) in enumerate(zip(loc[:-1], log_scale[:-1])): # plot the distribution of the coefficients if isinstance(self.priors, list): distribution = self.priors[i] else: distribution = self.priors scale = distribution.reverse_log(log_scale_) zero_density = distribution.prob_density_fcn(x=0, loc=loc_, scale=scale) if zero_density > threshold: # cancel the coefficient loc[i].assign(0) log_scale[i].assign(-10) logging.info( f"Canceling coefficient {feature_names[i]} with pdf(0)={zero_density[0]}" ) self.print()
[docs] def integrate_uq(self, z0, t, mu=None, method="RK45"): # sample new set of coefficients sampled_coeffs, _, _ = self._coeffs def sindy_fcn(t_, inputs): return self.call_uq(inputs, coeffs=sampled_coeffs) return ( self.integrate(z0, t, mu=mu, method=method, sindy_fcn=sindy_fcn), sampled_coeffs, )
@tf.function def call_uq(self, inputs, coeffs): """ Apply the VINDy layer with given coefficients for uncertainty quantification. Applies the VINDy layer for given coefficients so that not the mean coefficients of the distribution are taken. Parameters ---------- inputs : tf.Tensor Input tensor. coeffs : tf.Tensor Coefficients to use for the computation. Returns ------- tf.Tensor Time derivative z_dot. """ if len(inputs.shape) == 1: inputs = tf.expand_dims(inputs, 0) inputs = tf.cast(inputs, dtype=self.dtype_) features = self.features(inputs) z_dot = features @ tf.transpose(coeffs) return z_dot
[docs] class LogVarL1L2Regularizer(tf.keras.regularizers.Regularizer): """ Regularizer for the log variance of the coefficients in the VINDy layer """ def __init__(self, l1=0.0, l2=0.0, dtype=tf.float32): # The default value for l1 and l2 are different from the value in l1_l2 # for backward compatibility reason. Eg, L1L2(l2=0.1) will only have l2 # and no l1 penalty. l1 = 0.0 if l1 is None else l1 l2 = 0.0 if l2 is None else l2 self.dtype_ = dtype self.l1 = tf.convert_to_tensor(l1, dtype=dtype) self.l2 = tf.convert_to_tensor(l2, dtype=dtype) def __call__(self, x): regularization = tf.convert_to_tensor(0, dtype=self.dtype_) if self.l1: regularization += self.l1 * tf.reduce_sum(tf.abs(tf.exp(0.5 * x))) if self.l2: # equivalent to "self.l2 * tf.reduce_sum(tf.square(x))" regularization += 2.0 * self.l2 * tf.nn.l2_loss(tf.exp(0.5 * x)) return regularization