Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RandNet-Parareal implementation #604

Open
wants to merge 1 commit into
base: parareal_timestepping
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 251 additions & 2 deletions gusto/timestepping/parareal.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from firedrake import Function
from gusto.core.fields import Fields

import numpy as np
import scipy
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge

class PararealFields(object):

Check failure on line 9 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

E302

gusto/timestepping/parareal.py:9:1: E302 expected 2 blank lines, found 1

def __init__(self, equation, nlevels):
levels = [str(n) for n in range(nlevels+1)]
Expand All @@ -22,7 +26,7 @@
return getattr(self, str(n))


class Parareal(object):
class PararealAbstr(object):

def __init__(self, domain, coarse_scheme, fine_scheme, nG, nF,
n_intervals, max_its):
Expand Down Expand Up @@ -86,6 +90,11 @@
xFnp1 = self.xF(n+1)(self.name)
self.fine_scheme.apply(xFnp1, self.xFn)

self.update_dataset(init_cond=self.x,
F_init_cond=self.xF,
G_init_cond=self.xGkm1,
para_iters=range(k, self.n_intervals))

# compute correction
for n in range(k, self.n_intervals):
xn = self.x(n)(self.name)
Expand All @@ -95,7 +104,247 @@
xnp1 = self.x(n+1)(self.name)
xGkm1 = self.xGkm1(n+1)(self.name)
xFnp1 = self.xF(n+1)(self.name)
xnp1.assign(xGk - xGkm1 + xFnp1)
self.apply_update_rule(xnp1, xGk, xFnp1 - xGkm1, xn)
xGkm1.assign(xGk)

x_out.assign(xnp1)

def update_dataset(self, init_cond, F_init_cond, G_init_cond, para_iters):
'''
Update dataset using the initial initial conditions from the previous iteration,
and the application of F and G to them. Specifically, if U_i^k is the initial
condition at time t_i and iteration k, update with
U_{i}^{k-1}, F(U_{i}^{k-1}), and G(U_{i}^{k-1}).
'''
raise NotImplementedError("update_dataset method must be implemented in subclass")

def apply_update_rule(self, xnp1, xGk, correction, new_init_cond):
'''
Calculate the update rule for the next iteration.
'''
raise NotImplementedError("calc_update_rule method must be implemented in subclass")

Check failure on line 126 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W293

gusto/timestepping/parareal.py:126:1: W293 blank line contains whitespace
class Parareal(PararealAbstr):

Check failure on line 127 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

E302

gusto/timestepping/parareal.py:127:1: E302 expected 2 blank lines, found 1
def update_dataset(self, *args, **kwargs):
# We don't need to store any data for Parareal
pass

def apply_update_rule(self, xnp1, xGk, correction, new_init_cond):
xnp1.assign(xGk + correction)

Check failure on line 134 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W293

gusto/timestepping/parareal.py:134:1: W293 blank line contains whitespace


class Dataset(object):

Check failure on line 137 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

E303

gusto/timestepping/parareal.py:137:1: E303 too many blank lines (3)
def __init__(self):
self.dtset_x = None
self.dtset_y = None
self.tempx = []
self.tempy = []

def add_obsv(self, x, y):
if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray):
raise TypeError("x and y must be numpy arrays")
if x.ndim != 1 or y.ndim != 1:
raise ValueError("x and y must be 1D arrays")

Check failure on line 149 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W293

gusto/timestepping/parareal.py:149:1: W293 blank line contains whitespace
self.tempx.append(x.copy())
self.tempy.append(y.copy())

def collect(self):
self.dtset_x = self._to_numpy_and_merge(self.dtset_x, self.tempx)
self.dtset_y = self._to_numpy_and_merge(self.dtset_y, self.tempy)
self.tempx = []
self.tempy = []

def get_data(self):
return self.dtset_x, self.dtset_y

@staticmethod
def _to_numpy_and_merge(dtset, temp):
if len(temp) == 0:
raise ValueError("No data has been added to the dataset")

Check failure on line 166 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W293

gusto/timestepping/parareal.py:166:1: W293 blank line contains whitespace
if dtset is None:
return np.array(temp)
else:
return np.concatenate((dtset, np.array(temp)), axis=0)

Check failure on line 171 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W293

gusto/timestepping/parareal.py:171:1: W293 blank line contains whitespace

class RandNet():
'''
A Random Neural Network for Local Regression with random polynomial-expanded features.

This class implements a random neural network that retrains on a subset

Check failure on line 177 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W291

gusto/timestepping/parareal.py:177:76: W291 trailing whitespace
of the dataset for each prediction. The subset is dynamically selected

Check failure on line 178 in gusto/timestepping/parareal.py

View workflow job for this annotation

GitHub Actions / Run linter

W291

gusto/timestepping/parareal.py:178:75: W291 trailing whitespace
as the `m` nearest neighbors of the target point `new_x`.
'''

