Source code for smefit.optimize.optimize

"""
Fitting the Wilson coefficients
"""
import os

# sys.path.insert(0,'/data/theorie/jjethier/SMEFT/code')
# sys.path.insert(0,'/Users/jacobethier/Documents/SMEFT/code')
import time
import copy
import scipy.optimize as opt
from scipy.stats import ncx2
import numpy as np
from ..core import load_data as ld
from ..core import chisquared as chi2
from collections.abc import Iterable


[docs]class OPTIMIZE(object): def __init__(self, config): self.config = config self.delta = 1e-7 self.step_size = 0.012 self.iterations = range(150) self.rep = int(self.config["rep"]) # Load data self.loaded_datasets = ld.load_datasets(self.config, self.rep) # Initialize tuple of coefficient labels and values self.coefficients = ld.aggregate_coefficients( self.config, self.loaded_datasets, self.rep ) for k in self.config["coefficients"]: if k not in self.coefficients.labels: raise NotImplementedError( "%s does not enter the theory. Comment it out in setup script and restart." % k ) # Get indice locations for quadratic corrections if self.config["HOlambda"] == "HO": self.config["HOindex1"] = [] self.config["HOindex2"] = [] for coeff in self.loaded_datasets.HOcorrectionsDICT: idx1 = np.where(self.coefficients.labels == coeff.split("*")[0])[0][0] idx2 = np.where(self.coefficients.labels == coeff.split("*")[1])[0][0] self.config["HOindex1"].append(idx1) self.config["HOindex2"].append(idx2) self.config["HOindex1"] = np.array(self.config["HOindex1"]) self.config["HOindex2"] = np.array(self.config["HOindex2"]) # Initialize arrays and values self.cost_values = [] self.coeffsteps = [] self.agg_shifts = 0 self.epoch = 1 # For closure test we produce pseudodata where theory is SM (i.e. known) # and add noise to simulate the experimental data self.smzero = np.zeros(len(self.coefficients.values)) # If one wants to do a CT to non-SM here is # where you would change the initial coeffs # smzero[5] = 10 if self.config["CT0L"]: self.setup_CT()
[docs] def setup_CT(self): """Set up the closure tests""" level = self.config["ctlevel"] print(f"Running a Level %d closure test to SM" % level) ct0l = ld.generate_closure( self.config, self.loaded_datasets, self.coefficients.values, self.smzero, self.coefficients.labels, level, ) self.loaded_datasets = ct0l
[docs] def get_free_params(self): """Gets free parameters entering fit""" free_params = [] free_param_labels = [] for k in self.config["coefficients"].keys(): idx = np.where(self.coefficients.labels == k)[0][0] if self.config["coefficients"][k]["fixed"] is False: free_params.append(self.coefficients.values[:][idx]) free_param_labels.append(k) self.free_params = np.array(free_params) self.free_param_labels = np.array(free_param_labels)
[docs] def set_newparams(self, params): """Propagates minimizer's parameter shifts""" shifts = 0 for i, param in enumerate(params): if param != self.free_params[i]: shifts += 1 self.free_params[i] = param self.propogate_params() self.set_constraints() return shifts
[docs] def set_constraints(self): """Sets parameter constraints""" for k in self.config["coefficients"].keys(): idx = np.where(self.coefficients.labels == k)[0][0] if self.config["coefficients"][k]["fixed"] is False: continue elif self.config["coefficients"][k]["fixed"] is True: self.coefficients.values[:][idx] = self.config["coefficients"][k][ "value" ] else: prop_const = self.config["coefficients"][k]["value"] if isinstance(prop_const, Iterable): self.coefficients.values[:][idx] = 0.0 for i, const in enumerate(prop_const): idx_fixed = np.where( self.coefficients.labels == self.config["coefficients"][k]["fixed"][i] )[0][0] self.coefficients.values[:][idx] += ( const * self.coefficients.values[:][idx_fixed] ) else: idx_fixed = np.where( self.coefficients.labels == self.config["coefficients"][k]["fixed"] )[0][0] self.coefficients.values[:][idx] = ( prop_const * self.coefficients.values[:][idx_fixed] )
[docs] def propogate_params(self): """Propagates minimizer's updated parameters to the coefficient tuple""" for k in self.config["coefficients"].keys(): idx = np.where(self.coefficients.labels == k)[0][0] if self.config["coefficients"][k]["fixed"] is False: idx2 = np.where(self.free_param_labels == k)[0][0] self.coefficients.values[:][idx] = self.free_params[idx2]
[docs] def get_status(self, shifts, chi2): """Monitor minimization procedure""" if len(self.cost_values) == 0: self.cost_values.append(chi2) if shifts > 2 and chi2 < self.cost_values[-1]: print("=" * 80) t1 = time.time() self.cost_values.append(chi2) self.coeffsteps.append(copy.deepcopy(self.free_params)) self.agg_shifts = 0 print("Coefficients") print("=" * 80) for i in range(len(self.coefficients.values[:])): print( "%s = %0.3f" % (self.coefficients.labels[i], self.coefficients.values[:][i]) ) print("=" * 80) print( r"Iteration = %d %2s Chi2=%0.4f %2s Ndat=%d" % (self.epoch, " ", chi2, " ", self.npts) ) print("Walltime=%0.2f(mins)" % ((t1 - self.t0) / 60.0)) self.epoch += 1
[docs] def chi2_func(self, params): """Wrap the chi2 in a function for scipy optimiser. Pass noise and data info as args. Log the chi2 value and values of the coefficients.""" shifts = self.set_newparams(params) loaded_data = self.loaded_datasets coefficients = self.coefficients chi2_info = chi2.compute_total_chi2( self.config, loaded_data, coefficients.values, coefficients.labels ) if self.config["cv"]: current_chi2 = chi2_info[0][0] self.npts = chi2_info[1][0] self.npts_V = chi2_info[1][1] else: current_chi2 = chi2_info[0] self.npts = chi2_info[1] self.get_status(shifts, current_chi2) return current_chi2
[docs] def run_fit(self): """Run the minimisation""" # Initialize save boolean save = False # Minimise the chi2 with initial conditions of the SM self.get_free_params() bounds = [self.coefficients.bounds[k] for k in self.free_param_labels] self.t0 = time.time() print("Starting SMEFiT Analysis...") scipy_min = opt.minimize( self.chi2_func, self.free_params, method="trust-constr", bounds=bounds ) print("Finished!\n") self.t1 = time.time() print("Walltime = %0.2f" % ((self.t1 - self.t0) / 60.0)) print("Results located in %s" % self.config["results_path"]) # Best coefficients produced by the algorithm scipy_coeffs = np.array(scipy_min.x) if self.config["cv"]: # Now we want to compute the chi2 of the validation set across the iterations self.chi2_per_it = [] for cvalues in self.coeffsteps: self.free_params = cvalues self.propogate_params() self.set_constraints() self.chi2_per_it.append( chi2.compute_total_chi2( self.config, self.loaded_datasets, self.coefficients.values, self.coefficients.labels, )[0][1] ) # Find the index number where the validation chi2 is minimised self.val_chi2min_index = self.chi2_per_it.index(min(self.chi2_per_it)) # And then find the parameters which match that self.param_best = self.coeffsteps[self.val_chi2min_index] tr_chi2 = self.cost_values[self.val_chi2min_index] val_chi2 = self.chi2_per_it[self.val_chi2min_index] if (tr_chi2 / self.npts) < 3 and (val_chi2 / self.npts_V) < 3: save = True print("Best params at iteration = ", self.val_chi2min_index) else: self.param_best = scipy_coeffs tr_chi2 = self.cost_values[-1] if tr_chi2 / self.npts < 3: save = True # Save results if save: myfile = open( self.config["results_path"] + "/SMEFT_coeffs_" + str(self.rep) + ".txt", "w", ) for op in self.coefficients.labels: myfile.write(str(op) + "\t") myfile.write("\n") self.free_params = self.param_best self.propogate_params() self.set_constraints() for item in self.coefficients.values[:]: myfile.write("%0.8f\t" % item) myfile.write("\n") myfile.close()