import numpy as np
import tensorflow as tf
import pysindy as ps
from vindy.layers.vindy_layer import VindyLayer
[docs]
class SindyCallback(tf.keras.callbacks.Callback):
"""
Callback for the SINDy layer. This callback is used to update the identified coefficients of the SINDy layer.
"""
def __init__(
self,
x,
dxdt,
dxddt,
mu,
t,
freq=100,
train_end=False,
ensemble=False,
subset_size=0.5,
n_subsets=100,
threshold=0.1,
thresholder: str = "l0",
z_names=None,
mu_names=None,
print_precision=3,
**kwargs,
):
"""
Initialize the SINDy callback.
Parameters
----------
x : array-like
Training data.
dxdt : array-like
Time derivative of training data.
dxddt : array-like
Second order time derivative of training data.
mu : array-like
Parameter of the system.
t : array-like
Time.
freq : int, optional
Frequency of the update (default is 100).
train_end : bool, optional
Whether to perform update at the end of training (default is False).
ensemble : bool, optional
Whether to use ensemble method for SINDy (default is False).
subset_size : float, optional
Size of the subset for ensemble method (default is 0.5).
n_subsets : int, optional
Number of subsets for ensemble method (default is 100).
threshold : float, optional
Threshold for the update (default is 0.1).
thresholder : str, optional
Regularization function to use (default is "l0").
z_names : list of str, optional
Names of the latent variables (default is None).
mu_names : list of str, optional
Names of the parameters (default is None).
print_precision : int, optional
Precision of the printed model equation (default is 3).
kwargs : dict
Additional arguments.
"""
assert (
isinstance(subset_size, float) and subset_size > 0 and subset_size <= 1
), "subset_size must be a float between 0 and 1"
assert (
isinstance(n_subsets, int) and n_subsets > 0
), "n_subsets must be a positive integer"
self.threshold = threshold
self.freq = freq
self.train_end = train_end
self.ensemble = ensemble
self.subset_size = subset_size
self.n_subsets = n_subsets
self.x = x
self.dxdt = dxdt
self.dxddt = dxddt
self.mu = mu
self.t = t
self.thresholder = thresholder
self.z_names = z_names
self.mu_names = mu_names
self.print_precision = print_precision
super().__init__(**kwargs)
[docs]
def prepare_data_for_pysindy(self, t, z_feat, dzdt, dzddt=None):
"""
Prepare training data for the separate SINDy model.
Parameters
----------
t : array-like
Time.
z_feat : array-like
Latent variables and their features.
dzdt : array-like
Time derivative of latent variables.
dzddt : array-like, optional
Second time derivative of latent variables (default is None).
Returns
-------
tuple
Prepared latent variables, their time derivatives, and time.
"""
n_samples = z_feat.shape[0]
n_timesteps = t.shape[0]
n_sims = int(n_samples / n_timesteps)
# pysindy requires multiple simulations to be represented as list
z_feat = list(z_feat.numpy().reshape([n_sims, n_timesteps, -1]))
if dzddt is None:
z_dot = list(dzdt.reshape([n_sims, n_timesteps, -1]))
else:
z_dot = list(
np.concatenate([dzdt, dzddt], axis=1).reshape([n_sims, n_timesteps, -1])
)
return z_feat, z_dot, t
[docs]
def process_data_for_sindy(self):
"""
Process data for SINDy optimization.
Returns
-------
tuple
Processed latent variables, their time derivatives, time, fixed coefficients, feature IDs, and mask.
"""
# concatenate the input for the sindy layer
mu = self.mu
# calculate the time derivatives of the latent variable given the training data
if self.model.second_order:
z, dzdt, dzddt = self.model.calc_latent_time_derivatives(
self.x, self.dxdt, self.dxddt
)
z_sindy = self.model.concatenate_sindy_input(z, dzdt=dzdt, mu=mu)
else:
z, dzdt = self.model.calc_latent_time_derivatives(self.x, self.dxdt, None)
dzddt = None
z_sindy = self.model.concatenate_sindy_input(z, dzdt=None, mu=mu)
# get the mask which specifies which of the features are used in the sindy_model
mask = self.sindylayer.mask.numpy()
# the fixed coefficients are the coefficients which are not optimized but fixed to a specific value
fixed_coeffs = self.sindylayer.fixed_coeffs.numpy()
# %% for the seperate sindy optimization, we only train on the selected features
# hence we create a seelction matrix which extracts the selected features
# ids of the selected features
feat_ids = np.unique(
np.argwhere(np.concatenate([mask, fixed_coeffs], axis=0) != 0)[:, 1]
)
# create selection matrix
mask_selection = np.zeros([feat_ids.size, self.sindylayer.n_bases_functions])
mask_selection[range(feat_ids.size), feat_ids] = 1
# %% data preparation for the seperate sindy sindy_model
# apply the basis functions to the latent variable
z_feat = self.sindylayer.tfFeat(z_sindy)
# apply the selection matrix to the basis functions
z_feat = tf.matmul(z_feat, mask_selection.T)
# bring the data in the right format for pysindy
z_feat, z_dot, times_train = self.prepare_data_for_pysindy(
self.t, z_feat, dzdt, dzddt
)
return z_feat, z_dot, times_train, fixed_coeffs, feat_ids, mask
[docs]
def update_weights(self, weights, variance=None):
"""
Update the weights of the SINDy layer.
Parameters
----------
weights : array-like
Weights to update.
variance : array-like, optional
Variance of the weights (default is None).
"""
# %% set the weights of the original sindy_model to the weights of the seperate sindy sindy_model
# check if weights contain nan or inf values
# only get trainable weights
trainable_ids = list(self.sindylayer.dof_ids.numpy())
weights = np.array([weights[id[0], id[1]] for id in trainable_ids])
if np.isnan(weights).any() or np.isinf(weights).any():
tf.print("SINDy optimization failed. NaN values in weights.")
else:
# set weights
for weight_id, w in enumerate(weights):
self.sindylayer.kernel[weight_id].assign(w)
# update variance in case of variational sindy layer
if isinstance(self.sindylayer, VindyLayer):
# update variance
if variance is not None:
variance = np.array(
[variance[id[0], id[1]] for id in trainable_ids]
)
# calculate log scale from variance
priors = self.sindylayer.priors
if isinstance(priors, list):
log_scale = np.array(
[
prior.variance_to_log_scale(v).numpy()
for prior, v in zip(priors, variance)
]
)
else:
log_scale = priors.variance_to_log_scale(variance).numpy()
# set all -inf values to -100
log_scale[log_scale == -np.inf] = -100
for weight_id, s in enumerate(log_scale):
self.sindylayer.kernel_scale[weight_id].assign(s)
[docs]
def call_pySINDy(self, z_feat, z_dot, times_train, fixed_coeffs, feat_ids, mask):
"""
Fit a separate SINDy model using the provided data and constraints.
Parameters
----------
z_feat : list of array-like
Latent variables and their features.
z_dot : list of array-like
Time derivatives of latent variables.
times_train : array-like
Time.
fixed_coeffs : array-like
Fixed coefficients for the SINDy model.
feat_ids : array-like
IDs of the selected features.
mask : array-like
Mask specifying which features are used in the SINDy model.
Returns
-------
pysindy.SINDy
Fitted SINDy model.
"""
# %% Define the constraints for the optimization of the seperate sindy sindy_model
# we want to apply the same mask and fixed coefficients as in the original sindy_model
# -> rewrite the constraints for pysindy
# get the fixed coefficients for the selected features
fixed_coeffs = fixed_coeffs[:, feat_ids]
# define the lhs constraints (n_constraints x n_features * n_outputs) and the rhs constraints (n_constraints, )
ids = np.argwhere(mask[:, feat_ids] == 0).squeeze()
constraint_lhs = np.zeros((ids.shape[0], fixed_coeffs.size))
constraint_lhs[range(ids.shape[0]), ids[:, 0] * feat_ids.size + ids[:, 1]] = 1
constraint_rhs = fixed_coeffs[ids[:, 0], ids[:, 1]]
optimizer = ps.ConstrainedSR3(
threshold=self.threshold,
constraint_rhs=constraint_rhs,
constraint_lhs=constraint_lhs,
thresholder=self.thresholder,
max_iter=10000,
)
# %% fit the seperate sindy sindy_model
# feature_names = np.array(self.sindylayer.get_feature_names(z=["z", "dz"]))[feat_ids]
sindy_model = ps.SINDy(
# feature_names=feature_names,
feature_library=ps.PolynomialLibrary(
degree=1, include_interaction=False, include_bias=False
),
optimizer=optimizer,
)
sindy_model.fit(
z_feat, t=times_train, u=None, multiple_trajectories=True, x_dot=z_dot
)
return sindy_model
[docs]
def on_train_end(self, logs=None):
"""
Perform SINDy update at the end of training if specified.
Parameters
----------
logs : dict, optional
Dictionary of logs (default is None).
"""
if self.train_end:
self.sindylayer = self.model.sindy_layer
self.perform_update()
[docs]
def on_epoch_end(self, epoch, logs=None):
"""
Perform SINDy update at the end of every specified number of epochs.
Parameters
----------
epoch : int
Current epoch number.
logs : dict, optional
Dictionary of logs (default is None).
"""
# only perform sindy update every freq epochs
if (epoch + 1) % self.freq == 0:
self.sindylayer = self.model.sindy_layer
tf.print("Epoch: ", epoch)
tf.print("Previous SINDy coefficients: ")
tf.print(
self.sindylayer.model_equation_to_str(
self.z_names, self.mu_names, self.print_precision
)
)
self.perform_update()
# logging
tf.print("Updated SINDy coefficients: ")
tf.print(
self.sindylayer.model_equation_to_str(
self.z_names, self.mu_names, self.print_precision
)
)