def __init__(self, d, seed=47, res_size=100, degree=1, m=10):
'''
Initializes a RandNet instance.

Args:
d (int): The dimensionality of the input data.
seed (int, optional): Seed for the random number generator to ensure
reproducibility. Default is 47.
res_size (int, optional): The number of neurons in the hidden layer
(reservoir size). Default is 100.
degree (int, optional): The degree of the polynomial feature expansion.
Default is 1 (linear features).
m (int, optional): The number of nearest neighbors to use for retraining
during prediction. Default is 10.
'''
self.d = d
self.N = res_size
self.rng = np.random.default_rng(seed)
self.m = m

self.loss = lambda x: np.maximum(x, 0)
self.M = 1
self.R = 1
self.alpha = 0
self.poly = PolynomialFeatures(degree=degree)
self.poly.fit(np.zeros((1, d)))
self.degree = self.poly.n_output_features_

bias, C = self._init_obj()
self.bias, self.C = bias, C


def _init_obj(self):
N, rng = self.N, self.rng
bias = rng.uniform(-1, 1, (N, 1))
C = rng.uniform(-1, 1, (N, self.degree))
return bias, C

def _fit(self, x, y, bias, C):
x = self.poly.fit_transform(x)
X = self.loss(bias + C @ x.T) # activation
X = X.T #first col is intercept
mdl = Ridge(alpha=self.alpha)
mdl.fit(X, y)
return mdl


def fit(self, x, y):

# Normalize data between -1 and 1
mn = np.min(x, axis=0)
mx = np.max(x, axis=0)

self.x = 2*(x-mn)/(mx-mn)-1
self.y = 2*(y-mn)/(mx-mn)-1
self.norm_min = mn
self.norm_max = mx


def predict(self, new_x):
mn, mx = self.norm_min, self.norm_max
bias = self.M * self.R * self.bias
bias = self.bias
C = self.R * self.C

# normalize input
new_x = 2*(new_x-mn)/(mx-mn)-1

# Compute the nearest neighbors
s_idx = np.argsort(scipy.spatial.distance.cdist(new_x, self.x, metric='sqeuclidean')[0,:])
# print('>', s_idx[:self.m])
xm = self.x[s_idx[:self.m], :]
ym = self.y[s_idx[:self.m], :]

# Compute input features
new_X = self.poly.fit_transform(new_x)
_int = bias + C @ new_X.T
new_X = self.loss(_int)

# Fit the model and make predictions
mdl = self._fit(xm, ym, bias, C)
preds = np.squeeze(mdl.predict(new_X.T))

# Unnormalize the output
preds = (preds+1)/2 * (mx-mn) + mn

return preds



class RandNetParareal(Parareal):
'''
An implementation of: RandNet-Parareal: a time-parallel PDE solver using Random Neural Networks.

Citation:
@article{gattiglio2024randnet,
title={RandNet-Parareal: a time-parallel PDE solver using Random Neural Networks},
author={Gattiglio, Guglielmo and Grigoryeva, Lyudmila and Tamborrino, Massimiliano},
journal={Advances in neural information processing systems},
year={2024}
}

See https://arxiv.org/pdf/2411.06225v1 for details on the effect of the
parameters, specifically Appendix D.


'''
def __init__(self, *args, n_neurons=100, n_neighbors=10, poly_expansion_degree=1, **kwargs):
'''
See https://arxiv.org/pdf/2411.06225v1 for details on the effect of the parameters, specifically Appendix D.

Args:
*args: Positional arguments passed to the base `Parareal` class.
n_neurons (int, optional): Number of neurons in the hidden layer
of the `RandNet` (reservoir size). Default is 100.
n_neighbors (int, optional): Number of nearest neighbors used by
the `RandNet` during predictions. Default is 10.
poly_expansion_degree (int, optional): Degree of polynomial
expansion for input features in the `RandNet`. Default is 1.
**kwargs: Additional keyword arguments passed to the base `Parareal` class.
'''
super().__init__(*args, **kwargs)

# Dataset stores for RandNet-Parareal
self.dtset = Dataset()

self.n_neurons = n_neurons
self.n_neighbors = n_neighbors
self.poly_expansion_degree = poly_expansion_degree

def setup(self, equation, apply_bcs=True, *active_labels):

super().setup(equation, apply_bcs, *active_labels)
state_dimension = len(equation.X.dat.data[:])

# Random neural network
self.randnet = RandNet(d=state_dimension, res_size=self.n_neurons, degree=self.poly_expansion_degree, m=self.n_neighbors)



def update_dataset(self, init_cond, F_init_cond, G_init_cond, para_iters):

# store each observation
for n in para_iters:
dtset_x = init_cond(n)(self.name).dat.data
dtset_y = F_init_cond(n+1)(self.name).dat.data - G_init_cond(n+1)(self.name).dat.data
self.dtset.add_obsv(dtset_x, dtset_y)

# gather the dataset -> convert to numpy
self.dtset.collect()

# fit the RandNet on the data
self.randnet.fit(*self.dtset.get_data())

def apply_update_rule(self, xnp1, xGk, correction, new_init_cond):
new_init_cond = new_init_cond.dat.data

pred = self.randnet.predict(new_init_cond.reshape(1, -1))

xnp1.assign(correction)
xnp1.dat.data[:] = xGk.dat.data[:] + pred








Loading