Source code for smefit.core.load_data

"""
Module for reading in commondata, theory and SMEFT corrections
"""
import sys
import os
import itertools
from . import compute_theory as pr
import numpy as np
from collections import namedtuple
from scipy.optimize import fsolve
from scipy import stats


DataTuple = namedtuple(
    "DataTuple",
    (
        "Commondata",
        "SMTheory",
        "CorrectionsDICT",
        "CorrectionsVAL",
        "HOcorrectionsDICT",
        "HOcorrectionsVAL",
        "ExpNames",
        "NdataExp",
        "Kinematics",
        "Noise",
        "TrainingMask",
        "ValidationMask",
        "CovMat",
    ),
)

# Should probably reduce this to a function which loads only one set
[docs]def load_datasets(config, i_rep): """ Loads commondata, theory and SMEFT corrections into a namedtuple""" # TODO: remove this dependency from validphys import loader # TODO: remove this dependency from NNPDF import ComputeCovMat, CommonData EXP_DATA = [] SM_THEORY = [] LIN_DICT = [] QUAD_DICT = [] X_VALS = [] SMEAR = [] CHI2_COVMAT = [] training_mask = [] validation_mask = [] N_data_exp = [] EXP_name = [] dataset = config["datasets"] seed_handler(i_rep) for sset in dataset: # Read CommonData and SM predictions for replica `i_rep` from file cd = loader.Loader().get_commondata(sset, None) theory_filename = config["theory_path"] + sset + ".txt" # sm_predictions = np.loadtxt(theory_filename, ndmin=2)[i_rep] sm_predictions_rep0 = np.loadtxt(theory_filename, ndmin=2)[0] # For the plotting tool we need to extract the kinematic information # and for ttbar production this is kin1 (i.e. ptt, mtt etc) kintab = cd.get_kintable() xvals = np.array([kintab[i][0] for i in range(0, cd.GetNData())]) if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config: keep = np.where(xvals < 1000)[0] # Read in the SMEFT corrections and remove header, SM row and first column corrections_filename = config["corrections_path"] + sset + ".txt" # Create a dictionary to store operator names and results corrections_dic = {} with open(corrections_filename) as f: for line in f: key, *val = line.split() corrections_dic[key] = val if ( "Mtt" in sset or "mTWZ" in sset or "memu" in sset ) and "mass_cut" in config: corrections_dic[key] = np.array(corrections_dic[key])[keep] corrections_dic.pop("SM", None) corrections_dic.pop("center-NNPDF", None) higherorder = {} removeHO = list() # Split the dictionary into lambda^-2 and lambda^-4 terms for key, value in corrections_dic.items(): if "*" in key: removeHO.append(key) higherorder[key] = np.array( [float(value[i]) for i in range(len(value))] ) # Load float values instead of strings if "^" in key: new_key = "%s*%s" % (key[:-2], key[:-2]) removeHO.append(key) higherorder[new_key] = np.array( [float(value[i]) for i in range(len(value))] ) for key in removeHO: if key in corrections_dic: del corrections_dic[key] # TODO: safe to remove? operator_str = list(corrections_dic.keys()) corrections = np.array(list(corrections_dic.values()), dtype=float).T if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config: corrections = corrections[keep] pdf_cov_filename = config["theory_path"] + sset + "_cov.txt" pdf_covariance = np.loadtxt(pdf_cov_filename, ndmin=2) central_data = np.array([cd.GetData(i) for i in range(cd.GetNData())]) if "t0" in config and config["t0"] == "data": covmat = ComputeCovMat(cd, central_data) else: covmat = ComputeCovMat(cd, sm_predictions_rep0) # PDF covariance + experimental cov mat total_covmat = covmat + pdf_covariance # If bootstrap is on, generate data replica with total covmat # Note: central PDF is used for SM predictions # Uncertainties from PDFs are added to covmat if config["bootstrap"]: noise = generate_artdata(cd, total_covmat, i_rep) else: noise = np.zeros(cd.GetNData()) ndat = cd.GetNData() if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config: total_covmat = total_covmat[:, keep] total_covmat = total_covmat[keep, :] sm_predictions_rep0 = sm_predictions_rep0[keep] ndat = len(sm_predictions_rep0) noise = noise[keep] EXP_name.append(sset) N_data_exp.append(ndat) # Gather the training and validation masks here if config["cv"]: ndat = cd.GetNData() if ndat < 5: for i in range(ndat): training_mask.append(True) validation_mask.append(False) else: training_id = np.random.choice( int(ndat), int(round(ndat * 0.50, 0)), replace=False ) for i in range(ndat): if i in training_id: training_mask.append(True) validation_mask.append(False) else: training_mask.append(False) validation_mask.append(True) if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config: central_data = central_data[keep] xvals = xvals[keep] if ("Mtt" in sset or "mTWZ" in sset or "memu" in sset) and "mass_cut" in config: central_data = central_data[keep] xvals = xvals[keep] EXP_DATA.append(central_data) SM_THEORY.append(sm_predictions_rep0) LIN_DICT.append(corrections_dic) QUAD_DICT.append(higherorder) X_VALS.append(xvals) SMEAR.append(noise) CHI2_COVMAT.append(total_covmat) # newset = DataTuple(cd, sm_predictions_rep0, corrections_dic,np.zeros(ndat), # higherorder, np.zeros(ndat),np.zeros(ndat),np.zeros(ndat), # xvals, noise, np.zeros(ndat), np.zeros(ndat),total_covmat) # artificial_data_check(newset) # FLatten array containing all data EXP_array = np.array([item for sublist in EXP_DATA for item in sublist]) # Total number of experiments and data points ndata = len(EXP_array) # Flatten array containing all SM theory SM_array = np.array([item for sublist in SM_THEORY for item in sublist]) # Store keys for linear correction values and build matrix containing linear corrections LIN_array = list([item for sublist in LIN_DICT for item in sublist]) lin_corr_keys = np.array(sorted(list(set(LIN_array)))) lin_corr_values = np.zeros((ndata, len(lin_corr_keys))) cnt = 0 for lindict in LIN_DICT: dummy_key = list(lindict.keys())[0] for j in range(len(lindict[dummy_key])): for key in lindict: idx = np.where(np.array(lin_corr_keys) == key)[0][0] lin_corr_values[cnt, idx] = float(lindict[key][j]) cnt += 1 # Store keys for HO correction values and build matrix containing HO corrections QUAD_array = list([item for sublist in QUAD_DICT for item in sublist]) quad_corr_keys = np.array(sorted(list(set(QUAD_array)))) quad_corr_values = np.zeros((ndata, len(quad_corr_keys))) cnt = 0 for quaddict in QUAD_DICT: dummy_key = list(quaddict.keys())[0] for j in range(len(quaddict[dummy_key])): for key in quaddict: idx = np.where(np.array(quad_corr_keys) == key)[0][0] quad_corr_values[cnt, idx] = float(quaddict[key][j]) cnt += 1 # Flatten kinematics array X_array = np.array([item for sublist in X_VALS for item in sublist]) # Flatten noise array SMEAR_array = np.array([item for sublist in SMEAR for item in sublist]) # Build large covariance matrix (individual datsets are on diagonal, no cross-correlations) COVMAT_array = np.zeros((ndata, ndata)) cnt = 0 for i, expdata in enumerate(EXP_DATA): for j in range(len(expdata)): for k in range(len(expdata)): COVMAT_array[cnt + j, cnt + k] = CHI2_COVMAT[i][j][k] cnt += len(expdata) training_idx = np.array([i for i, x in enumerate(training_mask) if x]) validation_idx = np.array([i for i, x in enumerate(validation_mask) if x]) N_data_exp = np.array(N_data_exp) EXP_names = np.array(EXP_name) if "norm_CMS_WZ" in config: position = 0 for iexp, expname in enumerate(EXP_names): ndat = N_data_exp[iexp] if expname.startswith("CMS_WZ"): break position += ndat config["norm_CMS_WZ"] = [position, position + ndat] # Make one large datatuple containing all data, SM theory, corrections, etc. totalset = DataTuple( EXP_array, SM_array, lin_corr_keys, lin_corr_values, quad_corr_keys, quad_corr_values, EXP_names, N_data_exp, X_array, SMEAR_array, training_idx, validation_idx, COVMAT_array, ) return totalset
[docs]def seed_handler(i_rep): """ Allow the seed for splitting the data and artificial data replication (and artifical data for level 2 CT) to be set here """ np.random.seed(i_rep)
[docs]def generate_artdata(commondata, covmat, i_rep): """ Generates artificial data noise for MC replica i_rep """ # TODO: remove this dependency from NNPDF import CommonData # Ensure different random seed is used for each replica (just use replica ID number) seed_handler(i_rep) ndat = commondata.GetNData() gaussian_dist = np.random.multivariate_normal(np.zeros(ndat), covmat) return gaussian_dist
[docs]def generate_CTartdata(commondata, covmat): """ Generates experimental data noise for level 2 CT """ # Ensure the random seed is fixed and way outside the range # needed for fitting seed_handler(1234567) ndat = len(commondata) gaussian_dist = np.random.multivariate_normal(np.zeros(ndat), covmat) return gaussian_dist
[docs]def generate_closure(config, dataset, coefficients, pseudocoeff, labels, level): """ Generates a level 0, 1 or 2 closure test set using the `coefficients` table This is done by shifting the 'Noise' attribute of a dataset tuple accordingly We fit to pseudodata (the SM) as opposed to real data (experiment) """ original_data = dataset.Commondata covmat = dataset.CovMat # Pseudodata here is just the SM theory exactly pseudodata = pr.make_predictions(config, dataset, pseudocoeff, labels) if level == 0: # No artificial noise shifted_data = pseudodata - original_data elif level == 1: # Adding statistical noise exp_noise = generate_CTartdata(pseudodata, covmat) if "norm_CMS_WZ" in config: idx1, idx2 = config["norm_CMS_WZ"][0], config["norm_CMS_WZ"][1] norm_CMS = np.sum((pseudodata + exp_noise)[idx1:idx2]) exp_noise[idx1:idx2] /= norm_CMS pseudodata[idx1:idx2] /= norm_CMS shifted_data = exp_noise - original_data + pseudodata elif level == 2: # Adding MC noise and statistical noise - set a fixed random seed exp_noise = generate_CTartdata(pseudodata, covmat) shifted_data = dataset.Noise + exp_noise - original_data + pseudodata else: print( f"Closure tests can only be levels 0, 1, 2. Please enter the correct number" ) return DataTuple( original_data, dataset.SMTheory, dataset.CorrectionsDICT, dataset.CorrectionsVAL, dataset.HOcorrectionsDICT, dataset.HOcorrectionsVAL, dataset.ExpNames, dataset.NdataExp, dataset.Kinematics, shifted_data, dataset.TrainingMask, dataset.ValidationMask, dataset.CovMat, )
[docs]def artificial_data_check(set): """ Check that the artificial noise reproduces the original covariance matrix """ N_REPLICAS_CHECK = 1000 cd = set.Commondata ndat = cd.GetNData() art_noise = [] for i in range(N_REPLICAS_CHECK): art_noise.append(generate_artdata(cd, set.CovMat, i)) exp_cov = np.array(set.CovMat) art_cov = np.cov(np.array(art_noise).T) if ndat == 1: # Total xsec data art_cov = np.array([[art_cov]]) max_err_std = 0 max_err_cov = 0 for i, j in list(itertools.product(range(ndat), range(ndat))): err = 100 * abs((exp_cov[i][j] - art_cov[i][j]) / exp_cov[i][j]) if i == j: max_err_std = max(max_err_std, err) else: max_err_cov = max(max_err_cov, err) print(f"Maximum deviation for variances: {max_err_std}%") print(f"Maximum deviation for covariances: {max_err_cov}%")
CoeffTuple = namedtuple("Coefficients", ("labels", "values", "bounds"))
[docs]def aggregate_coefficients(config, loaded_datasets, i_rep): """ Aggregate all coefficient labels and construct an array of coefficient values of suitable size. Returns a CoeffTuple of the labels and values.""" # Set seed seed_handler(i_rep) # Get total number of operators coeff_labels = [] # for set in loaded_datasets: for key in loaded_datasets.CorrectionsDICT: coeff_labels.append(key) # Keep ordering of coefficients the same so they match to the actual corrections coeff_labels, idx = np.unique(np.array(coeff_labels), return_index=True) coeff_labels = coeff_labels[np.argsort(idx)] # Give the initial point of the fit to be randomly spread around the bounds # specified by --bounds option (if none given, bounds are taken from setup.py) randarr = np.zeros(len(coeff_labels)) bounds = {} if config["bounds"] is None: if "OPff" in config["coefficients"].keys(): config["coefficients"]["Off"] = config["coefficients"].pop("OPff") for k in config["coefficients"].keys(): if k not in coeff_labels: print( k + " is not part of fitted coefficients. Please comment it out in the setup file" ) sys.exit() idx = np.where(coeff_labels == k)[0][0] min_val = config["coefficients"][k]["min"] max_val = config["coefficients"][k]["max"] randarr[idx] = np.random.uniform(low=min_val, high=max_val) bounds[k] = (min_val, max_val) else: previous_results = config["root_path"] + "/results/" + config["bounds"] print("Constructing bounds with %s" % previous_results) if "OPff" in config["coefficients"].keys(): config["coefficients"]["Off"] = config["coefficients"].pop("OPff") for k in config["coefficients"].keys(): if k not in coeff_labels: print( k + " is not part of fitted coefficients. Please comment it out in the setup file" ) sys.exit() if config["coefficients"][k]["fixed"]: continue # Renaming of OpD to OpDC to avoid output conflict with Opd if k == "OpD": name = "/%sC.txt" % k else: name = "/%s.txt" % k idx = np.where(coeff_labels == k)[0][0] if name[1:] not in os.listdir(previous_results): print("%s result not found in %s" % (k, previous_results)) sys.exit() openfile = open(previous_results + name, "r") data = openfile.readlines() min_val = float(data[0].split()[0]) max_val = float(data[-1].split()[0]) randarr[idx] = np.random.uniform(low=min_val, high=max_val) bounds[k] = (min_val, max_val) openfile.close() return CoeffTuple(coeff_labels, randarr, bounds)
[docs]def CL_bounds(chi2, coeff, percent_CL): """ Compute either the 68% or 95% confidence intervals""" # Quadratic fit of coefficients to the chi2 a, b, c = np.polyfit(coeff, chi2, 2) # Value of Wilson coefficient at the minimum of the chi2 parabola wilson_coeff_min = -b / 2 / a chi2_at_min = np.polyval([a, b, c], wilson_coeff_min) # 68 or 95% CL based on delta chi2 = 1 or delta chi2 = 3.84 if percent_CL == 68: del_chi2 = 1 elif percent_CL == 95: del_chi2 = 3.84 else: raise ValueError("Function only accepts 68% or 95% confidence levels!") # Define the delta chi2 function: chi2 - chi2 at the minimum and subtract # the equivalent CL value we want def chi2_fit_func(x): return a * x ** 2 + b * x + c - chi2_at_min - del_chi2 # Find the 68% or 95% bound CL_bound = fsolve(chi2_fit_func, wilson_coeff_min) return CL_bound, wilson_coeff_min