Source code for ml4eft.limits.optimize_ns

"""
Optimizer module to compute confidence level intervals in the SMEFT
"""

import pymultinest
import json
import sys
import numpy as np
import time
import os
import pandas as pd
from pathlib import Path
import shutil
import copy
import itertools

import ml4eft.analyse.analyse as analyse
import ml4eft.core.th_predictions as theory_pred

# fix randomness
np.random.seed(0)


[docs]class Optimize: """ Optimizer to obtain confidence level intervals in the SMEFT using either binned or unbinned (ML level) observables """
[docs] def __init__(self, config, coeff=None, rep=None): """ Optimize constructor Parameters ---------- config: dict loaded json run card coeff: array_like, optional Subset of ``(N,) ndarray`` EFT parameter names to include in the fit. Takes all EFT parameters by default. rep: int, optional Run the optimiser on a specific replica numner, set to ``None`` by default, in which case the optimiser works with the median over all replicas. """ self.config = config.copy() self.coeff = coeff self.rep = rep self.param_names = {} if "fit_id" in self.config.keys(): self.fit_id = self.config["fit_id"] if "order" in self.config.keys(): self.order = self.config["order"] if "prior" in self.config.keys(): self.prior_range = self.config["prior"] if "process" in self.config.keys(): self.process = self.config["process"] if "mode" in self.config.keys(): self.mode = self.config["mode"] else: print( "No mode (mode) selected, please specify one in the input card. Aborting." ) sys.exit() if "nlive" in self.config.keys(): self.live_points = self.config["nlive"] else: print( "Number of live points (nlive) not set in the input card. Using default: 500" ) self.live_points = 500 if "lumi" in self.config.keys(): self.lumi = self.config["lumi"] else: print( "Luminosity (lumi) not set in the input card. Using default: 5e3 pb^-1" ) if self.mode == 'binned': if "bins" in self.config.keys(): self.bins = self.config["bins"] else: print( "No binning (bins) specified. Please set in the in the input card. Aborting." ) sys.exit() elif self.mode == "nn": if "path_to_models" in self.config.keys(): self.path_to_models = analyse.Analyse.build_path_dict(self.config['path_to_models'], self.order, prefix='model') self.nn_analyser = analyse.Analyse(self.path_to_models, self.order) else: print( "No path tot model (path_to_models) specified. Please set in the in the input card. Aborting." ) sys.exit() self.bins = None self.kinematic = None elif self.mode == "truth": self.bins = None self.kinematic = None self.th_features = self.config['th_features'] if "path_to_theory_pred" in self.config.keys(): self.path_to_theory_pred = analyse.Analyse.build_path_dict(self.config["path_to_theory_pred"], self.order, prefix=self.process) else: print( "No path to theory predictions (path_to_theory_pred) specified. Please set in the in the input card. Aborting." ) sys.exit() self.th_pred = theory_pred.TheoryPred(self.path_to_theory_pred, bins=self.bins) if self.coeff is None: self.glob = True self.coeff = self.th_pred.get_c_names_unique() # coefficients to include in the fit else: self.glob = False self.suffix = '' if self.glob is False: self.suffix += '_' for c in self.coeff: self.suffix += c + '_' self.suffix = self.suffix[:-1] if self.rep is not None: self.output_path = os.path.join(self.config["results_path"], '{}_{}'.format(self.mode, self.fit_id), 'rep_{}'.format(self.rep)) else: if self.glob is True: self.output_path = os.path.join(self.config["results_path"], '{}_{}'.format(self.mode, self.fit_id)) else: self.output_path = os.path.join(self.config["results_path"], '{}_{}'.format(self.mode, self.fit_id), self.suffix[1:]) Path(self.output_path).mkdir(parents=True, exist_ok=True) # store input card as reference with open(os.path.join(self.output_path, 'input_card.json'), 'w') as json_data: json.dump(self.config, json_data) # nr of dof self.n_params = len(self.coeff) # load dataset (pseudo data) sm_data_path = self.config['observed_data_path'] sm_data, _ = analyse.Analyse.load_events(sm_data_path) # construct observed dataset xsec_sm_tot = np.sum(self.th_pred.th_dict['sm']) # sum over bins # accurate without bins? nu_tot_sm = xsec_sm_tot * config['lumi'] n_tot_sm = np.random.poisson(nu_tot_sm, 1) self.observed_data = sm_data.sample(int(n_tot_sm), random_state=1) if self.mode == "nn": # evaluated nn models on pseudo dataset self.nn_analyser.evaluate_models(self.observed_data, rep=self.rep) models_evaluated = self.nn_analyser.models_evaluated_df['models'] if self.rep is None: # take median over models self.nn_analyser.models_evaluated_df['models'] = models_evaluated.apply( lambda row: np.median(row, axis=0)) if self.mode == "truth": self.dsigma_dx = self.th_pred.compute_diff_coefficients(self) elif self.mode == "binned": if len(self.bins) == 1: kin, = self.bins.keys() x = self.observed_data[kin].values self.n_i, _, = np.histogram(x, self.bins[kin]) elif len(self.bins) == 2: kin_1, kin_2 = self.bins.keys() x = self.observed_data[kin_1].values y = self.observed_data[kin_2].values self.n_i, _, _ = np.histogram2d(x, y, bins=(self.bins[kin_1], self.bins[kin_2]))
[docs] def my_prior(self, cube): """ Construct prior volume Parameters ---------- cube: array_like, ``(N, ) ndarray`` unit hypercube of dim ``N`` Returns ------- cube: array_like prior volume """ for i, c_name in enumerate(self.coeff): prior_min, prior_max = self.prior_range[c_name] cube[i] = cube[i] * (prior_max - prior_min) + prior_min return cube
[docs] def cube_to_dict(self, cube): """ Converts the prior volume ``(N, ) ndarray`` to a dict with the EFT parameter names as keys Parameters ---------- cube: array_like prior volume """ for i, c_name in enumerate(self.coeff): self.param_names[c_name] = cube[i]
[docs] def log_like_nn(self, cube): """ Implementaiton of the unbinned extended log-likelihood using the ML models Parameters ---------- cube: array_like prior volume Returns ------- float Extended-log likelihood using the ML models """ self.cube_to_dict(cube) # compute inclusive xsec at cube sigma = 0 sigma += self.th_pred.th_dict['sm'] lin_models = self.th_pred.th_dict['lin'] for c_name, c_val in self.param_names.items(): # check if c_name has linear sensitivity if c_name in lin_models: sigma += c_val * self.th_pred.th_dict['lin'][c_name] if 'quad' in self.th_pred.th_dict: quad_pred = self.th_pred.th_dict['quad'] for (c1, c2) in itertools.product(self.param_names.keys(), repeat=2): c_name = '{}_{}'.format(c1, c2) if c_name in quad_pred: sigma += self.param_names[c1] * self.param_names[c2] * quad_pred['{}_{}'.format(c1, c2)] nu = sigma * self.lumi # median likelihood ratio ratio_med = self.nn_analyser.likelihood_ratio_nn(self.param_names) log_r_med = np.log(ratio_med) return -nu + np.sum(log_r_med)
[docs] def log_like_truth(self, cube): """ Implementaiton of the unbinned extended log-likelihood using the analytical differential cross-sections at the parton level Parameters ---------- cube: array_like prior volume Returns ------- float Extended-log likelihood using the analytical predictions """ self.cube_to_dict(cube) # compute inclusive xsec at cube sigma = 0 sigma += self.th_pred.th_dict['sm'] dsigma_dx = 0 dsigma_dx += self.dsigma_dx['sm'] lin_models = self.th_pred.th_dict['lin'] for c_name, c_val in self.param_names.items(): # check if c_name has linear sensitivity if c_name in lin_models: sigma += c_val * self.th_pred.th_dict['lin'][c_name] dsigma_dx += c_val * self.dsigma_dx['lin'][c_name] if 'quad' in self.th_pred.th_dict: quad_pred = self.th_pred.th_dict['quad'] for (c1, c2) in itertools.product(self.param_names.keys(), repeat=2): c_name = '{}_{}'.format(c1, c2) if c_name in quad_pred: sigma += self.param_names[c1] * self.param_names[c2] * quad_pred['{}_{}'.format(c1, c2)] dsigma_dx += self.param_names[c1] * self.param_names[c2] * self.dsigma_dx['quad'][ '{}_{}'.format(c1, c2)] nu = sigma * self.lumi log_like = -nu + np.sum(np.log(dsigma_dx / self.dsigma_dx['sm'])) return log_like
[docs] def log_like_binned(self, cube): """ Implementaiton of the binned Poissonian log-likelihood Parameters ---------- cube: array_like prior volume Returns ------- float Binned Poissonian log-likelihood """ self.cube_to_dict(cube) # compute inclusive xsec at cube sigma = 0 sigma += self.th_pred.th_dict['sm'] lin_pred = self.th_pred.th_dict['lin'] for c_name, c_val in self.param_names.items(): if c_name in lin_pred: sigma += c_val * lin_pred[c_name] if 'quad' in self.th_pred.th_dict: quad_pred = self.th_pred.th_dict['quad'] for (c1, c2) in itertools.product(self.param_names.keys(), repeat=2): c_name = '{}_{}'.format(c1, c2) if c_name in quad_pred: sigma += self.param_names[c1] * self.param_names[c2] * quad_pred['{}_{}'.format(c1, c2)] nu_i = sigma * self.lumi log_like_i = self.n_i * np.log(nu_i) - nu_i return np.sum(log_like_i)
[docs] def run_sampling(self): """Runs the minimisation with Nested Sampling""" # Prefix for results prefix = os.path.join(self.output_path, "1k-") t1 = time.perf_counter() if self.mode == "nn": log_likelihood = self.log_like_nn elif self.mode == "truth": log_likelihood = self.log_like_truth elif self.mode == "binned": log_likelihood = self.log_like_binned result = pymultinest.solve(LogLikelihood=log_likelihood, Prior=self.my_prior, n_dims=self.n_params, outputfiles_basename=prefix, verbose=True, n_live_points=self.live_points, const_efficiency_mode=True, importance_nested_sampling=True, sampling_efficiency=0.01, evidence_tolerance=0.5 ) print("NS done!") t2 = time.perf_counter() print("Time = ", (t2 - t1) / 60.0) # export the posterior samples to json self.save(result) # remove ns raw output self.clean()
[docs] def clean(self): """ Removes raw NS output if no raw output is needed""" filelist = [ f for f in os.listdir(self.output_path) if f.startswith("1k-") ] for f in filelist: if f in os.listdir(self.output_path): os.remove(os.path.join(self.output_path, f))
[docs] def save(self, result): """ Save NS replicas to json inside a dictionary of the form {'coeff': [replicas values]} Parameters ---------- result : dict result dictionary """ vals = {} for i, (c, samples) in enumerate(zip(self.param_names.keys(), result['samples'].T)): vals[c] = samples.tolist() with open(os.path.join(self.output_path, 'posterior.json'), "w") as f: json.dump(vals, f)