"""
Post-training analysing module to load, plot and evaluates models
"""
import matplotlib as mpl
from matplotlib import pyplot as plt
from matplotlib import rc
import numpy as np
import torch
import sys
import copy
import matplotlib.gridspec as gridspec
from matplotlib import animation
from matplotlib.ticker import NullFormatter
import os, sys
import pickle
import joblib
import json
import pandas as pd
from sklearn.cluster import KMeans
from scipy import integrate
import logging
import itertools
# import own pacakges
from ml4eft.core import classifier as classifier
from ml4eft.core.truth import tt_prod
from ..preproc import constants
mz = constants.mz # z boson mass [TeV]
mh = constants.mh
mt = constants.mt
col_s = constants.col_s
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica'], 'size': 22})
rc('text', usetex=True)
[docs]class Analyse:
"""
Post-training analyser that loads and evaluates models
"""
[docs] def __init__(self, path_to_models, order='quad', all=False):
"""
Analyse constructor
Parameters
----------
path_to_models: dict
Of the form {'lin': {'c1': ``<path_to_c1_models>``, 'c2': ``<path_to_c2_models>``}, 'quad': {'c1_c1': ``<path_to_c1_c1_models>``},
{'c1_c2': ``<path_to_c1_c2_models>``}, {'c2_c2': ``<path_to_c2_c2_models>``}}
order: str, default='quad'
The order in the EFT expansion (choose between either 'lin' or 'quad')
all: bool, default='True'
When set to True, the analyser loads all replicas. Set to False by defualt, in which case
the replicas are clusterd into "good" and "bad" models based on their loss value.
Examples
--------
Trained models can be loaded by creating an :class:`ml4eft.analyse.analyse.Analyse` object
>>> analyser = Analyse(path_to_models, 'quad')
Followed by constructing a model dictionary that contains all the models plus associated settings
>>> analyser.build_model_dict()
>>> analyser.model_df
models idx scalers run_card rep_paths
lin c1 [Classifier() ... [rep_idx, ...] [RobustScaler(quantile_range=(5,95)), ...] {'name': 'c1', ...} [<path_to_rep>, ...]
c2 [Classifier() ... ... ... ... ...
quad c1_c1 [Classifier() ... ... ... ... ...
c1_c2 [Classifier() ... ... ... ... ...
c2_c2 [Classifier() ... ... ... ... ...
Events and the inclusive cross-section can be loaded into a DataFrame by
>>> df, xsec = analyser.load_events('.../events_<rep>.pkl.gz')
>>> df
sqrts_hat pt_l1 pt_l2 pt_l_leading pt_l_trailing ...
1 1702.356400 20.532279 308.295118 308.295118 20.532279 ...
2 ... ... ... ... ...
To evaluate the models on the loaded DataFrame ``df``, use:
>>> analyser.evaluate_models(df)
>>> analyser.models_evaluated_df['models']
>>> analyser.models_evaluated_df['models']
lin c1 [[0.05130317, 0.05723376, 0.058220766, 0.04042...
c2 [[0.074420996, 0.10293982, 0.10705493, 0.05695...
quad c1_c1 [[0.07958487, 0.13463444, 0.14215767, 0.049889...
c1_c2 [[0.0005354581, 0.00042073405, 0.0004430262, -...
c2_c2 [[0.00032746396, 0.00041523552, 0.00045115477,...
"""
self.path_to_models = path_to_models
self.order = order
self.model_df = None
self.models_evaluated_df = None
self.coeff_truth = None
self.all = True
[docs] @staticmethod
def get_event_paths(root_path):
"""
Returns a list of paths to the DataFrames at ``root_path``
Parameters
----------
root_path: str
path to the DataFrame directory
Returns
-------
event_paths: list
list of paths to the DataFrames at stored at ``root_path``
Examples
--------
The paths to the event DataFrames stored at ``root_path`` can be loaded for 'n_rep' replicas by
>>> analyser.get_event_paths('/training_data/tt_llvlvlbb/tt_c1')
[/training_data/tt_llvlvlbb/tt_c1/events_0.pkl.gz', ... , /training_data/tt_llvlvlbb/tt_c1/events_<n_rep>.pkl.gz']
"""
event_paths = [os.path.join(root_path, mc_run) for mc_run in
os.listdir(root_path) if mc_run.startswith('events_')]
return event_paths
[docs] @staticmethod
def build_path_dict(root, order, prefix):
"""
Construct path to model dictionary
Parameters
----------
root: str
Path to the model root directory
order: str
Order of the EFT expansion, choose between 'lin' and 'quad'
prefix: str
For models: `prefix` = ``models``, for theory predictions: `prefix` = ``process_id`
Returns
-------
path_to_models: dict
Dictionary containing the paths to the models for each EFT ratio function
"""
path_to_models = {}
if prefix != 'model':
path_to_models['sm'] = os.path.join(root, '{}_sm'.format(prefix))
path_to_models['lin'] = {}
if order == 'quad':
path_to_models['quad'] = {}
for model_dir in os.listdir(root):
if model_dir.startswith(prefix):
if 'sm' in model_dir: # sm has already been added
continue
# linear
if model_dir.count('_') == 1:
path_to_models['lin'].update(
{model_dir.split("{}_".format(prefix), 1)[1]: os.path.join(root, model_dir)})
if model_dir.count('_') == 2 and order == 'quad':
path_to_models['quad'].update(
{model_dir.split("{}_".format(prefix), 1)[1]: os.path.join(root, model_dir)})
else:
continue
return path_to_models
[docs] @staticmethod
def posterior_loader(path):
"""Loads the posterior samples at ``path`` and converts it to a DataFrame
Parameters
----------
path: str
Location of posterior samples (.json file)
Returns
-------
df: pandas.DataFrame
DataFrame of the posterior samples
"""
with open(path) as json_data:
samples = json.load(json_data)
df = pd.DataFrame(samples)
return df
[docs] @staticmethod
def load_events(event_path):
"""
Loads a event DataFrame and splits it into the events and the inclusive cross section
Parameters
----------
event_path: str
Path to the DataFrame (including the xsec as first row)
Returns
-------
events: pandas.DataFrame
DataFrame with events
xsec: float
Inclusive cross-section of the events
"""
event_df = pd.read_pickle(event_path)
events = event_df.iloc[1:, :]
xsec = event_df.iloc[0, 0]
return events, xsec
[docs] @staticmethod
def load_loss(path_to_loss):
"""
Loades the losses per epoch into a list
Parameters
----------
path_to_loss: str
Path to loss file
Returns
-------
loss: list
list of losses per epoch
"""
with open(path_to_loss) as f:
loss = [float(line.rstrip()) for line in f]
return loss
[docs] @staticmethod
def load_run_card(path):
"""
Loads training run card
Parameters
----------
path: str
path to json model run card that stores the hyperparameter settings
Returns
-------
run_card: dict
dict with all the hyperparameter settings
"""
with open(path) as json_data:
run_card = json.load(json_data)
run_card['architecture'] = [len(run_card['features'])] + run_card['hidden_sizes'] + [run_card['output_size']]
return run_card
[docs] def filter_out_models(self, losses):
"""
Filter out the badly trained models based on kmeans clustering.
Parameters
----------
losses: array_like
Losses of all the trained models
Returns
-------
good_model_idx: numpy.ndarray
Array indices of the 'good' models
"""
if self.all:
return np.arange(len(losses.flatten()))
else:
n_clusters = 2
kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit(losses.reshape(-1, 1))
cluster_labels = kmeans.labels_
cluster_nr_low_loss = np.argmin(kmeans.cluster_centers_)
# find model indices in the low loss cluster
good_model_idx = np.argwhere(cluster_labels == cluster_nr_low_loss).flatten()
# if relative std has not been reduced by a factor 10, select only models within +- 4 sigma
if np.std(losses[good_model_idx]) / np.std(losses) > 0.1:
sigma_loss = (np.nanpercentile(losses, 84) - np.nanpercentile(losses, 16)) / 2
los_med = np.nanpercentile(losses, 50)
good_model_idx = np.argwhere(
(los_med - 4 * sigma_loss < losses) & (losses < los_med + 4 * sigma_loss)).flatten()
return good_model_idx
[docs] def load_models(self, model_path, rep=None, epoch=-1):
"""
Loads the trained models
Parameters
----------
model_path: str
path to model directory
rep: int, optional
Load only a specific replica specified by ``rep``. Load all replicas by default.
epoch: int, optional
Model at specific epoch to load. Set to the best model by default.
Returns
-------
models: array_like
``(N,) ndarray`` containing the loaded neural networks
models_rep_nr: array_like
``(N,) ndarray`` containing the replica numbers of the loaded neural networks
scalers: array_like
``(N,) ndarray`` containing the preprocessing scalers of the loaded neural networks
run_card: dict
training run card of the trained models
rep_paths: array_like
``(N,) ndarray`` with the paths to the neural networks
"""
# collect all replica paths when no specific replica is requested
if rep is None:
rep_paths = [os.path.join(model_path, mc_run) for mc_run in os.listdir(model_path) if
mc_run.startswith('mc_run')]
else:
rep_paths = [os.path.join(model_path, 'mc_run_{}'.format(rep))]
losses_tr = [] # to store the final training losses
losses_val = [] # to store the final validation losses
models = []
scalers = []
for rep_path in rep_paths:
path_to_run_card = os.path.join(rep_path, 'run_card.json')
path_to_scaler = os.path.join(rep_path, 'scaler.gz')
if not os.path.isfile(path_to_scaler): # if model did not get trained, jump to the next one
rep_paths.remove(rep_path)
continue
run_card = self.load_run_card(path_to_run_card)
scaler = joblib.load(path_to_scaler)
model_name = os.path.basename(model_path)
c_name = model_name.split('model_')[1]
c_train = run_card['c_train']
quadratic = True if '_' in c_name else False
if quadratic:
c1, c2 = c_name.split('_')
c_train_value = c_train[c1] * c_train[c2]
else:
c_train_value = c_train[c_name]
loaded_model = classifier.Classifier(run_card['architecture'], c_train_value)
# build path to model config file
if epoch != -1: # if specific epoch is requested
network_path = os.path.join(rep_path, 'trained_nn_{}.pt'.format(epoch))
if not os.path.isfile(network_path):
network_path = os.path.join(rep_path, 'trained_nn.pt')
else:
network_path = os.path.join(rep_path, 'trained_nn.pt')
# load the model
loaded_model.load_state_dict(torch.load(network_path))
# read the losses
path_to_loss_tr = os.path.join(rep_path, 'loss.out')
path_to_loss_val = os.path.join(rep_path, 'loss_val.out')
loss_tr = self.load_loss(path_to_loss_tr)
loss_val = self.load_loss(path_to_loss_val)
# load all the parameters into the trained network and save
losses_tr.append(loss_tr[-run_card['patience']])
losses_val.append(loss_val[-run_card['patience']])
models.append(loaded_model)
scalers.append(scaler)
losses_tr = np.array(losses_tr)
losses_val = np.array(losses_val)
models = np.array(models)
scalers = np.array(scalers)
# filter out the badly trained models based on their training loss only if all replicas
# have been included
models_rep_nr = None
if rep is None:
good_model_idx = self.filter_out_models(losses_tr)
models = models[good_model_idx]
scalers = scalers[good_model_idx]
# map model indices back to rep number
models_rep_nr = []
for idx in good_model_idx:
rep_nr = int(os.path.basename(rep_paths[idx]).split('mc_run_', 1)[1])
models_rep_nr.append(rep_nr)
models_rep_nr = np.array(models_rep_nr)
rep_paths = np.array(rep_paths)[good_model_idx]
return models, models_rep_nr, scalers, run_card, rep_paths
[docs] def build_model_dict(self, rep=None, epoch=-1):
"""
Constructs a DataFrame with loaded models plus associated info
Parameters
----------
rep: int, optional
Replica number in case a specific replica is requested
epoch: int, optional
Epoch to load, set to the best model by default
"""
from collections import defaultdict
self.model_dict = defaultdict(dict)
for order, dict_fo in self.path_to_models.items():
if self.order == 'lin' and order == 'quad': # skip quadratics when running linear
continue
for c_name, model_path in dict_fo.items():
pretrained_models, models_idx, scalers, run_card, rep_paths = self.load_models(model_path, rep, epoch)
self.model_dict[order][c_name] = {'models': pretrained_models, 'idx': models_idx, 'scalers': scalers,
'run_card': run_card, 'rep_paths': rep_paths}
self.model_df = pd.DataFrame.from_dict({(i, j): self.model_dict[i][j]
for i in self.model_dict.keys()
for j in self.model_dict[i].keys()},
orient='index')
[docs] def evaluate_models(self, df, rep=None, epoch=-1):
"""
Evaluates the loaded models on a Pandas DataFrame ``df``
Parameters
----------
df: pd.DataFrame
input to the models
rep: int, optional
Replica number in case a specific replica is requested.
Set to ``None`` by default in which case all available replicas are included.
epoch: int, optional
Epoch number to load. Set to the best model by default.
"""
# load models if not done already
if rep is not None:
self.build_model_dict(rep, epoch)
if self.model_df is None:
self.build_model_dict(rep, epoch)
models_evaluated = copy.deepcopy(self.model_dict)
for order, dict_fo in self.model_dict.items():
for c_name, dict_c in dict_fo.items():
models = dict_c['models']
for i, model in enumerate(models):
features = dict_c['run_card']['features']
features_scaled = dict_c['scalers'][i].transform(df[features])
with torch.no_grad():
model_ev = model.n_alpha(torch.tensor(features_scaled).float()).numpy().flatten()
models_evaluated[order][c_name]['models'][i] = model_ev
models_evaluated[order][c_name]['models'] = np.vstack(models_evaluated[order][c_name]['models'])
self.models_evaluated_df = pd.DataFrame.from_dict({(i, j): models_evaluated[i][j]
for i in models_evaluated.keys()
for j in models_evaluated[i].keys()},
orient='index')
[docs] def coeff_function_truth(self, df, c_name, features, process, order):
"""
Evaluates the analytic EFT ratio functions :math:`r_{\sigma}^{(i)}` and :math:`r_{\sigma}^{(i,j)}`
Parameters
----------
df: pandas.DataFrame
Events on which to evaluate :math:`r_{\sigma}^{(i)}` and :math:`r_{\sigma}^{(i,j)}`
c_name: str
Name of the EFT coefficient. Choose between 'ctgre', 'cut', 'cut_cut', 'ctgre_ctgre'
features: list
Kinematic features, options are ``m_tt``, ``y``
process: str
Supported options are 'tt' or 'ZH'
order: str
Order of the EFT expansion, choose between 'lin' and 'quad'
Returns
-------
coeff: array_like
``(N,) ndarray`` with :math:`r_{\sigma}^{(i)}` or :math:`r_{\sigma}^{(i,j)}` evaluated on ``df`` depending
on the ``order``
"""
if c_name == 'ctGRe' and order == 'lin':
cprime = {'ctGRe': -10, 'ctu8': 0}
elif c_name == 'ctu8' and order == 'lin':
cprime = {'ctGRe': 0, 'ctu8': 10}
elif c_name == 'ctGRe_ctGRe' and order == 'quad':
cprime = {'ctGRe': 10, 'ctu8': 0}
c_name = 'ctGRe'
if c_name == 'ctu8_ctu8' and order == 'quad':
cprime = {'ctGRe': 0, 'ctu8': 10}
c_name = 'ctu8'
# ratio_truth_lin = self.likelihood_ratio_truth(df, cprime, features, process, order)
ratio_truth_lin = self.likelihood_ratio_truth(df, cprime, features, process, order)
# compute the mask once to select the physical region in phase space
mask = ratio_truth_lin == 0
if order == 'lin': # only one of the c elements can be nonzero
coeff = np.ma.masked_where(mask, (ratio_truth_lin - 1) / cprime[c_name])
return coeff
elif order == 'quad': # only one of the c elements can be nonzero
coeff = np.ma.masked_where(mask, (ratio_truth_lin - 1) / cprime[c_name] ** 2)
return coeff
[docs] def accuracy_heatmap(self, c_name, order, process, mx_cut=None, rep=None, epoch=-1, ax=None, text=None):
"""
Compares the NN and true EFT ratio functions and plots their ratio and pull
Parameters
----------
c_name: str
Name of the EFT parameter, e.g. 'ctgre'
order: str
Order in the EFT expansion, options are 'lin' or 'quad'
process: str
Specifies the process. Currently supported is 'tt' and 'ZH'
mx_cut: list, optional
Plot range of the invariant mass
rep: int, optional
Request to plot for a specific replica
epoch: int, optional
Request to plot for a specific epoch.
ax: matplotlib.pyplot.axes, optional
Axes to plot on
text: str, optional
Add additional text on the heatmap such as the replica number
Returns
-------
matplotlib.figure.Figure
Heatmap of EFT ratio function
`matplotlib.figure.Figure`
Heatmap of associated pull
Examples
--------
For a single EFT coefficient :math:`c_{tG}` the likelihood ratio takes the form
.. math::
r_{\sigma}(x, c_{tG}) = 1 + c_{tG} r_{\sigma}^{(c_{tG})} + c_{tG}^2 r_{\sigma}^{(c_{tG}, c_{tG})}
To plot for example the accuracy of :math:`r_{\sigma}^{(c_{tG}, c_{tG})}` by plotting its ratio to the exact
result, one runs
>>> fig_med, fig_pull = analyser.accuracy_heatmap('ctgre_ctgre', 'quad', 'tt')
>>> fig_med
.. image:: ../images/heatmap_med.png
>>> fig_pull
.. image:: ../images/heatmap_pull.png
"""
s = constants.col_s ** 2
epsilon = 1e-2
if process == 'ZH':
if mx_cut is None:
mx_min, mx_max = mz + mh + epsilon, 3
else:
mx_min, mx_max = mx_cut
elif process == 'tt':
if mx_cut is None:
mx_min, mx_max = 2 * mt + epsilon, 3
else:
mx_min, mx_max = mx_cut
y_min, y_max = - np.log(np.sqrt(s) / mx_min), np.log(np.sqrt(s) / mx_min)
x_spacing, y_spacing = 1e-2, 0.01
mx_span = np.arange(mx_min, mx_max, x_spacing)
y_span = np.arange(y_min, y_max, y_spacing)
mx_grid, y_grid = np.meshgrid(mx_span, y_span)
grid = np.c_[mx_grid.ravel(), y_grid.ravel()]
if process == 'ZH':
df = pd.DataFrame({'y': grid[:, 1], 'm_zh': grid[:, 0]})
elif process == 'tt':
df = pd.DataFrame({'y': grid[:, 1], 'm_tt': grid[:, 0]})
# models
self.evaluate_models(df, rep, epoch)
nn = self.models_evaluated_df.loc[order, c_name]['models']
nn = np.vstack(nn)
n_models = nn.shape[0]
nn = nn.reshape((n_models, *mx_grid.shape))
# truth
features = df.columns.values
if self.coeff_truth is None:
self.coeff_truth = self.coeff_function_truth(df, c_name, features, process, order).reshape(
mx_grid.shape)
xlabel = r'$m_{ZH}\;\rm{[TeV]}$' if process == 'ZH' else r'$m_{t\bar{t}}\;\rm{[TeV]}$'
if rep is None:
# determine median, low and high CI
nn_median = np.percentile(nn, 50, axis=0)
nn_high = np.percentile(nn, 84, axis=0)
nn_low = np.percentile(nn, 16, axis=0)
nn_unc = (nn_high - nn_low) / 2
# median
median_ratio = self.coeff_truth / nn_median
title = r'$\rm{Unbinned\;exact/Unbinned\;ML}$'
fig_1, ax_1 = plt.subplots(figsize=(10, 8))
im_1 = self.plot_heatmap(ax[0], median_ratio,
xlabel=xlabel,
ylabel=r'$y_{t\bar{t}}$',
title=title,
extent=[mx_min, mx_max, y_min, y_max],
cmap='seismic',
bounds=[0.95, 0.96, 0.97, 0.98, 1.02, 1.03, 1.04, 1.05],
text=text)
# pull
fig_2, ax_2 = plt.subplots(figsize=(10, 8))
pull = (self.coeff_truth - nn_median) / nn_unc
im_2 = self.plot_heatmap(ax[1], pull,
xlabel=xlabel,
ylabel=r'$y_{t\bar{t}}$',
title=r'$\rm{Pull}$',
extent=[mx_min, mx_max, y_min, y_max],
cmap='seismic',
bounds=np.linspace(-1.5, 1.5, 10),
text=text)
return ax[0], ax[1]
else:
title = r"$\mathrm{{Truth/NN}}\;(\mathrm{{rep}}\;{})$".format(rep)
im = self.plot_heatmap(ax, self.coeff_truth / nn[0],
xlabel=xlabel,
ylabel=r'$y_{t\bar{t}}$',
title=title,
extent=[mx_min, mx_max, y_min, y_max],
cmap='seismic',
bounds=[0.95, 0.96, 0.97, 0.98, 1.02, 1.03, 1.04, 1.05],
rep=rep)
return im
[docs] def plot_heatmap_overview(self, c_name, order, process, mx_cut=None, reps=None, epoch=-1):
"""
Produces an overview of heatmaps showing in each plot the ratio between the ML model prediction and
the analytical EFT ratio function :math:`r_{\sigma}^{(i)}` or :math:`r_{\sigma}^{(i, j)}`
Parameters
----------
c_name: str
Name of EFT coefficient
order: str
Order in the EFT expansion
process: str
Specifies the process, choose between 'tt' and 'ZH'
mx_cut: float, optional
Plot range of the invariant mass
reps: int, optional
Number of replicas to include in the heatmap overview
epoch: int, optional
Specific epoch to plot at, takes the best models by default
Returns
-------
fig: matplotlib.figure
Examples
--------
To produce a heatmap overview of the first 20 replicas, run
>>> analyser = Analyse(path_to_models, 'quad')
>>> fig = analyser.plot_heatmap_overview('ctgre_ctgre', 'quad', 'tt', cut=0.5, reps=np.arange(20))
>>> fig
.. image:: ../images/heatmap_overview.png
"""
from mpl_toolkits.axes_grid1 import AxesGrid
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica'], 'size': 50})
n_cols = 4
mc_reps = len(reps)
n_rows = int(np.ceil(mc_reps / n_cols))
fig = plt.figure(figsize=(10 * n_cols, 10 * n_rows))
grid = AxesGrid(fig, 111,
nrows_ncols=(n_rows, n_cols),
axes_pad=1.0,
cbar_mode='single',
cbar_location='bottom',
cbar_pad=1.5,
cbar_size=0.5
)
for i, rep in enumerate(reps):
im = self.accuracy_heatmap(c_name, order, process, mx_cut, rep, epoch, grid[i])
grid[-1].cax.colorbar(im)
grid.cbar_axes[0].colorbar(im)
return fig
[docs] def likelihood_ratio_nn(self, c, df=None, epoch=-1):
"""
Compute the Neural Network parameterised likelihood ratio
Parameters
----------
c: dict
Of the form {'c1': value, 'c2': value, ...}
df: pandas.DataFrame, optional
In case the loaded models have not been evaluated yet, one can pass ``df`` to evaluate the neural networks
epoch: int, optional
Specify an epoch if necessary, takes the best model by default
Returns
-------
ratio: array_like
likelihood ratio as ``(N,M) ndarray`` with ``N`` and ``M`` the number of replicas and events respectively
"""
# update evaluated models for a new df
if df is not None:
self.evaluate_models(df, epoch=epoch)
ratio = 1
if 'lin' in self.models_evaluated_df.index:
lin_models = self.models_evaluated_df['models'].loc["lin"]
for c_name, c_val in c.items():
if c_name in lin_models:
ratio += c_val * lin_models[c_name]
if 'quad' in self.models_evaluated_df.index:
quad_models = self.models_evaluated_df['models'].loc["quad"]
for (c1, c2) in itertools.product(c.keys(), repeat=2):
c_name = '{}_{}'.format(c1, c2)
if c_name in quad_models:
ratio += c[c1] * c[c2] * quad_models['{}_{}'.format(c1, c2)]
ratio = np.vstack(ratio)
return ratio
[docs] def decision_function_nn(self, c, df=None, epoch=-1):
"""
Computes the Neural Network parameterised decision function :math:`g(x, c)`
Parameters
----------
df: pd.DataFrame
event info
c: dict
EFT point
epoch: int, optional
Use models at a specific epoch, set to -1 (best modeL) by default
Returns
-------
decision_function: numpy.ndarray
NN decision function
"""
ratio = self.likelihood_ratio_nn(c, df, epoch=epoch)
decision_function = 1 / (1 + ratio)
return decision_function
[docs] def point_by_point_comp_med(self, df, c, features, process, order, ax, text=None):
"""
Produces a point by point comparison of the log-likelihood ratio between the (median) ML model and the analytical
(exact) model
Parameters
----------
df: pandas.DataFrame
DataFrame with events
c: dict
Of the form {'c1': value, 'c2': value}
features: array_like
List of features to include in the comparison
process: str
Choose between ``tt`` or ``ZH``
order: str
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
ax: matplotlib.axes
Axes object to plot on
text: str
Additonal text to put on the plot
Examples
--------
>>> analyser = Analyse(path_to_models, 'quad')
>>> fig, ax = plt.subplots()
>>> events_sm = pd.read_pickle('<events_sm_0.pkl.gz>')
>>> analyser.point_by_point_comp_med(events_sm, {'ctgre': 2, 'cut': 0}, ['y', 'm_tt'], 'tt', 'lin')
>>> fig
.. image:: ../images/pbp-med.png
"""
r_nn = self.likelihood_ratio_nn(c, df=df)
tau_nn = np.log(r_nn)
r_truth = self.likelihood_ratio_truth(df, c, features, process, order)
tau_truth = np.log(r_truth)
fig = plt.figure(figsize=(8, 8))
x = np.linspace(np.min(tau_truth) - 0.1, np.max(tau_truth) + 0.1, 100)
ax.scatter(tau_truth, np.median(tau_nn, axis=0), s=2, color='red')
ax.plot(x, x, linestyle='dashed', color='grey')
ax.set_xlabel(r'$\log r(x, c)^{\rm{Unbinned}\;\rm{exact}}$')
ax.set_ylabel(r'$\log r(x, c)^{\rm{Unbinned}\;\rm{ML}}$')
ax.set_xlim((np.min(x), np.max(x)))
ax.set_ylim((np.min(x), np.max(x)))
if text is not None:
ax.text(0.95, 0.1, text,
horizontalalignment='right',
verticalalignment='center',
transform=ax.transAxes)
fig.tight_layout()
[docs] def point_by_point_comp(self, df, c_name, c, features, process, order):
"""
Produces a point by point comparison overview per replica of the log-likelihood ratio between the ML model
and the analytical (exact) model
Parameters
----------
df: pandas.DataFrame
DataFrame with events
c_name: str
name of EFT ratio function to compare, e.g. ``c1_c2``
c: dict
Of the form {'c1': value, 'c2': value}
features: array_like
List of features to include in the comparison
process: str
Choose between ``tt`` or ``ZH``
order: str
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
Examples
--------
>>> analyser = Analyse(path_to_models, 'quad')
>>> fig, ax = plt.subplots()
>>> events_sm = pd.read_pickle('<events_sm_0.pkl.gz>')
>>> analyser.point_by_point_comp(events_sm, {'ctgre': -2, 'cut': 0}, ['y', 'm_tt'], 'tt', 'lin')
>>> fig
.. image:: ../images/pbp_overview.png
"""
r_nn = self.likelihood_ratio_nn(c, df=df)
tau_nn = np.log(r_nn)
r_truth = self.likelihood_ratio_truth(df, c, features, process, order)
tau_truth = np.log(r_truth)
# overview plot for all replicas
ncols = 5
mc_reps = r_nn.shape[0]
nrows = int(np.ceil(mc_reps / ncols))
fig1 = plt.figure(figsize=(ncols * 4, nrows * 4))
x = np.linspace(np.min(tau_truth) - 0.1, np.max(tau_truth) + 0.1, 100)
model_idx = self.model_df.loc[order, c_name]['idx']
for i in np.argsort(model_idx):
ax = plt.subplot(nrows, ncols, model_idx[i] + 1)
plt.scatter(tau_truth, tau_nn[i, :], s=2, color='k')
plt.plot(x, x, linestyle='dashed', color='grey')
plt.text(0.1, 0.9, r"$\mathrm{{rep}}\;{}$".format(model_idx[i]), horizontalalignment='left',
verticalalignment='center',
transform=ax.transAxes)
plt.xlim((np.min(x), np.max(x)))
plt.ylim((np.min(x), np.max(x)))
plt.tight_layout()
# median
fig2, ax = plt.subplots(figsize=(8, 8))
x = np.linspace(np.min(tau_truth) - 0.1, np.max(tau_truth) + 0.1, 100)
plt.scatter(tau_truth, np.median(tau_nn, axis=0), s=2, color='k')
plt.plot(x, x, linestyle='dashed', color='grey')
plt.xlabel(r'$\log r(x, c)^{\rm{Unbinned}\;\rm{exact}}$')
plt.ylabel(r'$\log r(x, c)^{\rm{Unbinned}\;\rm{ML}}$')
plt.xlim((np.min(x), np.max(x)))
plt.ylim((np.min(x), np.max(x)))
plt.tight_layout()
return fig1, fig2
[docs] def plot_loss_overview(self, c_name, order, ax=None, rep=None, xlim=None):
"""
Plots the loss evolution per replica and returns an overview plot
Parameters
----------
c_name: str
name of EFT parameter
order: str
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
ax: matplotlib.axes, optional
Axes object to plot on
rep: int, optional
Specific replica to plot
Returns
-------
fig: matplotlib.figure
Loss overview plot
train_loss_best: array_like
List of 'best' training losses
Examples
--------
To plot a loss overview corresponding to the training of :math:`r_{\sigma}^{(c_{tG}, c_{tG})}`, run
>>> analyser = Analyse(path_to_models, 'quad')
>>> fig, train_losses = analyser.plot_loss_overview('ctgre_ctgre', 'quad')
>>> fig
.. image:: ../images/loss_overview.png
"""
if self.model_df is None:
self.build_model_dict()
rep_paths = self.model_df['rep_paths'][order][c_name]
loss_tr = []
loss_val = []
mc_reps = len(rep_paths)
ncols = 5
nrows = int(np.ceil(mc_reps / ncols))
patience = self.model_df.loc[order, c_name]['run_card']['patience']
model_idx = self.model_df.loc[order, c_name]['idx']
for rep_path in rep_paths:
path_to_loss_tr = os.path.join(rep_path, 'loss.out')
loss_tr.append(self.load_loss(path_to_loss_tr))
path_to_loss_val = os.path.join(rep_path, 'loss_val.out')
loss_val.append(self.load_loss(path_to_loss_val))
fig = plt.figure(figsize=(ncols * 4, nrows * 4))
train_loss_best = np.array([loss[-patience] for loss in loss_tr])
val_loss_best = np.array([loss[-patience] for loss in loss_val])
for i in np.argsort(model_idx):
if rep is not None:
if model_idx[i] != rep:
continue
else:
ax = plt.subplot(nrows, ncols, model_idx[i] + 1)
epochs = np.arange(len(loss_tr[i]))
label_val = r'$L_{\mathrm{val}}$' if model_idx[i] == 0 else None
label_train = r'$L_{\mathrm{tr}}$' if model_idx[i] == 0 else None
loss_train_rep = np.array(loss_tr[i])
loss_val_rep = np.array(loss_val[i])
if xlim is None:
ax.set_xlim(left=50)
ax.set_ylim(min(loss_train_rep[-1], loss_val_rep[-1]) - 0.2 * max(loss_sigma_val, loss_sigma_tr),
max(loss_train_rep[-1], loss_val_rep[-1]) + 0.8 * max(loss_sigma_val, loss_sigma_tr))
ax.plot(epochs, loss_train_rep, label=label_train)
ax.plot(epochs, loss_val_rep, label=label_val)
else:
ax.plot(epochs[xlim:], loss_train_rep[xlim:], label=label_train)
ax.plot(epochs[xlim:], loss_val_rep[xlim:], label=label_val)
ax.axvline(epochs[-patience], 0, 0.75, color='red', linestyle='dotted')
# ax.set_yscale('log')
# ax.yaxis.set_major_formatter(NullFormatter())
# ax.yaxis.set_minor_formatter(NullFormatter())
# ax.axes.yaxis.set_ticklabels([])
ax.set_ymargin(0.1)
ax.set_xmargin(0)
ax.text(0.95, 0.9, r"$\mathrm{{rep}}\;{}$".format(model_idx[i]), horizontalalignment='right',
verticalalignment='center',
transform=ax.transAxes)
ax.set_xlabel(r'$\mathrm{Epoch}$')
ax.set_ylabel(r'$\mathrm{Loss}$')
med_loss_tr = np.percentile(train_loss_best, 50)
low_loss_tr = np.percentile(train_loss_best, 16)
high_loss_tr = np.percentile(train_loss_best, 84)
loss_sigma_tr = np.abs((high_loss_tr - low_loss_tr) / 2)
med_loss_val = np.percentile(val_loss_best, 50)
low_loss_val = np.percentile(val_loss_best, 16)
high_loss_val = np.percentile(val_loss_best, 84)
loss_sigma_val = np.abs((high_loss_val - low_loss_val) / 2)
# same scale (not everything visible)
# ax.set_ylim(min(loss_train_rep[-1], loss_val_rep[-1]) - 0.2 * max(loss_sigma_val, loss_sigma_tr),
# min(loss_train_rep[-1], loss_val_rep[-1]) + 2 * max(loss_sigma_val, loss_sigma_tr))
# everything visible (not the same scale)
if xlim is None:
ax.set_ylim(min(loss_train_rep[-1], loss_val_rep[-1]) - 0.2 * max(loss_sigma_val, loss_sigma_tr),
max(loss_train_rep[-1], loss_val_rep[-1]) + 0.8 * max(loss_sigma_val, loss_sigma_tr))
else:
y_min = np.min(np.concatenate((loss_train_rep[xlim:], loss_val_rep[xlim:])))
y_max = np.max(np.concatenate((loss_train_rep[xlim:], loss_val_rep[xlim:])))
ax.set_ylim(y_min, y_max)
if model_idx[i] == 0:
ax.legend(loc="lower left", frameon=False)
plt.tight_layout()
return fig, train_loss_best
[docs] def plot_accuracy_1d(self, c, c_name, process, order, mx_cut, epoch=-1, ax=None, text=None):
"""
Plots the decision boundary :math:`g(x,c)` as predicted by the ML model and the analytical (exact) model along
1 dimension, i.e :x=math:`m_{tt}`
Parameters
----------
c: dict
Of the form {'c1': value, 'c2': value}
process: str
Choose between ``tt`` or ``ZH``
order: str, optional
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
mx_cut: list
Plot range of the invariant mass
epoch: int, optional
Specific epoch to plot, set to the best models by default
ax: matplotlib.axes, optional
Plot on an already created axes object
text: str, optional
Additional text to show on the plot
Returns
-------
fig: matplotlib.figure
Plot comparing the decision boundary :math:`g(x,c)` as predicted by the ML model and the analytical (exact)
result
Examples
--------
>>> analyser = Analyse(path_to_models, 'quad')
>>> fig = analyser.plot_accuracy_1d(c={'ctgre': -2, 'cut': 0}, process='tt', order='quad', cut=0.5, text=r'$c=c_{tG}=2\;\mathrm{quadratic}$')
>>> fig
.. image:: ../images/decission_function_1d.png
"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 6))
else:
fig = plt.subplots(figsize=(10, 6))
x = np.linspace(mx_cut[0], mx_cut[1], 200)
x = np.stack((np.zeros(len(x)), x), axis=-1)
if process == 'tt':
df = pd.DataFrame(x, columns=['y', 'm_tt'])
elif process == 'ZH':
df = pd.DataFrame(x, columns=['y', 'm_zh'])
features = self.model_df['run_card'][order][c_name]['features']
f_ana_lin = self.decision_function_truth(df, c, df.columns.values, process, order)
f_preds_nn = self.decision_function_nn(c, df, epoch=epoch)
f_pred_up = np.percentile(f_preds_nn, 84, axis=0)
f_pred_down = np.percentile(f_preds_nn, 16, axis=0)
ax.fill_between(x[:, 1], f_pred_down, f_pred_up,
label=r"$\mathrm{Unbinned}\;\mathrm{ML}\;(m_{t\bar{t}}, y_{t\bar{t}})$",
alpha=0.4)
ax.plot(x[:, 1], f_ana_lin, '--', c='red',
label=r"$\mathrm{Unbinned}\;\mathrm{exact}\;(m_{t\bar{t}}, y_{t\bar{t}})$")
ax.set_ylim((0, 1))
ax.set_xlim(np.min(x[:, 1]), np.max(x[:, 1]))
ax.set_ylabel(r'$g\;(x, c)$')
xlabel = r'$m_{ZH}\;\rm{[TeV]}$' if process == 'ZH' else r'$m_{t\bar{t}}\;\rm{[TeV]}$'
ax.set_xlabel(xlabel)
if text is not None:
ax.text(0.05, 0.1, text,
horizontalalignment='left',
verticalalignment='center',
transform=ax.transAxes)
ax.legend(frameon=False)
plt.tight_layout()
return fig
[docs] @staticmethod
def likelihood_ratio_truth(events, c, features, process, order=None):
"""
Computes the analytic likelihood ratio :math:`r(x, c)`
Parameters
----------
order: str, optional
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
process: str
Choose between ``tt`` or ``ZH``
features: list
List of kinematic labels
events : pd.DataFrame
Pandas DataFrame with the events
c : dict
EFT point with operator names specified as keys
Returns
-------
ratio: numpy.ndarray
Likelihood ratio wrt the SM
"""
n_features = len(features)
if process == 'tt':
c = np.array([c['ctGRe'], c['ctu8']])
if n_features == 1:
dsigma_0 = [tt_prod.dsigma_dmtt(*x[features], c, order) for _, x in events.iterrows()] # EFT
dsigma_1 = [tt_prod.dsigma_dmtt(*x[features]) for _, x in
events.iterrows()] # SM
elif n_features == 2:
dsigma_0 = [tt_prod.dsigma_dmtt_dy(*x[features], c, order) for _, x in
events.iterrows()] # EFT
dsigma_1 = [tt_prod.dsigma_dmtt_dy(*x[features]) for _, x in
events.iterrows()] # SM
elif n_features == 3:
dsigma_0 = [tt_prod.dsigma_dmtt_dy_dpt(*x[features], c, order) for
index, x in
events.iterrows()] # EFT
dsigma_1 = [tt_prod.dsigma_dmtt_dy_dpt(*x[features]) for
index, x in
events.iterrows()] # SM
dsigma_0, dsigma_1 = np.array(dsigma_0), np.array(dsigma_1)
ratio = np.divide(dsigma_0, dsigma_1, out=np.zeros_like(dsigma_0), where=dsigma_1 != 0)
return ratio.flatten()
[docs] def decision_function_truth(self, events, c, features, process, order=None):
"""
Computes the analytic decision function
Parameters
----------
order: str, optional
Specifies the order in the EFT expansion. Must be one of ``lin``, ``quad``.
process: str
Choose between ``tt`` or ``ZH``
features: list
List of kinematic labels
events : pd.DataFrame
Pandas DataFrame with the events
c : numpy.ndarray, shape=(M,)
EFT point in M dimensions, e.g c = (cHW, cHq3)
Returns
-------
decision_function: numpy.ndarray
Truth decision function
"""
ratio = self.likelihood_ratio_truth(events, c, features, process, order)
decision_function = 1 / (1 + ratio)
return decision_function
[docs] @staticmethod
def plot_heatmap(ax, data, xlabel, ylabel, title, extent, bounds, cmap='GnBu', rep=None, text=None):
"""
Plot and return a heatmap of ``data``
Parameters
----------
data: numpy.ndarray, shape=(M, N)
Input array
xlabel: str
x-label
ylabel: str
y-label
title: str
title of plot
extent: list
boundaries of the heatmap, e.g. [x_0, x_1, y_1, y_2]
bounds: list
The boundaries of the discrete colourmap
cmap: str
colourmap to use, set to 'GnBu' by default
Returns
-------
fig: `matplotlib.figure.Figure`
"""
import matplotlib.ticker as tick
# discrete colorbar
cmap_copy = copy.copy(mpl.cm.get_cmap(cmap))
norm = mpl.colors.BoundaryNorm(bounds, cmap_copy.N, extend='both')
cmap_copy.set_bad(color='gainsboro')
if rep is not None:
fig = ax.figure
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica'], 'size': 30})
else:
fig = plt.figure(figsize=(10, 8))
im = ax.imshow(data, extent=extent, origin='lower',
aspect=(extent[1] - extent[0]) / (extent[3] - extent[2]) * 10 / 8, cmap=cmap_copy, norm=norm)
if rep is not None:
ax.text(0.95, 0.95, r"$\mathrm{{rep}}\;{}$".format(rep), horizontalalignment='right',
verticalalignment='top',
transform=ax.transAxes)
else:
cbar = fig.colorbar(mpl.cm.ScalarMappable(norm=norm, cmap=cmap_copy), ax=ax,
format=tick.FormatStrFormatter(r'$%.2f$'))
ax.set_title(title, fontsize=15)
if text is not None:
ax.text(0.95, 0.05, text, horizontalalignment='right', verticalalignment='center',
transform=ax.transAxes)
plt.tight_layout()
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
return im