Source code for vindy.layers.sindy_layer

import tensorflow as tf
import numpy as np
import scipy
import inspect
from vindy.libraries import PolynomialLibrary, BaseLibrary
from sympy import symbols
import logging

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


[docs] class SindyLayer(tf.keras.layers.Layer): """ Sparse Identification of Nonlinear Dynamics (SINDy) layer. This layer evaluates a library of candidate functions on latent inputs and performs sparse regression to recover governing coefficients. Parameters ---------- state_dim : int Number of latent variables (dimension of the latent state z). param_dim : int, optional Number of parameters (mu). Default is 0. feature_libraries : list of BaseLibrary, optional Feature libraries applied to the latent variables (e.g. PolynomialLibrary). param_feature_libraries : list of BaseLibrary, optional Feature libraries applied to parameters (mu). second_order : bool, optional If True, enforce second-order structure for dynamics (include z_dot features). kernel_regularizer : tf.keras.regularizers.Regularizer, optional Regularizer applied to the learned coefficient kernel. x_mu_interaction : bool, optional If True, include interaction features between state and parameters. mask : array-like, optional Mask to fix or remove certain coefficients from training. fixed_coeffs : array-like, optional Values for coefficients that are fixed (applied after masking). dtype : str, optional Data type used by the layer (e.g. 'float32'). **kwargs Additional keyword arguments passed to ``tf.keras.layers.Layer``. """
[docs] def __init__( self, state_dim, param_dim=0, feature_libraries=None, param_feature_libraries=None, second_order=True, kernel_regularizer=tf.keras.regularizers.L1L2(l1=1e-3, l2=0), x_mu_interaction=True, mask=None, fixed_coeffs=None, dtype="float32", **kwargs, ): """ Layer for SINDy approximation of the time derivative of the latent variable. Feature libraries are applied to the latent variable and its time derivative, and a sparse regression is performed to obtain the governing coefficients. Notes ----- This initializer validates arguments, constructs default feature libraries when none are provided and prepares internal bookkeeping variables used by the layer (masks, coefficient shapes, etc.). """ super(SindyLayer, self).__init__(**kwargs) # default libraries if feature_libraries is None: feature_libraries = [PolynomialLibrary(degree=3)] if param_feature_libraries is None: param_feature_libraries = [] # assert that input arguments are valid self.assert_arguments(locals()) self.dtype_ = dtype # default library if len(feature_libraries) == 0: feature_libraries = [PolynomialLibrary(degree=3)] self.feature_libraries = feature_libraries self.param_feature_libraries = param_feature_libraries self.state_dim = state_dim self.param_dim = param_dim self.x_mu_interaction = x_mu_interaction # get feature dimension if second_order: self.output_dim = 2 * state_dim else: self.output_dim = state_dim self.n_bases_functions = self.features( tf.ones((1, self.output_dim + param_dim)) ).shape[1] # set certain values of kernel self.mask, self.fixed_coeffs = self.set_mask(mask, fixed_coeffs) self.second_order = second_order if second_order: # enforcing the structure of the 2nd order model # the feature library will look like this [1, z1, ..., zn, z1_dot, ..., zn_dot, ...] # consequently we enforce 2nd order structure [0_(n x n+1) I_(n x n) 0_(n x ...)] zero_matrix = tf.zeros( shape=[self.state_dim, self.state_dim + 1], dtype=self.dtype_ ) eye_matrix = tf.eye(self.state_dim, dtype=self.dtype_) zero_matrix2 = tf.zeros( shape=[self.state_dim, self.n_bases_functions - int(state_dim * 2) - 1], dtype=self.dtype_, ) # apply the structure of the 2nd order model fixed_kernel = tf.concat([zero_matrix, eye_matrix, zero_matrix2], axis=1) self.mask = tf.concat( [tf.zeros(fixed_kernel.shape, dtype=self.dtype_), self.mask], axis=0 ) self.fixed_coeffs = tf.concat([fixed_kernel, self.fixed_coeffs], axis=0) # initialize sindy coefficients self.init_weigths(kernel_regularizer)
@property def loss_trackers(self): """ Return loss trackers used by the layer. Returns ------- dict Mapping of loss names to Keras Metric objects. By default the SINDy layer does not add separate loss trackers and returns an empty dict. """ return dict() def _init_to_config(self, init_locals): """ Save initializer arguments into ``self.config`` for serialization. Parameters ---------- init_locals : dict The locals() mapping from the initializer; used to persist init args. """ sig = inspect.signature(self.__init__) keys = [param.name for param in sig.parameters.values()] values = [init_locals[name] for name in keys] init_dict = dict(zip(keys, values)) self.config = init_dict
[docs] def assert_arguments(self, arguments): """ Validate initializer arguments and raise informative assertions. Parameters ---------- arguments : dict Mapping from argument name to value (typically ``locals()`` from __init__). """ assert arguments["dtype"] in [ "float32", "float64", tf.float32, tf.float64, ], "dtype must be either float32 or float64" assert isinstance(arguments["state_dim"], int), "state_dim must be an integer" assert isinstance(arguments["param_dim"], int), "param_dim must be an integer" # assert that mask and fixed_coeffs have the right shape assert ( isinstance(arguments["mask"], np.ndarray) or isinstance(arguments["mask"], tf.Tensor) or arguments["mask"] is None ), "mask must be either None, a numpy array or a tensorflow tensor" assert ( isinstance(arguments["fixed_coeffs"], np.ndarray) or isinstance(arguments["fixed_coeffs"], tf.Tensor) or arguments["fixed_coeffs"] is None ), "fixed_coeffs must be either None, a numpy array or a tensorflow tensor" if arguments["mask"] is not None: assert ( arguments["mask"].shape[0] == arguments["state_dim"] ), "mask must have shape (state_dim, x)" if arguments["fixed_coeffs"] is not None: assert ( arguments["fixed_coeffs"].shape[0] == arguments["state_dim"] ), "fixed_coeffs must have shape (state_dim, x)" # assert that feature_libraries is a list of veni.libraries objects assert isinstance( arguments["feature_libraries"], list ), "feature_libraries must be a list" for lib in arguments["feature_libraries"]: assert isinstance( lib, BaseLibrary ), "feature_libraries must be a list of veni.libraries objects" # assert that param_feature_libraries is a list of veni.libraries objects assert isinstance( arguments["param_feature_libraries"], list ), "param_feature_libraries must be a list" for lib in arguments["param_feature_libraries"]: assert isinstance( lib, BaseLibrary ), "param_feature_libraries must be a list of veni.libraries objects" # assert that second_order and x_mu_interaction are booleans assert isinstance( arguments["second_order"], bool ), "second_order must be a boolean" assert isinstance( arguments["x_mu_interaction"], bool ), "x_mu_interaction must be a boolean" assert isinstance( arguments["kernel_regularizer"], tf.keras.regularizers.Regularizer ), "kernel_regularizer must be a tf.keras.regularizers.Regularizer object"
@property def coefficient_matrix_shape(self): return (self.output_dim, self.n_bases_functions) @property def kernel_shape(self): """ Return the shape of the internal kernel (trainable coefficients). Returns ------- tuple Kernel shape as (n_dofs, 1). """ return (self.n_dofs, 1)
[docs] def init_weigths(self, kernel_regularizer): # get amount of dofs (equals the number of ones in the mask) self.n_dofs = int(tf.reduce_sum(self.mask)) # get ids of dofs self.dof_ids = tf.where(tf.equal(self.mask, 1)) init = tf.random_uniform_initializer(minval=-1, maxval=1) self.kernel = self.add_weight( name="SINDy_coefficents", initializer=init, shape=self.kernel_shape, dtype=self.dtype_, regularizer=kernel_regularizer, )
[docs] def set_mask(self, mask, fixed_coeffs=None): """ Normalize and pad mask and fixed coefficient arrays to the expected shape. Parameters ---------- mask : array-like or None Mask specifying which coefficients are trainable (1) or disabled (0). fixed_coeffs : array-like or None Fixed coefficient values to be applied for masked entries. Returns ------- tuple ``(mask, fixed_coeffs)`` both cast to the layer dtype and padded to the proper coefficient matrix shape. """ if mask is None: mask = tf.ones([self.state_dim, self.n_bases_functions]) if fixed_coeffs is None: fixed_coeffs = tf.zeros([self.state_dim, self.n_bases_functions]) mask = tf.cast(mask, dtype=self.dtype_) if mask.shape != self.coefficient_matrix_shape: # bring mask to the right shape by padding ones mask = tf.pad( mask, [[0, 0], [0, self.coefficient_matrix_shape[1] - mask.shape[1]]], constant_values=1, ) fixed_coeffs = tf.cast(fixed_coeffs, dtype=self.dtype_) if fixed_coeffs.shape != self.coefficient_matrix_shape: # bring mask to the right shape by padding zeros fixed_coeffs = tf.pad( fixed_coeffs, [[0, 0], [0, self.coefficient_matrix_shape[1] - fixed_coeffs.shape[1]]], constant_values=0, ) return mask, fixed_coeffs
@property def _coeffs(self): """ Get the coefficients of the SINDy layer as a matrix. Returns ------- tf.Tensor Coefficient matrix with shape (output_dim, n_bases_functions). """ # fill the coefficient matrix with the trainable coefficients coeffs = self.fill_coefficient_matrix(self.kernel) return coeffs
[docs] def get_sindy_coeffs(self): return self._coeffs.numpy()
[docs] def get_prunable_weights(self): # Prune bias also, though that usually harms model accuracy too much. return [self.kernel]
[docs] def prune_weights(self, threshold=0.01, training=False): mask = tf.math.greater( tf.math.abs(self.kernel), threshold * tf.ones_like(self.kernel, dtype=self.kernel.dtype), ) mask = tf.cast(mask, dtype=self.kernel.dtype) self.kernel.assign(tf.multiply(self.kernel, mask))
[docs] def fill_coefficient_matrix(self, trainable_coeffs): """ Insert the trainable coefficients into the full coefficient matrix. Parameters ---------- trainable_coeffs : array-like Trainable coefficients arranged to match the active DOFs. Returns ------- tf.Tensor Full coefficient matrix with fixed coefficients applied. """ # create a zero matrix for the coefficients with the correct shape coeffs = tf.zeros(self.coefficient_matrix_shape) # put the coefficients into the coefficient matrix Xi at the correct positions coeffs = tf.tensor_scatter_nd_update( coeffs, self.dof_ids, trainable_coeffs[:, 0] ) # apply the mask7 if self.fixed_coeffs is not None: coeffs += self.fixed_coeffs return coeffs
@tf.function def call(self, inputs, training=False): """ Forward pass of the SINDy layer: evaluate features and compute prediction. Parameters ---------- inputs : tf.Tensor Latent variables, shape ``(batch_size, latent_dim)``. training : bool, optional Whether the call is in training mode. Returns ------- tf.Tensor Predicted derivatives with shape ``(batch_size, output_dim)``. """ z_features = self.features(inputs) z_dot = z_features @ tf.transpose(self._coeffs) return z_dot
[docs] def features(self, inputs): """ Compute concatenated features from configured libraries. Parameters ---------- inputs : tf.Tensor Input tensor that contains state and (optionally) parameter values. Returns ------- tf.Tensor Concatenated feature matrix for the SINDy regression. """ # in case we want interaction between parameters and states if self.x_mu_interaction: z_feat = self.concat_features(inputs, self.feature_libraries) return z_feat # if we want to apply separate features to parameter and states else: z_feat = self.concat_features( inputs[:, : self.output_dim], self.feature_libraries ) if len(self.param_feature_libraries) > 0: param_feat = self.concat_features( inputs[:, self.output_dim :], self.param_feature_libraries ) return tf.concat([z_feat, param_feat], axis=1) return z_feat
[docs] def concat_features(self, z, libraries): """ Concatenate outputs of several feature libraries. Parameters ---------- z : tf.Tensor Input to the feature libraries. libraries : list Iterable of library objects that are callable on ``z``. Returns ------- tf.Tensor Concatenated feature outputs along the last axis. """ features = [library(z) for library in libraries] z_feat = tf.concat(features, axis=-1) return z_feat
[docs] def get_feature_names(self, z=None, mu=None): """ Construct human-readable feature names for states and parameters. Parameters ---------- z : list of str, optional Names for the state variables. If None, default names are generated. mu : list of str, optional Names for parameter variables. If None, default names are generated. Returns ------- list of sympy.Symbol or str Feature names in the order produced by ``features``. """ if z is None: z = [f"z_{i}" for i in range(self.output_dim)] if mu is None: mu = [f"\u03bc_{i}" for i in range(self.param_dim)] z = [symbols(z_) for z_ in z] mu = [symbols(mu_) for mu_ in mu] # in case we want interaction between parameters and states if self.x_mu_interaction: features = [library.get_names(z + mu) for library in self.feature_libraries] # combine lists to one list features = [item for sublist in features for item in sublist] # if we want to apply separate features to parameter and states else: z_feat = [library.get_names(z) for library in self.feature_libraries] # combine lists to one list z_feat = [item for sublist in z_feat for item in sublist] param_feat = [ library.get_names(mu) for library in self.param_feature_libraries ] # combine lists to one list param_feat = [item for sublist in param_feat for item in sublist] features = z_feat + param_feat return features
[docs] def print(self, z=None, mu=None, precision: int = 3): """ Print the discovered SINDy equations to stdout. Parameters ---------- z : list of str, optional Variable names for states. mu : list of str, optional Variable names for parameters. precision : int, optional Number of decimal places when formatting coefficients. """ print(self.model_equation_to_str(z, mu, precision))
[docs] def model_equation_to_str(self, z=None, mu=None, precision: int = 3): """ Convert coefficients and feature names into a human-readable equation string. Parameters ---------- z : list of str, optional Names of the state variables. mu : list of str, optional Names of the parameter variables. precision : int, optional Decimal precision for printing coefficients. Returns ------- str Multi-line string with one equation per latent state. """ if z is None: z = [f"z{i}" for i in range(self.output_dim)] if mu is None: mu = [f"\u03bc{i}" for i in range(self.param_dim)] if len(z) != self.output_dim: raise ValueError(f"arguments should have length {self.output_dim}") if len(mu) != self.param_dim: raise ValueError(f"mu should have length {self.param_dim}") # in case we want interaction between parameters and states features = self.get_feature_names(z, mu) coeffs = self.get_sindy_coeffs() str = "" for i, c_ in enumerate(coeffs): str += f"d{z[i]} = " for j in range(len(c_)): if np.round(c_[j], precision) != 0: if c_[j] > 0: str += f"+ {np.abs(c_[j]):.{precision}f}*{features[j]} " else: str += f"- {np.abs(c_[j]):.{precision}f}*{features[j]} " # if j < len(c_) - 1: # print(' + ', end='') str += "\n" return str
[docs] def integrate(self, z0, t, mu=None, method="RK45", sindy_fcn=None): """ Integrate the SINDy model forward in time using scipy.integrate.solve_ivp. Parameters ---------- z0 : array-like Initial state for integration. t : array-like Time points at which to evaluate the solution. mu : array-like or callable, optional Parameter trajectory (or callable) to pass to the model. method : str, optional Integration method for solve_ivp (e.g. 'RK45'). sindy_fcn : callable, optional Callable implementing the right-hand side. If None, uses ``self.rhs_``. Returns ------- OdeResult The object returned by scipy.integrate.solve_ivp. """ # tensorflow tensor form numpy z0 = tf.cast(z0, dtype=self.dtype_) t = tf.cast(t, dtype=self.dtype_) if sindy_fcn is None: sindy_fcn = self.rhs_ if mu is not None: if not callable(mu): mu = tf.cast(mu, dtype=self.dtype_) mu_fun = scipy.interpolate.interp1d( t, mu, axis=0, kind="cubic", fill_value="extrapolate" ) t = t[:-1] logging.warning( "Last time point dropped in simulation because " "interpolation of control input was used. To avoid " "this, pass in a callable for u." ) else: mu_fun = mu def rhs(t, x): return sindy_fcn(t, np.concatenate([x, mu_fun(t)], axis=0))[0] else: def rhs(t, x): return sindy_fcn(t, x)[0] sol = scipy.integrate.solve_ivp( rhs, t_span=[t[0], t[-1]], t_eval=t, y0=z0, method=method, # rtol=1e-6 ) return sol
[docs] def rhs_(self, t, inputs): """ Evaluate the right-hand side z'(t) = f(z, mu) for provided inputs. Parameters ---------- t : float Current time (unused by default but present for compatibility). inputs : array-like Flattened inputs (state or state+param) for the RHS function. Returns ------- tf.Tensor or ndarray Time derivative evaluated at the given inputs. """ if len(inputs.shape) == 1: inputs = tf.expand_dims(inputs, 0) return self(inputs)