""" Binary classifier module
"""
# !/usr/bin/env python
# coding: utf-8
import logging
import torch
import torch.utils.data as data
import torch.optim as optim
import numpy as np
import datetime
import json
import os
import time
import pandas as pd
from joblib import dump, load
from matplotlib import pyplot as plt
from matplotlib import rc
from torch import nn
from sklearn.model_selection import train_test_split
import shutil
import ml4eft.analyse.analyse as analyse
from sklearn.preprocessing import StandardScaler, RobustScaler, QuantileTransformer
import joblib
import sys
rc('font', **{'family': 'sans-serif', 'sans-serif': ['Helvetica']})
rc('text', usetex=True)
[docs]class MLP(nn.Module):
""" Multi Layer Perceptron that serves as building block for the binary classifier
"""
[docs] def __init__(self, architecture):
"""
Parameters
----------
architecture: array_like
``(N, ) ndarray`` that specifies the MLP's architecture as the number of input nodes followed
by the number of hidden nodes in each consecutive hidden layer. The last entry must corespond
to the number of output units.
"""
super().__init__()
input_size = architecture[0]
hidden_sizes = architecture[1:-1]
output_size = architecture[-1]
# Create the network based on the specified hidden sizes
layers = []
layer_sizes = [input_size] + hidden_sizes
for layer_index in range(1, len(layer_sizes)):
layers += [nn.Linear(layer_sizes[layer_index - 1], layer_sizes[layer_index]), nn.ReLU()]
layers += [nn.Linear(layer_sizes[-1], output_size)]
self.layers = nn.Sequential(
*layers)
[docs] def forward(self, x):
"""
Performs a forward pass of the MLP
Parameters
----------
x: array_like
Input ``(N, ) torch.Tensor`` with ``N`` the number of input nodes
Returns
-------
out: torch.Tensor
"""
out = self.layers(x)
return out
[docs]class CustomActivationFunction(nn.Module):
"""
Class to construct custom activation functions
"""
[docs] def __init__(self):
super().__init__()
self.name = self.__class__.__name__
self.config = {"name": self.name}
[docs]class ConstraintActivation(CustomActivationFunction):
"""Class to transform the classifier output to ensure a positive likelihood ratio
"""
[docs] def __init__(self, c):
"""
Parameters
----------
c: float
Traning parameter :math:`c^{(\mathrm{tr})}` at which the EFT data set is generated
"""
super().__init__()
self.c = c
def forward(self, x):
if self.c > 0:
return torch.relu(x) - 1 / self.c + 1e-6
else:
return - torch.relu(x) - 1 / self.c - 1e-6
[docs]class Classifier(nn.Module):
""" The decssion function :math:`g(x, c)`
Implementation of the decision boundary `g(x, c)`
"""
[docs] def __init__(self, architecture, c):
super().__init__()
self.c = c
self.n_alpha = MLP(architecture)
self.n_alpha.layers.add_module('constraint', ConstraintActivation(self.c))
[docs] def forward(self, x):
"""
Parameters
----------
x: array_like
Input ``(N, ) torch.Tensor`` with ``N`` the number of input nodes
Returns
-------
g: Torch.Tensor
decision boundary between two
"""
NN_out = self.n_alpha(x)
g = 1 / (1 + (1 + self.c * NN_out))
return g
[docs]class PreProcessing():
"""
A feature preprocessor and data loader
"""
[docs] def __init__(self, fitter, path):
"""
Parameters
----------
fitter: :py:class:`Fitter <ml4eft.core.classifier.Fitter>`
Fitter object
path: dict
Dictionary with paths to the SM and EFT dataset, e.g:
:code:`{'sm': <path_to_sm_dataset>, 'eft': <path_to_eft_dataset>}`
"""
self.path = path
if fitter.scaler_type == 'robust':
self.scaler = RobustScaler(quantile_range=(5, 95))
elif fitter.scaler_type == 'standardise':
self.scaler = StandardScaler()
else:
self.scaler = QuantileTransformer(n_quantiles=1000, output_distribution='normal')
self.load_data(fitter)
[docs] def load_data(self, fitter):
"""
Loads ``pandas.DataFrame`` into SM and EFT dataframes.
"""
# sm
df_sm_full = pd.read_pickle(self.path['sm'], compression="infer")
df_sm_wo_xsec = df_sm_full.iloc[1:, :]
# cross section before cuts
self.xsec_sm = df_sm_full.iloc[0, 0]
self.df_sm = df_sm_wo_xsec.sample(fitter.n_dat)
# eft
df_eft_full = pd.read_pickle(self.path['eft'], compression="infer")
df_eft_wo_xsec = df_eft_full.iloc[1:, :]
# cross section before cuts
self.xsec_eft = df_eft_full.iloc[0, 0]
self.df_eft = df_eft_wo_xsec.sample(fitter.n_dat)
[docs] def feature_scaling(self, fitter, scaler_path):
"""
Parameters
----------
fitter: :py:class:`Fitter <ml4eft.core.classifier.Fitter>`
Fitter object
scaler_path: string
Path to where preprocessing scaler must be saved
Returns
-------
df_sm_scaled : pandas.DataFrame
Rescaled SM events
df_eft_scaled : pandas.DataFrame
Rescaled EFT events
"""
df = pd.concat([self.df_sm, self.df_eft])
# fit the scaler transformer to the eft and sm features
self.scaler.fit(df[fitter.features])
# rescale the sm and eft data
features_sm_scaled = self.scaler.transform(self.df_sm[fitter.features])
features_eft_scaled = self.scaler.transform(self.df_eft[fitter.features])
# convert transformed features to dataframe
df_sm_scaled = pd.DataFrame(features_sm_scaled, columns=fitter.features)
df_eft_scaled = pd.DataFrame(features_eft_scaled, columns=fitter.features)
# save the scaler
joblib.dump(self.scaler, scaler_path)
return df_sm_scaled, df_eft_scaled
[docs]class EventDataset(data.Dataset):
"""
Event loader
"""
[docs] def __init__(self, df, xsec, path, n_dat, features, hypothesis=0):
"""
EventDataset constructor
Parameters
----------
df: pandas.DataFrame
Event DataFrame
xsec: float
inclusive cross-section
path: str
Path to dataset
n_dat: int
Number of data points
features: array_like
List of features to train on
hypothesis: int
0 for EFT and 1 for SM
"""
super().__init__()
self.df = df
self.xsec = xsec
self.hypothesis = hypothesis
self.features = features
self.events = None
self.weights = None
self.labels = None
self.event_loader(path)
[docs] def event_loader(self, path):
"""
Set the weights of the evevnts, labels and convert the events to torch.Tensor,
Parameters
----------
path: str
Path to dataset
"""
n_dat = len(self.df)
self.weights = self.xsec * torch.ones(n_dat).unsqueeze(-1)
self.events = torch.tensor(self.df[self.features].values)
self.labels = torch.ones(n_dat).unsqueeze(-1) if self.hypothesis else torch.zeros(n_dat).unsqueeze(-1)
logging.info("Dataset loaded from {}".format(path))
def __len__(self):
return len(self.events)
def __getitem__(self, idx):
data_sample, weight_sample, label_sample = self.events[idx], self.weights[idx], self.labels[idx]
return data_sample, weight_sample, label_sample
[docs]class Fitter:
"""
Training class
"""
[docs] def __init__(self, json_path, mc_run, c_name, output_dir, print_log=False):
"""
Fitter constructor
Parameters
----------
json_path: str
Path to json run card
mc_run: int
Replica number
c_name: str
EFT coefficient for which to learn the ratio function
output_dir: str
Path to where the models should be stored
print_log: bool, optional
Set to true to print training progress to stdout, otherwise it
prints to a log file only
"""
# read the json run card
with open(json_path) as json_data:
self.run_options = json.load(json_data)
self.c_name = c_name
self.process_id = self.run_options["process_id"]
self.lr = self.run_options["lr"]
self.n_batches = self.run_options["n_batches"]
self.loss_type = self.run_options['loss_type']
self.scaler_type = self.run_options['scaler_type']
self.patience = self.run_options['patience']
self.val_ratio = self.run_options['val_ratio']
self.c_train = self.run_options["c_train"]
self.n_dat = self.run_options['n_dat']
self.epochs = self.run_options['epochs']
self.features = self.run_options['features']
self.network_size = [len(self.features)] + self.run_options['hidden_sizes'] + [
self.run_options['output_size']]
self.event_data_path = self.run_options['event_data'] # path to training data
self.quadratic = True if '_' in self.c_name else False
if self.quadratic:
c1, c2 = self.c_name.split('_')
self.c_train_value = self.c_train[c1] * self.c_train[c2]
else:
self.c_train_value = self.c_train[self.c_name]
self.mc_run = mc_run
output_dir = os.path.join(output_dir, time.strftime("%Y/%m/%d"))
os.makedirs(output_dir, exist_ok=True)
model_path = os.path.join(output_dir, 'model_{}'.format(self.c_name))
mc_path = os.path.join(model_path, 'mc_run_{}/'.format(self.mc_run))
log_path = os.path.join(mc_path, 'logs')
self.path_dict = {'model': model_path,
'mc_path': mc_path,
'log_path': log_path}
self.scaler = None
if not os.path.exists(mc_path):
os.makedirs(mc_path)
os.makedirs(log_path)
# initialise all paths to None, unless we run at pure quadratic or cross term level
self.path_lin_1 = self.path_lin_2 = self.path_quad_1 = self.path_quad_2 = None
# build the paths to the eft event data and initialize the right model
if self.quadratic:
self.path_dict['eft_data_path'] = '{eft_coeff}/events_{mc_run}.pkl.gz'
self.model = Classifier(self.network_size, self.c_train_value)
else: # linear
self.path_dict['eft_data_path'] = '{eft_coeff}/events_{mc_run}.pkl.gz'
self.model = Classifier(self.network_size, self.c_train_value)
# create log file with timestamp
t = time.localtime()
current_time = time.strftime("%H_%M_%S", t)
# print log to stdout when print_log is True, else only save to log file
if print_log:
handlers = [logging.FileHandler(log_path + '/training_{}.log'.format(current_time)),
logging.StreamHandler(sys.stdout)
]
else:
handlers = [logging.FileHandler(log_path + '/training_{}.log'.format(current_time))]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=handlers
)
logging.info("All directories created, ready to load the data")
# load the training and validation data
data_train, data_val = self.load_data()
# copy run card to the appropriate folder
with open(mc_path + 'run_card.json', 'w') as outfile:
json.dump(self.run_options, outfile)
# start the training
self.train_classifier(data_train, data_val)
[docs] def load_data(self):
"""
Constructs training and validation sets
Returns
-------
data_train: array_like
Training data set
data_val: array_like
Validation data set
"""
# event files are stored at event_data_path/sm, event_data_path/lin, event_data_path/quad
# or event_data_path/cross for sm, linear, quadratic (single coefficient) and cross terms respectively
path_sm = os.path.join(self.event_data_path, self.process_id + '_sm/events_{}.pkl.gz'.format(self.mc_run))
path_eft = os.path.join(self.event_data_path,
self.process_id + '_' + self.path_dict['eft_data_path'].format(eft_coeff=self.c_name,
mc_run=self.mc_run))
path_dict = {'sm': path_sm, 'eft': path_eft}
# preprocessing of the data
preproc = PreProcessing(self, path_dict)
# save the scaler
scaler_path = os.path.join(self.path_dict['mc_path'], 'scaler.gz')
df_sm_scaled, df_eft_scaled = preproc.feature_scaling(self, scaler_path)
self.n_dat = min(len(df_eft_scaled), len(df_sm_scaled))
# construct an eft and a sm data set for each value of c in c_values and make a list out of it
data_eft = EventDataset(df_eft_scaled,
xsec=preproc.xsec_eft,
path=path_eft,
n_dat=self.n_dat,
features=self.features,
hypothesis=0)
data_sm = EventDataset(df_sm_scaled,
xsec=preproc.xsec_sm,
path=path_sm,
n_dat=self.n_dat,
features=self.features,
hypothesis=1)
data_all = [data_eft, data_sm]
# split each data set in training and validation
data_split = []
for dataset in data_all:
n_val_points = int(self.val_ratio * len(dataset))
n_train_points = len(dataset) - n_val_points
data_split.append(data.random_split(dataset, [n_train_points, n_val_points]))
# collect all the training and validation sets
data_train, data_val = [], []
for dataset in data_split:
data_train.append(dataset[0])
data_val.append(dataset[1])
return data_train, data_val
[docs] def train_classifier(self, data_train, data_val):
"""
Starts the training of the binary classifier
Parameters
----------
data_train: array_like
Traning data set
data_val: array_like
Validation data set
"""
# define the optimizer
optimizer = optim.AdamW(self.model.parameters(), lr=self.lr)
# Use PyTorche's DataLoader to allow for mini-batches. After each epoch, the minibatches reshuffle.
# Create a dataloader object for each eft point + sm and put them all in one big list called train_data_loader
# or val_data_loader
train_data_loader = [
data.DataLoader(dataset_train, batch_size=int(dataset_train.__len__() / self.n_batches), shuffle=True) for
dataset_train in data_train]
val_data_loader = [
data.DataLoader(dataset_val, batch_size=int(dataset_val.__len__() / self.n_batches), shuffle=False) for
dataset_val in data_val]
# call the training loop
self.training_loop(optimizer=optimizer, train_loader=train_data_loader, val_loader=val_data_loader)
[docs] def training_loop(self, optimizer, train_loader, val_loader):
"""
Optimize the classifier with `optimizer` on the training data set `train_loader`. Keeps track of potential
overfitting through `val_loader`.
Parameters
----------
optimizer: torch.optim
Optimizer, e.g. torch.optim.AdamW
train_loader: array_like
List of torch.utils.data.DataLoader objects, one for the SM and the EFT (training)
val_loader: array_like
List of torch.utils.data.DataLoader objects, one for the SM and the EFT (validation)
"""
path = self.path_dict['mc_path']
loss_list_train, loss_list_val = [], [] # stores the training loss per epoch
# To be able to keep track of potential over-fitting, introduce a counter that gets increased
# by one whenever the the validation loss increases during an epoch
overfit_counter = 0
# outer loop that runs over the number of epochs
iterations = 0
for epoch in range(1, self.epochs + 1):
# check for plateau
if len(loss_list_train) > 10:
# if the loss after the first epoch queals the latest loss, we reset the weights
if loss_list_train[1] == loss_list_train[-1]:
logging.info("Detected stagnant training, reset the weights")
self.model.apply(self.weight_reset)
loss_train, loss_val = 0.0, 0.0
# We save the model parameters at the start of each epoch
torch.save(self.model.state_dict(), path + 'trained_nn_{}.pt'.format(epoch))
# compute validation loss
with torch.no_grad():
for minibatch in zip(*val_loader):
val_loss = torch.zeros(1)
for i, [event, weight, label] in enumerate(minibatch):
if isinstance(self.model, Classifier):
output = self.model(event.float())
loss = self.loss_fn(output, label, weight)
val_loss += loss
assert val_loss.requires_grad is False
loss_val += val_loss.item()
# loop over the mini-batches.
for j, minibatch in enumerate(zip(*train_loader)):
train_loss = torch.zeros(1)
# loop over all the datasets within the minibatch and compute their contribution to the loss
for i, [event, weight, label] in enumerate(minibatch): # i=0: eft, i=1: sm
if isinstance(self.model, Classifier):
output = self.model(event.float())
loss = self.loss_fn(output, label, weight)
train_loss += loss
# perform gradient descent after each minibatch. Move to the next epoch when all minibatches are looped over.
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
loss_train += train_loss.item()
loss_list_train.append(loss_train)
loss_list_val.append(loss_val)
training_status = "Epoch {epoch}, Training loss {train_loss}, Validation loss {val_loss}, Overfit counter = {overfit}". \
format(time=datetime.datetime.now(), epoch=epoch, train_loss=loss_train, val_loss=loss_val,
overfit=overfit_counter)
logging.info(training_status)
np.savetxt(path + 'loss.out', loss_list_train)
np.savetxt(path + 'loss_val.out', loss_list_val)
# in case the maximum number of epochs is reached, save the final state
if epoch == self.epochs:
torch.save(self.model.state_dict(), path + 'trained_nn.pt')
break
# check whether the network is overfitting by increasing the overfit_counter by one if the
# validation loss increases during the epoch.
if epoch > 20:
if loss_val > min(loss_list_val):
overfit_counter += 1
else:
overfit_counter = 0
if overfit_counter == self.patience:
stopping_point = epoch - self.patience
logging.info("Stopping point reached! Overfit counter = {}".format(overfit_counter))
shutil.copyfile(path + 'trained_nn_{}.pt'.format(stopping_point), path + 'trained_nn.pt')
logging.info("Backwards stopping done")
break
loss_val_old = loss_val
iterations += 1
logging.info("Finished training")
[docs] def weight_reset(self, m):
"""
Reset the weights and biases associated with the model ``m``.
Parameters
----------
m: MLP
Model of type :py:meth:`MLP <MLP>`.
"""
if isinstance(m, nn.Linear):
m.reset_parameters()
[docs] def loss_fn(self, outputs, labels, w_e):
"""
Loss function
Parameters
----------
outputs: torch.Tensor
Output of the decision function
labels: torch.Tensor
Classification labels
w_e: torch.Tensor
Event weights
Returns
-------
torch.Tensor
Average loss of the mini-batch
"""
if self.loss_type == 'CE':
loss = - (1 - labels) * w_e * torch.log(1 - outputs) - labels * w_e * torch.log(outputs)
elif self.loss_type == 'QC':
loss = (1 - labels) * w_e * outputs ** 2 + labels * w_e * (1 - outputs) ** 2
# average over all the losses in the batch
return torch.mean(loss, dim=0)