"""This module houses the Emulator of the opacity mixing"""
import os.path
import h5py
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
from MITgcmutils import wrmds
from sklearn.metrics import (
mean_squared_error,
mean_squared_log_error,
r2_score,
)
from sklearn.model_selection import train_test_split
from tensorflow import keras
from opac_mixer.utils.callbacks import CustomCallback
from opac_mixer.utils.models import get_deepset
from opac_mixer.utils.scalings import (
default_input_scaling,
default_output_scaling,
default_inv_output_scaling,
)
from .mix import CombineOpacGrid
from .read import ReadOpac
DEFAULT_PRANGE = (1e-6, 1000, 30)
DEFAULT_TRANGE = (100, 10000, 30)
DEFAULT_MMR_RANGES = {
"CO": (1e-30, 0.005522337070205542),
"H2O": (3.509581940975492e-22, 0.0057565911404275204),
"HCN": (1e-30, 9.103077483740115e-05),
"C2H2,acetylene": (1e-30, 1.581540423097846e-05),
"CH4": (1e-30, 0.0031631031028604537),
"PH3": (1e-30, 6.401082202603451e-06),
"CO2": (1e-30, 0.00015319944152172055),
"NH3": (3.8119208513224578e-25, 0.00084362326521647),
"H2S": (2.0093762682408387e-18, 0.0003290905470710346),
"VO": (1e-30, 1.6153195092178982e-07),
"TiO": (1e-30, 3.925184850731112e-06),
"Na": (1e-30, 2.524986071526357e-05),
"K": (1e-30, 1.932224843084919e-06),
"SiO": (1e-30, 0.0010448970102509476),
"FeH": (1e-30, 0.000203477300968298),
}
class DataIO:
"""IO Class for storing the emulator"""
def __init__(self, filename):
"""
Setup the IO class
Parameters
----------
filename: str
The filename of the IO
"""
self.filename = filename
def load(self):
"""load data"""
if not os.path.exists(self.filename):
raise FileNotFoundError(
"we could not load the data, because it doesnt exist."
)
with h5py.File(self.filename, "r") as f:
input_data = np.asarray(f["input"])
mix = np.asarray(f["mix"])
split_seed = int(f["mix"].attrs["split_seed"])
test_size = float(f["mix"].attrs["test_size"])
return mix, input_data, split_seed, test_size
def write_out(self, mix, input_data, split_seed, test_size):
"""
write data
Parameters
----------
mix: array(batchsize, lg)
The mixed k-tables
input_data: array(batchsize, lg, ls)
The input data
split_seed: float
The random seed used in splitting training and test data
test_size: float
The size of the training set
"""
with h5py.File(self.filename, "w") as f:
inp_ds = f.create_dataset(
"input", input_data.shape, dtype=input_data.dtype
)
mix_ds = f.create_dataset("mix", mix.shape, dtype=input_data.dtype)
mix_ds.attrs["split_seed"] = split_seed
mix_ds.attrs["test_size"] = test_size
mix_ds[...] = mix
inp_ds[...] = input_data
[docs]
class Emulator:
"""
The supervisor that handels the training and evaluation of the opacity emulator.
"""
[docs]
def __init__(
self,
opac,
prange_opacset=DEFAULT_PRANGE,
trange_opacset=DEFAULT_TRANGE,
filename_data=None,
):
"""
Construct the emulator class.
Parameters
----------
opac: opac_mixer.read.ReadOpac
a list of input opacity readers. Can be setup, but does not need to. Will do the setup itself otherwise.
prange_opacset: array(3)
optional, the range to which the reader should interpolate the pressure grid to (lower, upper, num_points).
trange_opacset: array(3)
optional, the range to which the reader should interpolate the temperature grid to (lower, upper, num_points).
filename_data: str
A filename, used to save the training and testing data to
"""
if isinstance(opac, list):
assert all(isinstance(opac_i, ReadOpac) for opac_i in opac)
self.opac = opac
elif isinstance(opac, ReadOpac):
self.opac = [opac]
else:
raise ValueError(
"opac needs to be either a list of ReadOpac or a ReadOpac"
" instance"
)
for opac_i in self.opac:
if not opac_i.read_done:
opac_i.read_opac()
if not opac_i.interp_done:
opac_i.setup_temp_and_pres(
pres=np.logspace(
np.log10(prange_opacset[0]),
np.log10(prange_opacset[1]),
prange_opacset[2],
),
temp=np.linspace(*trange_opacset),
)
self.input_scaling = default_input_scaling
self.output_scaling = default_output_scaling
self.inv_output_scaling = default_inv_output_scaling
self.mixer = [CombineOpacGrid(opac) for opac in self.opac]
ls = [int(opac.ls) for opac in self.opac]
lg = [int(opac.lg[0]) for opac in self.opac]
if len(ls) > 1:
assert (
np.diff(ls) == 0.0
), "we need the same number of species for all ReadOpac instances"
assert (
np.diff(lg) == 0.0
), "we need the same number of g points for all ReadOpac instances"
self._lg = lg[0]
self._ls = ls[0]
self._input_dim = (self._lg, self._ls)
if filename_data is not None:
self._io = DataIO(filename=filename_data)
# initialisation of variables
self._has_input = False
self._has_mix = False
self._has_model = False
self._is_trained = False
self._batchsize_resh = []
self._batchsize = []
self.abus = []
self.verbose = False
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.model = None
self._model_filename = None
self.input_data = np.empty((0, *self._input_dim))
[docs]
def setup_scaling(
self, input_scaling=None, output_scaling=None, inv_output_scaling=None
):
"""
(optional) Change the callback functions for the scaling of in and output.
Defaults are given as opac_mixer.scalings.default_<name>.
See opac_mixer/utils/scalings.py for inspiration
Parameters
----------
input_scaling: function or None
The function to use for input scaling.
If None, use opac_mixer.scalings.default_input_scaling
output_scaling: function or None
The function to use for output scaling.
If None, use opac_mixer.scalings.default_output_scaling
inv_output_scaling: function or None
The function to use for output scaling.
If None, use opac_mixer.scalings.default_inv_output_scaling
"""
if input_scaling is not None:
self.input_scaling = input_scaling
if output_scaling is not None:
self.output_scaling = output_scaling
if inv_output_scaling is not None:
self.inv_output_scaling = inv_output_scaling
if hasattr(self, "X_train"):
np.testing.assert_allclose(
self.inv_output_scaling(
self.X_train,
self.output_scaling(self.X_train, self.y_train),
),
self.y_train,
)
if hasattr(self, "X_test"):
np.testing.assert_allclose(
self.inv_output_scaling(
self.X_test, self.output_scaling(self.X_test, self.y_test)
),
self.y_test,
)
[docs]
def setup_sampling_grid(
self, approx_batchsize=8e5, extra_abus=None, bounds=None
):
"""
Setup the sampling grid. Sampling along MMR and pressure is in logspace.
Sampling along temperature is in linspace.
Parameters
----------
approx_batchsize: int
Number of total sampling points. Needs to be a power of 2 for sobol sampling
bounds: dict or None
the lower and upper bounds for sampling. Shape: {'species':(lower, upper)}
The key can be either a species name in opac.spec or p and T for pressure and Temperature.
It will use opac_mixer.emulator.DEFAULT_MMR_RANGES for mmrs, opac_mixer.emulator.DEFAULT_PRANGE for pressure,
and opac_mixer.emulator.DEFAULT_TRANGE for temperautre for all missing values
extra_abus: array(num_sample, ls, lp, lt)
Extra abundancies (mmrs) used for the training data. Could be e.g., a grid of eq. chem abundancies
Returns
-------
input_data (array(batchsize, opac.lg, opac.ls)):
The sampled inputdata to train/test the emulator.
The input_data consists of kappas(g) for each species
"""
if bounds is None:
bounds = {}
self._batchsize_resh = []
self._batchsize = []
self.abus = []
self.input_data = np.empty((0, *self._input_dim))
for opac in self.opac:
if extra_abus is not None:
assert extra_abus.shape[1] == self._ls, (
"wrong shape in extra_abus second dimension (number of"
" species)"
)
assert extra_abus.shape[2] == opac.lp[0], (
"wrong shape in extra_abus second dimension (number of"
" pressure points)"
)
assert extra_abus.shape[3] == opac.lt[0], (
"wrong shape in extra_abus second dimension (number of"
" temperature points)"
)
extra_batchsize = int(extra_abus.shape[0])
else:
extra_batchsize = 0
batchsize = (
int(approx_batchsize) // opac.lp[0] // opac.lt[0] // opac.lf[0]
)
batchsize_resh = batchsize * opac.lp[0] * opac.lt[0] * opac.lf[0]
self._batchsize_resh.append(batchsize_resh)
self._batchsize.append(batchsize)
l_bounds = []
u_bounds = []
for sp in opac.spec:
if sp not in DEFAULT_MMR_RANGES and sp not in bounds:
raise ValueError(f"We miss the bounds for {sp}.")
default_l, default_u = DEFAULT_MMR_RANGES.get(sp)
l, u = bounds.get(sp, (default_l, default_u))
l_bounds.append(np.maximum(l, 1.0e-20))
u_bounds.append(u)
# Use a standard uniform distribution
sample = np.random.uniform(
low=0.0,
high=1.0,
size=(
batchsize - extra_batchsize,
self._ls,
opac.lp[0],
opac.lt[0],
),
)
# Scale the sampling to the actual bounds
# Note: We use loguniform like scaling for mmrs
abus = np.exp(
sample[:, :, :, :]
* (
np.log(u_bounds)[np.newaxis, :, np.newaxis, np.newaxis]
- np.log(l_bounds)[np.newaxis, :, np.newaxis, np.newaxis]
)
+ np.log(l_bounds)[np.newaxis, :, np.newaxis, np.newaxis]
)
if extra_abus is not None:
abus = np.concatenate((abus, extra_abus), axis=0)
weighted_kappas = (
abus[:, :, :, :, np.newaxis, np.newaxis]
* opac.kcoeff[np.newaxis, ...]
)
weighted_kappas = weighted_kappas.transpose((0, 2, 3, 4, 1, 5))
self.abus.append(abus)
self.input_data = np.concatenate(
(
self.input_data,
weighted_kappas.reshape(
batchsize_resh, opac.ls, opac.lg[0]
).transpose(0, 2, 1),
),
axis=0,
)
self._check_input_data(self.input_data)
self._has_input = True
return self.input_data
def _check_input_data(self, input_data):
"""
Checks the shape of the input data and raises an error if wrong
Parameters
----------
input_data: array(batchsize, opac.lg, opac.ls
The input data
"""
shape = input_data.shape
if len(shape) != 3 or shape[1:] != self._input_dim:
raise ValueError("input data does not match")
assert (input_data >= 0).all(), "We need positive input data!"
[docs]
def setup_mix(self, test_size=0.2, split_seed=None, do_parallel=True):
"""
Setup the mixer and generate the training and testdata.
Parameters
----------
test_size: float
fraction of data used for testing
split_seed: int
A seed to be used for shuffling training and test data before splitting
do_parallel bool
If you want to create the data in parallel or not
"""
if not self._has_input:
raise AttributeError(
"we do not have input yet. Run setup_sampling_grid first."
)
if split_seed is None:
split_seed = np.random.randint(2**32 - 1)
# make sure the filename comes without the npy suffix
mixes = np.empty((0, self._lg))
for mixer, abus, batchsize_resh in zip(
self.mixer, self.abus, self._batchsize_resh
):
if do_parallel:
mix = mixer.add_batch_parallel(abus)
else:
mix = mixer.add_batch(abus)
mixes = np.concatenate(
(mixes, mix.reshape(batchsize_resh, self._lg)), axis=0
)
self.X_train, self.X_test, self.y_train, self.y_test = self._do_split(
self.input_data, mixes, split_seed, test_size, use_split_seed=True
)
if hasattr(self, "_io"):
self._io.write_out(mixes, self.input_data, split_seed, test_size)
self._has_mix = True
return self.X_train, self.X_test, self.y_train, self.y_test
[docs]
def load_data(
self,
filename=None,
test_size=None,
split_seed=None,
use_split_seed=True,
):
"""
Load the training and test data from a h5 file.
Parameters
----------
filename: str
optional, can be set either here or in the constructor. Make sure the filename comes without the npy suffix
test_size: float
optional, use a different test size than the one loaded
split_seed: int
optional, use a different seed to shuffle data before spliting training and testing data
use_split_seed: bool
optional, if true, it will just use the provided or loaded split seed, else it will create a new random one
"""
if not hasattr(self, "_io") and filename is None:
raise ValueError(
"we have no clue where we could get the data from. Set a"
" filename either in this method or the constructor"
)
if filename is not None:
self._io = DataIO(filename=filename)
mix, input_data, split_seed_l, test_size_l = self._io.load()
self.input_data = input_data
self._check_input_data(self.input_data)
if test_size is None:
test_size = test_size_l
if split_seed is None:
split_seed = split_seed_l
self.X_train, self.X_test, self.y_train, self.y_test = self._do_split(
self.input_data, mix, split_seed, test_size, use_split_seed
)
self._has_input = True
self._has_mix = True
return self.X_train, self.X_test, self.y_train, self.y_test
@staticmethod
def _do_split(input_data, mix, split_seed, test_size, use_split_seed=True):
"""Do the split of training and testing data.
Parameters
----------
input_data: array(batchsize, opac.lg, opac.ls)
The input (X)
mix: array(batchsize, opac.lg)
The output (y)
split_seed: float
a specific random seed to use
test_size: float
The size of the test-set
use_split_seed: bool
If the split seed is to be used or not
"""
if (mix <= 0).any():
raise ValueError(
"We found negative crosssections. Something is wrong here."
)
return train_test_split(
input_data,
mix,
test_size=test_size,
random_state=split_seed if use_split_seed else None,
)
[docs]
def setup_model(
self,
model=None,
filename=None,
load=False,
learning_rate=1e-3,
hidden_units=None,
verbose=True,
**model_kwargs,
):
"""
Setup the emulator model and train it.
Note: This will reset all previously trained weights in keras models.
Parameters
----------
model: sklearn compatible model
(optional): a model to learn. Needs to be contructed already. Use DeepSet by default
filename: str or None
optional, A filename to save the model
load: bool
optional, load a -pretrained- model instead of constructing one
Parameters for DeepSet
----------------------
Check keras.compile docs for more arguments. Any extra argument is directly passed to keras.compile
learning_rate: float
optional, learning rate of optimizer (adam per default, change by setting optimizer=<name>)
hidden_units: int
optional, number of hidden units in the encoder (per default equals number of g-points)
(model_kwargs)
arguments to pass to keras.compile for construction (only when model=None is used)
"""
keras.backend.clear_session()
self.verbose = verbose
if load:
# Load model
self.model = keras.models.load_model(filename)
self._is_trained = True
else:
if not self._has_mix:
raise AttributeError(
"we do not have a mix to work with yet. Run"
" setup_sampling_grid and setup_mix first."
)
if model is None:
self.model = get_deepset(
ng=self._lg, hidden_units=hidden_units
)
elif model is not None:
print(
"WARNING: make sure your model is permutation invariant!"
)
# Use provided model (needs to be sklearn compatible)
self.model = model
if isinstance(self.model, keras.Model):
extra_model_kwargs = {
"optimizer": keras.optimizers.Adam(
learning_rate=learning_rate
),
"loss": "mean_squared_error",
}
extra_model_kwargs.update(model_kwargs)
self.model.compile(**extra_model_kwargs)
if self.verbose:
self.model.summary()
if filename is not None:
# Save filename for later use
self._model_filename = filename
self._has_model = True
[docs]
def fit(self, *args, **kwargs):
"""
Train the model.
Parameters
----------
args:
Whatever you want to pass to the model to fit
kwargs:
Whatever you want to pass to the model to fit
"""
fit_args = {}
if isinstance(self.model, keras.Model):
fit_args["epochs"] = 100
fit_args["batch_size"] = 32
fit_args["verbose"] = 0
fit_args.update(kwargs)
if not self._has_model:
raise AttributeError(
"we do not have a model yet. Run setup_sampling_grid,"
" setup_mix and setup_model first."
)
# fit the model on the training dataset
X_train = self.input_scaling(self.X_train)
y_train = self.output_scaling(self.X_train, self.y_train)
if isinstance(self.model, keras.Model):
callbacks = [
keras.callbacks.EarlyStopping(monitor="loss", patience=3)
]
if self.verbose:
callbacks.append(CustomCallback(self))
self.model.fit(X_train, y_train, callbacks=callbacks, **fit_args)
else:
self.model.fit(
X_train.reshape(len(X_train), -1), y_train, *args, **kwargs
)
if hasattr(self, "_model_filename") and callable(
getattr(self.model, "save", None)
):
print(f"Saving model to {self._model_filename}")
self.model.save(self._model_filename)
self._is_trained = True
[docs]
def predict(self, X, *args, **kwargs):
"""
Predict using the trained model.
Parameters
----------
X: array(num_samples, opac.lg, opac.ls
The values you want predictions for
args:
Whatever you want to pass to the model for prediction
kwargs:
Whatever you want to pass to the model for prediction
"""
self._check_trained()
self._check_input_data(X)
# fit the model on the training dataset
if isinstance(self.model, keras.Model):
verbose = 0 if not self.verbose else "auto"
return self.inv_output_scaling(
X,
self.model.predict(
self.input_scaling(X), verbose=verbose, *args, **kwargs
),
)
return self.inv_output_scaling(
X,
self.model.predict(
self.input_scaling(X).reshape(len(X), -1), *args, **kwargs
),
)
[docs]
def score(self, validation_set=None):
"""
Print some metrics for the training and test data.
Parameters
----------
validation_set: list(X_test, y_test)
validation set to be used instead of (self.X_test, self.y_test)
Note the dimensions of X_test: array(num_samples, opac.lg, opac.ls)
and y_test: array(num_samples, opac.lg)
"""
if validation_set is None:
X_test = self.X_test
y_test = self.y_test
else:
X_test = validation_set[0]
y_test = validation_set[1]
self._check_input_data(X_test)
self._check_trained()
y_p_test = self.predict(X_test)
y_p_train = self.predict(self.X_train)
train_mask = self.y_train > 1e-45
test_mask = y_test > 1e-45
log_err_out = (y_p_train[train_mask] > 0).all()
y_add_test = np.sum(X_test, axis=-1)
y_add_train = np.sum(self.X_train, axis=-1)
y_test_masked = y_test[test_mask]
y_train_masked = self.y_train[train_mask]
y_p_test_masked = y_p_test[test_mask]
y_p_train_masked = y_p_train[train_mask]
y_add_test_masked = y_add_test[test_mask]
y_add_train_masked = y_add_train[train_mask]
e_add_test = np.sqrt(
mean_squared_error(y_test_masked, y_add_test_masked)
)
e_add_train = np.sqrt(
mean_squared_error(y_train_masked, y_add_train_masked)
)
e_p_test = np.sqrt(mean_squared_error(y_test_masked, y_p_test_masked))
e_p_train = np.sqrt(
mean_squared_error(y_train_masked, y_p_train_masked)
)
if log_err_out:
e_log_add_test = np.sqrt(
mean_squared_log_error(y_test_masked, y_add_test_masked)
)
e_log_add_train = np.sqrt(
mean_squared_log_error(y_train_masked, y_add_train_masked)
)
e_log_p_test = np.sqrt(
mean_squared_log_error(y_test_masked, y_p_test_masked)
)
e_log_p_train = np.sqrt(
mean_squared_log_error(y_train_masked, y_p_train_masked)
)
r2_add_test = r2_score(y_test_masked, y_add_test_masked)
r2_add_train = r2_score(y_train_masked, y_add_train_masked)
r2_p_test = r2_score(y_test_masked, y_p_test_masked)
r2_p_train = r2_score(y_train_masked, y_p_train_masked)
if log_err_out:
r2_log_add_test = r2_score(
np.log(y_test_masked), np.log(y_add_test_masked)
)
r2_log_add_train = r2_score(
np.log(y_train_masked), np.log(y_add_train_masked)
)
r2_log_p_test = r2_score(
np.log(y_test_masked), np.log(y_p_test_masked)
)
r2_log_p_train = r2_score(
np.log(y_train_masked), np.log(y_p_train_masked)
)
print(
"test (add, model): {:.2e}, {:.2e}".format(
e_add_test, e_p_test
)
)
print(
"train (add, model): {:.2e}, {:.2e}".format(
e_add_train, e_p_train
)
)
if log_err_out:
print(
"log test (add, model): {:.2e}, {:.2e}".format(
e_log_add_test, e_log_p_test
)
)
print(
"log train (add, model): {:.2e}, {:.2e}".format(
e_log_add_train, e_log_p_train
)
)
print(
"r2 test (add, model): {:.2e}, {:.2e}".format(
r2_add_test, r2_p_test
)
)
print(
"r2 train (add, model): {:.2e}, {:.2e}".format(
r2_add_train, r2_p_train
)
)
if log_err_out:
print(
"log r2 test (add, model): {:.2e}, {:.2e}".format(
r2_log_add_test, r2_log_p_test
)
)
print(
"log r2 train (add, model): {:.2e}, {:.2e}".format(
r2_log_add_train, r2_log_p_train
)
)
def _check_trained(self):
"""Just a random check if the model has been trained or not."""
if not self._is_trained:
raise AttributeError(
"we do not have a trained model yet. Run setup_sampling_grid,"
" setup_mix and setup_model and fit first."
)
[docs]
def export(self, path, file_format="exorad"):
"""
Export the weights
Parameters
----------
path: str
path where the weights should be stored
file_format: str
the format in which the weights should be stored. Can be either exorad or numpy.
"""
self._check_trained()
if isinstance(self.model, keras.Model):
for i, weights in enumerate(self.model.weights):
if file_format in ("np", "numpy"):
np.save(f"{path}/ml_coeff_{i}.npy", weights.numpy())
elif file_format == "exorad":
wrmds(
f"{path}/ml_coeff_{i}",
weights.numpy().flatten(order="F"),
dataprec="float64",
)
else:
raise NotImplementedError("format is not supported yet.")
else:
raise NotImplementedError(
"not implemented for the type of model used"
)
[docs]
def plot_predictions(self, validation_set=None):
"""
Plot the predictions vs the true values
Parameters
----------
validation_set: list(X_test, y_test)
validation set to be used instead of (self.X_test, self.y_test)
Note the dimensions of X_test: array(num_samples, opac.lg, opac.ls)
and y_test: array(num_samples, opac.lg)
"""
if validation_set is None:
X_test = self.X_test
y_test = self.y_test
else:
X_test = validation_set[0]
self._check_input_data(X_test)
y_test = validation_set[1]
y_p_test = self.predict(X_test)
y_add_test = np.sum(X_test, axis=-1)
for index in range(y_p_test.shape[-1]):
fig, axes = plt.subplots(1, 2, sharex=True, sharey=True)
axes[0].plot(
y_add_test[:, index],
y_test[:, index],
"bo",
ms=0.01,
linestyle="None",
)
axes[1].plot(
y_p_test[:, index],
y_test[:, index],
"ro",
ms=0.01,
linestyle="None",
)
fig.suptitle(f"g index = {index}")
axes[0].set_title("simple sum")
axes[1].set_title("model predictions")
for ax in axes:
ax.plot(
[y_p_test[:, index].min(), y_p_test[:, index].max()],
[y_p_test[:, index].min(), y_p_test[:, index].max()],
color="gray",
ls="--",
)
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_ylabel("true")
ax.set_xlabel("predicted")
ax.set_aspect("equal")
plt.show()
[docs]
def plot_weights(self, do_log=True):
"""
Plot the weights of the model
Parameters
----------
do_log: bool
do log scaling in the colorbar
"""
self._check_trained()
if isinstance(self.model, keras.Model):
for i, weights in enumerate(self.model.weights):
if do_log:
if (weights.numpy() < 0).any():
vmax = abs(weights.numpy()).max()
vmin = -vmax
linthr = abs(weights.numpy()).min()
# linthr = 1e-1
norm = mcolors.SymLogNorm(
linthr=linthr, vmin=vmin, vmax=vmax
)
cmap = "BrBG"
else:
norm = mcolors.LogNorm()
cmap = "viridis"
else:
norm = mcolors.Normalize()
cmap = "viridis"
img = plt.imshow(weights.numpy(), norm=norm, cmap=cmap)
plt.title(f"weight {i}")
plt.colorbar(img)
plt.show()