"""
Fitting the Wilson coefficients
"""
import os
# sys.path.insert(0,'/data/theorie/jjethier/SMEFT/code')
# sys.path.insert(0,'/Users/jacobethier/Documents/SMEFT/code')
import time
import copy
import scipy.optimize as opt
from scipy.stats import ncx2
import numpy as np
from ..core import load_data as ld
from ..core import chisquared as chi2
from collections.abc import Iterable
[docs]class OPTIMIZE(object):
def __init__(self, config):
self.config = config
self.delta = 1e-7
self.step_size = 0.012
self.iterations = range(150)
self.rep = int(self.config["rep"])
# Load data
self.loaded_datasets = ld.load_datasets(self.config, self.rep)
# Initialize tuple of coefficient labels and values
self.coefficients = ld.aggregate_coefficients(
self.config, self.loaded_datasets, self.rep
)
for k in self.config["coefficients"]:
if k not in self.coefficients.labels:
raise NotImplementedError(
"%s does not enter the theory. Comment it out in setup script and restart."
% k
)
# Get indice locations for quadratic corrections
if self.config["HOlambda"] == "HO":
self.config["HOindex1"] = []
self.config["HOindex2"] = []
for coeff in self.loaded_datasets.HOcorrectionsDICT:
idx1 = np.where(self.coefficients.labels == coeff.split("*")[0])[0][0]
idx2 = np.where(self.coefficients.labels == coeff.split("*")[1])[0][0]
self.config["HOindex1"].append(idx1)
self.config["HOindex2"].append(idx2)
self.config["HOindex1"] = np.array(self.config["HOindex1"])
self.config["HOindex2"] = np.array(self.config["HOindex2"])
# Initialize arrays and values
self.cost_values = []
self.coeffsteps = []
self.agg_shifts = 0
self.epoch = 1
# For closure test we produce pseudodata where theory is SM (i.e. known)
# and add noise to simulate the experimental data
self.smzero = np.zeros(len(self.coefficients.values))
# If one wants to do a CT to non-SM here is
# where you would change the initial coeffs
# smzero[5] = 10
if self.config["CT0L"]:
self.setup_CT()
[docs] def setup_CT(self):
"""Set up the closure tests"""
level = self.config["ctlevel"]
print(f"Running a Level %d closure test to SM" % level)
ct0l = ld.generate_closure(
self.config,
self.loaded_datasets,
self.coefficients.values,
self.smzero,
self.coefficients.labels,
level,
)
self.loaded_datasets = ct0l
[docs] def get_free_params(self):
"""Gets free parameters entering fit"""
free_params = []
free_param_labels = []
for k in self.config["coefficients"].keys():
idx = np.where(self.coefficients.labels == k)[0][0]
if self.config["coefficients"][k]["fixed"] is False:
free_params.append(self.coefficients.values[:][idx])
free_param_labels.append(k)
self.free_params = np.array(free_params)
self.free_param_labels = np.array(free_param_labels)
[docs] def set_newparams(self, params):
"""Propagates minimizer's parameter shifts"""
shifts = 0
for i, param in enumerate(params):
if param != self.free_params[i]:
shifts += 1
self.free_params[i] = param
self.propogate_params()
self.set_constraints()
return shifts
[docs] def set_constraints(self):
"""Sets parameter constraints"""
for k in self.config["coefficients"].keys():
idx = np.where(self.coefficients.labels == k)[0][0]
if self.config["coefficients"][k]["fixed"] is False:
continue
elif self.config["coefficients"][k]["fixed"] is True:
self.coefficients.values[:][idx] = self.config["coefficients"][k][
"value"
]
else:
prop_const = self.config["coefficients"][k]["value"]
if isinstance(prop_const, Iterable):
self.coefficients.values[:][idx] = 0.0
for i, const in enumerate(prop_const):
idx_fixed = np.where(
self.coefficients.labels
== self.config["coefficients"][k]["fixed"][i]
)[0][0]
self.coefficients.values[:][idx] += (
const * self.coefficients.values[:][idx_fixed]
)
else:
idx_fixed = np.where(
self.coefficients.labels
== self.config["coefficients"][k]["fixed"]
)[0][0]
self.coefficients.values[:][idx] = (
prop_const * self.coefficients.values[:][idx_fixed]
)
[docs] def propogate_params(self):
"""Propagates minimizer's updated parameters to the coefficient tuple"""
for k in self.config["coefficients"].keys():
idx = np.where(self.coefficients.labels == k)[0][0]
if self.config["coefficients"][k]["fixed"] is False:
idx2 = np.where(self.free_param_labels == k)[0][0]
self.coefficients.values[:][idx] = self.free_params[idx2]
[docs] def get_status(self, shifts, chi2):
"""Monitor minimization procedure"""
if len(self.cost_values) == 0:
self.cost_values.append(chi2)
if shifts > 2 and chi2 < self.cost_values[-1]:
print("=" * 80)
t1 = time.time()
self.cost_values.append(chi2)
self.coeffsteps.append(copy.deepcopy(self.free_params))
self.agg_shifts = 0
print("Coefficients")
print("=" * 80)
for i in range(len(self.coefficients.values[:])):
print(
"%s = %0.3f"
% (self.coefficients.labels[i], self.coefficients.values[:][i])
)
print("=" * 80)
print(
r"Iteration = %d %2s Chi2=%0.4f %2s Ndat=%d"
% (self.epoch, " ", chi2, " ", self.npts)
)
print("Walltime=%0.2f(mins)" % ((t1 - self.t0) / 60.0))
self.epoch += 1
[docs] def chi2_func(self, params):
"""Wrap the chi2 in a function for scipy optimiser. Pass noise and
data info as args. Log the chi2 value and values of the coefficients."""
shifts = self.set_newparams(params)
loaded_data = self.loaded_datasets
coefficients = self.coefficients
chi2_info = chi2.compute_total_chi2(
self.config, loaded_data, coefficients.values, coefficients.labels
)
if self.config["cv"]:
current_chi2 = chi2_info[0][0]
self.npts = chi2_info[1][0]
self.npts_V = chi2_info[1][1]
else:
current_chi2 = chi2_info[0]
self.npts = chi2_info[1]
self.get_status(shifts, current_chi2)
return current_chi2
[docs] def run_fit(self):
"""Run the minimisation"""
# Initialize save boolean
save = False
# Minimise the chi2 with initial conditions of the SM
self.get_free_params()
bounds = [self.coefficients.bounds[k] for k in self.free_param_labels]
self.t0 = time.time()
print("Starting SMEFiT Analysis...")
scipy_min = opt.minimize(
self.chi2_func, self.free_params, method="trust-constr", bounds=bounds
)
print("Finished!\n")
self.t1 = time.time()
print("Walltime = %0.2f" % ((self.t1 - self.t0) / 60.0))
print("Results located in %s" % self.config["results_path"])
# Best coefficients produced by the algorithm
scipy_coeffs = np.array(scipy_min.x)
if self.config["cv"]:
# Now we want to compute the chi2 of the validation set across the iterations
self.chi2_per_it = []
for cvalues in self.coeffsteps:
self.free_params = cvalues
self.propogate_params()
self.set_constraints()
self.chi2_per_it.append(
chi2.compute_total_chi2(
self.config,
self.loaded_datasets,
self.coefficients.values,
self.coefficients.labels,
)[0][1]
)
# Find the index number where the validation chi2 is minimised
self.val_chi2min_index = self.chi2_per_it.index(min(self.chi2_per_it))
# And then find the parameters which match that
self.param_best = self.coeffsteps[self.val_chi2min_index]
tr_chi2 = self.cost_values[self.val_chi2min_index]
val_chi2 = self.chi2_per_it[self.val_chi2min_index]
if (tr_chi2 / self.npts) < 3 and (val_chi2 / self.npts_V) < 3:
save = True
print("Best params at iteration = ", self.val_chi2min_index)
else:
self.param_best = scipy_coeffs
tr_chi2 = self.cost_values[-1]
if tr_chi2 / self.npts < 3:
save = True
# Save results
if save:
myfile = open(
self.config["results_path"] + "/SMEFT_coeffs_" + str(self.rep) + ".txt",
"w",
)
for op in self.coefficients.labels:
myfile.write(str(op) + "\t")
myfile.write("\n")
self.free_params = self.param_best
self.propogate_params()
self.set_constraints()
for item in self.coefficients.values[:]:
myfile.write("%0.8f\t" % item)
myfile.write("\n")
myfile.close()