"""
Module where the optimization functions are defined.
"""
from __future__ import annotations
from copy import deepcopy
from typing import TYPE_CHECKING
import numpy as np
from scipy.optimize import Bounds, minimize
from albums.equilibrium import Capability
from albums.flow import IncompatibleFlowError
from albums.options import EquilibriumOptions, OptimiserOptions, TheoryOptions
from albums.system import SystemState
if TYPE_CHECKING: # pragma: no cover - hints only
from mbtrack2 import CavityResonator, Synchrotron
[docs]def _require_r_factor(flow) -> None:
"""Reject flows whose equilibrium does not compute a meaningful R-factor."""
if Capability.TOUSCHEK_LIFETIME_RATIO not in flow.equilibrium.provides:
raise IncompatibleFlowError(
f"Optimising the R-factor needs an equilibrium that computes a "
f"meaningful Touschek lifetime ratio; '{flow.equilibrium.name}' does "
f"not (its Touschek is NaN). Use a Venturini or Alves flow."
)
[docs]def _is_unstable(res) -> bool:
"""True if any instability theory in the Result flags an instability."""
return bool(np.any(res.robinson_flags) or res.ptbl or res.zero_frequency)
[docs]def _equilibrium_not_converged(res) -> bool:
"""The equilibrium solve failed."""
return not res.converged
[docs]def _theories_not_converged(res) -> bool:
"""The equilibrium converged but a theory did not converge."""
return any(not np.all(tr.converged) for tr in res.theories.values())
[docs]def evaluate_R(
B: SystemState,
flow,
eq_opts: EquilibriumOptions,
theory_opts: TheoryOptions = TheoryOptions(),
is_equilibrium: bool = False,
) -> float:
"""
Evaluate the R-factor (or the equilibrium-only R-factor) of one operating
point as a score to be minimised.
Parameters
----------
B : SystemState
The system to evaluate.
flow : Flow
Flow object used for solving.
eq_opts : EquilibriumOptions
Beam current, integration boundary and solver options.
theory_opts : TheoryOptions
Shared theory inputs (used only when is_equilibrium is False).
is_equilibrium : bool
If True, evaluate the equilibrium only (no instability theories). The
score is then only penalised for non-convergence.
If False, it is also penalised at any unstable operating point, and at
any point where a theory failed to converge.
Returns
-------
float
Negative R-factor if successful and stable, a penalty (10) otherwise.
"""
try:
if is_equilibrium:
res = B.run_equilibrium(flow, eq_opts)
if _equilibrium_not_converged(res) or np.isnan(res.Touschek):
return 10
return -res.Touschek
res = B.run(flow, eq_opts, theory_opts)
if (
_equilibrium_not_converged(res)
or _is_unstable(res)
or _theories_not_converged(res)
or np.isnan(res.Touschek)
):
return 10
return -res.Touschek
except Exception:
return 10
[docs]def maximize_R(
ring: Synchrotron,
cavity_list: list[CavityResonator],
flow,
psi0_HC: float,
bounds,
eq_opts: EquilibriumOptions,
theory_opts: TheoryOptions = TheoryOptions(),
opti_opts: OptimiserOptions = OptimiserOptions(),
is_equilibrium: bool = False,
) -> float:
"""
Optimise the harmonic-cavity tuning angle to maximise the R-factor.
Parameters
----------
ring : Synchrotron
Ring configuration.
cavity_list : list of CavityResonator
Main cavity in position 0, the harmonic cavity (whose psi is optimised)
in position 1.
flow : Flow
Solve flow.
psi0_HC : float
Initial guess for the harmonic-cavity tuning angle in degrees.
bounds : tuple
(lower, upper) bounds for the tuning angle.
eq_opts : EquilibriumOptions
Beam current, integration boundary and solver options.
theory_opts : TheoryOptions
Shared theory inputs (used only when is_equilibrium is False).
opti_opts : OptimiserOptions
scipy and loop settings of the optimisation.
is_equilibrium : bool
If True, optimise the equilibrium-only R-factor (no instability theories).
Returns
-------
float
Optimal tuning angle in degrees, or 90 on failure.
Raises
------
IncompatibleFlowError
If the flow's equilibrium does not provide a meaningful R-factor
(Capability.TOUSCHEK_LIFETIME_RATIO), e.g. a Bosch flow.
"""
_require_r_factor(flow)
cavity_list = deepcopy(cavity_list)
@np.vectorize
def to_eval(psi_HC):
cavity_list[1].psi = psi_HC * np.pi / 180
B = SystemState(ring, cavity_list)
return evaluate_R(B, flow, eq_opts, theory_opts, is_equilibrium)
bd = Bounds([bounds[0]], [bounds[1]])
res = minimize(
fun=to_eval,
x0=[psi0_HC],
bounds=bd,
method=opti_opts.method_opti,
tol=opti_opts.tol_opti,
options={"maxiter": opti_opts.maxiter_opti, "rhobeg": opti_opts.rhobeg_opti},
)
return res.x[0] if res.success else 90
[docs]def _get_vals(
ring: Synchrotron,
cavity_list: list[CavityResonator],
flow,
psi0_HC: float,
bounds,
eq_opts: EquilibriumOptions,
theory_opts: TheoryOptions = TheoryOptions(),
opti_opts: OptimiserOptions = OptimiserOptions(),
is_equilibrium: bool = False,
) -> tuple:
"""Optimise psi_HC then return (psi, Result) at the chosen operating point.
Drives maximize_R, optionally seeding the initial guess from xi_init_input
(auto_psi_input) and, when loop_option is set, stepping psi outward until a
stable point is found. is_equilibrium selects equilibrium-only solving and
the matching stability check.
"""
cavity_list = deepcopy(cavity_list)
MC = cavity_list[0]
HC = cavity_list[1]
I0 = eq_opts.I0
debug = opti_opts.debug
if opti_opts.auto_psi_input:
xi = opti_opts.xi_init_input
HC_det = I0 * HC.Rs / HC.Q * ring.f1 / MC.Vc * HC.m**2 / xi
HC_fr = HC_det + HC.m * ring.f1
HC_psi = (
np.arctan(HC.QL * (HC_fr / (HC.m * ring.f1) - (HC.m * ring.f1) / HC_fr))
* 180
/ np.pi
)
psi0_HC = HC_psi
if debug:
print(f"psi0 input = {psi0_HC}")
psi = maximize_R(
ring,
cavity_list,
flow,
psi0_HC,
bounds,
eq_opts,
theory_opts,
opti_opts,
is_equilibrium,
)
if debug:
print(f"psi out of maximize_R = {psi}")
def solve_at(psi_deg):
HC.psi = psi_deg * np.pi / 180
B = SystemState(ring, cavity_list)
if is_equilibrium:
return B.run_equilibrium(flow, eq_opts)
return B.run(flow, eq_opts, theory_opts)
if not opti_opts.loop_option:
res = solve_at(psi)
if debug:
print(res)
return (psi, res)
count = 0
add_psi = opti_opts.add_psi_loop
max_loop = opti_opts.max_loop
while True:
psi = psi + count * add_psi
res = solve_at(psi)
is_stable = not _equilibrium_not_converged(res) and (
is_equilibrium or not (_is_unstable(res) or _theories_not_converged(res))
)
if is_stable:
if debug:
print(f"psi sent to result = {psi}")
print(res)
return (psi, res)
elif debug:
print(f"unstable: {count}")
print(f"psi unstable = {psi}")
print(res)
if count == max_loop:
raise ValueError(
f"_get_vals: no stable psi found within {max_loop} loop "
f"iterations (add_psi_loop={add_psi}, last psi={psi})."
)
count += 1