import tensorflow as tf
import numpy as np
import inspect
from vindy.libraries import PolynomialLibrary, BaseLibrary
from sympy import symbols
[docs]
class SindyLayer(tf.keras.layers.Layer):
"""
Layer for SINDy approximation of the time derivative of the latent variable.
Feature libraries are applied to the latent variable and its time derivative, and a sparse regression is performed.
Parameters
----------
state_dim : int
Number of latent variables.
param_dim : int, optional
Number of parameters (default is 0).
feature_libraries : list, optional
List of feature libraries for the latent variables (default is [PolynomialLibrary(degree=3)]).
param_feature_libraries : list, optional
List of feature libraries for the parameters (default is []).
second_order : bool, optional
If True, enforce 2nd order structure (default is True).
kernel_regularizer : tf.keras.regularizers.Regularizer, optional
Regularizer for the kernel (default is tf.keras.regularizers.L1L2(l1=1e-3, l2=0)).
x_mu_interaction : bool, optional
If True, interaction between latent variables and parameters (default is True).
mask : array-like, optional
If required, certain coefficients of the latent governing equations can be fixed and are consequently masked out for training (default is None).
fixed_coeffs : array-like, optional
Values for the coefficients that are masked out during training (default is None).
dtype : str, optional
Data type of the layer (default is "float32").
kwargs : dict
Additional arguments for the TensorFlow layer class.
"""
def __init__(
self,
state_dim,
param_dim=0,
feature_libraries: list = [],
param_feature_libraries: list = [],
second_order=True,
kernel_regularizer=tf.keras.regularizers.L1L2(l1=1e-3, l2=0),
x_mu_interaction=True,
mask=None,
fixed_coeffs=None,
dtype="float32",
**kwargs,
):
"""
Layer for SINDy approximation of the time derivative of the latent variable
feature libraries are applied to the latent variable and its time derivative and a sparse regression is performed
:param state_dim: (int) number of latent variables
:param param_dim: (int) number of parameters
:param feature_libraries: (list) list of feature libraries for the latent variables
:param param_feature_libraries: (list) list of feature libraries for the parameters
:param second_order: (bool) if True, enforce 2nd order structure,
i.e. d/dt [z, z_d] = [z_dot, Theta(z, z_dot)@Xi] where Theta(z, z_dot) is a feature library and
Xi are the coefficients to be identified
:param kernel_regularizer: (tf.keras.regularizers.Regularizer) regularizer for the kernel
:param x_mu_interaction: (bool) if True, interaction between latent variables and parameters
:param mask: (array-like) If required certain coefficients of the latent governing equations can be fixed
and are consequently masked out for training
:param fixed_coeffs: (array-like) values for the coefficients that are masked out during training
:param dtype: (str) data type of the layer
:param kwargs: additional arguments for the tensorflow layer class
"""
super(SindyLayer, self).__init__(**kwargs)
# assert that input arguments are valid
self.assert_arguments(locals())
self.dtype_ = dtype
# default library
if len(feature_libraries) == 0:
feature_libraries = [PolynomialLibrary(degree=3)]
self.feature_libraries = feature_libraries
self.param_feature_libraries = param_feature_libraries
self.state_dim = state_dim
self.param_dim = param_dim
self.x_mu_interaction = x_mu_interaction
# get feature dimension
if second_order:
self.output_dim = 2 * state_dim
else:
self.output_dim = state_dim
self.n_bases_functions = self.tfFeat(
tf.ones((1, self.output_dim + param_dim))
).shape[1]
# set certain values of kernel
self.mask, self.fixed_coeffs = self.set_mask(mask, fixed_coeffs)
self.second_order = second_order
if second_order:
# enforcing the structure of the 2nd order model
# the feature library will look like this [1, z1, ..., zn, z1_dot, ..., zn_dot, ...]
# consequently we enforce 2nd order structure [0_(n x n+1) I_(n x n) 0_(n x ...)]
zero_matrix = tf.zeros(
shape=[self.state_dim, self.state_dim + 1], dtype=self.dtype_
)
eye_matrix = tf.eye(self.state_dim, dtype=self.dtype_)
zero_matrix2 = tf.zeros(
shape=[self.state_dim, self.n_bases_functions - int(state_dim * 2) - 1],
dtype=self.dtype_,
)
# apply the structure of the 2nd order model
fixed_kernel = tf.concat([zero_matrix, eye_matrix, zero_matrix2], axis=1)
self.mask = tf.concat(
[tf.zeros(fixed_kernel.shape, dtype=self.dtype_), self.mask], axis=0
)
self.fixed_coeffs = tf.concat([fixed_kernel, self.fixed_coeffs], axis=0)
# initialize sindy coefficients
self.init_weigths(kernel_regularizer)
@property
def loss_trackers(self):
"""
Returns the loss trackers of the layer if any. The standard sindy layer has no loss trackers.
Returns
-------
dict
Loss trackers.
"""
return dict()
def _init_to_config(self, init_locals):
"""
Save the parameters with which the model was initialized except for the data itself.
Parameters
----------
init_locals : dict
Local variables from the __init__ function.
"""
sig = inspect.signature(self.__init__)
keys = [param.name for param in sig.parameters.values()]
values = [init_locals[name] for name in keys]
init_dict = dict(zip(keys, values))
self.config = init_dict
[docs]
def assert_arguments(self, arguments):
"""
Assert that the arguments passed to the layer are valid.
Parameters
----------
arguments : dict
Arguments passed to the layer.
"""
assert arguments["dtype"] in [
"float32",
"float64",
tf.float32,
tf.float64,
], "dtype must be either float32 or float64"
assert isinstance(arguments["state_dim"], int), "state_dim must be an integer"
assert isinstance(arguments["param_dim"], int), "param_dim must be an integer"
# assert that mask and fixed_coeffs have the right shape
assert (
isinstance(arguments["mask"], np.ndarray)
or isinstance(arguments["mask"], tf.Tensor)
or arguments["mask"] is None
), "mask must be either None, a numpy array or a tensorflow tensor"
assert (
isinstance(arguments["fixed_coeffs"], np.ndarray)
or isinstance(arguments["fixed_coeffs"], tf.Tensor)
or arguments["fixed_coeffs"] is None
), "fixed_coeffs must be either None, a numpy array or a tensorflow tensor"
if arguments["mask"] is not None:
assert (
arguments["mask"].shape[0] == arguments["state_dim"]
), "mask must have shape (state_dim, x)"
if arguments["fixed_coeffs"] is not None:
assert (
arguments["fixed_coeffs"].shape[0] == arguments["state_dim"]
), "fixed_coeffs must have shape (state_dim, x)"
# assert that feature_libraries is a list of aesindy.libraries objects
assert isinstance(
arguments["feature_libraries"], list
), "feature_libraries must be a list"
for lib in arguments["feature_libraries"]:
assert isinstance(
lib, BaseLibrary
), "feature_libraries must be a list of aesindy.libraries objects"
# assert that param_feature_libraries is a list of aesindy.libraries objects
assert isinstance(
arguments["param_feature_libraries"], list
), "param_feature_libraries must be a list"
for lib in arguments["param_feature_libraries"]:
assert isinstance(
lib, BaseLibrary
), "param_feature_libraries must be a list of aesindy.libraries objects"
# assert that second_order and x_mu_interaction are booleans
assert isinstance(
arguments["second_order"], bool
), "second_order must be a boolean"
assert isinstance(
arguments["x_mu_interaction"], bool
), "x_mu_interaction must be a boolean"
assert isinstance(
arguments["kernel_regularizer"], tf.keras.regularizers.Regularizer
), "kernel_regularizer must be a tf.keras.regularizers.Regularizer object"
@property
def coefficient_matrix_shape(self):
"""
Returns the shape of the coefficient matrix.
Returns
-------
tuple
Shape of the coefficient matrix.
"""
return (self.output_dim, self.n_bases_functions)
@property
def kernel_shape(self):
"""
Returns the dimension of the kernel (weights) of the SINDy layer.
Returns
-------
tuple
Shape of the kernel.
"""
return (self.n_dofs, 1)
[docs]
def init_weigths(self, kernel_regularizer):
"""
Initialize the weights of the SINDy layer.
Parameters
----------
kernel_regularizer : tf.keras.regularizers.Regularizer
Regularizer for the kernel.
"""
# get amount of dofs (equals the number of ones in the mask)
self.n_dofs = int(tf.reduce_sum(self.mask))
# get ids of dofs
self.dof_ids = tf.where(tf.equal(self.mask, 1))
init = tf.random_uniform_initializer(minval=-1, maxval=1)
self.kernel = self.add_weight(
name="SINDy_coefficents",
initializer=init,
shape=self.kernel_shape,
dtype=self.dtype_,
regularizer=kernel_regularizer,
)
[docs]
def set_mask(self, mask, fixed_coeffs=None):
"""
Set the mask and fixed coefficients for the SINDy layer to mask out certain coefficients and set their values.
Parameters
----------
mask : array-like or None
Mask for the coefficients. If None, a mask of ones is used.
fixed_coeffs : array-like or None, optional
Fixed coefficients for the masked values. If None, a matrix of zeros is used.
Returns
-------
tuple
A tuple containing the mask and fixed coefficients.
"""
if mask is None:
mask = tf.ones([self.state_dim, self.n_bases_functions])
if fixed_coeffs is None:
fixed_coeffs = tf.zeros([self.state_dim, self.n_bases_functions])
mask = tf.cast(mask, dtype=self.dtype_)
if mask.shape != self.coefficient_matrix_shape:
# bring mask to the right shape by padding ones
mask = tf.pad(
mask,
[[0, 0], [0, self.coefficient_matrix_shape[1] - mask.shape[1]]],
constant_values=1,
)
fixed_coeffs = tf.cast(fixed_coeffs, dtype=self.dtype_)
if fixed_coeffs.shape != self.coefficient_matrix_shape:
# bring mask to the right shape by padding zeros
fixed_coeffs = tf.pad(
fixed_coeffs,
[[0, 0], [0, self.coefficient_matrix_shape[1] - fixed_coeffs.shape[1]]],
constant_values=0,
)
return mask, fixed_coeffs
@property
def _coeffs(self):
"""
Get the coefficients of the SINDy layer as a matrix.
Returns
-------
tf.Tensor
Coefficient matrix.
"""
# fill the coefficient matrix with the trainable coefficients
coeffs = self.fill_coefficient_matrix(self.kernel)
return coeffs
[docs]
def get_sindy_coeffs(self):
"""
Get the SINDy coefficients as a numpy array.
Returns
-------
np.ndarray
SINDy coefficients.
"""
return self._coeffs.numpy()
[docs]
def get_prunable_weights(self):
"""
Get the prunable weights of the SINDy layer.
Returns
-------
list
List of prunable weights.
"""
# Prune bias also, though that usually harms model accuracy too much.
return [self.kernel]
[docs]
def prune_weights(self, threshold=0.01, training=False):
"""
Prune the weights of the SINDy layer by setting values below a threshold to zero.
Parameters
----------
threshold : float, optional
Threshold for pruning (default is 0.01).
training : bool, optional
Whether the layer is in training mode (default is False).
Returns
-------
None
"""
mask = tf.math.greater(tf.math.abs(self.kernel), threshold * tf.ones_like(self.kernel, dtype=self.kernel.dtype))
mask = tf.cast(mask, dtype=self.kernel.dtype)
self.kernel.assign(tf.multiply(self.kernel, mask))
[docs]
def fill_coefficient_matrix(self, trainable_coeffs):
"""
Fill the coefficient matrix with the trainable coefficients.
Parameters
----------
trainable_coeffs : array-like
Trainable coefficients.
Returns
-------
tf.Tensor
Coefficient matrix filled with the trainable coefficients.
"""
# create a zero matrix for the coefficients with the correct shape
coeffs = tf.zeros(self.coefficient_matrix_shape)
# put the coefficients into the coefficient matrix Xi at the correct positions
coeffs = tf.tensor_scatter_nd_update(
coeffs, self.dof_ids, trainable_coeffs[:, 0]
)
# apply the mask7
if self.fixed_coeffs is not None:
coeffs += self.fixed_coeffs
return coeffs
[docs]
@tf.function
def call(self, inputs, training=False):
"""
Perform the forward pass of the SINDy layer.
Parameters
----------
inputs : tf.Tensor
Input tensor.
training : bool, optional
Whether the layer is in training mode (default is False).
Returns
-------
tf.Tensor
Output tensor after applying the SINDy layer.
"""
z_features = self.tfFeat(inputs)
z_dot = z_features @ tf.transpose(self._coeffs)
return z_dot
[docs]
@tf.function
def tfFeat(self, inputs):
"""
Combine all features for the SINDy layer.
Parameters
----------
inputs : tf.Tensor
Input tensor.
Returns
-------
tf.Tensor
Combined features.
"""
# in case we want interaction between parameters and states
if self.x_mu_interaction:
z_feat = self.concat_features(inputs, self.feature_libraries)
return z_feat
# if we want to apply separate features to parameter and states
else:
z_feat = self.concat_features(
inputs[:, : self.output_dim], self.feature_libraries
)
if len(self.param_feature_libraries) > 0:
param_feat = self.concat_features(
inputs[:, self.output_dim :], self.param_feature_libraries
)
return tf.concat([z_feat, param_feat], axis=1)
return z_feat
[docs]
@tf.function
def concat_features(self, z, libraries):
"""
Concatenate features from different libraries.
Parameters
----------
z : tf.Tensor
Input tensor.
libraries : list
List of feature libraries.
Returns
-------
tf.Tensor
Concatenated features.
"""
features = [library(z) for library in libraries]
z_feat = tf.concat(features, axis=1)
return z_feat
[docs]
def get_feature_names(self, z=None, mu=None):
"""
Construct feature names for states and parameters.
Parameters
----------
z : list of str, optional
Names of the states, e.g., \['z1', 'z2', ...\] (default is None).
mu : list of str, optional
Names of the parameters, e.g., \['mu1', 'mu2', ...\] (default is None).
Returns
-------
list of str
List of feature names.
"""
if z is None:
z = [f"z_{i}" for i in range(self.output_dim)]
if mu is None:
mu = [f"\u03BC_{i}" for i in range(self.param_dim)]
z = [symbols(z_) for z_ in z]
mu = [symbols(mu_) for mu_ in mu]
# in case we want interaction between parameters and states
if self.x_mu_interaction:
features = [library.get_names(z + mu) for library in self.feature_libraries]
# combine lists to one list
features = [item for sublist in features for item in sublist]
# if we want to apply separate features to parameter and states
else:
z_feat = [library.get_names(z) for library in self.feature_libraries]
# combine lists to one list
z_feat = [item for sublist in z_feat for item in sublist]
param_feat = [
library.get_names(mu) for library in self.param_feature_libraries
]
# combine lists to one list
param_feat = [item for sublist in param_feat for item in sublist]
features = z_feat + param_feat
return features
[docs]
def print(self, z=None, mu=None, precision: int = 3):
"""
Print the model equation.
Parameters
----------
z : list of str, optional
Names of the states, e.g., \['z1', 'z2', ...\] (default is None).
mu : list of str, optional
Names of the parameters, e.g., \['mu1', 'mu2', ...\] (default is None).
precision : int, optional
Number of decimal places (default is 3).
Returns
-------
None
"""
print(self.model_equation_to_str(z, mu, precision))
[docs]
def model_equation_to_str(self, z=None, mu=None, precision: int = 3):
"""
Convert coefficients and feature names into a readable equation.
Parameters
----------
z : list of str, optional
Names of the states, e.g., \['z1', 'z2', ...\] (default is None).
mu : list of str, optional
Names of the parameters, e.g., \['mu1', 'mu2', ...\] (default is None).
precision : int, optional
Number of decimal places (default is 3).
Returns
-------
str
Model equation as a string.
"""
if z is None:
z = [f"z{i}" for i in range(self.output_dim)]
if mu is None:
mu = [f"\u03BC{i}" for i in range(self.param_dim)]
if len(z) != self.output_dim:
raise ValueError(f"arguments should have length {self.output_dim}")
if len(mu) != self.param_dim:
raise ValueError(f"mu should have length {self.param_dim}")
# in case we want interaction between parameters and states
features = self.get_feature_names(z, mu)
coeffs = self.get_sindy_coeffs()
str = ""
for i, c_ in enumerate(coeffs):
str += f"d{z[i]} = "
for j in range(len(c_)):
if np.round(c_[j], precision) != 0:
if c_[j] > 0:
str += f"+ {np.abs(c_[j]):.{precision}f}*{features[j]} "
else:
str += f"- {np.abs(c_[j]):.{precision}f}*{features[j]} "
# if j < len(c_) - 1:
# print(' + ', end='')
str += "\n"
return str