Source code for albums.equilibrium

# -*- coding: utf-8 -*-
"""
Stage 1 of the ALBuMS pipeline: beam-equilibrium solvers.

References
----------
[1] : Bosch, R. A., K. J. Kleman, and J. J. Bisognano. "Robinson
instabilities with a higher-harmonic cavity." Physical Review Special
Topics-Accelerators and Beams 4.7 (2001): 074401.

[2] : Bosch, R. A., and C. S. Hsue. "Suppression of longitudinal
coupled-bunch instabilities by a passive higher harmonic cavity."
Proceedings of International Conference on Particle Accelerators. IEEE, 1993.

[3] : Gamelin, A., Yamamoto, N. (2021). Equilibrium bunch density
distribution with multiple active and passive RF cavities.
IPAC'21 (MOPAB069).

[4] : Venturini, M. (2018). Passive higher-harmonic rf cavities with general
settings and multibunch instabilities in electron storage rings.
Physical Review Accelerators and Beams, 21(11), 114404.

[5] : Alves, Murilo B., and Fernando H. de Sá. "Equilibrium of longitudinal
bunch distributions in electron storage rings with arbitrary impedance
sources and generic filling patterns." Physical Review Accelerators and
Beams 26.9 (2023): 094402.
"""

from __future__ import annotations

from copy import deepcopy
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Protocol, runtime_checkable

import numpy as np
from mbtrack2 import BeamLoadingEquilibrium, gaussian_bunch
from pycolleff.longitudinal_equilibrium import LongitudinalEquilibrium
from scipy.constants import c
from scipy.integrate import trapezoid

from albums.mbtrack2_to_pycolleff import (
    cavityresonator_to_pycolleff,
    impedance_to_pycolleff,
    synchrotron_to_pycolleff,
)
from albums.options import EquilibriumOptions
from albums.physics import potential

if TYPE_CHECKING:  # pragma: no cover - hints only
    from mbtrack2 import CavityResonator, Synchrotron


