# -*- coding: utf-8 -*-
import importlib
import itertools
import pathlib
import subprocess
import sys
from shutil import copyfile
import yaml
from .analyze.pca import RotateToPca
from .chi2 import Scanner
from .log import logging
from .optimize.analytic import ALOptimizer
from .optimize.mc import MCOptimizer
from .optimize.ultranest import USOptimizer
try:
from mpi4py import MPI
run_parallel = True
except ModuleNotFoundError:
run_parallel = False
_logger = logging.getLogger(__name__)
[docs]
class Runner:
"""
Container for all the possible |SMEFiT| run methods.
Init the root path of the package where tables,
results, plot config and reports are stored.
Parameters
----------
run_card : dict
run card dictionary
single_parameter_fits : bool
True for single parameter fits
runcard_file : pathlib.Path, None
path to runcard if already present
"""
def __init__(
self, run_card, single_parameter_fits, pairwise_fits, runcard_file=None
):
self.run_card = run_card
self.runcard_file = runcard_file
self.single_parameter_fits = single_parameter_fits
self.pairwise_fits = pairwise_fits
self.setup_result_folder()
[docs]
def setup_result_folder(self):
"""
Create result folder and copy the runcard there
"""
# Construct results folder
result_ID = self.run_card["result_ID"]
result_folder = pathlib.Path(self.run_card["result_path"])
if self.run_card["replica"] is not None:
res_folder_fit = (
result_folder / result_ID / f"replica_{self.run_card['replica']}"
)
else:
res_folder_fit = result_folder / result_ID
subprocess.call(f"mkdir -p {result_folder}", shell=True)
if res_folder_fit.exists():
_logger.warning(f"{res_folder_fit} already found, overwriting old results")
subprocess.call(f"mkdir -p {res_folder_fit}", shell=True)
# Copy yaml runcard to results folder or dump it
# in case no given file is passed
runcard_copy = result_folder / result_ID / f"{result_ID}.yaml"
if self.runcard_file is None:
with open(runcard_copy, encoding="utf-8") as f:
yaml.dump(self.run_card, f, default_flow_style=False)
else:
copyfile(
self.runcard_file,
runcard_copy,
)
[docs]
@classmethod
def from_file(cls, runcard_file, replica=None):
"""
Create Runner from a runcard file
Parameters
----------
runcard_file: pathlib.Path, str
path to runcard
replica: int
replica number. Optional used only for MC
Returns
-------
runner: `smefit.runner.Runner`
instance of class Runner
"""
config = {}
# load file
with open(runcard_file, encoding="utf-8") as f:
config = yaml.safe_load(f)
config["replica"] = replica
# set result ID to runcard name by default
if "result_ID" not in config:
config["result_ID"] = runcard_file.stem
single_parameter_fits = config.get("single_parameter_fits", False)
pairwise_fits = config.get("pairwise_fits", False)
return cls(
config, single_parameter_fits, pairwise_fits, runcard_file.absolute()
)
[docs]
def get_optimizer(self, optimizer):
"""Return the seleted optimizer."""
if optimizer == "NS":
return self.ultranest
elif optimizer == "MC":
return self.mc
elif optimizer == "A":
return self.analytic
raise ValueError(f"{optimizer} is not available")
[docs]
def analytic(self, config):
"""Sample the analytic linear solution."""
a_opt = ALOptimizer.from_dict(config)
a_opt.run_sampling()
[docs]
def ultranest(self, config):
"""Run a fit with Ultra Nest."""
# add external modules to paths
if "external_chi2" in config:
external_chi2 = config["external_chi2"]
for _, module in external_chi2.items():
module_path = module["path"]
path = pathlib.Path(module_path)
base_path, stem = path.parent, path.stem
sys.path = [str(base_path)] + sys.path
if run_parallel:
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
ns_opt = USOptimizer.from_dict(config)
else:
ns_opt = None
ns_opt = comm.bcast(ns_opt, root=0)
else:
ns_opt = USOptimizer.from_dict(config)
ns_opt.run_sampling()
[docs]
def mc(self, config):
"""Run a fit with |MC|."""
mc_opt = MCOptimizer.from_dict(config)
mc_opt.run_sampling()
mc_opt.save()
[docs]
def rotate_to_pca(self):
"""Rotate to |PCA| basis."""
_logger.info("Rotate input basis to PCA basis")
pca_rot = RotateToPca.from_dict(self.run_card)
pca_rot.compute()
pca_rot.update_runcard()
pca_rot.save()
[docs]
def global_analysis(self, optimizer):
"""Run a global fit using the selected optimizer.
Parameters
----------
optimizer: string
optimizer to be used (NS, MC or A)
"""
config = self.run_card
opt = self.get_optimizer(optimizer)
opt(config)
[docs]
def single_parameter_analysis(self, optimizer):
"""Run a seried of single parameter fits for all the operators specified in the runcard.
Parameters
----------
optimizer: string
optimizer to be used (NS, MC or A)
"""
def MultipleConstrainError():
raise ValueError(
"Constrain with multiple coefficients do not make sense, in sigle parameter fits."
)
config = self.run_card
# loop on all the coefficients
for coeff in config["coefficients"].keys():
single_coeff_config = dict(config)
single_coeff_config["coefficients"] = {}
# skip contrained coeffs
if "constrain" in config["coefficients"][coeff]:
_logger.info("Skipping contrained coefficient %s", coeff)
continue
# if there are constrained coefficients, only
# relations in which appears a single indepentent
# coefficient make sense.
new_coeff_config = {}
# seach for a realtion: loop on all the coefficients
for coeff2, vals in config["coefficients"].items():
if "constrain" not in vals:
continue
# if fixed value, crash
if isinstance(vals["constrain"], (int, float)):
raise ValueError(
"Fixed value constrain do not make sense for single parameter fits."
)
constrain = (
vals["constrain"]
if isinstance(vals["constrain"], list)
else [vals["constrain"]]
)
# only single coefficient constrain are supported
new_constrain = []
for addend in constrain:
if len(addend) > 1:
MultipleConstrainError()
if coeff in addend:
new_constrain.append(addend)
# check that the coefficient appearing in all the addends is the same
elif new_constrain:
MultipleConstrainError()
if new_constrain:
new_coeff_config[coeff2] = vals
# add fitted coefficient
new_coeff_config[coeff] = config["coefficients"][coeff]
single_coeff_config["coefficients"] = new_coeff_config
opt = self.get_optimizer(optimizer)
opt(single_coeff_config)
[docs]
def pairwise_analysis(self, optimizer):
"""Run a series of pairwise parameter fits for all the operators specified in the runcard.
Parameters
----------
optimizer: string
optimizer to be used only NS is supported
"""
if optimizer != "NS":
raise ValueError("Paiwise analysis is implemented only for NS.")
config = self.run_card
for c1, c2 in itertools.combinations(config["coefficients"].keys(), 2):
pairwise_coeff_config = dict(config)
pairwise_coeff_config["coefficients"] = {}
pairwise_coeff_config["coefficients"][c1] = config["coefficients"][c1]
pairwise_coeff_config["coefficients"][c2] = config["coefficients"][c2]
opt = self.get_optimizer(optimizer)
opt(pairwise_coeff_config)
[docs]
def run_analysis(self, optimizer):
"""Run either the global analysis or a series of single parameter fits using the selected optimizer.
Parameters
----------
optimizer: string
optimizer to be used (NS, MC or A)
"""
if self.single_parameter_fits:
self.single_parameter_analysis(optimizer)
elif self.pairwise_fits:
self.pairwise_analysis(optimizer)
else:
self.global_analysis(optimizer)
[docs]
def chi2_scan(self, n_replica, compute_bounds):
r"""Run an individual :math:`\chi^2` scan.
Parameters
----------
n_replica: int
number of replicas to use.
If 0 only the :math:`\chi^2` experimental data
will be computed.
compute_bounds: bool
if True compute and save the :math:`\chi^2` bounds.
"""
scan = Scanner(self.run_card, n_replica)
if compute_bounds:
scan.compute_bounds()
scan.compute_scan()
scan.plot_scan()