import math
import numpy as np
import random
import os
import fnmatch
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import datetime as dt
import copy
from scipy.fft import next_fast_len
from sklearn.model_selection import train_test_split
from kneed import KneeLocator
from ..plotting.hyperparameters import plot_hp
[docs]
class TrainZeroLossPeak:
def __init__(self,
spectra,
eaxis,
cluster_centroids=None,
display_step=1000,
training_report_step=1,
n_batch_of_replica=1,
n_batches=1,
n_replica=100,
n_epochs=1000,
shift_de1=1.,
shift_de2=1.,
regularisation_constant=10.,
path_to_models='./models/',
remove_temp_files=True,
**kwargs
):
r"""
The TrainZeroLossPeak class provides the tools to train the ZLP models for the spectra.
Parameters
----------
spectra
eaxis
cluster_centroids
path_to_models
display_step
training_report_step
n_batch_of_replica
n_batches
n_replica
n_epochs
shift_de1
shift_de2
regularisation_constant : float
Constant that weighs the fit accuracy to the regularisation term
remove_temp_files
kwargs
"""
self.spectra = spectra
self.eaxis = eaxis
self.deltaE = np.abs(self.eaxis[1] - self.eaxis[0])
if cluster_centroids is None:
self.cluster_centroids = np.zeros(1)
else:
self.cluster_centroids = cluster_centroids
self.n_ebins = len(self.eaxis)
self.n_clusters = len(self.spectra)
self.path_to_models = path_to_models
self.display_step = display_step
self.training_report_step = training_report_step
self.n_batch_of_replica = n_batch_of_replica
self.n_replica = n_replica
self.n_epochs = n_epochs
self.n_batches = n_batches
self.shift_de1 = shift_de1
self.shift_de2 = shift_de2
self.regularisation_constant = regularisation_constant
self.remove_temp_files = remove_temp_files
[docs]
def train_zlp_models_scaled(self, lr=1e-3):
r"""
Train the ZLP models. This functions calls up the other functions step by step to complete the whole process.
Refer to each function for details what they specifically perform.
Parameters
----------
lr : float,
Learning rate of the neural network
"""
# Sets the display step in the console / log files per how many epochs the status of the training goes.
if self.display_step is None:
self.print_progress = False
self.display_step = 1E6
else:
self.print_progress = True
if not os.path.exists(self.path_to_models):
os.makedirs(self.path_to_models)
print("preparing hyperparameters!")
self.set_minimum_intensities()
self.set_y_data()
self.set_dydx_data()
self.set_sigma()
self.find_fwhm_idx()
self.find_local_min_idx()
self.find_kneedle_idx()
self.de1, self.de2, self.mde1, self.mde2 = self.calculate_hyperparameters()
if self.print_progress:
print("dE1:", np.round(self.de1, 3))
print("dE2:", np.round(self.de2, 3))
self.scale_eaxis()
self.calc_scale_var_log_int_i()
self.save_scale_var_log_int_i()
self.save_hyperparameters()
print("Hyperparameters prepared!")
self.loss_test_reps = np.zeros(self.n_replica)
self.loss_train_reps = np.zeros(self.n_replica)
for i in range(self.n_replica):
j = i + self.n_replica * self.n_batch_of_replica - self.n_replica + 1
path_nn_replica = os.path.join(self.path_to_models, 'nn_rep_{}'.format(j))
if self.print_progress:
print("Started training on replica number {}".format(j) + ", at time ", dt.datetime.now())
self.data_y = np.empty((0, 1))
self.data_x = np.empty((0, 2))
self.data_sigma = np.empty((0, 1))
# Use a list of tensors, because data from the different spectra in the replica will not
# have equal shapes generally and Torch/Numpy cannot handle non-rectangular arrays.
self.data_x_for_derivative = []
for cluster_label in range(self.n_clusters):
# Initialize the data of the replica by taking a spectra from each cluster
self.initialize_x_y_sigma_input(cluster_label)
# Split the replica into a train set and a test set
self.train_test = train_test_split(self.data_x, self.data_y, self.data_sigma, test_size=0.25)
self.train_x_for_derivative, self.test_x_for_derivative = train_test_split(self.data_x_for_derivative,
test_size=0.25)
self.set_train_x_y_sigma()
self.set_test_x_y_sigma()
# Train the replica/model
self.model = MultilayerPerceptron(num_inputs=2, num_outputs=1)
self.model.apply(weight_reset)
self.train_and_evaluate_model(i, j, lr=lr)
# save model state when max number of allowed epochs has been reached
torch.save(self.model.state_dict(), path_nn_replica)
# make a training report for every replica
if self.training_report_step > 0:
if j % self.training_report_step == 0 or j == 1:
self.set_path_for_training_report(j)
self.write_txt_of_loss(self.training_report_path, 'train_loss', self.loss_train_n)
self.write_txt_of_loss(self.training_report_path, 'test_loss', self.loss_test_n)
self.plot_training_report()
self.write_txt_of_loss(self.path_to_models, f'costs_train_{self.n_batch_of_replica}', self.loss_train_reps)
self.write_txt_of_loss(self.path_to_models, f'costs_test_{self.n_batch_of_replica}', self.loss_test_reps)
# Check if training procedure is finished by counting the number of nn_rep files.
# If required number of files is present the output is cleaned up.
num_files = len(fnmatch.filter(os.listdir(self.path_to_models), 'nn_rep_*'))
self.required_num_files = self.n_replica * self.n_batches
if num_files == self.required_num_files:
self.cleanup_files()
[docs]
def set_minimum_intensities(self):
r"""
Set all features smaller than 1 to 1.
"""
for i in range(self.n_clusters): # There is no even amount of spectra in each cluster, so we have to loop
self.spectra[i][self.spectra[i] < 1] = 1
[docs]
def set_y_data(self):
r"""
Smooths all the spectra per cluster and takes the median per cluster.
"""
y_raw = self.spectra
self.y_smooth = np.zeros(self.n_clusters, dtype=object)
self.y_smooth_log = np.zeros(self.n_clusters, dtype=object)
self.y_smooth_median = np.zeros(self.n_clusters, dtype=object)
self.y_smooth_median_log = np.zeros(self.n_clusters, dtype=object)
for i in range(self.n_clusters):
self.y_smooth[i] = smooth_signals_per_cluster(y_raw[i])
self.y_smooth_log[i] = np.log(self.y_smooth[i])
if len(y_raw[i]) == 1:
self.y_smooth_median[i] = self.y_smooth[i][0]
self.y_smooth_median_log[i] = self.y_smooth_log[i][0]
else:
self.y_smooth_median[i] = np.nanpercentile(self.y_smooth[i], 50, axis=0)
self.y_smooth_median_log[i] = np.nanpercentile(self.y_smooth_log[i], 50, axis=0)
[docs]
def set_dydx_data(self):
r"""
Determines the slope of all spectra per cluster, smooths the slope and takes the median per cluster.
"""
dydx = np.zeros(self.n_clusters, dtype=object)
dydx_log = np.zeros(self.n_clusters, dtype=object)
self.dydx_median = np.zeros(self.n_clusters, dtype=object)
self.dydx_median_log = np.zeros(self.n_clusters, dtype=object)
self.dydx_smooth = np.zeros(self.n_clusters, dtype=object)
for i in range(self.n_clusters):
dydx[i] = (self.y_smooth[i][:, 1:] - self.y_smooth[i][:, :-1]) / self.deltaE
dydx_log[i] = (self.y_smooth_log[i][:, 1:] - self.y_smooth_log[i][:, :-1]) / self.deltaE
if len(dydx[i]) == 1:
self.dydx_median[i] = smooth_signals_per_cluster(dydx[i])[0]
self.dydx_median_log[i] = smooth_signals_per_cluster(dydx_log[i])[0]
else:
self.dydx_median[i] = np.nanpercentile(dydx[i], 50, axis=0)
self.dydx_median_log[i] = np.nanpercentile(dydx_log[i], 50, axis=0)
self.dydx_smooth[i] = smooth_signals_per_cluster(dydx[i])
[docs]
def set_sigma(self):
r"""
Determine the sigma (spread of spectra per cluster) per cluster.
"""
if self.n_clusters == 1 and len(self.spectra[0]) == 1:
print("Single spectra spotted, sigma will be determined by bootstrap parameterization")
self.sigma = None
else:
self.sigma = np.zeros((self.n_clusters, self.n_ebins))
for i in range(self.n_clusters):
if len(self.spectra[i]) == 1:
print("Warning! Only a single spectra in the cluster, sigma will be set to 0.")
self.sigma[i, :] = np.zeros(self.n_ebins)
else:
ci_low = np.nanpercentile(np.log(self.spectra[i]), 16, axis=0)
ci_high = np.nanpercentile(np.log(self.spectra[i]), 84, axis=0)
self.sigma[i, :] = np.absolute(ci_high - ci_low)
[docs]
def find_fwhm_idx(self):
r"""
Determine the FWHM indices per cluster (Full Width at Half Maximum):
- indices of the left and right side of the ZLP
- indices of the left and right side of the log of the ZLP
These are all determine by taking the local minimum and maximum of the dy/dx
"""
self.fwhm_gain_idx = np.zeros(self.n_clusters, dtype=int)
self.fwhm_loss_idx = np.zeros(self.n_clusters, dtype=int)
self.fwhm_gain_log_idx = np.zeros(self.n_clusters, dtype=int)
self.fwhm_loss_log_idx = np.zeros(self.n_clusters, dtype=int)
for i in range(self.n_clusters):
self.fwhm_gain_idx[i] = np.argmax(self.dydx_median[i]) - 1
self.fwhm_loss_idx[i] = np.argmin(self.dydx_median[i]) + 1
self.fwhm_gain_log_idx[i] = np.argmax(self.dydx_median_log[i]) - 1
self.fwhm_loss_log_idx[i] = np.argmin(self.dydx_median_log[i]) + 1
# Values of the FWHMs
self.fwhm = (self.eaxis[self.fwhm_loss_idx] - self.eaxis[self.fwhm_gain_idx])
self.fwhm_log = (self.eaxis[self.fwhm_loss_log_idx] - self.eaxis[self.fwhm_gain_log_idx])
[docs]
def find_local_min_idx(self):
r"""
Determine the first local minimum index of the signals per cluster by setting it to the point where the
derivative crosses zero.
"""
self.local_min_loss_idx = np.zeros(self.n_clusters, dtype=int)
self.local_min_gain_idx = np.zeros(self.n_clusters, dtype=int)
for i in range(self.n_clusters):
crossing_loss = (self.dydx_median[i][self.fwhm_loss_log_idx[i]:] > 0)
if not crossing_loss.any():
print("No crossing found in loss region cluster " + str(i) + ", finding minimum of absolute of dydx")
self.local_min_loss_idx[i] = np.argmin(
np.absolute(self.dydx_median[i])[self.fwhm_loss_log_idx[i]:]) + self.fwhm_loss_log_idx[i]
else:
self.local_min_loss_idx[i] = np.argwhere(
self.dydx_median[i][self.fwhm_loss_log_idx[i]:] > 0).flatten()[0] + self.fwhm_loss_log_idx[i]
crossing_gain = (self.dydx_median[i][:self.fwhm_gain_log_idx[i]] < 0)
if not crossing_gain.any():
print("No crossing found in gain region cluster " + str(i) + ", finding minimum of absolute of dydx")
self.local_min_gain_idx[i] = np.argmin(
np.absolute(self.dydx_median[i])[:self.fwhm_gain_log_idx[i]])
else:
self.local_min_gain_idx[i] = np.argwhere(
self.dydx_median[i][:self.fwhm_gain_log_idx[i]] < 0).flatten()[-1]
[docs]
def find_kneedle_idx(self):
r"""
Find the kneedle index per cluster.
The kneedle algorithm is used to find the point of highest curvature in your concave or convex data set.
"""
self.kneedle_loss_idx = np.zeros(self.n_clusters, dtype=int)
self.kneedle_gain_idx = np.zeros(self.n_clusters, dtype=int)
for i in range(self.n_clusters):
x_loss_range = self.eaxis[self.fwhm_loss_log_idx[i]:self.local_min_loss_idx[i]]
y_loss_range = self.y_smooth_median_log[i][self.fwhm_loss_log_idx[i]:self.local_min_loss_idx[i]]
kneedle_loss = KneeLocator(x=x_loss_range, y=y_loss_range, curve='convex', direction='decreasing')
self.kneedle_loss_idx[i] = np.argwhere(self.eaxis > kneedle_loss.knee).flatten()[0]
x_gain_range = self.eaxis[self.local_min_gain_idx[i]:self.fwhm_gain_log_idx[i]]
y_gain_range = self.y_smooth_median_log[i][self.local_min_gain_idx[i]:self.fwhm_gain_log_idx[i]]
kneedle_gain = KneeLocator(x=x_gain_range, y=y_gain_range, curve='convex')
self.kneedle_gain_idx[i] = np.argwhere(self.eaxis < kneedle_gain.knee).flatten()[-1]
[docs]
def calculate_hyperparameters(self):
r"""
Calculate the values of the hyperparameters in the gain and loss region, dE1 and mdE1 are calculated by taking
the location of the kneedles at each side of the ZLP and shifting them with the gives shift value.
dE2 and mdE2 are calcualted by taking the value of the eaxis where a fit of the log10 function intersects with
a single count. If this value is not found the end point of the signal is taken as location for dE2.
"""
def _find_log10_fit(x, y, idx1, idx2):
r"""
Calculates the log10 fit
Parameters
----------
x
y
idx1
idx2
Returns
-------
"""
slope = (np.log10(y[idx2]) - np.log10(y[idx1])) / (x[idx2] - x[idx1])
factor = 10 ** (np.log10(y[idx2]) - slope * x[idx2])
return log10_fit(x, slope, factor)
de1 = np.zeros(self.n_clusters)
de2 = np.zeros(self.n_clusters)
mde1 = np.zeros(self.n_clusters)
mde2 = np.zeros(self.n_clusters)
self.intersect_loss_idx = np.zeros(self.n_clusters, dtype=int)
self.intersect_gain_idx = np.zeros(self.n_clusters, dtype=int)
for i in range(self.n_clusters):
# loss
de1[i] = self.eaxis[self.kneedle_loss_idx[i]] * self.shift_de1
log10_loss = _find_log10_fit(x=self.eaxis, y=self.y_smooth_median[i],
idx1=self.kneedle_loss_idx[i], idx2=self.local_min_loss_idx[i])
intersect_loss_single_count = (log10_loss < 1)
if not intersect_loss_single_count.any():
print("Log10 fit cluster ", i
, " does not cross a single count, setting intersect index to second last eaxis index")
self.intersect_loss_idx[i] = int(len(self.eaxis) - 2)
else:
self.intersect_loss_idx[i] = np.argwhere(log10_loss < 1).flatten()[0]
de2[i] = self.eaxis[self.intersect_loss_idx[i]] * self.shift_de2
if de2[i] > self.eaxis[-2]:
print(de2[i], " is shifted beyond the eaxis, setting de2 to the second last value of the eaxis")
de2[i] = self.eaxis[-2]
# gain
mde1[i] = self.eaxis[self.kneedle_gain_idx[i]] * self.shift_de1
log10_gain = _find_log10_fit(x=self.eaxis, y=self.y_smooth_median[i],
idx1=self.local_min_gain_idx[i], idx2=self.kneedle_gain_idx[i])
intersect_gain_single_count = (log10_gain < 1)
if not intersect_gain_single_count.any():
print("Log10 fit cluster ", i
, " does not cross a single count, setting intersect index to second eaxis index")
self.intersect_gain_idx[i] = int(len(self.eaxis) - 2)
else:
self.intersect_gain_idx[i] = np.argwhere(log10_gain < 1).flatten()[-1]
mde2[i] = self.eaxis[self.intersect_gain_idx[i]] * self.shift_de2
if mde2[i] < self.eaxis[1]:
print(mde2[i], " is shifted beyond the eaxis, setting de2 to the second value of the eaxis")
mde2[i] = self.eaxis[1]
return de1, de2, mde1, mde2
[docs]
def scale_eaxis(self):
r"""
Scales the features of the energy axis between [0.1, 0.9]. This is to optimize the speed
of the neural network.
"""
scale_var_eaxis = find_scale_var(self.eaxis)
self.eaxis_scaled = scale(self.eaxis, scale_var_eaxis)
[docs]
def calc_scale_var_log_int_i(self, based_on='log_zlp'):
r"""
Calculate the scale variables of the log of the integrated intensity of the spectra for the three highest bins
of the Zero Loss Peak.
"""
# Collect all spectra from the data set into a single array
all_spectra = np.empty((0, self.n_ebins))
if self.n_clusters == 1 and len(self.spectra[0]) == 1:
for i in range(len(self.spectra)):
all_spectra = np.append(all_spectra, self.spectra[i] + self.spectra[i] / 2, axis=0)
all_spectra = np.append(all_spectra, self.spectra[i] / 2, axis=0)
else:
for i in range(len(self.spectra)):
all_spectra = np.append(all_spectra, self.spectra[i], axis=0)
if based_on == 'log_zlp':
log_int_i = np.zeros(len(all_spectra))
for i in range(len(all_spectra)):
max_idx = np.argmax(all_spectra[i])
log_int_i[i] = np.log(np.sum(all_spectra[i][max_idx - 1:max_idx + 2]))
else:
log_int_i = np.log(np.sum(all_spectra, axis=1)).flatten()
self.scale_var_log_int_i = find_scale_var(log_int_i)
del all_spectra
[docs]
def save_scale_var_log_int_i(self):
r"""
Save the scale variables of the log of the total integrated intensity of the spectra, denoted ``I``.
"""
path_scale_var = os.path.join(self.path_to_models, 'scale_var.txt')
if not os.path.exists(path_scale_var):
np.savetxt(path_scale_var, self.scale_var_log_int_i)
[docs]
def save_hyperparameters(self):
r"""
Save the hyperparameters in hyperparameters.txt. These are:
- cluster centroids, keep note if they were determined from the raw data, or if the log had been taken.
- dE1 for all clusters
- dE2 for all clusters
- FWHM for all clusters
"""
print("I'm saving hyperparameters.txt so hang on...")
print(f"clusters centroids={self.cluster_centroids} size {self.cluster_centroids.shape}")
print(f"dE1={self.de1} has size {self.de1.shape}")
print(f"dE2={self.de2} has size {self.de2.shape}")
print(f"FWHM={self.fwhm} has size {self.fwhm.shape}")
p1 = os.path.join(self.path_to_models, "hyperparameters.txt")
np.savetxt(p1, np.vstack((self.cluster_centroids, self.de1, self.de2, self.fwhm)))
print("Saved hyperparameters.txt!")
[docs]
def set_train_x_y_sigma(self):
r"""
Take the x, y and sigma data for the train set and reshape them for neural network input
"""
self.train_x = self.train_test[0]
self.train_y = self.train_test[2]
self.train_sigma = self.train_test[4]
n_train = len(self.train_x)
self.train_x = self.train_x.reshape(n_train, 2)
self.train_y = self.train_y.reshape(n_train, 1)
self.train_sigma = self.train_sigma.reshape(n_train, 1)
self.train_x = torch.from_numpy(self.train_x)
self.train_y = torch.from_numpy(self.train_y)
self.train_sigma = torch.from_numpy(self.train_sigma)
[docs]
def set_test_x_y_sigma(self):
r"""
Take the x, y and sigma data for the test set and reshape them for neural network input
"""
self.test_x = self.train_test[1]
self.test_y = self.train_test[3]
self.test_sigma = self.train_test[5]
n_test = len(self.test_x)
self.test_x = self.test_x.reshape(n_test, 2)
self.test_y = self.test_y.reshape(n_test, 1)
self.test_sigma = self.test_sigma.reshape(n_test, 1)
self.test_x = torch.from_numpy(self.test_x)
self.test_y = torch.from_numpy(self.test_y)
self.test_sigma = torch.from_numpy(self.test_sigma)
[docs]
def train_and_evaluate_model(self, i, j, lr):
r"""
Train and evaluate the model. Also saves the values of cost_train and cost_test per epoch.
Parameters
----------
i : int
j : int
lr : float
learning rate
"""
# store the test and train loss per epoch
self.loss_test_n = []
self.loss_train_n = []
optimizer = optim.Adam(self.model.parameters(), lr=lr)
for epoch in range(1, self.n_epochs + 1):
# Set model to training mode
self.model.train()
output_train = self.model(self.train_x.float())
# Because self.train_x_for_derivative is a list of tensors
# we need to explicitly loop over the entries to compute the outputs
output_for_derivative_train = []
for input_tensor in self.train_x_for_derivative:
output_for_derivative_train.append(self.model(input_tensor.float()))
loss_train = self.loss_function(output_train, output_for_derivative_train, self.train_y, self.train_sigma)
self.loss_train_n.append(loss_train.item())
# update weights
optimizer.zero_grad()
loss_train.backward()
optimizer.step()
# Set model to evaluation mode
self.model.eval()
with torch.no_grad():
output_test = self.model(self.test_x.float())
output_for_derivative_test = []
for input_tensor in self.test_x_for_derivative:
output_for_derivative_test.append(self.model(input_tensor.float()))
loss_test = self.loss_function(output_test, output_for_derivative_test, self.test_y, self.test_sigma)
self.loss_test_n.append(loss_test.item())
if epoch % self.display_step == 0 and self.print_progress:
training_loss = round(loss_train.item(), 3)
testing_loss = round(self.loss_test_n[epoch - 1], 3)
print('----------------------')
print(f'Rep {j}, Epoch {epoch}')
print(f'Training loss {training_loss}')
print(f'Testing loss {testing_loss}')
# update the test and train loss of the replica
self.loss_test_reps[i] = loss_test.item()
self.loss_train_reps[i] = loss_train.item()
[docs]
def loss_function(self, output, output_for_derivative, target, error):
r"""
The loss function to train the ZLP takes the model ``output``, the raw spectrum ``target`` and the associated
``error``. The latter corresponds to the one sigma spread within a given cluster at fixed :math:`\Delta E`.
It returns the cost function :math:`C_{\mathrm{ZLP}}^{(m)}` associated with the replica :math:`m` as
.. math:: :label: eq:lossfunction
C_{\mathrm{ZLP}}^{(m)} = \frac{1}{n_{E}} \sum_{k=1}^K \sum_{\ell_k=1}^{n_E^{(k)}} \frac{\left[I^{(i_{m,k}, j_{m,k})}(E_{\ell_k}) - I_{\rm ZLP}^{({\mathrm{NN}})(m)} \left(E_{\ell_k},\ln \left( N_{\mathrm{ tot}}^{(i_{m,k},j_{m,k})} \right) \right) \right]^2}{\sigma^2_k \left(E_{\ell_k} \right)}.
Parameters
----------
eaxis : np.ndarray
Energy-loss axis
output: torch.tensor
Neural Network output
output_for_derivative : list of torch.tensor
Each entry in the list should correspond to the neural
network output between de1 and de2 of a single spectrum
in the replica
target: torch.tensor
Raw EELS spectrum
error: torch.tensor
Uncertainty on :math:`\log I_{\mathrm{EELS}}(\Delta E)`.
Returns
-------
loss: torch.tensor
Loss associated with the model ``output``.
"""
cost = 0
for output_tensor in output_for_derivative:
derivative = torch.diff(output_tensor.flatten())
# Penalise for bins where derivative is positive
cost += torch.sum(derivative[derivative > 0])
return torch.mean(torch.square((output - target) / error)) + self.regularisation_constant * cost
[docs]
def set_path_for_training_report(self, j):
r"""
Set the save directory for the training report of the replica being trained on.
Parameters
----------
j : int
Index of the replica being trained on.
"""
self.training_report_path = os.path.join(
self.path_to_models, 'training_reports/rep_{}'.format(j))
if not os.path.exists(self.training_report_path):
os.makedirs(self.training_report_path)
[docs]
def plot_training_report(self):
r"""
Creat the training report plot: evolution of the training and validation
loss per epoch.
"""
fig, ax = plt.subplots()
ax.plot(self.loss_train_n, label="Training Loss")
ax.plot(self.loss_test_n, label="Validation Loss")
ax.set_xlabel("epochs")
ax.set_ylabel("Loss")
ax.set_yscale('log')
path = os.path.join(self.training_report_path, 'loss.pdf')
fig.savefig(path)
[docs]
def write_txt_of_loss(self, base_path, filename, loss):
r"""
Write train/test loss to a .txt file.
Parameters
----------
base_path : str
Directory to store the report in.
filename : str
Filename of .txt file to store report in.
loss : list
List containing loss value per epoch.
"""
path = os.path.join(
base_path, f'{filename}.txt')
with open(path, 'w') as text_file:
for item in loss:
text_file.write(f'{item}\n')
[docs]
def cleanup_files(self):
r"""
Cleans up the files generated by train_zlp_models_scaled.
costs_train_*, costs_test_*, and nn_rep_* files are merged into single files
costs_train, costs_test, and nn_parameters respectively.
"""
# Output file names
train_cost_path = os.path.join(
self.path_to_models, 'costs_train.txt')
test_cost_path = os.path.join(
self.path_to_models, 'costs_test.txt')
nn_replicas_path = os.path.join(
self.path_to_models, 'nn_replicas')
# Loop over all costs files, which are indexed starting from index 1.
with open(train_cost_path, 'w') as text_file_train, open(test_cost_path, 'w') as text_file_test:
for i in range(1, self.n_batches + 1):
filename_train = os.path.join(
self.path_to_models, f'costs_train_{i}.txt')
with open(filename_train) as f_train:
for line in f_train:
text_file_train.write(f'{float(line.strip())}\n')
if self.remove_temp_files:
os.remove(filename_train)
filename_test = os.path.join(
self.path_to_models, f'costs_test_{i}.txt')
with open(filename_test) as f_test:
for line in f_test:
text_file_test.write(f'{float(line.strip())}\n')
if self.remove_temp_files:
os.remove(filename_test)
# Dictionary to store models in
model_dict = dict()
model = MultilayerPerceptron(num_inputs=2, num_outputs=1)
# Loop over all nn_rep files, which are indexed starting from index 1.
for i in range(1, self.required_num_files + 1):
path_to_model = os.path.join(
self.path_to_models, f'nn_rep_{i}')
model.load_state_dict(torch.load(path_to_model))
model_dict[f'model_{i}'] = copy.deepcopy(model.state_dict())
if self.remove_temp_files:
os.remove(path_to_model)
torch.save(model_dict, nn_replicas_path)
[docs]
def save_figplot(self, fig, title="no_title.pdf"):
r"""
Display the computed values of dE1 (both methods) together with the
raw EELS spectrum.
Parameters
----------
fig : matplotlib.Figure
Figure to be saved.
title : str
Filename to store the plot in.
"""
path = os.path.join(self.path_to_models, title)
fig.savefig(path)
print(f"Plot {title} stored in {path}")
[docs]
def plot_hp_cluster_slope(self, **kwargs):
r"""
Create a plot of the hyperparameters plotted on top of the slopes of the spectra per cluster.
Parameters
----------
kwargs: dict, optional
Additional keyword arguments.
"""
fig = plot_hp(eaxis=self.eaxis, clusters_data=self.dydx_smooth, de1=self.de1, de2=self.de2, **kwargs)
return fig
[docs]
def plot_hp_cluster(self, **kwargs):
r"""
Create a plot of the hyperparameters plotted on top of the spectra per cluster.
Parameters
----------
kwargs: dict, optional
Additional keyword arguments.
"""
fig = plot_hp(eaxis=self.eaxis, clusters_data=self.y_smooth, de1=self.de1, de2=self.de2, **kwargs)
return fig
[docs]
class MultilayerPerceptron(nn.Module):
r"""
Multilayer Perceptron (MLP) class. It uses the following architecture
.. math::
[n_i, 10, 15, 5, n_f],
where :math:`n_i` and :math:`n_f` denote the number of input features and
output target values respectively.
Parameters
----------
num_inputs: int
number of input features
num_outputs: int
dimension of the target output.
"""
def __init__(self, num_inputs, num_outputs):
super().__init__()
# Initialize the modules we need to build the network
self.linear1 = nn.Linear(num_inputs, 10)
self.linear2 = nn.Linear(10, 15)
self.linear3 = nn.Linear(15, 5)
self.output = nn.Linear(5, num_outputs)
self.sigmoid = nn.Sigmoid()
self.relu = nn.ReLU()
[docs]
def forward(self, x):
"""
Propagates the input features ``x`` through the MLP.
Parameters
----------
x: torch.tensor
input features
Returns
-------
x: torch.tensor
MLP outcome
"""
# Perform the calculation of the model to determine the prediction
x = self.linear1(x)
x = self.sigmoid(x)
x = self.linear2(x)
x = self.sigmoid(x)
x = self.linear3(x)
x = self.relu(x)
x = self.output(x)
return x
# STATIC FUNCTIONS
[docs]
def scale(inp, ab):
r"""
Rescale the training data to lie between 0.1 and 0.9. Rescaling features is to help speed up the neural network
training process. The value range [0.1, 0.9] ensures the neuron activation states will typically lie close to the
linear region of the sigmoid activation function.
Parameters
----------
inp: numpy.ndarray, shape=(M,)
training data to be rescaled, e.g. :math:`\Delta E`
ab: numpy.ndarray, shape=(M,)
scaling parameters, which can be found with
:py:meth:`find_scale_var() <find_scale_var>`.
Returns
-------
Rescaled training data
"""
return inp * ab[0] + ab[1]
[docs]
def find_scale_var(inp, min_out=0.1, max_out=0.9):
r"""
Computes the scaling parameters needed to rescale the training data to lie
between ``min_out`` and ``max_out``. For our neural network the value range [0.1, 0.9] ensures the
neuron activation states will typically lie close to the linear region of the sigmoid activation function.
Parameters
----------
inp: numpy.ndarray, shape=(M,)
training data to be rescaled
min_out: float
lower limit. Set to 0.1 by default.
max_out: float
upper limit. Set to 0.9 by default
Returns
-------
a, b: list
list of rescaling parameters
"""
a = (max_out - min_out) / (inp.max() - inp.min())
b = min_out - a * inp.min()
return [a, b]
[docs]
def weight_reset(m):
r"""
Reset the weights and biases associated with the model ``m``.
Parameters
----------
m: MLP
Model of type :py:meth:`MLP <MLP>`.
"""
if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
m.reset_parameters()
[docs]
def smooth_signals_per_cluster(signals, window_length=51, window_type='hanning'):
r"""
Smooth all signals in a cluster using a window length ``window_len`` and a window type ``window``.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
Parameters
----------
signals: numpy.ndarray, shape=(M,)
The input data
window_length: int, optional
The dimension of the smoothing window; should be an odd integer. Default is 51
window_type: str, optional
the type of window from ``"flat"``, ``"hanning"``, ``"hamming"``, ``"bartlett"``,
``"blackman"`` and ``"kasier"``. ``"flat"`` window will produce a moving average smoothing.
Default is ``"hanning"``
Returns
-------
signal_smooth: numpy.ndarray, shape=(M,)
The smoothed signal
"""
# Set window length to uneven number
window_length += (window_length + 1) % 2
# extend the signal with the window length on both ends
signals_padded = np.r_['-1', signals[:, window_length - 1:0:-1], signals, signals[:, -2:-window_length - 1:-1]]
# Pick the window type
if window_type == 'flat': # moving average
window = np.ones(window_length, 'd')
else:
window = eval('np.' + window_type + '(window_length)')
# Determine the smoothed signal and throw away the padded ends
surplus_data = int((window_length - 1) * 0.5)
def window_convolve(data):
return np.convolve(data, window / window.sum(), mode='valid')
signals_smooth = np.apply_along_axis(window_convolve, axis=1, arr=signals_padded)[:, surplus_data:-surplus_data]
return signals_smooth
[docs]
def log10_fit(x, a, b, order=1):
return b * 10 ** (a * (x ** (1 / order)))
[docs]
def power_fit(x, a, r):
return a * x ** r