[docs]class Capability(str, Enum): """What an equilibrium snapshot makes available to downstream theories.""" AMPLITUDE_FORM_FACTOR = "amplitude_form_factor" # real form factor F per cavity TAYLOR_POTENTIAL = "taylor_potential" # potential-well coeffs a, b, c PHASE_FORM_FACTOR = "phase_form_factor" # phase form factor PHI MBTRACK2_BLE = "mbtrack2_ble" # mbtrack2 BeamLoadingEquilibrium PYCOLLEFF_LE = "pycolleff_le" # pycolleff LongitudinalEquilibrium FULL_DISTRIBUTION = "full_distribution" # resolved profile rho(z) ARBITRARY_FILLING = "arbitrary_filling" # non-uniform filling pattern BROADBAND_IMPEDANCE = "broadband_impedance" # extra broadband Z in Haissinski TOUSCHEK_LIFETIME_RATIO = "touschek_lifetime_ratio" # computed from bunch profile
[docs]@dataclass class Equilibrium: """Immutable result of a Stage-1 solve, passed to every Stage-2 theory. Fields are None when the producing solver does not provide them; a theory must never read a field outside the solver's capabilities (the Flow validation guarantees it never has to). """ converged: bool I0: float | None = None # beam current the solve used [A] bunch_length: float | None = None # RMS bunch length [s] # potential-well decomposition (Bosch Eq. 4-6) — TAYLOR_POTENTIAL a: float | None = None b: float | None = None c: float | None = None # form factors per cavity F: np.ndarray | None = None # AMPLITUDE_FORM_FACTOR PHI: np.ndarray | None = None # PHASE_FORM_FACTOR # backend handles (capability-gated, intentionally leaky for the theories # that genuinely need them; NOT serialisable — skipped by the scan/HDF5 I/O) backend_BLE: BeamLoadingEquilibrium | None = None # MBTRACK2_BLE backend_LE: LongitudinalEquilibrium | None = None # PYCOLLEFF_LE # cavities as set by the solver cavity_list: list[CavityResonator] | None = None # equilibrium-level scalars Touschek: float = np.nan # Touschek lifetime ratio (NaN if not computed) xi: float | None = None # HC/MC force ratio # stamped by the solver at return time (== solver.provides) capabilities: frozenset[Capability] = field(default_factory=frozenset)
[docs] def has(self, *caps: Capability) -> bool: return set(caps).issubset(self.capabilities)
def __repr__(self) -> str: n_cav = None if self.cavity_list is None else len(self.cavity_list) return ( f"Equilibrium(converged={self.converged}, I0={self.I0}, " f"bunch_length={self.bunch_length}, Touschek={self.Touschek}, " f"xi={self.xi}, n_cavity={n_cav})" )
[docs]@runtime_checkable class EquilibriumSolver(Protocol): """Stage-1 strategy interface. solve receives the ring, the cavity list (mutated in place) and an EquilibriumOptions bundle (carrying the beam current, the integration boundary and every solver option), and returns an Equilibrium stamped with provides. """ name: str provides: frozenset[Capability]
[docs] def solve(
self, ring: Synchrotron, cavity_list: list[CavityResonator], opts: EquilibriumOptions, ) -> Equilibrium: ...
[docs]def resolve_tau_boundary(ring: Synchrotron, tau_boundary: float | None) -> float: """Integration half-window: the given value, or 0.1 times the RF period.""" return ring.T1 * 0.1 if tau_boundary is None else tau_boundary
[docs]def _compute_xi(cavity_list: list[CavityResonator]) -> float: """First-harmonic/main 'force' ratio (Bosch near Eq. 7). Defined for any list of two or more cavities (element 0 is the main cavity); uses the first harmonic cavity. A per-harmonic-cavity xi is left for the full multi-harmonic-cavity physics. """ MC = cavity_list[0] HC = cavity_list[1] denom = MC.Vc * np.sin(MC.theta) if denom == 0: return np.nan return -1 * HC.m * HC.Vc * np.sin(HC.theta) / denom
_MULTI_HC_MSG = ( "{} currently supports a single harmonic cavity (cavity_list of length 2); " "got {} cavities. Multi-harmonic-cavity support is pending the 3-RF physics." ) # --------------------------------------------------------------------------- # # Solver iteration loops (operate on ring + cavity_list, call physics.potential) # # --------------------------------------------------------------------------- #
[docs]def _passive_bosch( ring: Synchrotron, cavity_list: list[CavityResonator], tau_boundary: float, opts: EquilibriumOptions, ) -> tuple: # Follow III.A. (i)-(v) in [1] with added information from [2]. I0 = opts.I0 auto_set_MC_theta = opts.auto_set_MC_theta optimal_tunning = opts.optimal_tunning max_counter = opts.max_counter n = len(cavity_list) F = np.zeros(n) F[0] = 1 F[1:] = 0.1 MC = cavity_list[0] diff = 1 counter = 0 converged = True old_F = np.zeros_like(F) while diff > 0.01: counter += 1 if counter > max_counter: converged = False break if auto_set_MC_theta: delta = 0 for i in range(n): if i == 0: continue cavity = cavity_list[i] delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi) if np.abs(ring.U0 + delta) > MC.Vc: MC.theta = 0 else: MC.theta = np.arccos((ring.U0 + delta) / MC.Vc) if optimal_tunning: MC.set_optimal_detune(I0, F[0]) for i in range(n): if i == 0: continue cavity = cavity_list[i] cavity.theta = np.arctan( np.sin(-2 * cavity.psi) / (-2 * np.cos(cavity.psi) ** 2) ) cavity.Vc = ( -2 * I0 * F[i] * cavity.RL * np.cos(cavity.psi) ** 2 / np.cos(cavity.theta) ) a, b, cc = potential.potential_decomposition(ring, cavity_list) bunch_length = potential.bunch_length(ring, a, b, cc, tau_boundary) diff = 0 for i in range(n): cavity = cavity_list[i] old_F[i] = F[i] F[i] = potential.form_factors(ring.f1, cavity.m, bunch_length) denom = max(np.abs(F[i]), 1e-12) diff += np.abs(F[i] - old_F[i]) / denom if diff > 0.01: for i in range(n): F[i] = F[i] * 0.1 + old_F[i] * 0.9 return bunch_length, a, b, cc, converged, F
[docs]def _active_bosch( ring: Synchrotron, cavity_list: list[CavityResonator], tau_boundary: float, opts: EquilibriumOptions, ) -> tuple: # Follow V.A. (i)-(iv) in [1]. if len(cavity_list) > 2: raise NotImplementedError( _MULTI_HC_MSG.format("Active Bosch", len(cavity_list)) ) I0 = opts.I0 optimal_tunning = opts.optimal_tunning auto_set_for_xi = opts.auto_set_for_xi n = len(cavity_list) F = np.zeros(n) F[0] = 1 F[1:] = 0.1 if auto_set_for_xi: xi = opts.xi if xi is None: raise ValueError("xi must be given if auto_set_for_xi=True") MC = cavity_list[0] HC = cavity_list[1] ratio = ring.U0 / (MC.Vc * (1 - 1 / HC.m**2)) if np.abs(ratio) > 1: ratio = np.clip(ratio, -1, 1) MC.theta = np.arccos(ratio) HC.theta = np.arctan(HC.m * xi * np.tan(MC.theta)) - np.pi HC.Vc = -xi * MC.Vc * np.sin(MC.theta) / (HC.m * np.sin(HC.theta)) a, b, cc = potential.potential_decomposition(ring, cavity_list) bunch_length = potential.bunch_length(ring, a, b, cc, tau_boundary) for i, cavity in enumerate(cavity_list): F[i] = potential.form_factors(ring.f1, cavity.m, bunch_length) if optimal_tunning: for i, cavity in enumerate(cavity_list): cavity.set_optimal_detune(I0, F[i]) return bunch_length, a, b, cc, True, F
[docs]def _venturini( ring: Synchrotron, cavity_list: list[CavityResonator], tau_boundary: float, opts: EquilibriumOptions, ) -> tuple: if len(cavity_list) > 2: raise NotImplementedError(_MULTI_HC_MSG.format("Venturini", len(cavity_list))) I0 = opts.I0 passive_harmonic_cavity = opts.passive_harmonic_cavity auto_set_MC_theta = opts.auto_set_MC_theta optimal_tunning = opts.optimal_tunning n = len(cavity_list) F = opts.F_init if opts.F_init is not None else np.ones(n) PHI = None set_MC_phase_HCpassive = opts.set_MC_phase_HCpassive MC = cavity_list[0] HC = cavity_list[1] if set_MC_phase_HCpassive: delta = 0 for i in range(n): if i == 0: continue cavity = cavity_list[i] delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi) if np.abs(ring.U0 + delta) > MC.Vc: MC.theta = 0 else: MC.theta = np.arccos((ring.U0 + delta) / MC.Vc) if optimal_tunning: MC.set_optimal_detune(I0) if optimal_tunning and not passive_harmonic_cavity: HC.set_optimal_detune(I0) MC.set_generator(I0) if passive_harmonic_cavity: HC.Vg = 0 HC.theta_g = 0 else: HC.set_generator(I0) BLE = BeamLoadingEquilibrium( ring, cavity_list, I0, auto_set_MC_theta=auto_set_MC_theta, B1=-1 * tau_boundary * c, B2=tau_boundary * c, ) try: sol = BLE.beam_equilibrium() bunch_length = BLE.std_rho() / c converged = sol.success F = BLE.F PHI = BLE.PHI for i, cavity in enumerate(cavity_list): Vc_phasor = -1 * cavity.Vb(I0) * F[i] * np.exp( 1j * (cavity.psi - PHI[i]) ) + cavity.Vg * np.exp(1j * cavity.theta_g) cavity.Vc = np.abs(Vc_phasor) cavity.theta = np.angle(Vc_phasor) a, b, cc = potential.potential_decomposition(ring, cavity_list) except Exception: converged = False bunch_length = None a = None b = None cc = None return bunch_length, a, b, cc, converged, F, PHI, BLE
[docs]def _alves( ring: Synchrotron, cavity_list: list[CavityResonator], tau_boundary: float, opts: EquilibriumOptions, ) -> tuple: passive_harmonic_cavity = opts.passive_harmonic_cavity if passive_harmonic_cavity is False: raise NotImplementedError( "Alves method with active harmonic cavities is not implemented." ) if len(cavity_list) > 2: raise NotImplementedError(_MULTI_HC_MSG.format("Alves", len(cavity_list))) I0 = opts.I0 auto_set_MC_theta = opts.auto_set_MC_theta optimal_tunning = opts.optimal_tunning niter = opts.niter tol = opts.tol beta = opts.beta m = opts.m print_flag = opts.print_flag filling = opts.filling impedance = opts.impedance n = len(cavity_list) F = opts.F_init if opts.F_init is not None else np.ones(n) PHI = np.zeros(n) set_MC_phase_HCpassive = opts.set_MC_phase_HCpassive MC = cavity_list[0] HC = cavity_list[1] if set_MC_phase_HCpassive: delta = 0 for i in range(n): if i == 0: continue cavity = cavity_list[i] delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi) if np.abs(ring.U0 + delta) > MC.Vc: MC.theta = 0 else: MC.theta = np.arccos((ring.U0 + delta) / MC.Vc) if optimal_tunning: MC.set_optimal_detune(I0) MC.set_generator(I0) HC.Vg = 0 HC.theta_g = 0 if filling is None: filling = np.ones(ring.h) / ring.h identical_bunches = True else: filling = np.asarray(filling) identical_bunches = False nb = np.count_nonzero(filling) ring_pce = synchrotron_to_pycolleff(ring, I0, MC.Vc, nb) mc_pce = cavityresonator_to_pycolleff(MC, Impedance=False) hc_pce = cavityresonator_to_pycolleff(HC, Impedance=True) sources = [mc_pce, hc_pce] if impedance is not None: impedance_pce = impedance_to_pycolleff(impedance) sources += [impedance_pce] longeq = LongitudinalEquilibrium( ring=ring_pce, impedance_sources=sources, fillpattern=filling ) longeq.identical_bunches = identical_bunches longeq.feedback_on = True longeq.feedback_method = 0 # 0 = Phasor, 1 = LS longeq.zgrid = np.linspace(-1, 1, 2001) * ring_pce.rf_lamb / 2 longeq.max_mode = 1000 * ring_pce.harm_num longeq.min_mode0_ratio = 1e-10 try: _, converged = longeq.calc_longitudinal_equilibrium( niter, tol, beta, m, print_flag ) z0, sigmaz = longeq.calc_moments(longeq.zgrid, longeq.distributions) bunch_length = sigmaz[0] / c if auto_set_MC_theta: ring_pce = synchrotron_to_pycolleff(ring, I0, MC.Vc, nb) longeq = LongitudinalEquilibrium( ring=ring_pce, impedance_sources=sources, fillpattern=filling ) longeq.identical_bunches = identical_bunches longeq.main_ref_phase_offset = z0[0] / c * ring.omega1 longeq.feedback_on = True longeq.feedback_method = 0 longeq.zgrid = np.linspace(-1, 1, 2001) * ring_pce.rf_lamb / 2 longeq.max_mode = 1000 * ring_pce.harm_num longeq.min_mode0_ratio = 1e-10 _, converged = longeq.calc_longitudinal_equilibrium( niter, tol, beta, m, print_flag ) except Exception: converged = False if converged: z0, sigmaz = longeq.calc_moments(longeq.zgrid, longeq.distributions) bunch_length = sigmaz[0] / c longeq.ring.bunlen = sigmaz[0] zgrid = longeq.zgrid - z0[0] rho = longeq.distributions[0] for i, cavity in enumerate(cavity_list): exp = np.exp(1j * ring.k1 * cavity.m * zgrid) FF = trapezoid(exp * rho, x=zgrid) F[i] = np.real(FF) PHI[i] = np.imag(FF) MC.Vg = longeq.main_gen_amp_mon MC.theta_g = -1 * longeq.main_gen_phase_mon for i, cavity in enumerate(cavity_list): Vc_phasor = -1 * cavity.Vb(I0) * F[i] * np.exp( 1j * (cavity.psi - PHI[i]) ) + cavity.Vg * np.exp(1j * cavity.theta_g) cavity.Vc = np.abs(Vc_phasor) cavity.theta = np.angle(Vc_phasor) a, b, cc = potential.potential_decomposition(ring, cavity_list) LE = longeq else: bunch_length = None a = None b = None cc = None LE = None return bunch_length, a, b, cc, converged, F, PHI, LE
# --------------------------------------------------------------------------- # # Concrete solvers # # --------------------------------------------------------------------------- #
[docs]class _SolverRepr: """Mixin giving the stateless solver classes a clean ClassName() repr.""" def __repr__(self) -> str: return f"{type(self).__name__}()"
[docs]class BoschEquilibrium(_SolverRepr): """Original Bosch [1] iteration. Amplitude form factor + bunch length only.""" name = "Bosch" provides = frozenset( { Capability.AMPLITUDE_FORM_FACTOR, Capability.TAYLOR_POTENTIAL, } )
[docs] def solve( self, ring: Synchrotron, cavity_list: list[CavityResonator], opts: EquilibriumOptions, ) -> Equilibrium: tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary) if opts.passive_harmonic_cavity: bl, a, b, c, conv, F = _passive_bosch(ring, cavity_list, tau_boundary, opts) else: bl, a, b, c, conv, F = _active_bosch(ring, cavity_list, tau_boundary, opts) return Equilibrium( converged=bool(conv), I0=opts.I0, bunch_length=bl, a=a, b=b, c=c, F=np.array(F), cavity_list=deepcopy(cavity_list), capabilities=self.provides, Touschek=np.nan, xi=_compute_xi(cavity_list) if conv else None, )
[docs]class VenturiniEquilibrium(_SolverRepr): """Venturini [3,4] via mbtrack2 BeamLoadingEquilibrium. Passive and active HC.""" name = "Venturini" provides = frozenset( { Capability.AMPLITUDE_FORM_FACTOR, Capability.TAYLOR_POTENTIAL, Capability.PHASE_FORM_FACTOR, Capability.MBTRACK2_BLE, Capability.TOUSCHEK_LIFETIME_RATIO, } )
[docs] def solve( self, ring: Synchrotron, cavity_list: list[CavityResonator], opts: EquilibriumOptions, ) -> Equilibrium: tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary) bl, a, b, c, conv, F, PHI, BLE = _venturini( ring, cavity_list, tau_boundary, opts ) try: R = BLE.R_factor if conv else np.nan except Exception: R = np.nan return Equilibrium( converged=bool(conv), I0=opts.I0, bunch_length=bl, a=a, b=b, c=c, F=np.array(F), PHI=None if PHI is None else np.array(PHI), backend_BLE=BLE, cavity_list=deepcopy(cavity_list), capabilities=self.provides, Touschek=R, xi=_compute_xi(cavity_list) if conv else None, )
[docs]class AlvesEquilibrium(_SolverRepr): """Alves [5] via pycolleff LongitudinalEquilibrium. Passive HC only.""" name = "Alves" provides = frozenset( { Capability.AMPLITUDE_FORM_FACTOR, Capability.TAYLOR_POTENTIAL, Capability.PHASE_FORM_FACTOR, Capability.PYCOLLEFF_LE, Capability.FULL_DISTRIBUTION, Capability.ARBITRARY_FILLING, Capability.BROADBAND_IMPEDANCE, Capability.TOUSCHEK_LIFETIME_RATIO, } )
[docs] def solve( self, ring: Synchrotron, cavity_list: list[CavityResonator], opts: EquilibriumOptions, ) -> Equilibrium: tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary) bl, a, b, c3, conv, F, PHI, LE = _alves(ring, cavity_list, tau_boundary, opts) if conv: try: rho0 = gaussian_bunch(LE.zgrid / c, ring.sigma_0) / c R = trapezoid(rho0**2, LE.zgrid) / trapezoid( LE.distributions[0] ** 2, LE.zgrid ) except Exception: R = np.nan else: R = np.nan return Equilibrium( converged=bool(conv), I0=opts.I0, bunch_length=bl, a=a, b=b, c=c3, F=np.array(F), PHI=None if PHI is None else np.array(PHI), backend_LE=LE, cavity_list=deepcopy(cavity_list), capabilities=self.provides, Touschek=R, xi=_compute_xi(cavity_list) if conv else None, )