Source code for smefit.optimize

# -*- coding: utf-8 -*-
import importlib
import json
import pathlib

import numpy as np
from rich.style import Style
from rich.table import Table

from smefit.rge import RGE

from .. import chi2, log
from ..coefficients import CoefficientManager
from ..loader import get_dataset

try:
    from mpi4py import MPI

    run_parallel = True
except ModuleNotFoundError:
    run_parallel = False

_logger = log.logging.getLogger(__name__)


[docs] class NumpyEncoder(json.JSONEncoder):
[docs] def default(self, o): if isinstance(o, (np.float32, np.float64, np.int32, np.int64)): return o.item() # Convert to native Python type return super().default(o)
[docs] class Optimizer: """ Common interface for Chi2 profile, NS and MC and A optimizers. Parameters ---------- results_path : pathlib.path path to result folder loaded_datasets : DataTuple, dataset tuple coefficients : `smefit.coefficients.CoefficientManager` instance of `CoefficientManager` with all the relevant coefficients to fit use_quad : bool if True includes also |HO| correction single_parameter_fits : bool True for single parameter fits use_multiplicative_prescription: if True uses the multiplicative prescription for the |EFT| corrections. external_chi2: dict dict of external chi2 rgemat: numpy.ndarray solution matrix of the RGE rge_dict: dict dictionary with the RGE input parameter options """ print_rate = 500 def __init__( self, results_path, loaded_datasets, coefficients, use_quad, single_parameter_fits, use_multiplicative_prescription, external_chi2=None, rgemat=None, rge_dict=None, ): self.results_path = pathlib.Path(results_path) self.loaded_datasets = loaded_datasets self.coefficients = coefficients self.use_quad = use_quad self.npts = ( self.loaded_datasets.Commondata.size if self.loaded_datasets is not None else 0 ) self.single_parameter_fits = single_parameter_fits self.use_multiplicative_prescription = use_multiplicative_prescription self.counter = 0 # set RGE matrix self.rgemat = rgemat self.rge_dict = rge_dict # load external chi2 modules as amortized objects (fast to evaluate) self.chi2_ext = ( self.load_external_chi2(external_chi2) if external_chi2 else None )
[docs] def load_external_chi2(self, external_chi2): """ Loads the external chi2 modules Parameters ---------- external_chi2: dict dict of external chi2s, with the name of the function object as key and the path to the external script as value Returns ------- ext_chi2_modules: list List of external chi2 objects that can be evaluated by passing a coefficients instance """ # dynamical import ext_chi2_modules = [] for class_name, module in external_chi2.items(): module_path = module["path"] path = pathlib.Path(module_path) base_path, stem = path.parent, path.stem chi2_module = importlib.import_module(stem) my_chi2_class = getattr(chi2_module, class_name) if self.rge_dict is not None: # Check if dynamic scale if self.rge_dict["obs_scale"] == "dynamic": _logger.info( f"Computing RGE matrix for {class_name} " f"with initial scale {self.rge_dict['init_scale']}." ) # compute RGE matrix if "scale" not in module: raise ValueError( "Dynamic scale requested but no scale provided in the external chi2" ) scale = module["scale"] rge_runner = RGE( self.coefficients.name, self.rge_dict["init_scale"], self.rge_dict.get("smeft_accuracy", "integrate"), ) rge_df = rge_runner.RGEmatrix(scale) gen_operators = list(rge_df.index) _logger.info("The operators generated by the RGE are: ") _logger.info(gen_operators) operators_dict = { k: {"max": 0.0, "min": 0.0} for k in gen_operators } new_coeffs = CoefficientManager.from_dict(operators_dict) chi2_ext = my_chi2_class(new_coeffs, rge_df.values) else: gen_operators = list(self.loaded_datasets.OperatorsNames) # Create dummy coefficients operators_dict = { k: {"max": 0.0, "min": 0.0} for k in gen_operators } new_coeffs = CoefficientManager.from_dict(operators_dict) chi2_ext = my_chi2_class(new_coeffs, self.rgemat) else: chi2_ext = my_chi2_class(self.coefficients) ext_chi2_modules.append(chi2_ext.compute_chi2) return ext_chi2_modules
@property def free_parameters(self): """Returns the free parameters entering fit""" return self.coefficients.free_parameters
[docs] def generate_chi2_table(self, chi2_dict, chi2_tot): r"""Generate log :math:`\chi^2` table""" table = Table(style=Style(color="white"), title_style="bold cyan", title=None) table.add_column("Dataset", style="bold green", no_wrap=True) table.add_column("Chi^2 /N_dat") for name, val in chi2_dict.items(): table.add_row(str(name), f"{val:.5}") table.add_row("Total", f"{(chi2_tot/self.npts):.5}") return table
[docs] def chi2_func(self, use_replica=False, print_log=True): r""" Wrap the math:`\chi^2` in a function for the optimizer. Pass noise and data info as args. Log the math:`\chi^2` value and values of the coefficients. Returns ------- current_chi2 : np.ndarray computed :math:`\chi^2` """ rank = 0 if run_parallel: comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: self.counter += 1 if print_log: print_log = (self.counter % self.print_rate) == 0 else: print_log = False # only compute the internal chi2 when datasets are loaded if self.loaded_datasets is not None: chi2_tot = chi2.compute_chi2( self.loaded_datasets, self.coefficients.value, self.use_quad, self.use_multiplicative_prescription, use_replica, ) else: chi2_tot = 0 if self.chi2_ext is not None: for chi2_ext in self.chi2_ext: chi2_ext_i = chi2_ext(self.coefficients.value) chi2_tot += chi2_ext_i if print_log: chi2_dict = {} for data_name in self.loaded_datasets.ExpNames: dataset = get_dataset(self.loaded_datasets, data_name) chi2_dict[data_name] = ( chi2.compute_chi2( dataset, self.coefficients.value, self.use_quad, self.use_multiplicative_prescription, use_replica, ) / dataset.NdataExp ) log.console.print(self.generate_chi2_table(chi2_dict, chi2_tot)) return chi2_tot
[docs] def dump_fit_result(self, fit_result_file, values): """ Dumps the fit results to a json file. dump_fit_result gets called repeatedly for single parameter fits, once for each parameter. `values` contains the samples of the current fit, while previous fit results get loaded into `tmp` and updated with the current samples. The updated values are then written back to the file. Parameters ---------- fit_result_file: PosixPath path to the fit results file values: dict dictionary containing the current fit results """ if self.single_parameter_fits: if fit_result_file.is_file(): with open(fit_result_file, encoding="utf-8") as f: tmp = json.load(f) # Get the operator name coeff = list(values["samples"].keys())[0] # update the values tmp["logz"][coeff] = values["logz"] tmp["max_loglikelihood"][coeff] = values["max_loglikelihood"] tmp["best_fit_point"][coeff] = values["best_fit_point"][coeff] tmp["samples"][coeff] = values["samples"][coeff] # update the file with the new values with open(fit_result_file, "w", encoding="utf-8") as f: json.dump(tmp, f, indent=4, cls=NumpyEncoder) else: values["single_parameter_fits"] = True with open(fit_result_file, "w", encoding="utf-8") as f: # Get the operator name coeff = list(values["best_fit_point"].keys())[0] values["logz"] = {coeff: values["logz"]} values["max_loglikelihood"] = {coeff: values["max_loglikelihood"]} json.dump(values, f, indent=4, cls=NumpyEncoder) else: with open(fit_result_file, "w", encoding="utf-8") as f: json.dump(values, f, indent=4, cls=NumpyEncoder)