"""
Module for reading in commondata, theory and SMEFT corrections
"""
import sys
import os
import itertools
from . import compute_theory as pr
import numpy as np
from collections import namedtuple
from scipy.optimize import fsolve
from scipy import stats
DataTuple = namedtuple(
"DataTuple",
(
"Commondata",
"SMTheory",
"CorrectionsDICT",
"CorrectionsVAL",
"HOcorrectionsDICT",
"HOcorrectionsVAL",
"ExpNames",
"NdataExp",
"Kinematics",
"Noise",
"TrainingMask",
"ValidationMask",
"CovMat",
),
)
# Should probably reduce this to a function which loads only one set
[docs]def load_datasets(config, i_rep):
""" Loads commondata, theory and SMEFT corrections into a namedtuple"""
# TODO: remove this dependency
from validphys import loader
# TODO: remove this dependency
from NNPDF import ComputeCovMat, CommonData
EXP_DATA = []
SM_THEORY = []
LIN_DICT = []
QUAD_DICT = []
X_VALS = []
SMEAR = []
CHI2_COVMAT = []
training_mask = []
validation_mask = []
N_data_exp = []
EXP_name = []
dataset = config["datasets"]
seed_handler(i_rep)
for sset in dataset:
# Read CommonData and SM predictions for replica `i_rep` from file
cd = loader.Loader().get_commondata(sset, None)
theory_filename = config["theory_path"] + sset + ".txt"
# sm_predictions = np.loadtxt(theory_filename, ndmin=2)[i_rep]
sm_predictions_rep0 = np.loadtxt(theory_filename, ndmin=2)[0]
# For the plotting tool we need to extract the kinematic information
# and for ttbar production this is kin1 (i.e. ptt, mtt etc)
kintab = cd.get_kintable()
xvals = np.array([kintab[i][0] for i in range(0, cd.GetNData())])
if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config:
keep = np.where(xvals < 1000)[0]
# Read in the SMEFT corrections and remove header, SM row and first column
corrections_filename = config["corrections_path"] + sset + ".txt"
# Create a dictionary to store operator names and results
corrections_dic = {}
with open(corrections_filename) as f:
for line in f:
key, *val = line.split()
corrections_dic[key] = val
if (
"Mtt" in sset or "mTWZ" in sset or "memu" in sset
) and "mass_cut" in config:
corrections_dic[key] = np.array(corrections_dic[key])[keep]
corrections_dic.pop("SM", None)
corrections_dic.pop("center-NNPDF", None)
higherorder = {}
removeHO = list()
# Split the dictionary into lambda^-2 and lambda^-4 terms
for key, value in corrections_dic.items():
if "*" in key:
removeHO.append(key)
higherorder[key] = np.array(
[float(value[i]) for i in range(len(value))]
) # Load float values instead of strings
if "^" in key:
new_key = "%s*%s" % (key[:-2], key[:-2])
removeHO.append(key)
higherorder[new_key] = np.array(
[float(value[i]) for i in range(len(value))]
)
for key in removeHO:
if key in corrections_dic:
del corrections_dic[key]
# TODO: safe to remove?
operator_str = list(corrections_dic.keys())
corrections = np.array(list(corrections_dic.values()), dtype=float).T
if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config:
corrections = corrections[keep]
pdf_cov_filename = config["theory_path"] + sset + "_cov.txt"
pdf_covariance = np.loadtxt(pdf_cov_filename, ndmin=2)
central_data = np.array([cd.GetData(i) for i in range(cd.GetNData())])
if "t0" in config and config["t0"] == "data":
covmat = ComputeCovMat(cd, central_data)
else:
covmat = ComputeCovMat(cd, sm_predictions_rep0)
# PDF covariance + experimental cov mat
total_covmat = covmat + pdf_covariance
# If bootstrap is on, generate data replica with total covmat
# Note: central PDF is used for SM predictions
# Uncertainties from PDFs are added to covmat
if config["bootstrap"]:
noise = generate_artdata(cd, total_covmat, i_rep)
else:
noise = np.zeros(cd.GetNData())
ndat = cd.GetNData()
if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config:
total_covmat = total_covmat[:, keep]
total_covmat = total_covmat[keep, :]
sm_predictions_rep0 = sm_predictions_rep0[keep]
ndat = len(sm_predictions_rep0)
noise = noise[keep]
EXP_name.append(sset)
N_data_exp.append(ndat)
# Gather the training and validation masks here
if config["cv"]:
ndat = cd.GetNData()
if ndat < 5:
for i in range(ndat):
training_mask.append(True)
validation_mask.append(False)
else:
training_id = np.random.choice(
int(ndat), int(round(ndat * 0.50, 0)), replace=False
)
for i in range(ndat):
if i in training_id:
training_mask.append(True)
validation_mask.append(False)
else:
training_mask.append(False)
validation_mask.append(True)
if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config:
central_data = central_data[keep]
xvals = xvals[keep]
if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config:
central_data = central_data[keep]
xvals = xvals[keep]
EXP_DATA.append(central_data)
SM_THEORY.append(sm_predictions_rep0)
LIN_DICT.append(corrections_dic)
QUAD_DICT.append(higherorder)
X_VALS.append(xvals)
SMEAR.append(noise)
CHI2_COVMAT.append(total_covmat)
# newset = DataTuple(cd, sm_predictions_rep0, corrections_dic,np.zeros(ndat),
# higherorder, np.zeros(ndat),np.zeros(ndat),np.zeros(ndat),
# xvals, noise, np.zeros(ndat), np.zeros(ndat),total_covmat)
# artificial_data_check(newset)
# FLatten array containing all data
EXP_array = np.array([item for sublist in EXP_DATA for item in sublist])
# Total number of experiments and data points
ndata = len(EXP_array)
# Flatten array containing all SM theory
SM_array = np.array([item for sublist in SM_THEORY for item in sublist])
# Store keys for linear correction values and build matrix containing linear corrections
LIN_array = list([item for sublist in LIN_DICT for item in sublist])
lin_corr_keys = np.array(sorted(list(set(LIN_array))))
lin_corr_values = np.zeros((ndata, len(lin_corr_keys)))
cnt = 0
for lindict in LIN_DICT:
dummy_key = list(lindict.keys())[0]
for j in range(len(lindict[dummy_key])):
for key in lindict:
idx = np.where(np.array(lin_corr_keys) == key)[0][0]
lin_corr_values[cnt, idx] = float(lindict[key][j])
cnt += 1
# Store keys for HO correction values and build matrix containing HO corrections
QUAD_array = list([item for sublist in QUAD_DICT for item in sublist])
quad_corr_keys = np.array(sorted(list(set(QUAD_array))))
quad_corr_values = np.zeros((ndata, len(quad_corr_keys)))
cnt = 0
for quaddict in QUAD_DICT:
dummy_key = list(quaddict.keys())[0]
for j in range(len(quaddict[dummy_key])):
for key in quaddict:
idx = np.where(np.array(quad_corr_keys) == key)[0][0]
quad_corr_values[cnt, idx] = float(quaddict[key][j])
cnt += 1
# Flatten kinematics array
X_array = np.array([item for sublist in X_VALS for item in sublist])
# Flatten noise array
SMEAR_array = np.array([item for sublist in SMEAR for item in sublist])
# Build large covariance matrix (individual datsets are on diagonal, no cross-correlations)
COVMAT_array = np.zeros((ndata, ndata))
cnt = 0
for i, expdata in enumerate(EXP_DATA):
for j in range(len(expdata)):
for k in range(len(expdata)):
COVMAT_array[cnt + j, cnt + k] = CHI2_COVMAT[i][j][k]
cnt += len(expdata)
training_idx = np.array([i for i, x in enumerate(training_mask) if x])
validation_idx = np.array([i for i, x in enumerate(validation_mask) if x])
N_data_exp = np.array(N_data_exp)
EXP_names = np.array(EXP_name)
if "norm_CMS_WZ" in config:
position = 0
for iexp, expname in enumerate(EXP_names):
ndat = N_data_exp[iexp]
if expname.startswith("CMS_WZ"):
break
position += ndat
config["norm_CMS_WZ"] = [position, position + ndat]
# Make one large datatuple containing all data, SM theory, corrections, etc.
totalset = DataTuple(
EXP_array,
SM_array,
lin_corr_keys,
lin_corr_values,
quad_corr_keys,
quad_corr_values,
EXP_names,
N_data_exp,
X_array,
SMEAR_array,
training_idx,
validation_idx,
COVMAT_array,
)
return totalset
[docs]def seed_handler(i_rep):
""" Allow the seed for splitting the data and
artificial data replication (and artifical data
for level 2 CT) to be set here
"""
np.random.seed(i_rep)
[docs]def generate_artdata(commondata, covmat, i_rep):
""" Generates artificial data noise for MC replica i_rep """
# TODO: remove this dependency
from NNPDF import CommonData
# Ensure different random seed is used for each replica (just use replica ID number)
seed_handler(i_rep)
ndat = commondata.GetNData()
gaussian_dist = np.random.multivariate_normal(np.zeros(ndat), covmat)
return gaussian_dist
[docs]def generate_CTartdata(commondata, covmat):
""" Generates experimental data noise for level 2 CT """
# Ensure the random seed is fixed and way outside the range
# needed for fitting
seed_handler(1234567)
ndat = len(commondata)
gaussian_dist = np.random.multivariate_normal(np.zeros(ndat), covmat)
return gaussian_dist
[docs]def generate_closure(config, dataset, coefficients, pseudocoeff, labels, level):
"""
Generates a level 0, 1 or 2 closure test set using the `coefficients` table
This is done by shifting the 'Noise' attribute of a dataset tuple accordingly
We fit to pseudodata (the SM) as opposed to real data (experiment)
"""
original_data = dataset.Commondata
covmat = dataset.CovMat
# Pseudodata here is just the SM theory exactly
pseudodata = pr.make_predictions(config, dataset, pseudocoeff, labels)
if level == 0:
# No artificial noise
shifted_data = pseudodata - original_data
elif level == 1:
# Adding statistical noise
exp_noise = generate_CTartdata(pseudodata, covmat)
if "norm_CMS_WZ" in config:
idx1, idx2 = config["norm_CMS_WZ"][0], config["norm_CMS_WZ"][1]
norm_CMS = np.sum((pseudodata + exp_noise)[idx1:idx2])
exp_noise[idx1:idx2] /= norm_CMS
pseudodata[idx1:idx2] /= norm_CMS
shifted_data = exp_noise - original_data + pseudodata
elif level == 2:
# Adding MC noise and statistical noise - set a fixed random seed
exp_noise = generate_CTartdata(pseudodata, covmat)
shifted_data = dataset.Noise + exp_noise - original_data + pseudodata
else:
print(
f"Closure tests can only be levels 0, 1, 2. Please enter the correct number"
)
return DataTuple(
original_data,
dataset.SMTheory,
dataset.CorrectionsDICT,
dataset.CorrectionsVAL,
dataset.HOcorrectionsDICT,
dataset.HOcorrectionsVAL,
dataset.ExpNames,
dataset.NdataExp,
dataset.Kinematics,
shifted_data,
dataset.TrainingMask,
dataset.ValidationMask,
dataset.CovMat,
)
[docs]def artificial_data_check(set):
""" Check that the artificial noise reproduces the original covariance matrix """
N_REPLICAS_CHECK = 1000
cd = set.Commondata
ndat = cd.GetNData()
art_noise = []
for i in range(N_REPLICAS_CHECK):
art_noise.append(generate_artdata(cd, set.CovMat, i))
exp_cov = np.array(set.CovMat)
art_cov = np.cov(np.array(art_noise).T)
if ndat == 1: # Total xsec data
art_cov = np.array([[art_cov]])
max_err_std = 0
max_err_cov = 0
for i, j in list(itertools.product(range(ndat), range(ndat))):
err = 100 * abs((exp_cov[i][j] - art_cov[i][j]) / exp_cov[i][j])
if i == j:
max_err_std = max(max_err_std, err)
else:
max_err_cov = max(max_err_cov, err)
print(f"Maximum deviation for variances: {max_err_std}%")
print(f"Maximum deviation for covariances: {max_err_cov}%")
CoeffTuple = namedtuple("Coefficients", ("labels", "values", "bounds"))
[docs]def aggregate_coefficients(config, loaded_datasets, i_rep):
""" Aggregate all coefficient labels and construct an array of coefficient
values of suitable size. Returns a CoeffTuple of the labels and values."""
# Set seed
seed_handler(i_rep)
# Get total number of operators
coeff_labels = []
# for set in loaded_datasets:
for key in loaded_datasets.CorrectionsDICT:
coeff_labels.append(key)
# Keep ordering of coefficients the same so they match to the actual corrections
coeff_labels, idx = np.unique(np.array(coeff_labels), return_index=True)
coeff_labels = coeff_labels[np.argsort(idx)]
# Give the initial point of the fit to be randomly spread around the bounds
# specified by --bounds option (if none given, bounds are taken from setup.py)
randarr = np.zeros(len(coeff_labels))
bounds = {}
if config["bounds"] is None:
if "OPff" in config["coefficients"].keys():
config["coefficients"]["Off"] = config["coefficients"].pop("OPff")
for k in config["coefficients"].keys():
if k not in coeff_labels:
print(
k
+ " is not part of fitted coefficients. Please comment it out in the setup file"
)
sys.exit()
idx = np.where(coeff_labels == k)[0][0]
min_val = config["coefficients"][k]["min"]
max_val = config["coefficients"][k]["max"]
randarr[idx] = np.random.uniform(low=min_val, high=max_val)
bounds[k] = (min_val, max_val)
else:
previous_results = config["root_path"] + "/results/" + config["bounds"]
print("Constructing bounds with %s" % previous_results)
if "OPff" in config["coefficients"].keys():
config["coefficients"]["Off"] = config["coefficients"].pop("OPff")
for k in config["coefficients"].keys():
if k not in coeff_labels:
print(
k
+ " is not part of fitted coefficients. Please comment it out in the setup file"
)
sys.exit()
if config["coefficients"][k]["fixed"]:
continue
# Renaming of OpD to OpDC to avoid output conflict with Opd
if k == "OpD":
name = "/%sC.txt" % k
else:
name = "/%s.txt" % k
idx = np.where(coeff_labels == k)[0][0]
if name[1:] not in os.listdir(previous_results):
print("%s result not found in %s" % (k, previous_results))
sys.exit()
openfile = open(previous_results + name, "r")
data = openfile.readlines()
min_val = float(data[0].split()[0])
max_val = float(data[-1].split()[0])
randarr[idx] = np.random.uniform(low=min_val, high=max_val)
bounds[k] = (min_val, max_val)
openfile.close()
return CoeffTuple(coeff_labels, randarr, bounds)
[docs]def CL_bounds(chi2, coeff, percent_CL):
""" Compute either the 68% or 95% confidence intervals"""
# Quadratic fit of coefficients to the chi2
a, b, c = np.polyfit(coeff, chi2, 2)
# Value of Wilson coefficient at the minimum of the chi2 parabola
wilson_coeff_min = -b / 2 / a
chi2_at_min = np.polyval([a, b, c], wilson_coeff_min)
# 68 or 95% CL based on delta chi2 = 1 or delta chi2 = 3.84
if percent_CL == 68:
del_chi2 = 1
elif percent_CL == 95:
del_chi2 = 3.84
else:
raise ValueError("Function only accepts 68% or 95% confidence levels!")
# Define the delta chi2 function: chi2 - chi2 at the minimum and subtract
# the equivalent CL value we want
def chi2_fit_func(x):
return a * x ** 2 + b * x + c - chi2_at_min - del_chi2
# Find the 68% or 95% bound
CL_bound = fsolve(chi2_fit_func, wilson_coeff_min)
return CL_bound, wilson_coeff_min