Source code for smefit.loader

# -*- coding: utf-8 -*-
import json
import pathlib
from collections import namedtuple

import jax.numpy as jnp
import numpy as np
import pandas as pd
import scipy.linalg as la
import yaml

from .basis_rotation import rotate_to_fit_basis
from .covmat import construct_covmat, covmat_from_systematics
from .log import logging

_logger = logging.getLogger(__name__)

DataTuple = namedtuple(
    "DataTuple",
    (
        "Commondata",
        "SMTheory",
        "OperatorsNames",
        "LinearCorrections",
        "QuadraticCorrections",
        "ExpNames",
        "NdataExp",
        "InvCovMat",
        "ThCovMat",
        "Luminosity",
        "Replica",
    ),
)


[docs] def check_file(path): """Check if path exists.""" if not path.exists(): raise FileNotFoundError(f"File {path} does not exist.")
[docs] def check_missing_operators(loaded_corrections, coeff_config): """Check if all the coefficient in the runcard are also present inside the theory tables.""" loaded_corrections = set(loaded_corrections) missing_operators = [k for k in coeff_config if k not in loaded_corrections] if missing_operators != []: raise ValueError( f"{missing_operators} not in the theory. Comment it out in setup script and restart.\n" "In case a cutoff was applied " f"the operator(s) {missing_operators} did not survive the mask." )
[docs] class Loader: """Class to check, load commondata and corresponding theory predictions. Parameters ---------- setname: str dataset name to load operators_to_keep : list list of operators for which corrections are loaded order: "LO", "NLO" EFT perturbative order use_quad : bool if True loads also |HO| corrections use_theory_covmat: bool if True add the theory covariance matrix to the experimental one rot_to_fit_basis: dict, None matrix rotation to fit basis or None """ commondata_path = pathlib.Path() """path to commondata folder, commondata excluded""" theory_path = pathlib.Path(commondata_path) """path to theory folder, theory excluded. Default it assumes to be the same as commondata_path""" def __init__( self, setname, operators_to_keep, order, use_quad, use_theory_covmat, use_multiplicative_prescription, rot_to_fit_basis, cutoff_scale, ): self._data_folder = self.commondata_path self._sys_folder = self.commondata_path / "systypes" self._theory_folder = self.theory_path self.setname = setname self.dataspec = {} ( self.dataspec["SM_predictions"], self.dataspec["theory_covmat"], self.dataspec["lin_corrections"], self.dataspec["quad_corrections"], self.dataspec["scales"], ) = self.load_theory( self.setname, operators_to_keep, order, use_quad, use_theory_covmat, use_multiplicative_prescription, rot_to_fit_basis, ) ( self.dataspec["central_values"], self.dataspec["sys_error"], self.dataspec["sys_error_t0"], self.dataspec["stat_error"], self.dataspec["luminosity"], ) = self.load_experimental_data() # mask theory and data to ensure only data below the specified cutoff scale is included self.mask = np.array( [True] * self.n_data ) # initial mask retains all datapoints if cutoff_scale is not None: self.apply_cutoff_mask(cutoff_scale) if len(self.dataspec["central_values"]) != len(self.dataspec["SM_predictions"]): raise ValueError( f"Number of experimental data points and theory predictions does not match in dataset {self.setname}." )
[docs] def apply_cutoff_mask(self, cutoff_scale): """ Updates previously loaded theory and datasets by filtering out points with scales above the cutoff scale Parameters ---------- cutoff_scale: flaot Value of the cutoff scale as specified in the runcard """ self.mask = self.dataspec["scales"] < cutoff_scale # if all datapoints lie above the cutoff, return if np.all(~self.mask): return # Apply mask to all relevant theory entries in dataspec self.dataspec.update( { "SM_predictions": self.dataspec["SM_predictions"][self.mask], "theory_covmat": self.dataspec["theory_covmat"][self.mask, :][ :, self.mask ], "lin_corrections": { k: v[self.mask] for k, v in self.dataspec["lin_corrections"].items() }, "quad_corrections": { k: v[self.mask] for k, v in self.dataspec["quad_corrections"].items() }, "scales": self.dataspec["scales"][self.mask], } ) # Single data points satisfy the mask already at this point if self.n_data == 1: stat_error = self.dataspec["stat_error"] else: stat_error = self.dataspec["stat_error"][self.mask] self.dataspec.update( { "central_values": self.dataspec["central_values"][self.mask], "sys_error": self.dataspec["sys_error"].loc[self.mask], "sys_error_t0": self.dataspec["sys_error_t0"].loc[self.mask], "stat_error": stat_error, } )
[docs] def load_experimental_data(self): """ Load experimental data with corresponding uncertainties Returns ------- cental_values: numpy.ndarray experimental central values covmat : numpy.ndarray experimental covariance matrix """ data_file = self._data_folder / f"{self.setname}.yaml" check_file(data_file) _logger.info(f"Loading dataset : {self.setname}") with open(data_file, encoding="utf-8") as f: data_dict = yaml.safe_load(f) central_values = np.array(data_dict["data_central"]) stat_error = np.array(data_dict["statistical_error"]) luminosity = data_dict.get("luminosity", None) num_sys = data_dict["num_sys"] num_data = data_dict["num_data"] # Load systematics from commondata file. # Read values of sys first sys_add = np.array(data_dict["systematics"]) # Read systype file if num_sys != 0: type_sys = np.array(data_dict["sys_type"]) name_sys = data_dict["sys_names"] # express systematics as percentage values of the central values sys_mult = sys_add / central_values * 1e2 # Identify add and mult systematics # and replace the mult ones with corresponding value computed # from data central value. Required for implementation of t0 prescription indx_add = np.where(type_sys == "ADD")[0] indx_mult = np.where(type_sys == "MULT")[0] sys_t0 = np.zeros((num_sys, num_data)) sys_t0[indx_add] = sys_add[indx_add].reshape(sys_t0[indx_add].shape) sys_t0[indx_mult] = ( sys_mult[indx_mult].reshape(sys_t0[indx_mult].shape) * self.dataspec["SM_predictions"] * 1e-2 ) # store also the sys without the t0 prescription sys = sys_add.reshape((num_sys, num_data)) # limit case with 1 sys if num_sys == 1: name_sys = [name_sys] # limit case no sys else: name_sys = ["UNCORR"] sys = np.zeros((num_sys + 1, num_data)) sys_t0 = sys # Build dataframe with shape (N_data * N_sys) and systematic name as the column headers df = pd.DataFrame(data=sys.T, columns=name_sys) df_t0 = pd.DataFrame(data=sys_t0.T, columns=name_sys) # limit case 1 data if num_data == 1: central_values = np.asarray([central_values]) # here return both exp sys and t0 modified sys return central_values, df, df_t0, stat_error, luminosity
[docs] @staticmethod def load_theory( setname, operators_to_keep, order, use_quad, use_theory_covmat, use_multiplicative_prescription, rotation_matrix=None, ): """ Load theory predictions Parameters ---------- operators_to_keep: list list of operators to keep order: "LO", "NLO" EFT perturbative order use_quad: bool if True returns also |HO| corrections use_theory_covmat: bool if True add the theory covariance matrix to the experimental one rotation_matrix: numpy.ndarray rotation matrix from tables basis to fitting basis Returns ------- sm: numpy.ndarray |SM| predictions lin_dict: dict dictionary with |NHO| corrections quad_dict: dict dictionary with |HO| corrections, empty if not use_quad scales: list list of energy scales for the theory predictions """ theory_file = Loader.theory_path / f"{setname}.json" check_file(theory_file) # load theory predictions with open(theory_file, encoding="utf-8") as f: raw_th_data = json.load(f) quad_dict = {} lin_dict = {} # save sm prediction at the chosen perturbative order sm = np.array(raw_th_data[order]["SM"]) # split corrections into a linear and quadratic dict for key, value in raw_th_data[order].items(): # quadratic terms if "*" in key and use_quad: quad_dict[key] = np.array(value) if use_multiplicative_prescription: quad_dict[key] = np.divide(quad_dict[key], sm) # linear terms elif "SM" not in key and "*" not in key: lin_dict[key] = np.array(value) if use_multiplicative_prescription: lin_dict[key] = np.divide(lin_dict[key], sm) # select corrections to keep def is_to_keep(op1, op2=None): if op2 is None: return op1 in operators_to_keep return op1 in operators_to_keep and op2 in operators_to_keep # rotate corrections to fitting basis if rotation_matrix is not None: lin_dict_fit_basis, quad_dict_fit_basis = rotate_to_fit_basis( lin_dict, quad_dict, rotation_matrix ) lin_dict_to_keep = { k: val for k, val in lin_dict_fit_basis.items() if is_to_keep(k) } quad_dict_to_keep = { k: val for k, val in quad_dict_fit_basis.items() if is_to_keep(k.split("*")[0], k.split("*")[1]) } else: lin_dict_to_keep = {k: val for k, val in lin_dict.items() if is_to_keep(k)} quad_dict_to_keep = { k: val for k, val in quad_dict.items() if is_to_keep(k.split("*")[0], k.split("*")[1]) } best_sm = np.array(raw_th_data["best_sm"]) if use_theory_covmat: th_cov = np.array(raw_th_data["theory_cov"]) else: th_cov = np.zeros((best_sm.size, best_sm.size)) # check if scales are present in the theory file scales = np.array(raw_th_data.get("scales", [None] * len(best_sm))) return ( best_sm, th_cov, lin_dict_to_keep, quad_dict_to_keep, scales, )
@property def n_data(self): """ Number of data Returns ------- n_data: int number of experimental data """ return self.dataspec["central_values"].size @property def lumi(self): """ Integrated luminosity of the dataset in fb^-1 Returns ------- lumi: float Integrated luminosity of the dataset in fb^-1 """ return self.dataspec["luminosity"] @property def central_values(self): """ Central values Returns ------- central_values: numpy.ndarray experimental central values """ return self.dataspec["central_values"] @property def covmat(self): """ Experimental covariance matrix Returns ------- covmat: numpy.ndarray experimental covariance matrix of a single dataset """ return construct_covmat(self.dataspec["stat_error"], self.dataspec["sys_error"]) @property def theory_covmat(self): """ Theory covariance matrix Returns ------- theory covmat: numpy.ndarray theory covariance matrix of a single dataset """ return self.dataspec["theory_covmat"] @property def sys_error(self): """ Systematic errors Returns ------- sys_error: pd.DataFrame systematic errors of the dataset """ return self.dataspec["sys_error"] @property def sys_error_t0(self): """ Systematic errors modified according to t0 prescription Returns ------- sys_error_t0: pd.DataFrame t0 systematic errors of the dataset """ return self.dataspec["sys_error_t0"] @property def stat_error(self): """ Statistical errors Returns ------- stat_error: np.array statistical errors of the dataset """ return self.dataspec["stat_error"] @property def sm_prediction(self): """ |SM| prediction for the dataset Returns ------- SM_predictions : numpy.ndarray best |SM| prediction """ return self.dataspec["SM_predictions"] @property def lin_corrections(self): """ |NHO| corrections Returns ------- lin_corrections : dict dictionary with operator names and |NHO| correctsions """ return self.dataspec["lin_corrections"] @property def quad_corrections(self): """ |HO| corrections Returns ------- quad_corrections : dict dictionary with operator names and |HO| correctsions """ return self.dataspec["quad_corrections"]
[docs] def construct_corrections_matrix_linear( corrections_list, n_data_tot, sorted_keys, rgemat=None ): """ Constructs the linear EFT corrections. Parameters ---------- corrections_list : list(dict) list containing corrections per experiment n_data_tot : int total number of experimental data points sorted_keys: numpy.ndarray list of sorted operator corrections rgemat: numpy.ndarray, optional solution matrix of the RGE, shape (k, l, m) with k the number of datapoints, l the number of generated coefficients and m the number of original |EFT| coefficients specified in the runcard. Returns ------- corr_values : np.ndarray matrix with correction values (n_data_tot, sorted_keys.size) """ corr_values = np.zeros((n_data_tot, sorted_keys.size)) cnt = 0 # loop on experiments for n_dat, correction_dict in corrections_list: # loop on corrections for key, values in correction_dict.items(): idx = np.where(sorted_keys == key)[0][0] corr_values[cnt : cnt + n_dat, idx] = values cnt += n_dat if rgemat is not None: if len(rgemat.shape) == 3: # dynamic scale, scale is datapoint specific corr_values = jnp.einsum("ij, ijk -> ik", corr_values, rgemat) else: # fixed scale so same rgemat for all datapoints corr_values = jnp.einsum("ij, jk -> ik", corr_values, rgemat) return corr_values
[docs] def construct_corrections_matrix_quadratic( corrections_list, n_data_tot, sorted_keys, rgemat=None ): """ Constructs quadratic EFT corrections. Parameters ---------- corrections_list : list(dict) list containing per experiment the number of datapoints and corresponding corrections n_data_tot : int total number of experimental data points sorted_keys: numpy.ndarray list of sorted operator corrections, shape=(n rg generated coeff,) or shape=(n original coeff,) in the absence of rgemat Returns ------- corr_values : np.ndarray matrix with correction values (n_data_tot, sorted_keys.size, sorted_keys.size) """ corr_values = np.zeros((n_data_tot, sorted_keys.size, sorted_keys.size)) cnt = 0 # loop on experiments for n_dat, correction_dict in corrections_list: # loop on corrections for key, values in correction_dict.items(): op1, op2 = key.split("*") idx1 = np.where(sorted_keys == op1)[0][0] idx2 = np.where(sorted_keys == op2)[0][0] # we want to store the values in the upper triangular part of the matrix if idx1 > idx2: idx1, idx2 = idx2, idx1 corr_values[cnt : cnt + n_dat, idx1, idx2] = values cnt += n_dat if rgemat is not None: if len(rgemat.shape) == 3: # dynamic scale, scale is datapoint specific corr_values = jnp.einsum( "ijk, ijl, ikr -> ilr", corr_values, rgemat, rgemat ) else: # fixed scale so same rgemat for all datapoints corr_values = jnp.einsum("ijk, jl, kr -> ilr", corr_values, rgemat, rgemat) return corr_values
[docs] def load_datasets( commondata_path, datasets, operators_to_keep, use_quad, use_theory_covmat, use_t0, use_multiplicative_prescription, default_order="LO", theory_path=None, rot_to_fit_basis=None, has_uv_couplings=False, has_external_chi2=False, rgemat=None, cutoff_scale=None, ): """ Loads experimental data, theory and |SMEFT| corrections into a namedtuple Parameters ---------- commondata_path : str, pathlib.Path path to commondata folder, commondata excluded datasets : list List of datasets to be loaded operators_to_keep: list list of operators for which corrections are loaded default_order: str Default perturbative order of the theory predictions use_quad: bool if True loads also |HO| corrections use_theory_covmat: bool if True add the theory covariance matrix to the experimental one theory_path : str, pathlib.Path, optional path to theory folder, theory excluded. Default it assumes to be the same as commondata_path rot_to_fit_basis: dict, optional matrix rotation to fit basis or None has_uv_couplings: bool, optional True for UV fits has_external_chi2: bool, optional True in the presence of external chi2 modules rgemat: numpy.ndarray, optional solution matrix of the RGE, shape=(k, l, m) with k the number of datapoints, l the number of generated coefficients under the RG and m the number of original |EFT| coefficients specified in the runcard. cutoff_scale: float, optional kinematic cutoff scale """ exp_data = [] sm_theory = [] sys_error_t0 = [] sys_error = [] stat_error = [] lin_corr_list = [] quad_corr_list = [] n_data_exp = [] lumi_exp = [] exp_name = [] th_cov = [] Loader.commondata_path = pathlib.Path(commondata_path) Loader.theory_path = pathlib.Path(theory_path or commondata_path) _logger.info(f"Applying cutoff scale: {cutoff_scale} GeV.") for sset in datasets: dataset_name = sset.get("name") dataset = Loader( dataset_name, operators_to_keep, sset.get("order", default_order), use_quad, use_theory_covmat, use_multiplicative_prescription, rot_to_fit_basis, cutoff_scale, ) # skip dataset if all datapoints are above the cutoff scale if np.all(~dataset.mask): continue exp_name.append(dataset_name) n_data_exp.append(dataset.n_data) lumi_exp.append(dataset.lumi) exp_data.extend(dataset.central_values) sm_theory.extend(dataset.sm_prediction) lin_corr_list.append([dataset.n_data, dataset.lin_corrections]) quad_corr_list.append([dataset.n_data, dataset.quad_corrections]) sys_error_t0.append(dataset.sys_error_t0) sys_error.append(dataset.sys_error) stat_error.append(dataset.stat_error) th_cov.append(dataset.theory_covmat) exp_data = np.array(exp_data) n_data_tot = exp_data.size # if uv couplings are present allow for op which are not in the # theory files (same for external chi2 and rge) if has_uv_couplings or has_external_chi2 or rgemat is not None: sorted_keys = np.unique((*operators_to_keep,)) else: # Construct unique list of operator names entering lin_corr_list operator_names = [] for item in lin_corr_list: operator_names.extend(list(item[1].keys())) sorted_keys = np.unique(operator_names) # operators to keep contains the operators that are present in the runcard # sorted_keys contains the operators that are present in the theory files # we need to check that all operators in the runcard are also present in the theory files check_missing_operators(sorted_keys, operators_to_keep) lin_corr_values = construct_corrections_matrix_linear( lin_corr_list, n_data_tot, sorted_keys, rgemat ) if use_quad: quad_corr_values = construct_corrections_matrix_quadratic( quad_corr_list, n_data_tot, sorted_keys, rgemat ) else: quad_corr_values = None # Construct unique large cov matrix accounting for correlations between different datasets # The theory covariance matrix, when used, will be different from zero. # At the moment it does not account for correlation between different datasets theory_covariance = la.block_diag(*th_cov) exp_covmat = covmat_from_systematics(stat_error, sys_error) + theory_covariance # replicas always generated using the experimental covmat, no t0 replica = np.random.multivariate_normal(exp_data, exp_covmat) if use_t0: fit_covmat = ( covmat_from_systematics(stat_error, sys_error_t0) + theory_covariance ) else: fit_covmat = exp_covmat # Make one large datatuple containing all data, SM theory, corrections, etc. return DataTuple( exp_data, np.array(sm_theory), sorted_keys, lin_corr_values, quad_corr_values, np.array(exp_name), np.array(n_data_exp), np.linalg.inv(fit_covmat), theory_covariance, np.array(lumi_exp), replica, )
[docs] def get_dataset(datasets, data_name): idx = np.where(datasets.ExpNames == data_name)[0][0] ndata = datasets.NdataExp[idx] lumi = datasets.Luminosity[idx] posix_in = datasets.NdataExp[:idx].sum() posix_out = posix_in + ndata return DataTuple( datasets.Commondata[posix_in:posix_out], datasets.SMTheory[posix_in:posix_out], datasets.OperatorsNames, datasets.LinearCorrections[posix_in:posix_out], ( datasets.QuadraticCorrections[posix_in:posix_out] if datasets.QuadraticCorrections is not None else None ), data_name, ndata, datasets.InvCovMat[posix_in:posix_out].T[posix_in:posix_out], datasets.ThCovMat[posix_in:posix_out].T[posix_in:posix_out], lumi, datasets.Replica[posix_in:posix_out], )