import numpy as np
import logging
import tensorflow as tf
import matplotlib.pyplot as plt
from .sindy_layer import SindyLayer
from vindy.distributions import Gaussian, BaseDistribution
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
[docs]
class VindyLayer(SindyLayer):
[docs]
def __init__(self, beta=1, priors=Gaussian(0.0, 1.0), **kwargs):
"""
Layer for variational identification of nonlinear dynamics (VINDy).
Approximates the time derivative of the latent variable. Feature libraries are
applied to the latent variables and a (sparse) variational inference is performed
to obtain the coefficients.
Parameters
----------
beta : float or int, default=1
Scaling factor for the KL divergence.
priors : BaseDistribution or list of BaseDistribution, default=Gaussian(0.0, 1.0)
Prior distribution(s) for the coefficients.
**kwargs
Additional keyword arguments, see SindyLayer.
"""
super(VindyLayer, self).__init__(**kwargs)
self.assert_additional_args(beta, priors)
self.priors = priors
self.beta = beta
self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_sindy")
[docs]
def assert_additional_args(self, beta, priors):
"""
Validate that the additional arguments are correct.
Parameters
----------
beta : float or int
Scaling factor for the KL divergence.
priors : BaseDistribution or list of BaseDistribution
Prior distribution(s) for the coefficients.
"""
# assert that input arguments are valid that are not checked in the super class
assert isinstance(beta, float) or isinstance(beta, int), "beta must be a float"
# priors
if isinstance(priors, list):
assert (
len(priors) == self.n_dofs
), f"Number of priors must match the number of dofs ({self.n_dofs})"
for prior in priors:
assert isinstance(prior, BaseDistribution), (
"All priors must be an instance inheriting from " "BaseDistribution"
)
else:
assert isinstance(
priors, BaseDistribution
), "priors must be a class inheriting from BaseDistribution"
[docs]
def init_weigths(self, kernel_regularizer):
super(VindyLayer, self).init_weigths(kernel_regularizer)
# initialize the log variance of the coefficients
init = tf.random_uniform_initializer(minval=-1, maxval=1)
l1, l2 = kernel_regularizer.l1, kernel_regularizer.l2
scale_regularizer = LogVarL1L2Regularizer(l1=l1, l2=l2, dtype=self.dtype_)
self.kernel_scale = self.add_weight(
name="SINDy_log_scale",
initializer=init,
shape=self.kernel_shape,
dtype=self.dtype_,
regularizer=scale_regularizer,
)
@property
def loss_trackers(self):
return dict(kl_sindy=self.kl_loss_tracker)
@property
def _coeffs(self):
"""
Get the coefficients of the SINDy layer sampled from the defined distribution.
Returns the coefficients sampled from the defined distribution parametrized by the
layer's kernel (weights).
Returns
-------
tuple of tf.Tensor
Tuple containing (coeffs, coeffs_mean, coeffs_log_scale).
"""
# split the kernel into mean and log variance
coeffs_mean, coeffs_log_scale = self.kernel, self.kernel_scale
# draw samples from the normal distribution
if isinstance(self.priors, list):
trainable_coeffs = []
for i, prior in enumerate(self.priors):
trainable_coeffs.append(
prior([coeffs_mean[i : i + 1], coeffs_log_scale[i : i + 1]])
)
trainable_coeffs = tf.concat(trainable_coeffs, axis=0)
else:
trainable_coeffs = self.priors([coeffs_mean, coeffs_log_scale])
# fill the coefficient matrix with the trainable coefficients
coeffs = self.fill_coefficient_matrix(trainable_coeffs)
return coeffs, coeffs_mean, coeffs_log_scale
[docs]
def kl_loss(self, mean, scale):
"""
Compute the KL divergence between the priors and coefficient distributions.
Parameters
----------
mean : tf.Tensor
Mean of the coefficient distributions.
scale : tf.Tensor
Scale (log variance) of the coefficient distributions.
Returns
-------
tf.Tensor
Scaled KL divergence loss.
"""
if isinstance(self.priors, list):
kl_loss = tf.cast(0, self.dtype_)
for prior in self.priors:
kl_loss += prior.KL_divergence(mean, scale)
else:
kl_loss = self.priors.KL_divergence(mean, scale)
return self.beta * tf.reduce_sum(kl_loss)
[docs]
def get_sindy_coeffs(self):
_, coeffs_mean, _ = self._coeffs
coeffs = self.fill_coefficient_matrix(coeffs_mean)
return coeffs.numpy()
@tf.function
def call(self, inputs, training=False):
"""
Apply the VINDy layer to the inputs.
Applies the feature libraries to the inputs, samples the coefficients from a
normal distribution parametrized by the layer's kernel (weights), and computes
the dot product of the features and the coefficients.
Parameters
----------
inputs : tf.Tensor
Input tensor.
training : bool, default=False
Whether the model is in training mode.
Returns
-------
tf.Tensor or list of tf.Tensor
If training: [z_dot, coeffs_mean, coeffs_log_var]
If not training: z_dot
"""
# todo: think about whether we want to have deterministic coefficients during inference (after training) or not
z_features = self.features(inputs)
coeffs, coeffs_mean, coeffs_log_var = self._coeffs
if training:
z_dot = z_features @ tf.transpose(coeffs)
return [z_dot, coeffs_mean, coeffs_log_var]
else:
# in case of evaluation, we use the mean of the coefficients
z_dot = z_features @ tf.transpose(self.fill_coefficient_matrix(coeffs_mean))
return z_dot
[docs]
def visualize_coefficients(self, x_range=None, z=None, mu=None):
"""
Visualize the coefficients of the SINDy layer as distributions.
Parameters
----------
x_range : tuple, optional
Range for x-axis.
z : array-like, optional
Latent state variable names.
mu : array-like, optional
Parameter variable names.
"""
# get coefficient parameterization
_, mean, log_scale = self._coeffs
_ = self._visualize_coefficients(
mean.numpy(), log_scale.numpy(), x_range=x_range, z=z, mu=mu
)
plt.show()
def _visualize_coefficients(
self,
mean,
log_scale,
x_range=None,
y_range=None,
z=None,
mu=None,
figsize=None,
y_ticks=True,
):
# get name of the corresponding features
feature_names = self.get_feature_names(z, mu)
n_variables = self.state_dim
n_plots = int(self.n_dofs / n_variables)
mean = mean.reshape(n_variables, n_plots).T
log_scale = log_scale.reshape(n_variables, n_plots).T
# create a plot with one subplot for each (trainablie) coefficient
# for j in range(n_figures):
if figsize is None:
figsize = (n_variables * 10, 10)
fig, axs = plt.subplots(n_plots, n_variables, figsize=figsize, sharex=True)
# in case of a one-dimensional system, we append a dimension to axs
if n_variables == 1:
axs = axs[:, np.newaxis]
for j in range(n_variables):
for i in range(n_plots):
# draw a vertical line at 0
axs[i][j].axvline(x=0, color="gray", linestyle="-")
# plot the distribution of the coefficients
if isinstance(self.priors, list):
distribution = self.priors[i]
else:
distribution = self.priors
scale = distribution.reverse_log(log_scale[i, j])
distribution.plot(mean[i, j], scale, ax=axs[i][j])
# put feature name as ylabel
if j == 0:
axs[i][j].set_ylabel(
f"${feature_names[i]}$", rotation=90, labelpad=10
)
# set x range
if x_range is not None:
axs[i][j].set_xlim(x_range)
# set y range
if y_range is not None:
axs[i][j].set_ylim(y_range)
if not y_ticks:
axs[i][j].set_yticks([])
plt.tight_layout()
# ensure that ylabel don't overlap with axis ticks
plt.subplots_adjust(left=0.1)
return fig
[docs]
def pdf_thresholding(self, threshold: float = 1.0):
"""
Cancel coefficients based on their probability density function at zero.
Cancels the coefficients of the SINDy layer if their corresponding probability
density function at zero is above the threshold, i.e., if pdf(0) > threshold.
Parameters
----------
threshold : float, default=1.0
Threshold value for cancelling coefficients.
"""
# get current
_, loc, log_scale = self._coeffs
feature_names = np.array([self.get_feature_names()] * self.state_dim).flatten()
# cancel coefficients
for i, (loc_, log_scale_) in enumerate(zip(loc[:-1], log_scale[:-1])):
# plot the distribution of the coefficients
if isinstance(self.priors, list):
distribution = self.priors[i]
else:
distribution = self.priors
scale = distribution.reverse_log(log_scale_)
zero_density = distribution.prob_density_fcn(x=0, loc=loc_, scale=scale)
if zero_density > threshold:
# cancel the coefficient
loc[i].assign(0)
log_scale[i].assign(-10)
logging.info(
f"Canceling coefficient {feature_names[i]} with pdf(0)={zero_density[0]}"
)
self.print()
[docs]
def integrate_uq(self, z0, t, mu=None, method="RK45"):
# sample new set of coefficients
sampled_coeffs, _, _ = self._coeffs
def sindy_fcn(t_, inputs):
return self.call_uq(inputs, coeffs=sampled_coeffs)
return (
self.integrate(z0, t, mu=mu, method=method, sindy_fcn=sindy_fcn),
sampled_coeffs,
)
@tf.function
def call_uq(self, inputs, coeffs):
"""
Apply the VINDy layer with given coefficients for uncertainty quantification.
Applies the VINDy layer for given coefficients so that not the mean coefficients
of the distribution are taken.
Parameters
----------
inputs : tf.Tensor
Input tensor.
coeffs : tf.Tensor
Coefficients to use for the computation.
Returns
-------
tf.Tensor
Time derivative z_dot.
"""
if len(inputs.shape) == 1:
inputs = tf.expand_dims(inputs, 0)
inputs = tf.cast(inputs, dtype=self.dtype_)
features = self.features(inputs)
z_dot = features @ tf.transpose(coeffs)
return z_dot
[docs]
class LogVarL1L2Regularizer(tf.keras.regularizers.Regularizer):
"""
Regularizer for the log variance of the coefficients in the VINDy layer
"""
def __init__(self, l1=0.0, l2=0.0, dtype=tf.float32):
# The default value for l1 and l2 are different from the value in l1_l2
# for backward compatibility reason. Eg, L1L2(l2=0.1) will only have l2
# and no l1 penalty.
l1 = 0.0 if l1 is None else l1
l2 = 0.0 if l2 is None else l2
self.dtype_ = dtype
self.l1 = tf.convert_to_tensor(l1, dtype=dtype)
self.l2 = tf.convert_to_tensor(l2, dtype=dtype)
def __call__(self, x):
regularization = tf.convert_to_tensor(0, dtype=self.dtype_)
if self.l1:
regularization += self.l1 * tf.reduce_sum(tf.abs(tf.exp(0.5 * x)))
if self.l2:
# equivalent to "self.l2 * tf.reduce_sum(tf.square(x))"
regularization += 2.0 * self.l2 * tf.nn.l2_loss(tf.exp(0.5 * x))
return regularization