Source code for albums.optimiser

"""
Module where the optimization functions are defined.
"""

from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING

import numpy as np
from scipy.optimize import Bounds, minimize

from albums.equilibrium import Capability
from albums.flow import IncompatibleFlowError
from albums.options import EquilibriumOptions, OptimiserOptions, TheoryOptions
from albums.system import SystemState

if TYPE_CHECKING:  # pragma: no cover - hints only
    from mbtrack2 import CavityResonator, Synchrotron


[docs]def _require_r_factor(flow) -> None: """Reject flows whose equilibrium does not compute a meaningful R-factor.""" if Capability.TOUSCHEK_LIFETIME_RATIO not in flow.equilibrium.provides: raise IncompatibleFlowError( f"Optimising the R-factor needs an equilibrium that computes a " f"meaningful Touschek lifetime ratio; '{flow.equilibrium.name}' does " f"not (its Touschek is NaN). Use a Venturini or Alves flow." )
[docs]def _is_unstable(res) -> bool: """True if any instability theory in the Result flags an instability.""" return bool(np.any(res.robinson_flags) or res.ptbl or res.zero_frequency)
[docs]def _equilibrium_not_converged(res) -> bool: """The equilibrium solve failed.""" return not res.converged
[docs]def _theories_not_converged(res) -> bool: """The equilibrium converged but a theory did not converge.""" return any(not np.all(tr.converged) for tr in res.theories.values())
[docs]def evaluate_R( B: SystemState, flow, eq_opts: EquilibriumOptions, theory_opts: TheoryOptions = TheoryOptions(), is_equilibrium: bool = False, ) -> float: """ Evaluate the R-factor (or the equilibrium-only R-factor) of one operating point as a score to be minimised. Parameters ---------- B : SystemState The system to evaluate. flow : Flow Flow object used for solving. eq_opts : EquilibriumOptions Beam current, integration boundary and solver options. theory_opts : TheoryOptions Shared theory inputs (used only when is_equilibrium is False). is_equilibrium : bool If True, evaluate the equilibrium only (no instability theories). The score is then only penalised for non-convergence. If False, it is also penalised at any unstable operating point, and at any point where a theory failed to converge. Returns ------- float Negative R-factor if successful and stable, a penalty (10) otherwise. """ try: if is_equilibrium: res = B.run_equilibrium(flow, eq_opts) if _equilibrium_not_converged(res) or np.isnan(res.Touschek): return 10 return -res.Touschek res = B.run(flow, eq_opts, theory_opts) if ( _equilibrium_not_converged(res) or _is_unstable(res) or _theories_not_converged(res) or np.isnan(res.Touschek) ): return 10 return -res.Touschek except Exception: return 10
[docs]def maximize_R( ring: Synchrotron, cavity_list: list[CavityResonator], flow, psi0_HC: float, bounds, eq_opts: EquilibriumOptions, theory_opts: TheoryOptions = TheoryOptions(), opti_opts: OptimiserOptions = OptimiserOptions(), is_equilibrium: bool = False, ) -> float: """ Optimise the harmonic-cavity tuning angle to maximise the R-factor. Parameters ---------- ring : Synchrotron Ring configuration. cavity_list : list of CavityResonator Main cavity in position 0, the harmonic cavity (whose psi is optimised) in position 1. flow : Flow Solve flow. psi0_HC : float Initial guess for the harmonic-cavity tuning angle in degrees. bounds : tuple (lower, upper) bounds for the tuning angle. eq_opts : EquilibriumOptions Beam current, integration boundary and solver options. theory_opts : TheoryOptions Shared theory inputs (used only when is_equilibrium is False). opti_opts : OptimiserOptions scipy and loop settings of the optimisation. is_equilibrium : bool If True, optimise the equilibrium-only R-factor (no instability theories). Returns ------- float Optimal tuning angle in degrees, or 90 on failure. Raises ------ IncompatibleFlowError If the flow's equilibrium does not provide a meaningful R-factor (Capability.TOUSCHEK_LIFETIME_RATIO), e.g. a Bosch flow. """ _require_r_factor(flow) cavity_list = deepcopy(cavity_list) @np.vectorize def to_eval(psi_HC): cavity_list[1].psi = psi_HC * np.pi / 180 B = SystemState(ring, cavity_list) return evaluate_R(B, flow, eq_opts, theory_opts, is_equilibrium) bd = Bounds([bounds[0]], [bounds[1]]) res = minimize( fun=to_eval, x0=[psi0_HC], bounds=bd, method=opti_opts.method_opti, tol=opti_opts.tol_opti, options={"maxiter": opti_opts.maxiter_opti, "rhobeg": opti_opts.rhobeg_opti}, ) return res.x[0] if res.success else 90
[docs]def _get_vals( ring: Synchrotron, cavity_list: list[CavityResonator], flow, psi0_HC: float, bounds, eq_opts: EquilibriumOptions, theory_opts: TheoryOptions = TheoryOptions(), opti_opts: OptimiserOptions = OptimiserOptions(), is_equilibrium: bool = False, ) -> tuple: """Optimise psi_HC then return (psi, Result) at the chosen operating point. Drives maximize_R, optionally seeding the initial guess from xi_init_input (auto_psi_input) and, when loop_option is set, stepping psi outward until a stable point is found. is_equilibrium selects equilibrium-only solving and the matching stability check. """ cavity_list = deepcopy(cavity_list) MC = cavity_list[0] HC = cavity_list[1] I0 = eq_opts.I0 debug = opti_opts.debug if opti_opts.auto_psi_input: xi = opti_opts.xi_init_input HC_det = I0 * HC.Rs / HC.Q * ring.f1 / MC.Vc * HC.m**2 / xi HC_fr = HC_det + HC.m * ring.f1 HC_psi = ( np.arctan(HC.QL * (HC_fr / (HC.m * ring.f1) - (HC.m * ring.f1) / HC_fr)) * 180 / np.pi ) psi0_HC = HC_psi if debug: print(f"psi0 input = {psi0_HC}") psi = maximize_R( ring, cavity_list, flow, psi0_HC, bounds, eq_opts, theory_opts, opti_opts, is_equilibrium, ) if debug: print(f"psi out of maximize_R = {psi}") def solve_at(psi_deg): HC.psi = psi_deg * np.pi / 180 B = SystemState(ring, cavity_list) if is_equilibrium: return B.run_equilibrium(flow, eq_opts) return B.run(flow, eq_opts, theory_opts) if not opti_opts.loop_option: res = solve_at(psi) if debug: print(res) return (psi, res) count = 0 add_psi = opti_opts.add_psi_loop max_loop = opti_opts.max_loop while True: psi = psi + count * add_psi res = solve_at(psi) is_stable = not _equilibrium_not_converged(res) and ( is_equilibrium or not (_is_unstable(res) or _theories_not_converged(res)) ) if is_stable: if debug: print(f"psi sent to result = {psi}") print(res) return (psi, res) elif debug: print(f"unstable: {count}") print(f"psi unstable = {psi}") print(res) if count == max_loop: raise ValueError( f"_get_vals: no stable psi found within {max_loop} loop " f"iterations (add_psi_loop={add_psi}, last psi={psi})." ) count += 1