import math
import numpy as np
import logging
import os
import copy
import warnings
import torch
import bz2
import pickle
import _pickle as cPickle
import matplotlib.pyplot as plt
from scipy.fft import next_fast_len, fft, ifft
from scipy.optimize import curve_fit
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA, NMF
from ncempy.io import dm
from .training import MultilayerPerceptron, TrainZeroLossPeak
from ..plotting.training_perf import plot_cost_dist
from ..plotting.zlp import plot_zlp_cluster_predictions
# Where is this used?
plt.rcParams.update({'font.size': 16}) # pyplot style
# Where is this used?
_logger = logging.getLogger(__name__)
class ProcessSpectralImage:
def __init__(self, image):
r"""Load spectral image.
ARGUMENTS
---------
image (dm4): spectral image.
EXAMPLE
-------
processed_SI = ProcessSpectralImage(image=img)
processed_SI.save_spectra_in_3D_array()
processed_SI.self.center_ZLP_at_0eV()
"""
self.image = image
self.x_signal = image.eaxis
self.n_rows = image.shape[0]
self.n_cols = image.shape[1]
def save_3D_arrays_in_4D_array(self):
A = np.zeros(shape=(self.n_rows, self.n_cols, self.n_aisles, 3))
A[:, :, :, 0] = self.arr3D_deltaE
A[:, :, :, 1] = self.arr3D_deltaE_shifted
A[:, :, :, 2] = self.arr3D_signal
return A
def save_spectra_in_3D_array(self):
self.n_aisles = self.x_signal.shape[0]
# 3D array: (row, column, aisle)
self.arr3D_zeroes = np.zeros(shape=(self.n_rows, self.n_cols, self.n_aisles))
self.arr3D_deltaE = np.copy(self.arr3D_zeroes)
self.arr3D_signal = np.copy(self.arr3D_zeroes)
for i in range(self.n_rows):
for j in range(self.n_cols):
# save the deltaE in aisle of 3D array
self.arr3D_deltaE[i, j, :] = self.x_signal
# save the signal in aisle of 3D array
self.arr3D_signal[i, j, :] = self.image.get_pixel_signal(i=i, j=j)
def center_ZLP_at_0eV(self):
r"""Take maximum of ZLP, determine its median and shift ZLP to 0 eV."""
# get index of max value for each aisle
idx = self.arr3D_signal.argmax(axis=2)
a1, a2 = np.indices(idx.shape)
# get the shifted energy per aisle, i.e. per (row,col)
self.arr2D_shifted_E = self.arr3D_deltaE[a1, a2, idx]
# create 3D array of shifted energy
arr3D_shifted_E = np.copy(self.arr3D_zeroes)
for k in range(self.n_aisles):
# fill each layer with the same 2D array containing shifted energy
arr3D_shifted_E[:, :, k] = self.arr2D_shifted_E
# subtract each aisle by the corresponding shifted energy
self.arr3D_deltaE_shifted = self.arr3D_deltaE - arr3D_shifted_E
def check_if_ZLP_centered_at_0eV_for(self, row=0):
x0 = self.arr3D_deltaE
xS = self.arr3D_deltaE_shifted
y0 = self.arr3D_signal
# plot
fig, ax = plt.subplots()
ax.set_xlim(-4, 4)
row = 20
for i in range(self.n_cols):
# plot each spectrum
ax.plot(x0[row, i, :], y0[row, i, :], 'r')
# plot each spectrum (shifted)
ax.plot(xS[row, i, :], y0[row, i, :], 'b')
return fig
def plot_spectral_image(self, arr, cmap='turbo_r', cbar_title=''):
fig, ax = plt.subplots()
p = ax.imshow(arr, cmap=cmap)
cbar = fig.colorbar(p, ax=ax)
cbar.ax.set_title(cbar_title) # intensity lower for thicker sample
return fig, ax
[docs]
class SpectralImage:
# signal names
DIELECTRIC_FUNCTION_NAMES = ['dielectric_function', 'dielectricfunction', 'dielec_func',
'die_fun', 'df', 'epsilon']
EELS_NAMES = ['electron_energy_loss_spectrum', 'electron_energy_loss',
'EELS', 'EEL', 'energy_loss', 'data']
IEELS_NAMES = ['inelastic_scattering_energy_loss_spectrum', 'inelastic_scattering_energy_loss',
'inelastic_scattering', 'IEELS', 'IES', 'signal_scattering_distribution', 'ssd']
ZLP_NAMES = ['zeros_loss_peak', 'zero_loss', 'ZLP', 'ZLPs', 'zlp', 'zlps']
THICKNESS_NAMES = ['t', 'thickness', 'thick', 'thin']
POOLED_ADDITION = ['pooled', 'pool', '_pooled', '_pool']
PCA_ADDITION = ['pca', 'PCA', '_pca', '_PCA']
NMF_ADDITION = ['nmf', 'NMF', '_nmf', '_NMF']
# meta data names
COLLECTION_ANGLE_NAMES = ["collection_angle", "col_angle", "beta"]
CONVERGENCE_ANGLE_NAMES = ["convergence_angle", "con_angle", "alpha"]
BEAM_ENERGY_NAMES = ["beam_energy", "beam_E", "E_beam", "E0", "E_0", "e0", "e_0"]
# Global physical constants
m_e = 5.1106E5 # [eV], Electron rest mass
e_c = 1.602176487E-19 # [C], Electron charge
a0 = 5.29E-11 # [m], Bohr radius
h_bar = 6.582119569E-16 # [eV*s], Planck's constant
c = 2.99792458E8 # [m/s], Speed of light
def __init__(self, data, deltaE=None, pixel_size=None, beam_energy=None,
collection_angle=None, convergence_angle=None, name=None, **kwargs):
r"""
The spectral image class that provides several tools to analyse spectral images with the zero-loss peak
subtracted.
Parameters
----------
data: numpy.ndarray, shape=(M,N,L)
Array containing the 3-D spectral image. The axes correspond to the x-axis (M), y-axis (N) and energy-loss (L).
deltaE: float, optional
bin width in energy loss spectrum in eV
pixel_size: numpy.ndarray, shape=(2,), optional
width of pixels in nm
beam_energy: float, optional
Energy of electron beam in KeV
collection_angle: float, optional
Collection angle of STEM in mrad
convergence_angle: float, optional
Convergence angle of STEM in mrad
name: str, optional
Name of the SI data file
Examples
--------
An example how to train and analyse a spectral image::
path_to_image = 'path to dm4 file'
im = SpectralImage.load_data(path_to_image)
im.train_zlp_models(n_clusters=8,
n_rep=100,
n_epochs=150000,
bs_rep_num=1,
path_to_models=path_to_models)
"""
# Image properties
self.data = data
self.data_subtracted = None
self.deltaE = deltaE # eV, energy bin width
self.set_eaxis() # Sets the shifted energy axis (E = 0 corresponds to peak of ZLP)
self.pixel_size = pixel_size # nm, real world size of the pixel
self.calc_axes() # Calculates the x and y-axis based on the pixel size.
self.beam_energy = beam_energy # KeV, incoming beam energy
self.collection_angle = collection_angle # mrad, collection semi-angle
self.convergence_angle = convergence_angle # mrad, convergence semi-angle
self.name = name # SI name
# ZLP models properties
self.cluster_centroids = None # Center values of the clusters
self.cluster_labels = None # 2d array where each pixel location has a corresponding cluster label
self.cluster_signals = None # All signals collected per cluster
self.zlp_models = None
self.scale_var_eaxis = find_scale_var(self.eaxis) # scale variables for the energy axis
self.scale_var_log_int_i = None # scale variables for the log of the integrated intensities
self.dE1 = None # dE1 hyperparameter for each cluster
self.dE2 = None # dE2 hyperparameter for each cluster
self.FWHM = None # Full Width at Half Maximum for each cluster
# Additional image properties for calculations and data modification
self.n = None # Refractive index, set per cluster
self.rho = None # Mass density, set per cluster
self.pooled = None # SI data is enhanced by pooling center pixel with surrounding pixels
self.pca = None # SI data is enhanced by performing PCA on the data
self.nmf = None # SI data is enhanced by performing NMF on the data
# Other
self.output_path = os.getcwd()
# PROPERTIES
@property
def shape(self):
r"""
Returns 3D-shape of
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` object
"""
return self.data.shape
@property
def n_spectra(self):
r"""
Returns the number of spectra present in
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` object.
"""
return self.shape[0]*self.shape[1]
@property
def n_clusters(self):
r"""
Returns the number of clusters in the
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` object.
"""
return len(self.cluster_centroids)
# METHODS ON SAVING AND LOADING DATA
[docs]
@classmethod
def load_data(cls, path_to_dmfile, load_survey_data=False):
r"""
Load the .dm4 (or .dm3) data and return a
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` instance.
Parameters
----------
path_to_dmfile: str
location of .dm4 file
load_survey_data: bool, optional
If there is HAADF survey data in your file, you can choose to also load that in. Default is `False`.
Returns
-------
SpectralImage
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>`
instance of the dm4 file
"""
dmfile = dm.fileDM(path_to_dmfile)
dmobjects = dmfile.numObjects
eels_dict = dmfile.getDataset(dmobjects - 2)
eels_metadict = dmfile.getMetadata(dmobjects - 2)
if eels_dict['data'].ndim == 3:
eels_data = np.swapaxes(np.swapaxes(eels_dict['data'], 0, 1), 1, 2)
elif eels_dict['data'].ndim == 2:
eels_data = np.swapaxes(np.swapaxes(eels_dict['data'], 0, 1), 1, 2)
elif eels_dict['data'].ndim == 1:
# A single signal is constructed to be a 1 by 1 pixel spectral image.
eels_data = np.zeros(shape=(1, 1, len(eels_dict['data'])))
eels_data[0, 0, :] = eels_dict['data']
else:
print("Dimensions of the data set could not be determined, please check your dataset")
# eels data
if eels_dict['pixelSize']:
try:
deltaE = eels_dict['pixelSize'][0]
except (IndexError, ValueError):
print("deltaE is not found, assuming to be in 1. Otherwise adjust manually")
deltaE = 1
try:
pixel_size = np.array(eels_dict['pixelSize'][1:])
if len(pixel_size) == 0:
pixel_size = np.array([1., 1.])
except (IndexError, ValueError):
print("pixel_size is not found, assuming to be 1. Otherwise adjust manually")
pixel_size = np.array([1, 1])
else:
print("The dm4 metadata does not contain 'pixelSize'. deltaE and pixel_size needs to be manually set.")
if eels_dict['pixelUnit']:
try:
energy_unit = eels_dict['pixelUnit'][0]
except IndexError:
print("energy_unit is not found, assuming to be in eV. Otherwise adjust manually")
deltaE = 'eV'
try:
pixel_unit = eels_dict['pixelUnit'][1]
except IndexError:
print("pixel_unit is not found, assuming to be in nanometer. Otherwise adjust manually")
pixel_unit = 'nm'
else:
print("The dm4 metadata does not contain 'pixelUnit'. energy_unit and pixel_unit needs to be manually set")
if (energy_unit is not None) and (deltaE is not None):
deltaE *= cls.get_prefix(energy_unit, 'eV')
if (pixel_size is not None) and (pixel_unit is not None):
pixel_size *= cls.get_prefix(pixel_unit, 'm') * 1E9
# eels metadata
try:
beam_energy = eels_metadict['Microscope Info Voltage'] / 1E3 # convert the beam energy to KeV
except KeyError:
beam_energy = None
print("The dm4 metadata does not contain 'Microscope Info Voltage'. beam_energy needs to be manually set.")
try:
collection_angle = eels_metadict['EELS Experimental Conditions Collection semi-angle (mrad)']
except KeyError:
collection_angle = None
print("The dm4 metadata does not contain 'EELS Experimental Conditions Collection semi-angle (mrad)'. "
"collection_angle needs to be manually set.")
try:
convergence_angle = eels_metadict['EELS Experimental Conditions Convergence semi-angle (mrad)']
except KeyError:
convergence_angle = None
print("The dm4 metadata does not contain 'EELS Experimental Conditions Convergence semi-angle (mrad)'. "
"convergence_angle needs to be manually set.")
image = cls(data=eels_data, deltaE=deltaE, pixel_size=pixel_size, beam_energy=beam_energy,
collection_angle=collection_angle, convergence_angle=convergence_angle,
name=path_to_dmfile[:-4])
# load in survey data if available
if dmobjects > 2 and load_survey_data is True:
image.survey_data = dmfile.getDataset(0)
image.survey_metadata = dmfile.getMetadata(0)
return image
[docs]
@classmethod
def load_spectral_image(cls, path_to_pickle):
r"""
Loads :py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` instance
from a pickled file.
Parameters
----------
path_to_pickle : str
path to the pickled image file.
Raises
------
ValueError
If path_to_pickle does not end on the desired format .pkl.
FileNotFoundError
If path_to_pickle does not exist.
Returns
-------
SpectralImage
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` object
(i.e. including all attributes) loaded from pickle file.
"""
if path_to_pickle[-4:] != '.pkl':
raise ValueError(
"please provide a path to a pickle file containing a Spectral_image class object.")
if not os.path.exists(path_to_pickle):
raise FileNotFoundError(
"pickled file: " + path_to_pickle + " not found")
with open(path_to_pickle, 'rb') as pickle_im:
image = pickle.load(pickle_im)
return image
[docs]
@classmethod
def load_compressed_Spectral_image(cls, path_to_compressed_pickle):
r"""
Loads spectral image from a compressed pickled file. This will take longer than loading from non-compressed
pickle.
Parameters
----------
path_to_compressed_pickle : str
path to the compressed pickle image file.
Raises
------
ValueError
If path_to_compressed_pickle does not end on the desired format .pbz2.
FileNotFoundError
If path_to_compressed_pickle does not exist.
Returns
-------
image : SpectralImage
:py:meth:`spectral_image.SpectralImage <EELSFitter.core.spectral_image.SpectralImage>` instance
loaded from the compressed pickle file.
"""
if path_to_compressed_pickle[-5:] != '.pbz2':
raise ValueError(
"please provide a path to a compressed .pbz2 pickle file containing a Spectrall_image class object.")
if not os.path.exists(path_to_compressed_pickle):
raise FileNotFoundError(
'pickled file: ' + path_to_compressed_pickle + ' not found')
image = cls.decompress_pickle(path_to_compressed_pickle)
return image
[docs]
def save_image(self, filename):
r"""
Function to save image, including all attributes, in pickle (.pkl) format. Image will be saved \
at indicated location and name in filename input.
Parameters
----------
filename : str
path to save location plus filename. If it does not end on ".pkl", ".pkl" will be added.
"""
if filename[-4:] != '.pkl':
filename = filename + '.pkl'
with open(filename, 'wb') as output: # Overwrites any existing file.
pickle.dump(self, output, pickle.HIGHEST_PROTOCOL)
[docs]
def save_compressed_image(self, filename):
r"""
Function to save image, including all attributes, in compressed pickle (.pbz2) format. Image will \
be saved at location ``filename``.
Advantage over :py:meth:`save_image() <EELSFitter.core.spectral_image.SpectralImage.save_image>` is that \
the saved file has a reduced file size, disadvantage is that saving and reloading the image \
takes significantly longer.
Parameters
----------
filename : str
path to save location plus filename. If it does not end on ".pbz2", ".pbz2" will be added.
"""
if filename[-5:] != '.pbz2':
filename = filename + '.pbz2'
self.compressed_pickle(filename, self)
[docs]
@staticmethod
def compressed_pickle(title, data):
r"""
Saves ``data`` at location ``title`` as compressed pickle.
"""
with bz2.BZ2File(title, 'w') as f:
cPickle.dump(data, f)
[docs]
@staticmethod
def decompress_pickle(file):
r"""
Opens, decompresses and returns the pickle file at location ``file``.
Parameters
----------
file: str
location where the pickle file is stored
Returns
-------
data: SpectralImage
"""
data = bz2.BZ2File(file, 'rb')
data = cPickle.load(data)
return data
[docs]
@staticmethod
def get_prefix(unit, SIunit=None, numeric=True):
r"""
Method to convert units to their associated SI values.
Parameters
----------
unit: str,
unit of which the prefix is requested
SIunit: str, optional
The SI unit of the unit
numeric: bool, optional
Default is `True`. If `True` the prefix is translated to the
numeric value (e.g. :math:`10^3` for `k`)
Returns
------
prefix: str or int
The character of the prefix or the numeric value of the prefix
"""
if SIunit is not None:
lenSI = len(SIunit)
if unit[-lenSI:] == SIunit:
prefix = unit[:-lenSI]
if len(prefix) == 0:
if numeric:
return 1
else:
return prefix
else:
print("provided unit not same as target unit: " +
unit + ", and " + SIunit)
if numeric:
return 1
else:
prefix = None
return prefix
else:
prefix = unit[0]
if not numeric:
return prefix
if prefix == 'p':
return 1E-12
if prefix == 'n':
return 1E-9
if prefix in ['u', 'micron', 'µ']:
return 1E-6
if prefix == 'm':
return 1E-3
if prefix == 'K':
return 1E3
if prefix == 'M':
return 1E6
if prefix == 'G':
return 1E9
if prefix == 'T':
return 1E12
else:
print("either no or unknown prefix in unit: " + unit +
", found prefix " + prefix + ", asuming no.")
return 1
# METHODS ON IMAGE
[docs]
def set_eaxis(self):
r"""
Determines the energy losses of the spectral image, based on the bin
width of the energy loss. It shifts the ``self.eaxis`` attribute such
that the zero point corresponds with the point of the highest intensity.
It also set the extrapolated eaxis for calculations that require extrapolation.
Returns
-------
eaxis: numpy.ndarray, shape=(M,)
Array of :math:`\Delta E` values
"""
data_avg = np.average(self.data, axis=(0, 1))
ind_max = np.argmax(data_avg)
self.eaxis = np.linspace(-ind_max * self.deltaE, (self.shape[2] - ind_max - 1) * self.deltaE, self.shape[2])
if self.eaxis[-1] <= 200:
esize = math.ceil((-1 * self.eaxis[0] + 200) / self.deltaE)
esize = next_fast_len(esize)
else:
esize = next_fast_len(len(self.eaxis))
self.eaxis_extrp = np.linspace(self.eaxis[0], esize * self.deltaE + self.eaxis[0], esize)
[docs]
def calc_axes(self):
r"""
Determines the x_axis and y_axis of the spectral image. Stores them in
``self.x_axis`` and ``self.y_axis`` respectively.
"""
self.y_axis = np.linspace(0, self.shape[0], self.shape[0] + 1)
self.x_axis = np.linspace(0, self.shape[1], self.shape[1] + 1)
if self.pixel_size is not None:
self.y_axis *= self.pixel_size[0]
self.x_axis *= self.pixel_size[1]
[docs]
def get_pixel_signal(self, i, j, signal_type='EELS', **kwargs):
r"""
Retrieves the spectrum at pixel (``i``, ``j``).
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
signal_type: str, optional
The type of signal that is requested, should comply with the defined
names. Set to `EELS` by default.
Returns
-------
signal : numpy.ndarray, shape=(M,)
Array with the requested signal from the requested pixel
"""
if signal_type in self.EELS_NAMES:
if self.data_subtracted is not None and isinstance(self.data_subtracted[i, j], np.ndarray):
return np.copy(self.data_subtracted[i, j])
else:
return np.copy(self.data[i, j, :])
elif signal_type in self.POOLED_ADDITION:
if self.pooled is None:
return self.pool_pixel(i, j, **kwargs)
else:
return np.copy(self.pooled[i, j, :])
elif signal_type in self.PCA_ADDITION:
if self.pca is None:
return self.pca_pixel(i, j, **kwargs)
else:
return np.copy(self.pca[i, j, :])
elif signal_type in self.NMF_ADDITION:
if self.nmf is None:
return self.nmf_pixel(i, j, **kwargs)
else:
return np.copy(self.nmf[i, j, :])
else:
print("no such signal", signal_type, ", returned general EELS signal.")
return np.copy(self.data[i, j, :])
[docs]
def get_pixel_signal_subtracted(self, i, j, signal_type='EELS', **kwargs):
r"""
Retrieves the spectrum at pixel (``i``, ``j``) after subtraction of the ZLP.
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
signal_type: str, optional
The type of signal that is requested, should comply with the defined
names. Set to `EELS` by default.
Returns
-------
signal_subtracted : numpy.ndarray, shape=(M,)
Array with the requested subtracted signal from the requested pixel
"""
zlps = self.get_pixel_matched_zlp_models(i, j, signal_type=signal_type, **kwargs)
signal_subtracted = self.get_pixel_signal(i, j, signal_type=signal_type, **kwargs) - zlps
return signal_subtracted
[docs]
def get_pixel_signal_subtracted_or_not(self, i, j, signal_type='EELS', zlp=False, **kwargs):
r"""
Function to retrieve non-subtracted data or data after subtraction of the ZLP.
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
signal_type: str, optional
The type of signal that is requested, should comply with the defined
names. Set to `EELS` by default
zlp : bool
If True, return the subtracted spectrum. Else, return the non-subtracted spectrum
kwargs: dict, optional
Additional keyword arguments, which are passed to self.get_pixel_signal
"""
def _summary_distribution(data, mean=50, lower=16, upper=84):
median = np.nanpercentile(data, mean, axis=0)
low = np.nanpercentile(data, lower, axis=0)
high = np.nanpercentile(data, upper, axis=0)
return [median, low, high]
if zlp:
if self.data_subtracted is None:
self.data_subtracted = np.zeros(self.shape[:2], dtype=object)
if not isinstance(self.data_subtracted[i, j], np.ndarray):
self.data_subtracted[i, j] = _summary_distribution(
self.get_pixel_signal_subtracted(i, j, signal_type=signal_type, **kwargs))[0]
return self.get_pixel_signal(i, j, signal_type=signal_type, **kwargs)
[docs]
def get_image_signals(self, signal_type='EELS', **kwargs):
r"""
Get all the signals of the image.
Parameters
----------
signal_type: str, optional
Description of signal, ``'EELS'`` by default.
kwargs
Returns
-------
image_signals : numpy.ndarray, shape=(M,N)
"""
if signal_type in self.EELS_NAMES:
return np.copy(self.data)
elif signal_type in self.POOLED_ADDITION:
if self.pooled is None:
self.pool_image(**kwargs)
return np.copy(self.pooled)
else:
return np.copy(self.pooled)
elif signal_type in self.PCA_ADDITION:
if self.pca is None:
self.pca_image(**kwargs)
return np.copy(self.pca)
else:
return np.copy(self.pca)
elif signal_type in self.NMF_ADDITION:
if self.nmf is None:
self.nmf_image(**kwargs)
return np.copy(self.nmf)
else:
return np.copy(self.nmf)
else:
print("no such signal", signal_type, ", returned general EELS data.")
return np.copy(self.data)
[docs]
def get_cluster_signals(self, conf_interval=1, signal_type='EELS'):
r"""
Get the signals ordered per cluster. Cluster signals are stored in attribute ``self.cluster_signals``.
Note that the pixel location information is lost.
Parameters
----------
conf_interval : float, optional
The ratio of spectra returned. The spectra are selected based on the
based_on value. The default is 1.
signal_type: str, optional
Description of signal, ``'EELS'`` by default.
Returns
-------
cluster_signals : numpy.ndarray, shape=(M,)
An array with size equal to the number of clusters. Each entry is a
2D array that contains all the spectra within that cluster.
"""
integrated_int = np.sum(self.data, axis=2)
cluster_signals = np.zeros(self.n_clusters, dtype=object)
j = 0
for i in range(self.n_clusters):
cluster_signal = self.get_image_signals(signal_type)[self.cluster_labels == i]
if conf_interval < 1:
intensities_cluster = integrated_int[self.cluster_labels == i]
arg_sort_int = np.argsort(intensities_cluster)
ci_lim = round((1 - conf_interval) / 2 *
intensities_cluster.size)
cluster_signal = cluster_signal[arg_sort_int][ci_lim:-ci_lim]
cluster_signals[j] = cluster_signal
j += 1
self.cluster_signals = cluster_signals
return cluster_signals
[docs]
def cut_image_energy(self, e1=None, e2=None, include_edge=True):
r"""
Cuts the spectral image at ``E1`` and ``E2`` and keeps only the part in between.
Parameters
----------
e1 : float, optional
lower cut. The default is ``None``, which means no cut is applied.
e2 : float, optional
upper cut. The default is ``None``, which means no cut is applied.
include_edge : Boolean, optional
If True, the edge values given in ``E1`` and ``E2`` are included in the cut result. Default is True
"""
if (e1 is None) and (e2 is None):
raise ValueError(
"To cut energy spectra, please specify minimum energy E1 and/or\
maximum energy E2.")
if e1 is None:
e1 = self.eaxis.min() - 1
if e2 is None:
e2 = self.eaxis.max() + 1
if include_edge is True:
select = ((self.eaxis >= e1) & (self.eaxis <= e2))
else:
select = ((self.eaxis > e1) & (self.eaxis < e2))
self.data = self.data[:, :, select]
self.eaxis = self.eaxis[select]
[docs]
def cut_image_pixels(self, range_width, range_height):
r"""
Cuts the spectral image
Parameters
----------
range_width: numpy.ndarray, shape=(2,)
Contains the horizontal selection cut
range_height: numpy.ndarray, shape=(2,)
Contains the vertical selection cut
"""
self.data = self.data[range_height[0]:range_height[1], range_width[0]:range_width[1]]
self.y_axis = self.y_axis[range_height[0]:range_height[1]]
self.x_axis = self.x_axis[range_width[0]:range_width[1]]
[docs]
def pool_image(self, area=9, **kwargs):
r"""
Pools spectral image using a squared window of size ``area`` around each pixel
Parameters
----------
area: int
Pooling parameter: area around the pixel, must be an odd number
kwargs: dict, optional
Additional keyword arguments.
Returns
-------
"""
if area % 2 == 0:
print("Unable to pool with even number " + str(area) + ", continuing with n_p=" + str(area + 1))
area += 1
if area > self.shape[0] or area > self.shape[1]:
raise ValueError("Your pooling area is too large for one or both of the image axes, "
"please choose a number smaller than these values")
print("Pooling of data started")
pooled_im = np.zeros(self.shape)
for i in range(self.shape[0]):
for j in range(self.shape[1]):
pooled_im[i, j] = self.pool_pixel(i=i, j=j, area=area, **kwargs)
self.pooled = pooled_im
print("Pooling of data complete")
[docs]
def pool_pixel(self, i, j, area=9, gaussian=True, **kwargs):
r"""
Pools the data of a squared window of size ``area`` around pixel (``i``, ``j``).
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
area: int
Pooling parameter: area around the pixel used for pooling, must be an odd number
gaussian: boolean
If true the pooling weights will use a gaussian distribution
kwargs: dict, optional
Additional keyword arguments.
Returns
-------
output: numpy.ndarray, shape=(M,)
Pooled spectrum of the pixel
"""
if area % 2 == 0:
print("Unable to pool with even number " + str(area) + ", continuing with n_p=" + str(area + 1))
area += 1
if area > self.shape[0] or area > self.shape[1]:
raise ValueError("Your pooling area is too large for one or both of the image axes, "
"please choose a number smaller than these values")
if gaussian is True:
# Set up the pooling weights, basically a 1d gaussian to be used twice
x = np.linspace(-1 * math.floor(area / 2), math.floor(area / 2), area)
z = gauss1d(x, **kwargs)
weights = z / np.max(z)
else:
x = np.ones(area)
weights = np.ones(area)
n_p_border = int(math.floor(area / 2))
y_min = i - n_p_border
y_max = i + n_p_border + 1
x_min = j - n_p_border
x_max = j + n_p_border + 1
# Check if the center pixel is on the border
if x_min < 0:
z = gauss1d(x, mx=x_min, **kwargs)
weights_x = z / np.max(z)
x_max += np.abs(x_min)
x_min = 0
elif x_max > self.shape[1]:
z = gauss1d(x, mx=(x_max - self.shape[1]), **kwargs)
weights_x = z / np.max(z)
x_min -= (x_max - self.shape[1])
x_max = self.shape[1]
else:
weights_x = weights
if y_min < 0:
z = gauss1d(x, mx=y_min, **kwargs)
weights_y = z / np.max(z)
y_max += np.abs(y_min)
y_min = 0
elif y_max > self.shape[0]:
z = gauss1d(x, mx=(y_max - self.shape[0]), **kwargs)
weights_y = z / np.max(z)
y_min -= (y_max - self.shape[0])
y_max = self.shape[0]
else:
weights_y = weights
if self.data_subtracted is not None:
# If not all subtracted spectra have been calculated, this output is incorrect
# because some entries in self.data_subtracted are 0
pooled_pixel = np.average(np.average(self.data_subtracted[y_min:y_max, x_min:x_max], axis=1, weights=weights_x),
axis=0, weights=weights_y)
else:
pooled_pixel = np.average(np.average(self.data[y_min:y_max, x_min:x_max, :], axis=1, weights=weights_x),
axis=0, weights=weights_y)
return pooled_pixel
[docs]
def pca_image(self, area_type='segment', n_components=0.9, segments_x=4, segments_y=4, **kwargs):
r"""
Use principal component analysis on the spectral image.
Parameters
----------
area_type: str
type of area used for principal component analysis. Usage types as follows:
- ``'segment'``, the image is segmented and pca is only done per segmented areas.
- ``'cluster'``, the data per cluster is used for pca within that cluster.
- ``'pixel'``, only the data used around a pixel is used for pca of that pixel.
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value. Default is 0.9.
segments_x: int
For ``'segment'`` option, number of segments the x-axis is divided upon. Default is 4.
segments_y: int
For ``'segment'`` option, number of segments the y-axis is divided upon. Default is 4.
kwargs: dict, optional
Additional keyword arguments.
Returns
-------
"""
if area_type == 'segment':
print("PCA of segmented areas started")
segsize_x = self.shape[1] / segments_x
segsize_y = self.shape[0] / segments_y
data_pca = np.zeros(self.shape)
for segment_y in np.arange(0, segments_y):
for segment_x in np.arange(0, segments_x):
x_min = round(segsize_x * segment_x)
x_max = round(segsize_x * (segment_x + 1))
y_min = round(segsize_y * segment_y)
y_max = round(segsize_y * (segment_y + 1))
if self.data_subtracted is not None:
# If not all subtracted spectra have been calculated, this output is incorrect
# because some entries in self.data_subtracted are 0
seg_data = self.data_subtracted[y_min:y_max, x_min:x_max]
else:
seg_data = self.data[y_min:y_max, x_min:x_max, :]
seg_data_shape = seg_data.shape
seg_data[seg_data < 0] = 0
seg_data_flat = seg_data.reshape(np.prod(seg_data_shape[:2]), seg_data_shape[2])
seg_data_pca = PCA(n_components=n_components, svd_solver='full')
seg_data_trans = seg_data_pca.fit_transform(seg_data_flat)
# seg_data_maps_pca = seg_data_trans.reshape(seg_data_shape[:2] + (seg_data_pca.n_components_,))
data_pca[y_min:y_max, x_min:x_max, :] = seg_data_pca.inverse_transform(seg_data_trans).reshape(
seg_data_shape)
self.pca = data_pca
elif area_type == 'cluster':
print("PCA of data per cluster started")
pca_im = np.zeros(self.shape)
for cluster_idx in np.arange(0, len(self.cluster_centroids)):
pca_im[self.cluster_labels == cluster_idx] = self.pca_cluster(cluster=cluster_idx,
n_components=n_components)
self.pca = pca_im
elif area_type == 'pixel':
print("PCA of data per pixel started")
pca_im = np.zeros(self.shape)
for i in range(self.shape[0]):
for j in range(self.shape[1]):
pca_im[i, j] = self.pca_pixel(i=i, j=j, n_components=n_components, **kwargs)
self.pca = pca_im
else:
print("please pick a valid area type")
print("PCA of data complete")
[docs]
def pca_pixel(self, i, j, area=9, n_components=0.9):
r"""
Use principal component analysis on the spectral image, using the data of a squared window of size ``n_p``
around pixel (``i``, ``j``).
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
area: int
PCA area parameter. Area around the pixel used for principal component analysis, must be an odd number
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value. Default is 0.9.
deconv
zlp_num
Returns
-------
output: numpy.ndarray, shape=(M,)
PCA spectrum of the pixel
"""
if area % 2 == 0:
print("Unable to PCA with even number " + str(area) + ", continuing with n_area=" + str(area + 1))
area += 1
if area > self.shape[0] or area > self.shape[1]:
raise ValueError("Your pixel area is too large for one or both of the image axes, "
"please choose a number smaller than these values")
# Check pixel segment for borders
n_area_border = int(math.floor(area / 2))
x_min = j - n_area_border
x_max = j + n_area_border + 1
y_min = i - n_area_border
y_max = i + n_area_border + 1
if x_min < 0:
x_max += np.abs(x_min)
x_min = 0
if x_max > self.shape[1]:
x_min -= (x_max - self.shape[1])
x_max = self.shape[1]
if y_min < 0:
y_max += np.abs(y_min)
y_min = 0
if y_max > self.shape[0]:
y_min -= (y_max - self.shape[0])
y_max = self.shape[0]
if self.data_subtracted is not None:
# If not all subtracted spectra have been calculated, this output is incorrect
# because some entries in self.data_subtracted are 0
seg_data = self.data_subtracted[y_min:y_max, x_min:x_max]
else:
seg_data = self.data[y_min:y_max, x_min:x_max, :]
seg_data_shape = seg_data.shape
seg_data[seg_data < 0] = 0
seg_data_flat = seg_data.reshape(np.prod(seg_data_shape[:2]), seg_data_shape[2])
seg_data_pca = PCA(n_components=n_components, svd_solver='full')
seg_data_trans = seg_data_pca.fit_transform(seg_data_flat)
seg_data_projected = seg_data_pca.inverse_transform(seg_data_trans).reshape(seg_data_shape)
pca_pixel = seg_data_projected[int(i - y_min), int(j - x_min)]
return pca_pixel
[docs]
def pca_cluster(self, cluster, n_components=0.9):
r"""
Use principal component analysis on a cluster of the spectral image. The signals of the cluster are already
in reduced format (pixel location is lost).
Parameters
----------
cluster : numpy.ndarray, shape=(M,)
An array with size equal to the number of clusters. Each entry is a 2D array that contains all the spectra
within that cluster.
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value. Default is 0.9.
Returns
-------
"""
cluster_data = self.get_cluster_signals()[cluster]
cluster_pca = PCA(n_components=n_components, svd_solver='full')
cluster_data_trans = cluster_pca.fit_transform(cluster_data)
cluster_data_pca = cluster_pca.inverse_transform(cluster_data_trans)
return cluster_data_pca
[docs]
def nmf_image(self, area_type='segment', n_components=0.9, max_iter=100000, segments_x=4, segments_y=4, **kwargs):
r"""
Use non-negative matrix factorization on the spectral image.
Parameters
----------
area_type: str
type of area used for principal component analysis. Usage types as follows:
- ``'segment'``, the image is segmented and nmf is only done per segmented areas.
- ``'cluster'``, the data per cluster is used for nmf within that cluster.
- ``'pixel'``, only the data used around a pixel is used for nmf of that pixel.
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value by first running PCA. Default is 0.9.
max_iter: int,
Default is 100000
segments_x: int
For ``'segment'`` option, number of segments the x-axis is divided upon. Default is 4.
segments_y: int
For ``'segment'`` option, number of segments the y-axis is divided upon. Default is 4.
kwargs: dict, optional
Additional keyword arguments.
Returns
-------
"""
if area_type == 'segment':
print("NMF of segmented areas started")
segsize_x = self.shape[1] / segments_x
segsize_y = self.shape[0] / segments_y
data_nmf = np.zeros(self.shape)
for segment_y in np.arange(0, segments_y):
for segment_x in np.arange(0, segments_x):
x_min = round(segsize_x * segment_x)
x_max = round(segsize_x * (segment_x + 1))
y_min = round(segsize_y * segment_y)
y_max = round(segsize_y * (segment_y + 1))
if self.data_subtracted is not None:
# If not all subtracted spectra have been calculated, this output is incorrect
# because some entries in self.data_subtracted are 0
seg_data = self.data_subtracted[y_min:y_max, x_min:x_max]
else:
seg_data = self.data[y_min:y_max, x_min:x_max, :]
seg_data_shape = seg_data.shape
seg_data[seg_data < 0] = 0
seg_data_flat = seg_data.reshape(np.prod(seg_data_shape[:2]), seg_data_shape[2])
if n_components < 1:
seg_data_pca = PCA(n_components=n_components, svd_solver='full').fit(seg_data_flat)
n_components = seg_data_pca.n_components_
if n_components < 2:
n_components = 2
seg_data_nmf = NMF(n_components=n_components, max_iter=max_iter)
seg_data_trans = seg_data_nmf.fit_transform(seg_data_flat)
# seg_data_maps_nmf = seg_data_trans_nmf.reshape(seg_data_shape[:2] + (seg_data_nmf.n_components_,))
data_nmf[y_min:y_max, x_min:x_max, :] = seg_data_nmf.inverse_transform(seg_data_trans).reshape(
seg_data_shape)
self.nmf = data_nmf
elif area_type == 'cluster':
print("NMF of data per cluster started")
data_nmf = np.zeros(self.shape)
for cluster_idx in np.arange(0, len(self.cluster_centroids)):
data_nmf[self.cluster_labels == cluster_idx] = self.nmf_cluster(cluster=cluster_idx,
n_components=n_components,
max_iter=max_iter)
self.nmf = data_nmf
elif area_type == 'pixel':
print("NMF of data per pixel started")
data_nmf = np.zeros(self.shape)
for i in range(self.shape[0]):
for j in range(self.shape[1]):
data_nmf[i, j] = self.nmf_pixel(i=i, j=j, n_components=n_components, **kwargs)
self.pca = data_nmf
else:
print("please pick a valid area type")
print("NMF of data complete")
[docs]
def nmf_cluster(self, cluster, n_components=0.9, max_iter=100000):
r"""
Use non-negative matrix factorization on a cluster of the spectral image.
The signals of the cluster are already in reduced format (pixel location is lost).
Parameters
----------
cluster : numpy.ndarray, shape=(M,)
An array with size equal to the number of clusters. Each entry is a
2D array that contains all the spectra within that cluster.
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value by PCA first. Default is 0.9.
max_iter: int,
Default is 100000
Returns
-------
"""
cluster_data = self.get_cluster_signals()[cluster]
cluster_data[cluster_data < 0] = 0
if n_components < 1:
cluster_pca = PCA(n_components=n_components, svd_solver='full').fit(cluster_data)
n_components = cluster_pca.n_components_
if n_components < 2:
n_components = 2
cluster_nmf = NMF(n_components=n_components, max_iter=max_iter)
cluster_data_trans = cluster_nmf.fit_transform(cluster_data)
cluster_data_nmf = cluster_nmf.inverse_transform(cluster_data_trans)
return cluster_data_nmf
[docs]
def nmf_pixel(self, i, j, area=9, n_components=0.9, max_iter=100000):
r"""
Use principal component analysis on the spectral image, using the data of a squared window of size ``n_p``
around pixel (``i``, ``j``).
Parameters
----------
i: int
y-coordinate of the pixel
j: int
x-coordinate of the pixel
area: int
PCA area parameter. Area around the pixel used for principal component analysis, must be an odd number
n_components: float,
number components to calculate. If between 0 and 1 the amount of components will be determined based on the
sum of the variance of the components below the given value by PCA first. Default is 0.9.
max_iter: int,
Default is 100000
Returns
-------
output: numpy.ndarray, shape=(M,)
PCA spectrum of the pixel
"""
if area % 2 == 0:
print("Unable to NMF with even number " + str(area) + ", continuing with n_area=" + str(area + 1))
area += 1
if area > self.shape[0] or area > self.shape[1]:
raise ValueError("Your pixel area is too large for one or both of the image axes, "
"please choose a number smaller than these values")
# Check pixel area
n_area_border = int(math.floor(area / 2))
x_min = j - n_area_border
x_max = j + n_area_border + 1
y_min = i - n_area_border
y_max = i + n_area_border + 1
if x_min < 0:
x_max += np.abs(x_min)
x_min = 0
if x_max > self.shape[1]:
x_min -= (x_max - self.shape[1])
x_max = self.shape[1]
if y_min < 0:
y_max += np.abs(y_min)
y_min = 0
if y_max > self.shape[0]:
y_min -= (y_max - self.shape[0])
y_max = self.shape[0]
if self.data_subtracted is not None:
# If not all subtracted spectra have been calculated, this output is incorrect
# because some entries in self.data_subtracted are 0
seg_data = self.data_subtracted[y_min:y_max, x_min:x_max]
else:
seg_data = self.data[y_min:y_max, x_min:x_max, :]
seg_data_shape = seg_data.shape
seg_data[seg_data < 0] = 0
seg_data_flat = seg_data.reshape(np.prod(seg_data_shape[:2]), seg_data_shape[2])
if n_components < 1:
seg_data_pca = PCA(n_components=n_components, svd_solver='full').fit(seg_data_flat)
n_components = seg_data_pca.n_components_
if n_components < 2:
n_components = 2
seg_data_nmf = NMF(n_components=n_components, max_iter=max_iter)
seg_data_trans = seg_data_nmf.fit_transform(seg_data_flat)
seg_data_projected = seg_data_nmf.inverse_transform(seg_data_trans).reshape(seg_data_shape)
pixel_nmf = seg_data_projected[int(i - y_min), int(j - x_min)]
return pixel_nmf
# METHODS ON SIGNAL
[docs]
@staticmethod
def smooth_signal(signal, window_length=51, window_type='hanning'):
r"""
Smooth a signal using a window length ``window_length`` and a window type ``window_type``.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the beginning and end part of the output signal.
Parameters
----------
signal: numpy.ndarray, shape=(M,)
Signal of length M
window_length: int, optional
The dimension of the smoothing window; should be an odd integer. Default is 51.
window_type: str, optional
the type of window from ``'flat'``, ``'hanning'``, ``'hamming'``, ``'bartlett'``,
``'blackman'`` and ``'kasier'``. ``'flat'`` window will produce a moving average smoothing.
Default is ``'hanning'``
Returns
-------
signal_smooth: numpy.ndarray, shape=(M,)
The smoothed signal
"""
# Set window length to uneven number
window_length += (window_length + 1) % 2
# extend the signal with the window length on both ends
signal_padded = np.r_['-1', signal[window_length - 1:0:-1], signal, signal[-2:-window_length - 1:-1]]
# Pick the window type
if window_type == 'flat': # moving average
window = np.ones(window_length, 'd')
else:
window = eval('np.' + window_type + '(window_length)')
# Determine the smoothed signal and throw away the padded ends
surplus_data = int((window_length - 1) * 0.5)
signal_smooth = np.convolve(signal_padded, window / window.sum(), mode='valid')[surplus_data:-surplus_data]
return signal_smooth
[docs]
def deconvolution(self, signal, zlp, correction=True):
r"""
Perform deconvolution on a given signal with a given Zero Loss Peak. This removes both the ZLP and plural
scattering. Based on the Fourier Log Method :cite:p:`Johnson1974, Egerton2011`
Parameters
----------
signal: numpy.ndarray, shape=(M,)
Raw signal of length M
zlp: numpy.ndarray, shape=(M,)
zero-loss peak of length M
correction: Bool
Sometimes a decreasing linear slope occurs on the place of the ZLP after deconvolution. This correction fits
a linear function and subtracts that from the signal. Default is True.
Returns
-------
output: numpy.ndarray, shape=(M,)
deconvoluted spectrum.
"""
# Extrapolate data
x = self.eaxis_extrp
y_zlp = np.zeros(len(x))
y_zlp[:self.shape[2]] = zlp
if len(signal) < len(self.eaxis_extrp):
y_signal = self.extrp_signal(signal=signal)
else:
y_signal = signal
# Fourier log method with Zero-loss modifier. See Egerton Chapter 4 for details
z_nu = cft(x, y_zlp)
j_nu = cft(x, y_signal)
j1_nu = z_nu * np.log(j_nu / z_nu)
J1_E = np.real(icft(x, j1_nu))
if correction is True:
# Correct for linear increase in ZLP and gain region.
zlp_peak = np.max(signal)
fwhm_idx1 = np.argwhere((signal >= 0.5 * zlp_peak))[-1] + 1
fwhm_idx2 = np.argwhere((signal >= 0.5 * zlp_peak))[0] - 1
fwhm = (self.eaxis[fwhm_idx1] - self.eaxis[fwhm_idx2])
dydx1_idx = int(np.argwhere(x >= -1 * fwhm)[0])
dydx2_idx = int(np.argwhere(x < fwhm)[-1])
x_fit = np.array(x[dydx1_idx:dydx2_idx], dtype='float64')
y_fit = np.array(J1_E[dydx1_idx:dydx2_idx], dtype='float64')
popt, pcov = curve_fit(f=linear_fit, xdata=x_fit, ydata=y_fit, bounds=([np.NINF, 0], [0, np.inf]))
deconv_corr = linear_fit(x, popt[0], popt[1])
deconv_corr[deconv_corr < 0] = 0
# Apply correction
J1_E = J1_E - deconv_corr
J1_E[J1_E < 0] = 0
J1_E[y_zlp == y_signal] = 0
return J1_E[:self.shape[2]].flatten()
[docs]
def get_extrp_param(self, signal, range_perc=0.1):
r"""
Retrieve the extrapolation parameter from the last 10% of a given signal
Parameters
----------
signal: numpy.ndarray, shape=(M,)
range_perc: float, optional
Returns
-------
r: float.
extrapolation parameter
"""
idx_last = int((1 - range_perc) * len(signal))
x_fit = np.array(self.eaxis[idx_last:], dtype='float64')
y_fit = np.array(signal[idx_last:], dtype='float64')
try:
popt, pcov = curve_fit(power_fit, x_fit, y_fit, bounds=([0, np.NINF], [np.inf, -1]))
r = popt[1]
except:
r = -1
return r
[docs]
def extrp_signal(self, signal, r=None):
r"""
Extrapolate your signal. Extrapolation model, generate vanishing to zero data after the real exp. data
See Egerton paragraph 4.2.2 for details. extrapolation of the form A*E^-r is used.
Parameters
----------
signal : numpy.ndarray, shape=(M,)
spectrum
r : float, optional
extrapolation parameter
Returns
-------
"""
x = self.eaxis_extrp
y = np.zeros(len(x))
y[:self.shape[2]] = signal
a = signal[-1] # Starting amplitude for extrapolated data, endpoint of the exp. data
if r is None:
r = self.get_extrp_param(signal=signal)
y[self.shape[2]:] = power_fit(1 + x[self.shape[2]:] - x[self.shape[2]], a, r)
return y
# METHODS ON ZLP
[docs]
def get_pixel_matched_zlp_models(self, i, j, signal_type='EELS', signal=None, **kwargs):
r"""
Returns the shape-(M, N) array of matched ZLP model predictions at pixel
(``i``, ``j``) after training. M and N correspond to the number of model
predictions and :math:`\Delta E` s respectively.
Parameters
----------
i: int
y-coordinate of the pixel.
j: int
x-coordinate of the pixel.
signal_type: str, bool
Description of signal type. Set to ``'EELS'`` by default.
signal: array, bool,
signal you want to match the zlps to. Important to do if you did not do any pooling, pca or nmf on the whole
image, otherwise it will calculate the denoised signal twice.
kwargs: dict, optional
Additional keyword arguments.
Returns
-------
predictions: numpy.ndarray, shape=(M, N)
The matched ZLP predictions at pixel (``i``, ``j``).
"""
# get pixel information
if signal is None:
signal = self.get_pixel_signal(i=i, j=j, signal_type=signal_type, **kwargs)
cluster = self.cluster_labels[i, j]
de1 = self.dE1[int(cluster)]
de2 = self.dE2[int(cluster)]
fwhm = self.FWHM[int(cluster)]
# get zlp predictions
max_idx = np.argmax(signal)
int_i = np.sum(signal[max_idx - 1:max_idx + 2])
predictions = self.get_zlp_models(int_i=int_i, **kwargs)
# match the predictions
predictions_matched = np.zeros((len(predictions), self.shape[2]))
for m, prediction in enumerate(predictions):
predictions_matched[m, :] = self.match_zlp_to_signal(signal, prediction, de1, de2, fwhm)
return predictions_matched
[docs]
def get_zlp_models(self, int_i, **kwargs):
r"""
Returns the shape-(M, N) array of zlp model predictions at the
integrated intensity ``int_i``. The logarithm of the integrated intensity is taken,
as the data is always trained to log of the signal.
Parameters
----------
int_i: float
Integrated intensity
"""
if self.zlp_models is None:
try:
self.load_zlp_models(**kwargs)
except:
self.load_zlp_models()
if self.scale_var_eaxis is None:
self.scale_var_eaxis = find_scale_var(self.eaxis)
if self.scale_var_log_int_i is None:
all_spectra = self.data
all_spectra[all_spectra < 1] = 1
log_int_i = np.log(np.sum(all_spectra, axis=2)).flatten()
self.scale_var_log_int_i = find_scale_var(log_int_i)
del all_spectra
# Prepare the neural network input features (scaled eaxis and scaled log of the integrated intensity)
log_int_i = np.log(int_i)
predict_x_np = np.zeros((self.shape[2], 2))
predict_x_np[:, 0] = scale(self.eaxis, self.scale_var_eaxis)
predict_x_np[:, 1] = scale(log_int_i, self.scale_var_log_int_i)
predict_x = torch.from_numpy(predict_x_np)
# Get the model predictions based on the neural network input features
predictions = np.zeros((len(self.zlp_models), self.shape[2]))
for i, model in enumerate(self.zlp_models):
with torch.no_grad():
predictions[i, :] = np.exp(model(predict_x.float()).flatten())[:self.shape[2]]
# Post-fit selection, if the prediction goes upward after the zlp peak, it is filtered out.
# predictions_filtered = []
# for j, prediction in enumerate(predictions):
# pred_lim = prediction[np.argwhere(self.eaxis > self.eaxis[np.argmax(prediction) + 1]).flatten()]
# if not any(np.diff(pred_lim) > 0.01):
# predictions_filtered.append(prediction)
# predictions_filtered = np.array(predictions_filtered)
return predictions # predictions_filtered
[docs]
def match_zlp_to_signal(self, signal, zlp, de1, de2, fwhm=None):
r"""
Apply the matching to the subtracted spectrum.
Parameters
----------
signal: numpy.ndarray, shape=(M,)
Signal to be matched
zlp: numpy.ndarray, shape=(M,)
ZLP model to be matched, must match length of Signal.
de1: float
Value of the hyperparameter :math:`\Delta E_{I}`
de2: float
Value of the hyperparameter :math:`\Delta E_{II}`
fwhm: float
Value of the hyperparameter :math:`FWHM`. If none is given, a fwhm is determined from the signal.
Default is None
Returns
-------
output: numpy.ndarray, shape=(M,)
Matched ZLP model
"""
if fwhm is None:
peak = np.max(signal)
fwhm_window = np.arghwere(signal > 0.5*peak)
fwhm_idx1 = fwhm_window.flatten()[0] - 1
fwhm_idx2 = fwhm_window.flatten()[-1] + 1
fwhm = self.eaxis[fwhm_idx2] - self.eaxis[fwhm_idx1]
de0 = (fwhm + de1) / 2
de0_m = -1 * de0
de1_m = -1 * de1
# Right side ZLP (loss spectrum)
e_window = self.eaxis[(self.eaxis < de1) & (self.eaxis >= de0)]
# delta = (de1 - de0) / 3.
# factor_nn = np.exp(- (e_window - de1)**2 / delta**2)
# factor_zlp = 1 - factor_model
delta = (de1 - de0) / 10.
factor_nn = 1 / (1 + np.exp(-(e_window - (de0 + de1) / 2) / delta))
factor_zlp = 1 - factor_nn
# Left side ZLP (gain spectrum)
e_window_m = self.eaxis[(self.eaxis > de1_m) & (self.eaxis <= de0_m)]
# delta_m = (de1_m + de0_m) / 3.
# factor_nn_m = np.exp(- (e_window_m - de1_m)**2 / delta_m**2)
# factor_zlp_m = 1 - factor_model
delta_m = (de1_m + de0_m) / 10.
factor_nn_m = 1 / (1 + np.exp(-(e_window_m - (de0_m + de1_m) / 2) / delta_m))
factor_zlp_m = 1 - factor_nn_m
# Match the ZLP to signal using the factors
range_m2 = zlp[self.eaxis <= de1_m]
range_m1 = zlp[(self.eaxis > de1_m) & (self.eaxis <= de0_m)] * factor_nn_m + \
signal[(self.eaxis > de1_m) & (self.eaxis <= de0_m)] * factor_zlp_m
range_0 = signal[(self.eaxis > de0_m) & (self.eaxis < de0)] # ZLP
range_1 = zlp[(self.eaxis < de1) & (self.eaxis >= de0)] * factor_nn + \
signal[(self.eaxis < de1) & (self.eaxis >= de0)] * factor_zlp
range_2 = zlp[(self.eaxis >= de1) & (self.eaxis <= de2)]
range_3 = zlp[(self.eaxis > de2)] * 0 # Rest of the spectrum
matched_zlp = np.minimum(
np.concatenate((range_m2, range_m1, range_0, range_1, range_2, range_3),
axis=0), signal)
return matched_zlp
[docs]
def train_zlp_models(self, conf_interval=1, lr=1e-3, signal_type='EELS', **kwargs):
r"""
Train the ZLP on the spectral image.
The spectral image is clustered in ``n_clusters`` clusters, according to
e.g. the integrated intensity or thickness. A random spectrum is then
taken from each cluster, which together defines one replica. The
training is initiated by calling
:py:meth:`train_zlp_models_scaled() <EELSFitter.core.training.train_zlp_models_scaled>`.
Parameters
----------
conf_interval: int, optional
Default is 1
lr: float, optional
Default is 1
signal_type: str, optional
Type of spectrum. Set to EELS by default.
**kwargs
Additional keyword arguments that are passed to the method
:py:meth:`train_zlp_models_scaled() <EELSFitter.core.training.train_zlp_models_scaled>`
in the :py:mod:`training` module.
"""
self.cluster(**kwargs)
self.training_data = self.get_cluster_signals(conf_interval=conf_interval, signal_type=signal_type)
self.train_zlps = TrainZeroLossPeak(spectra=self.training_data, eaxis=self.eaxis,
cluster_centroids=self.cluster_centroids, **kwargs)
self.train_zlps.train_zlp_models_scaled(lr=lr)
[docs]
def load_zlp_models(self, path_to_models, plot_chi2=False, plot_pred=False, idx=None, **kwargs):
r"""
Loads the trained ZLP models and stores them in ``self.zlp_models``.
Models that have a :math:`\chi^2 > \chi^2_{\mathrm{mean}} + 5\sigma` are
discarded, where :math:`\sigma` denotes the 68% CI.
Parameters
----------
path_to_models: str
Location where the model predictions have been stored after training.
plot_chi2: bool, optional
When set to `True`, plot and save the :math:`\chi^2` distribution.
plot_pred: bool, optional
When set to `True`, plot and save the ZLP predictions per cluster.
idx: int, optional
When specified, only the zlp labelled by ``idx`` is loaded, instead
of all model predictions.
"""
if not os.path.exists(path_to_models):
print(
"No path " + path_to_models + " found. Please ensure spelling \
and that there are models trained.")
return
path_to_models += (path_to_models[-1] != '/') * '/'
# Load in the hyperparameters
path_hp = 'hyperparameters.txt'
hypar = np.loadtxt(os.path.join(path_to_models, path_hp), ndmin=2)
self.dE1 = hypar[1, :]
self.dE2 = hypar[2, :]
self.FWHM = hypar[3, :]
print("Loading hyper-parameters complete")
# Load in the scale variables for scaling on the log of the integrated intensity of the spectra.
path_scale_var = 'scale_var.txt'
self.scale_var_log_int_i = np.loadtxt(os.path.join(path_to_models, path_scale_var))
print("Loading scale variables for zlp models complete")
# Cluster image based on the models centroids
self.cluster_on_centroids(hypar[0, :], **kwargs)
print("Clustering based on cluster centroids complete")
# Load in the models
self.zlp_models = []
model = MultilayerPerceptron(num_inputs=2, num_outputs=1)
if idx is not None:
with torch.no_grad():
model.load_state_dict(torch.load(os.path.join(
path_to_models, 'nn_replicas'))[f'model_{idx + 1}'])
self.zlp_models.append(copy.deepcopy(model))
return
filename_test = os.path.join(path_to_models, 'costs_test.txt')
cost_tests = np.loadtxt(filename_test)
cost_tests_mean = np.mean(cost_tests)
cost_tests_std = np.percentile(cost_tests, 68)
threshold_costs_tests = cost_tests_mean + 5 * cost_tests_std
cost_tests = cost_tests[cost_tests < threshold_costs_tests]
filename_train = os.path.join(path_to_models, 'costs_train.txt')
cost_trains = np.loadtxt(filename_train)
cost_trains_mean = np.mean(cost_trains)
cost_trains_std = np.percentile(cost_trains, 68)
threshold_costs_trains = cost_trains_mean + 5 * cost_trains_std
nn_rep_idx = np.argwhere(cost_trains < threshold_costs_trains)
cost_trains = cost_trains[cost_trains < threshold_costs_trains]
nn_replicas_path = os.path.join(path_to_models, 'nn_replicas')
checkpoint = torch.load(nn_replicas_path)
for idx in nn_rep_idx.flatten():
model.load_state_dict(checkpoint[f'model_{idx + 1}'])
self.zlp_models.append(copy.deepcopy(model))
print("Loading models complete")
# plot the chi2 distributions
if plot_chi2:
fig = plot_cost_dist(cost_trains, cost_tests, cost_tests_std)
fig.savefig(os.path.join(self.output_path, 'chi2_dist.pdf'))
print("Chi2 plot saved at {}".format(self.output_path))
# plot the zlp predictions for each cluster
if plot_pred:
fig = plot_zlp_cluster_predictions(image=self, xlim=[self.eaxis[0], np.median(self.dE2)], x=1,
yscale='log', xlabel=r"$\rm{Energy\;loss\;[eV]}$",
title=r"$\rm{Cluster\;predictions\;}$")
fig.savefig(os.path.join(self.output_path, 'Cluster_predictions.pdf'))
print("Cluster predictions plot saved at {}".format(self.output_path))
# METHODS ON QUANTITATIVE ANALYSIS
[docs]
def set_refractive_index(self, n=None, n_background=None):
r"""
Sets value of refractive index for the image as attribute self.n. If not clustered, n will be an
array of length one, otherwise it is an array of length n_clusters. If n_background is defined,
the cluster with the lowest thickness (cluster 0) will be assumed to be the vacuum/background,
and gets the value of the background refractive index.
If there are more specimen present in the image, it is wise to check by hand what cluster belongs
to what specimen, and set the values by running::
image.n[cluster_i] = n_i
Parameters
----------
n : float
refractive index of sample.
n_background : float, optional
if defined: the refractive index of the background/vacuum. This value will automatically be \
assigned to pixels belonging to the thinnest cluster.
"""
if n is None:
self.n = None
elif type(n) == float or type(n) == int:
self.n = np.ones(self.n_clusters) * n
if n_background is not None:
# assume cluster 0 is background
self.n[0] = n_background
elif len(n) == self.n_clusters:
self.n = n
[docs]
def set_mass_density(self, rho=None, rho_background=None):
r"""
Sets value of mass density for the image as attribute self.rho. If not clustered, rho will be an
array of length one, otherwise it is an array of length n_clusters. If rho_background is defined,
the cluster with the lowest thickness (cluster 0) will be assumed to be the vacuum/background,
and gets the value of the background mass density.
If there are more specimen present in the image, it is wise to check by hand what cluster belongs
to what specimen, and set the values by running::
image.n[cluster_i] = n_i
Parameters
----------
rho
rho_background
Returns
-------
"""
if rho is None:
self.rho = None
elif type(rho) == float or type(rho) == int:
self.rho = np.ones(self.n_clusters) * rho
if rho_background is not None:
self.rho[0] = rho_background
elif len(rho) == self.n_clusters:
self.rho = rho
[docs]
def calc_thickness(self, signal, n=None, rho=None, n_zlp=None):
r"""
Calculates thickness from sample data by one of two methods:
- Kramer-Kronig sum rule using the refractive index :cite:p:`Egerton1987`
- Log ratio using mass density :cite:p:`Iakoubovskii2008a`
Parameters
----------
signal : numpy.ndarray, shape=(M,)
spectrum
n : float
refraction index
rho : float
mass density
n_zlp: float or int
Set to 1 by default, for already normalized EELS spectra.
Returns
-------
t: float
thickness
Notes
-----
If using the refractive index, surface scatterings are not corrected for.
If you wish to correct for surface scatterings, please extract the thickness ``t`` from
:py:meth:`kramers_kronig_analysis() <EELSFitter.core.spectral_image.SpectralImage.kramers_kronig_analysis>`.
"""
# Microscope data
e0 = self.beam_energy # Electron gun voltage, KeV
beta = self.collection_angle # Collection semi-angle, mrad
alpha = self.convergence_angle # Convergence semi-angle, mrad
# Constants & kinetic definitions (Egerton 2011 appendix E)
m_e = self.m_e / 1E3 # Electron mass, KeV
a0 = self.a0 * 1E9 # Bohr radius, nm
gamma = 1 + (e0 / m_e) # Relativistic factor
T_eff = e0 * ((1 + gamma) / (2 * gamma ** 2)) # Effective kinetic energy, KeV
# Check if data is extrapolated
x = self.eaxis_extrp
if len(signal) < len(x):
y = self.extrp_signal(signal=signal)
else:
y = signal
if n is None and rho is None:
raise ValueError("The mass density and the refractive index are "
"not defined. Please provide one of them.")
elif n is not None and rho is not None:
raise ValueError("Please provide the refractive index OR the "
"mass density information, not both")
elif n is not None:
# Prepare single scattering distribution, Only take values from E = 0 onward
x_spliced = copy.deepcopy(x[x > 0])
y_spliced = copy.deepcopy(y[x > 0])
# Calculation of the ELF by normalization of the SSD
# First perform Angular corrections (Egerton 2011 section 4.2.1)
theta_e = x_spliced / (2 * gamma * T_eff) # Characteristic scattering angle (per energy loss), mrad
ang_cor = np.log(1 + (beta / theta_e) ** 2) # Angular/Aperture correction
Im = y_spliced / ang_cor / self.deltaE
# Thickness calculation
Im_sum = np.sum(Im / x_spliced) * self.deltaE
K = (Im_sum / (np.pi / 2) / (1 - 1. / n ** 2)) # proportionality constant (Egerton 2011 section 4.2.2)
t = (2 * np.pi * a0 * K * T_eff * 1E3) / n_zlp
elif rho is not None:
# Inelastic mean free path, based on Iakoubovskii et al. (2008) with gamma scaling suggestion from Egerton
theta_c = 20 # Effective cutoff angle, mrad
if theta_c > beta:
theta_c = beta
theta_e = 5.5 * rho ** 0.3 / (gamma * T_eff) # Characteristic scattering angle (from mass density), mrad
# Calculate lamba, the mean free path
upper_term = alpha ** 2 + beta ** 2 + 2 * theta_e ** 2 + np.abs(alpha ** 2 - beta ** 2)
lower_term = alpha ** 2 + beta ** 2 + 2 * theta_c ** 2 + np.abs(alpha ** 2 - beta ** 2)
lmbda = (100 / theta_e) / np.log((upper_term / lower_term) * (theta_c ** 2 / theta_e ** 2))
# Thickness calculation
t = lmbda * np.log(np.sum(y) / n_zlp)
return t
[docs]
def kramers_kronig_analysis(self, signal_ssd, n_zlp=None, iterations=1, n=None, t=None, delta=0.5):
r"""
Computes the complex dielectric function from the single scattering distribution (SSD) ``signal_ssd`` following
the Kramers-Krönig relations. This code is based on Egerton's MATlab code :cite:p:`Egerton2011`.
Parameters
----------
signal_ssd: numpy.ndarray, shape=(M,)
SSD of length energy-loss (M)
n_zlp: float
Total integrated intensity of the ZLP
iterations: int
Number of the iterations for the internal loop to remove the
surface plasmon contribution. If 1 the surface plasmon contribution
is not estimated and subtracted (the default is 1).
n: float
The medium refractive index. Used for normalization of the
SSD to obtain the energy loss function. If given the thickness
is estimated and returned. It is only required when `t` is None.
t: float
The sample thickness in nm.
delta: float
Factor added to aid stability for calculating surface losses
Returns
-------
eps: numpy.ndarray
The complex dielectric function,
.. math::
\epsilon = \epsilon_1 + i*\epsilon_2,
te: float
local thickness
srf_int: numpy.ndarray
Surface losses correction
Notes
-----
- Relativistic effects are not considered when correcting surface scattering.
- The value of delta depends on the thickness of your sample and is qualitatively determined by how realistic
the output is.
"""
# Microscope data
e0 = self.beam_energy * 1E3 # Electron gun voltage, converted to eV
beta = self.collection_angle / 1E3 # Collection semi-angle, converted to rad
# Check if data is extrapolated
x_extrp = self.eaxis_extrp
if len(signal_ssd) < len(x_extrp):
y_extrp = self.extrp_signal(signal=signal_ssd)
else:
y_extrp = signal_ssd
# Prepare spectrum
x = copy.deepcopy(x_extrp[x_extrp > 0])
y_spliced = copy.deepcopy(y_extrp[x_extrp > 0])
y = y_spliced
y_srf = np.empty(x.shape)
bins = len(x)
# Constants & kinetic definitions (Egerton 2011 appendix E)
m_e = self.m_e # Electron mass, eV
a0 = self.a0 # Bohr radius, m
hbar_c = self.h_bar * self.c # Planck's constant * speed of light, eV*m
gamma = 1 + (e0 / m_e) # Relativistic factor
T_eff = e0 * ((1 + gamma) / (2 * gamma ** 2)) # Effective kinetic energy, eV
k0 = gamma * np.sqrt(2 * T_eff * m_e) / hbar_c # Wavenumber, m^-1
# select refractive or thickness loop
if n is None and t is None:
raise ValueError("The refractive index and thickness are not defined. Please provide ONLY one of them.")
elif n is not None and t is not None:
raise ValueError("Please provide the refractive index OR the thickness, not both")
elif n is not None:
refractive_loop = True
elif t is not None:
refractive_loop = False
t = t / 1E9 # Thickness, converted to m
if n_zlp is None:
raise ValueError("Please provide the ZLP when thickness is used for normalization.")
for _ in range(iterations):
# Calculation of the ELF by normalization of the Single Scattering Distribution
# First perform Angular corrections (Egerton 2011 section 4.2.1)
theta_e = x / (2 * gamma * T_eff) # Characteristic scattering angle (per energy loss), rad
ang_cor = np.log(1 + (beta / theta_e) ** 2) # Angular/Aperture correction
Im = y / ang_cor / self.deltaE # Im[-1/eps]
if refractive_loop:
# normalize using the refractive index.
Im_sum = np.sum(Im / x) * self.deltaE
K = Im_sum / (np.pi / 2) / (1 - 1. / n ** 2) # proportionality constant (Egerton 2011 section 4.2.2)
if n_zlp is not None:
te = (2 * np.pi * a0 * K * T_eff) / n_zlp # Calculate thickness
else:
# normalize using the thickness
K = t * n_zlp / (2 * np.pi * a0 * T_eff) # proportionality constant from thickness
te = t
Im = Im / K
# Kramers Kronig Transform:
# We calculate KKT(Im(-1/epsilon))=1+Re(1/epsilon) with FFT
# Follows: D W Johnson 1975 J. Phys. A: Math. Gen. 8 490
# Use an optimal FFT size to speed up the calculation, and make it double the closest upper value to
# work around the wrap-around problem.
optimal_fft_bins = next_fast_len(2 * bins)
q = -2 * fft(Im, optimal_fft_bins).imag / optimal_fft_bins
q[:bins] *= -1
q = fft(q)
Re = q[:bins].real + 1 # Final touch, we have Re[1/eps]
# Epsilon appears:
# We calculate the real and imaginary parts of the CDF
eps_1 = Re / (Re ** 2 + Im ** 2)
eps_2 = Im / (Re ** 2 + Im ** 2)
if iterations > 1 and n_zlp is not None:
# See Egerton 2011 section 4.2.4, Ritchie (1957) and Heather (1967)
# Calculates the surface ELF from a vacuum border effect and subtracts from the ELF.
# delta = 0.1 for thick samples (>100nm).
theta_delta_e = delta / (2 * gamma * T_eff)
Im_srf = 4 * eps_2 / ((eps_1 + 1) ** 2 + eps_2 ** 2) - Im
dPsdE = (np.arctan(beta / theta_e) / (theta_e + theta_delta_e)
- beta / (beta ** 2 + theta_e ** 2)) / (np.pi * a0 * k0 * T_eff)
y_srf = (n_zlp * dPsdE * Im_srf) * self.deltaE
y = y_spliced - y_srf
eps = (eps_1 + eps_2 * 1j)
return eps[x <= self.eaxis[-1]], te*1E9, y_srf[x <= self.eaxis[-1]]
[docs]
def KK_pixel(self, i, j, signal_type='EELS', iterations=1, **kwargs):
r"""
Perform a Kramer-Krönig analysis on pixel (``i``, ``j``).
Parameters
----------
i : int
y-coordinate of the pixel
j : int
x-coordinate of the pixel.
signal_type: str, optional
Type of spectrum. Set to EELS by default.
iterations: int
Number of the iterations for the internal loop to remove the
surface plasmon contribution. If 1 the surface plasmon contribution
is not estimated and subtracted (the default is 1).
Returns
-------
dielectric_functions : numpy.ndarray, shape=(M,)
Collection dielectric-functions replicas at pixel (``i``, ``j``).
ts : float
Thickness.
S_ss : array_like
Surface scatterings.
signal_ssds : array_like
Deconvoluted EELS spectrum.
"""
# Check if user has learned to only provide one of these values
if self.n is not None and self.rho is None:
n = self.n[self.cluster_labels[i, j]]
rho = self.rho
elif self.rho is not None and self.n is None:
rho = self.rho[self.cluster_labels[i, j]]
n = self.n
else:
raise ValueError("Please provide either the refractive index OR the mass density value")
# Get your signal ready
signal = self.get_pixel_signal(i, j, signal_type=signal_type, **kwargs)
zlps = self.get_pixel_matched_zlp_models(i, j, signal_type=signal_type, signal=signal)
signal_extrp = self.extrp_signal(signal=signal)
# Prepare arrays
epss = (1 + 1j) * np.zeros(zlps[:, self.eaxis > 0].shape)
S_ss = np.zeros(zlps[:, self.eaxis > 0].shape)
ts = np.zeros(zlps.shape[0])
signal_ssds = np.zeros(zlps.shape)
max_signal_ssds = np.zeros(zlps.shape[0])
# Loop through all models
r = None
for k in range(zlps.shape[0]):
zlp_k = zlps[k, :]
n_zlp = float(np.sum(zlp_k))
signal_ssds[k] = self.deconvolution(signal=signal_extrp, zlp=zlp_k)
max_signal_ssds[k] = self.eaxis[np.argmax(signal_ssds[k])]
if r is None:
r = self.get_extrp_param(signal=signal_ssds[k])
signal_ssd_extrp = self.extrp_signal(signal=signal_ssds[k], r=r)
else:
signal_ssd_extrp = self.extrp_signal(signal=signal_ssds[k], r=r)
if rho is not None:
ts[k] = self.calc_thickness(signal=signal_extrp, rho=rho, n_zlp=n_zlp)
epss[k, :], ts[k], S_ss[k] = self.kramers_kronig_analysis(signal_ssd=signal_ssd_extrp, n_zlp=n_zlp,
t=ts[k], iterations=iterations, **kwargs)
if n is not None:
epss[k, :], ts[k], S_ss[k] = self.kramers_kronig_analysis(signal_ssd=signal_ssd_extrp, n_zlp=n_zlp,
n=n, iterations=iterations, **kwargs)
return epss, ts, S_ss, signal_ssds, max_signal_ssds
# METHODS ON CLUSTERING
[docs]
def cluster(self, n_clusters=5, based_on='log_zlp', init='k-means++', n_times=10, max_iter=300, seed=None,
save_seed=False, algorithm='lloyd', **kwargs):
r"""
Clusters the spectral image into clusters according to the (log)
integrated intensity at each pixel. Cluster means are stored in the
attribute ``self.cluster_centroids``. This is then passed on to the
cluster_on_centroids function where the index to which each cluster belongs
is stored in the attribute ``self.cluster_labels``.
Parameters
----------
n_clusters : int, optional
Number of clusters, 5 by default
based_on : str, optional
One can cluster either on the sum of the intensities (pass ``'sum'``),
the log of the sum (pass ``'log_sum'``),
the log of the ZLP peak value (pass ``'log_peak'``),
the log of the ZLP peak value + the two bins next to the peak value (pass ``'log_zlp'``),
the log of the sum of the bulk spectrum (no zlp) (pass ``'log_bulk'``),
the thickness (pass ``'thickness'``).
The default is ``'log_zlp'``.
init : {'k-means++', 'random'}, callable or array-like of shape \
(n_clusters, n_features), default='k-means++'
n_times : int, default=10
Number of time the k-means algorithm will be run with different
centroid seeds. The final results will be the best output of
n_init consecutive runs in terms of inertia.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm for a
single run.
seed : int or None, default=None
Determines random number generation for centroid initialization. Use
an int to make the randomness deterministic.
save_seed : bool, default=False
save the seed with corresponding settings to get to the same result
algorithm : {'lloyd', 'elkan'}, default='lloyd'
K-means algorithm to use. The classical EM-style algorithm is ``'lloyd'``.
The ``'elkan'`` variation can be more efficient on some datasets with
well-defined clusters, by using the triangle inequality. However, it's
more memory intensive due to the allocation of an extra array of shape
`(n_samples, n_clusters)`.
kwargs
Returns
-------
"""
if self.n_spectra == 1:
print("Not much to cluster on with a single spectrum")
n_clusters = 1
seed = 11111111
max_iter = 1
image_data = self.get_image_signals(**kwargs)
if based_on == 'sum':
values = image_data.sum(axis=2)
elif based_on == 'log_sum':
values = np.log(np.maximum(image_data, 1e-14).sum(axis=2))
elif based_on == 'log_peak':
values = np.log(image_data[:, :, np.argwhere((self.eaxis > -1) & (self.eaxis < 1)).flatten()].max(axis=2))
elif based_on == 'log_zlp':
values = np.zeros([self.shape[0], self.shape[1]])
max_idx = np.argmax(image_data, axis=2)
for i in np.arange(self.shape[0]):
for j in np.arange(self.shape[1]):
values[i, j] = np.log(image_data[i, j, max_idx[i, j] - 1:max_idx[i, j] + 2].sum())
elif based_on == 'log_bulk':
values = np.log(np.maximum(image_data[:, :, np.argwhere(self.eaxis > 5).flatten()], 1e-14).sum(axis=2))
elif based_on == 'thickness':
values = self.t[:, :, 0]
elif type(based_on) == np.ndarray:
values = based_on
if values.size != self.n_spectra:
raise IndexError("The size of values on which to cluster does not match the image size.")
else:
values = np.sum(image_data, axis=2)
print("provide either sum, log or thickness as clustering base, reverting back to sum")
if seed is not None:
seed_init = seed
n_times = 1
cost_min = np.inf
for _ in range(n_times):
if seed is None:
seed_init = get_seed()
kmeans = KMeans(n_clusters=n_clusters, init=init, n_init=1, max_iter=max_iter,
random_state=seed_init, algorithm=algorithm).fit(values.reshape(-1, 1))
if cost_min > kmeans.inertia_:
cost_min = kmeans.inertia_
min_cluster_centroids = kmeans.cluster_centers_.flatten()
min_seed = seed_init
min_iter = kmeans.n_iter_
print("Seed: " + str(seed_init) + " finished after " + str(
kmeans.n_iter_) + " iterations and has cost: " + str(kmeans.inertia_))
print("Seed: " + str(min_seed) + " has the lowest cost")
self.cluster_centroids = np.sort(min_cluster_centroids)[::-1]
self.cluster_on_centroids(self.cluster_centroids, based_on=based_on)
if save_seed is True:
print("Saving seed.txt parameters, sit tight!")
print("seed = " + str(seed) + " clusters = " + str(n_clusters) +
" iterations = " + str(self.k_means_cluster.n_iter))
path_seed = os.path.join(self.output_path, 'seed.txt')
np.savetxt(path_seed, np.vstack((seed, n_clusters, min_iter)))
print("Saved seed.txt!")
[docs]
def cluster_on_centroids(self, cluster_centroids, based_on='log_zlp', **kwargs):
r"""
If the image has been clustered before and the cluster centroids are
already known, one can use this function to reconstruct the original
clustering of the image.
Parameters
----------
cluster_centroids : numpy.ndarray, shape=(M,)
Array with the cluster centroids
based_on : str, optional
One can cluster either on the sum of the intensities (pass ``'sum'``),
the log of the sum (pass ``'log_sum'``),
the log of the ZLP peak value (pass ``'log_peak'``),
the log of the ZLP peak value + the two bins next to the peak value (pass ``'log_zlp'``),
the log of the sum of the bulk spectrum (no zlp) (pass ``'log_bulk'``),
the thickness (pass ``'thickness'``).
The default is ``'log_zlp'``.
kwargs
"""
self.cluster_centroids = cluster_centroids
image_data = self.get_image_signals(**kwargs)
if based_on == 'sum':
values = image_data.sum(axis=2)
elif based_on == 'log_sum':
values = np.log(np.maximum(image_data, 1e-14).sum(axis=2))
elif based_on == 'log_peak':
values = np.log(image_data[:, :, np.argwhere((self.eaxis > -1) & (self.eaxis < 1)).flatten()].max(axis=2))
elif based_on == 'log_zlp':
values = np.zeros([self.shape[0], self.shape[1]])
max_idx = np.argmax(image_data, axis=2)
for i in np.arange(self.shape[0]):
for j in np.arange(self.shape[1]):
values[i, j] = np.log(image_data[i, j, max_idx[i, j] - 1:max_idx[i, j] + 2].sum())
elif based_on == 'log_bulk':
values = np.log(np.maximum(image_data[:, :, np.argwhere(self.eaxis > 5).flatten()], 1e-14).sum(axis=2))
elif based_on == 'thickness':
values = self.t[:, :, 0]
elif type(based_on) == np.ndarray:
values = based_on
if values.size != self.n_spectra:
raise IndexError("The size of values on which to cluster does not match the image size.")
else:
values = np.sum(image_data, axis=2)
print("provide either sum, log or thickness as value base, reverting back to sum")
valar = (values.transpose() * np.ones(np.append(self.shape[:2], self.n_clusters)).transpose()).transpose()
self.cluster_labels = np.argmin(np.absolute(valar - cluster_centroids), axis=2)
print("# of spectra per cluster is", np.bincount(self.cluster_labels.flatten()))
if len(np.unique(self.cluster_labels)) < self.n_clusters:
warnings.warn(
"it seems like the clustered values of dE1 are not clustered on \
this image/on log or sum. Please check clustering.")
[docs]
def find_optimal_amount_of_clusters(self, cluster_start=None, sigma=0.05, n_models=1000,
conf_interval=1, signal_type='EELS', **kwargs):
r"""
Finds the optimal amount of clusters based on the amount of models that need to be trained.
Parameters
----------
cluster_start
sigma
n_models
conf_interval
signal_type
kwargs
Returns
-------
"""
int_log_I = np.log(np.sum(self.data, axis=2)).flatten()
if cluster_start is None:
cluster_start = int(0.5 * (self.n_spectra / n_models))
print("Start with", cluster_start, "clusters")
scale_factors = find_scale_var(inp=int_log_I, min_out=0, max_out=1)
optimal = False
n_clusters = cluster_start
prev_smallest = int(0.2 * n_models)
while optimal is False:
print("Currently at", n_clusters, "clusters")
self.cluster(n_clusters=n_clusters, **kwargs)
self.get_cluster_signals(signal_type=signal_type)
sigma_per_cluster = np.zeros(n_clusters)
n_spectra_per_cluster = np.zeros(n_clusters)
for cluster_label in range(n_clusters):
signals_in_cluster = self.cluster_signals[cluster_label]
signals_in_cluster[signals_in_cluster <= 0] = 1
if len(signals_in_cluster) <= 1:
print("You have reached a cluster of only 1 spectrum!")
optimal = True
break
ci_low = scale(np.nanpercentile(
np.log(np.sum(signals_in_cluster, axis=1)), conf_interval, axis=0), scale_factors)
ci_high = scale(np.nanpercentile(
np.log(np.sum(signals_in_cluster, axis=1)), 100 - conf_interval, axis=0), scale_factors)
sigma_per_cluster[cluster_label] = np.absolute(ci_high - ci_low)
n_spectra_per_cluster[cluster_label] = int(len(signals_in_cluster))
if np.median(n_spectra_per_cluster) < n_models:
if np.median(n_spectra_per_cluster) < int(0.9 * n_models):
optimal = True
print("Warning! Median # of spectra is getting too small, breaking off early")
elif np.min(n_spectra_per_cluster) < int(0.3 * n_models):
if np.min(n_spectra_per_cluster) < int(0.9 * prev_smallest):
optimal = True
print("Warning! # of spectra in smallest cluster is getting too small, breaking off early")
prev_smallest = np.min(n_spectra_per_cluster)
if np.max(sigma_per_cluster) <= sigma:
optimal = True
if optimal is False:
n_clusters += 1
else:
break
print("Finished! Note that this is just a suggestion.")
print("# of clusters =", n_clusters)
print("Spread is", np.round(sigma_per_cluster, 4))
print("# of spectra per cluster is", n_spectra_per_cluster)
print("median is", int(np.median(n_spectra_per_cluster)))
def get_key(self, key):
if key.lower() in (string.lower() for string in self.EELS_NAMES):
return 'data'
elif key.lower() in (string.lower() for string in self.IEELS_NAMES):
return 'ieels'
elif key.lower() in (string.lower() for string in self.ZLP_NAMES):
return 'zlp'
elif key.lower() in (string.lower() for string in self.DIELECTRIC_FUNCTION_NAMES):
return 'eps'
elif key.lower() in (string.lower() for string in self.THICKNESS_NAMES):
return 'thickness'
else:
return key
def __getitem__(self, key):
r"""
Determines behavior of `self[key]`
"""
return self.data[key]
def __getattr__(self, key):
key = self.get_key(key)
return object.__getattribute__(self, key)
def __setattr__(self, key, value):
key = self.get_key(key)
self.__dict__[key] = value
def __str__(self):
if self.name is not None:
name_str = ", name = " + self.name
else:
name_str = ""
return "Spectral image: " + name_str + ", image size:" + str(self.data.shape[0]) + "x" + \
str(self.data.shape[1]) + ", eaxis range: [" + str(round(self.eaxis[0], 3)) + "," + \
str(round(self.eaxis[-1], 3)) + \
"], deltaE: " + str(round(self.deltaE, 3))
def __repr__(self):
data_str = "data * np.ones(" + str(self.shape) + ")"
if self.name is not None:
name_str = ", name = " + self.name
else:
name_str = ""
return "Spectral_image(" + data_str + ", deltaE=" + str(round(self.deltaE, 3)) + name_str + ")"
def __len__(self):
return self.shape[2]
# STATIC FUNCTIONS
def cft(x, y):
r"""
Fourier Transformation
Parameters
----------
x: numpy.ndarray, shape=(M,)
y: numpy.ndarray, shape=(M,)
Returns
-------
F_k: numpy.ndarray, shape=(M,)
"""
N_0 = np.argmin(np.absolute(x))
N = len(x)
x_min = np.min(x)
x_max = np.max(x)
delta_x = (x_max - x_min) / N
k = np.linspace(0, N - 1, N)
cont_factor = np.exp(2j * np.pi * N_0 * k / N)
F_k = fft(y) * cont_factor * delta_x
return F_k
def icft(x, Y_k):
r"""
Inverse Fourier Transformation
Parameters
----------
x: numpy.ndarray, shape=(M,)
Y_k: numpy.ndarray, shape=(M,)
Returns
-------
f_n: numpy.ndarray, shape=(M,)
"""
N_0 = np.argmin(np.absolute(x))
N = len(x)
x_min = np.min(x)
x_max = np.max(x)
delta_x = (x_max - x_min) / N
k = np.linspace(0, N - 1, N)
cont_factor = np.exp(-2j * np.pi * N_0 * k / N)
f_n = ifft(Y_k * cont_factor) / delta_x
return f_n
def scale(inp, ab):
r"""
Rescale the input features by applying a linear map such that the range
covers 0.1 to 0.9.
Parameters
---------
inp: numpy.ndarray, shape=(M,)
Input feature array of length M
ab: list
Contains rescaling parameters
Returns
-------
output: numpy.ndarray, shape=(M,)
Rescaled features
"""
return inp * ab[0] + ab[1]
def find_scale_var(inp, min_out=0.1, max_out=0.9):
r"""
Find rescaling parameters such that the input features lie between 0.1 and 0.9
Parameters
----------
inp: numpy.ndarray, shape=(M,)
Input feature array of length M
min_out: float, optional
Minimum of input feature, set to 0.1 by default
max_out: float, optional
Maximum of input feature, set to 0.9 by default
Returns
-------
output: list
Rescaling parameters
"""
a = (max_out - min_out) / (inp.max() - inp.min())
b = min_out - a * inp.min()
return [a, b]
def round_scientific(value, n_sig):
r"""
Round ``value`` off to ``n_sig`` digits.
Parameters
----------
value: float
n_sig: int
Number of signicant digits
Returns
-------
output: float
Rounded version of ``value``.
"""
if value == 0:
return 0
if np.isnan(value):
return 0
scale = int(math.floor(math.log10(abs(value))))
num = round(value, n_sig - scale - 1)
return num
def trunc(values, decs=0):
r"""
Returns the truncated version of the input
Parameters
----------
values: numpy.ndarray, shape=(M,)
Input array
decs: int
Number of decimals to keep
Returns
-------
output: truncated version of the input
"""
return np.trunc(values * 10 ** decs) / (10 ** decs)
def get_seed(n_min=1e7, n_max=1e8):
return np.random.randint(n_min, n_max)
def linear_fit(x, a, b):
return a * x + b
def power_fit(x, a, r):
return a * x ** r
def gauss1d(x=0, mx=0, sx=2, **kwargs):
r"""
Parameters
----------
x
mx
sx
Returns
-------
"""
return 1. / (np.sqrt(2 * np.pi) * sx) * np.exp(
-(x - mx) ** 2 / (2 * sx ** 2))
def gauss2d(x=0, y=0, mx=0, my=0, sx=2, sy=2, **kwargs):
r"""
Returns values for a 2d gaussian distribution
Parameters
----------
x
y
mx
my
sx
sy
Returns
-------
"""
return 1. / (2. * np.pi * sx * sy) * np.exp(
-((x - mx) ** 2. / (2. * sx ** 2.) + (y - my) ** 2. / (2. * sy ** 2.)))