# -*- coding: utf-8 -*-
"""
Stage 1 of the ALBuMS pipeline: beam-equilibrium solvers.
References
----------
[1] : Bosch, R. A., K. J. Kleman, and J. J. Bisognano. "Robinson
instabilities with a higher-harmonic cavity." Physical Review Special
Topics-Accelerators and Beams 4.7 (2001): 074401.
[2] : Bosch, R. A., and C. S. Hsue. "Suppression of longitudinal
coupled-bunch instabilities by a passive higher harmonic cavity."
Proceedings of International Conference on Particle Accelerators. IEEE, 1993.
[3] : Gamelin, A., Yamamoto, N. (2021). Equilibrium bunch density
distribution with multiple active and passive RF cavities.
IPAC'21 (MOPAB069).
[4] : Venturini, M. (2018). Passive higher-harmonic rf cavities with general
settings and multibunch instabilities in electron storage rings.
Physical Review Accelerators and Beams, 21(11), 114404.
[5] : Alves, Murilo B., and Fernando H. de Sá. "Equilibrium of longitudinal
bunch distributions in electron storage rings with arbitrary impedance
sources and generic filling patterns." Physical Review Accelerators and
Beams 26.9 (2023): 094402.
"""
from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING, Protocol, runtime_checkable
import numpy as np
from mbtrack2 import BeamLoadingEquilibrium, gaussian_bunch
from pycolleff.longitudinal_equilibrium import LongitudinalEquilibrium
from scipy.constants import c
from scipy.integrate import trapezoid
from albums.mbtrack2_to_pycolleff import (
cavityresonator_to_pycolleff,
impedance_to_pycolleff,
synchrotron_to_pycolleff,
)
from albums.options import EquilibriumOptions
from albums.physics import potential
if TYPE_CHECKING: # pragma: no cover - hints only
from mbtrack2 import CavityResonator, Synchrotron
[docs]class Capability(str, Enum):
"""What an equilibrium snapshot makes available to downstream theories."""
AMPLITUDE_FORM_FACTOR = "amplitude_form_factor" # real form factor F per cavity
TAYLOR_POTENTIAL = "taylor_potential" # potential-well coeffs a, b, c
PHASE_FORM_FACTOR = "phase_form_factor" # phase form factor PHI
MBTRACK2_BLE = "mbtrack2_ble" # mbtrack2 BeamLoadingEquilibrium
PYCOLLEFF_LE = "pycolleff_le" # pycolleff LongitudinalEquilibrium
FULL_DISTRIBUTION = "full_distribution" # resolved profile rho(z)
ARBITRARY_FILLING = "arbitrary_filling" # non-uniform filling pattern
BROADBAND_IMPEDANCE = "broadband_impedance" # extra broadband Z in Haissinski
TOUSCHEK_LIFETIME_RATIO = "touschek_lifetime_ratio" # computed from bunch profile
[docs]@dataclass
class Equilibrium:
"""Immutable result of a Stage-1 solve, passed to every Stage-2 theory.
Fields are None when the producing solver does not provide them; a theory
must never read a field outside the solver's capabilities (the Flow
validation guarantees it never has to).
"""
converged: bool
I0: float | None = None # beam current the solve used [A]
bunch_length: float | None = None # RMS bunch length [s]
# potential-well decomposition (Bosch Eq. 4-6) — TAYLOR_POTENTIAL
a: float | None = None
b: float | None = None
c: float | None = None
# form factors per cavity
F: np.ndarray | None = None # AMPLITUDE_FORM_FACTOR
PHI: np.ndarray | None = None # PHASE_FORM_FACTOR
# backend handles (capability-gated, intentionally leaky for the theories
# that genuinely need them; NOT serialisable — skipped by the scan/HDF5 I/O)
backend_BLE: BeamLoadingEquilibrium | None = None # MBTRACK2_BLE
backend_LE: LongitudinalEquilibrium | None = None # PYCOLLEFF_LE
# cavities as set by the solver
cavity_list: list[CavityResonator] | None = None
# equilibrium-level scalars
Touschek: float = np.nan # Touschek lifetime ratio (NaN if not computed)
xi: float | None = None # HC/MC force ratio
# stamped by the solver at return time (== solver.provides)
capabilities: frozenset[Capability] = field(default_factory=frozenset)
[docs] def has(self, *caps: Capability) -> bool:
return set(caps).issubset(self.capabilities)
def __repr__(self) -> str:
n_cav = None if self.cavity_list is None else len(self.cavity_list)
return (
f"Equilibrium(converged={self.converged}, I0={self.I0}, "
f"bunch_length={self.bunch_length}, Touschek={self.Touschek}, "
f"xi={self.xi}, n_cavity={n_cav})"
)
[docs]@runtime_checkable
class EquilibriumSolver(Protocol):
"""Stage-1 strategy interface.
solve receives the ring, the cavity list (mutated in place) and an
EquilibriumOptions bundle (carrying the beam current, the integration
boundary and every solver option), and returns an Equilibrium stamped with
provides.
"""
name: str
provides: frozenset[Capability]
self,
ring: Synchrotron,
cavity_list: list[CavityResonator],
opts: EquilibriumOptions,
) -> Equilibrium: ...
[docs]def resolve_tau_boundary(ring: Synchrotron, tau_boundary: float | None) -> float:
"""Integration half-window: the given value, or 0.1 times the RF period."""
return ring.T1 * 0.1 if tau_boundary is None else tau_boundary
[docs]def _compute_xi(cavity_list: list[CavityResonator]) -> float:
"""First-harmonic/main 'force' ratio (Bosch near Eq. 7).
Defined for any list of two or more cavities (element 0 is the main cavity);
uses the first harmonic cavity. A per-harmonic-cavity xi is left for the full
multi-harmonic-cavity physics.
"""
MC = cavity_list[0]
HC = cavity_list[1]
denom = MC.Vc * np.sin(MC.theta)
if denom == 0:
return np.nan
return -1 * HC.m * HC.Vc * np.sin(HC.theta) / denom
_MULTI_HC_MSG = (
"{} currently supports a single harmonic cavity (cavity_list of length 2); "
"got {} cavities. Multi-harmonic-cavity support is pending the 3-RF physics."
)
# --------------------------------------------------------------------------- #
# Solver iteration loops (operate on ring + cavity_list, call physics.potential) #
# --------------------------------------------------------------------------- #
[docs]def _passive_bosch(
ring: Synchrotron,
cavity_list: list[CavityResonator],
tau_boundary: float,
opts: EquilibriumOptions,
) -> tuple:
# Follow III.A. (i)-(v) in [1] with added information from [2].
I0 = opts.I0
auto_set_MC_theta = opts.auto_set_MC_theta
optimal_tunning = opts.optimal_tunning
max_counter = opts.max_counter
n = len(cavity_list)
F = np.zeros(n)
F[0] = 1
F[1:] = 0.1
MC = cavity_list[0]
diff = 1
counter = 0
converged = True
old_F = np.zeros_like(F)
while diff > 0.01:
counter += 1
if counter > max_counter:
converged = False
break
if auto_set_MC_theta:
delta = 0
for i in range(n):
if i == 0:
continue
cavity = cavity_list[i]
delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi)
if np.abs(ring.U0 + delta) > MC.Vc:
MC.theta = 0
else:
MC.theta = np.arccos((ring.U0 + delta) / MC.Vc)
if optimal_tunning:
MC.set_optimal_detune(I0, F[0])
for i in range(n):
if i == 0:
continue
cavity = cavity_list[i]
cavity.theta = np.arctan(
np.sin(-2 * cavity.psi) / (-2 * np.cos(cavity.psi) ** 2)
)
cavity.Vc = (
-2
* I0
* F[i]
* cavity.RL
* np.cos(cavity.psi) ** 2
/ np.cos(cavity.theta)
)
a, b, cc = potential.potential_decomposition(ring, cavity_list)
bunch_length = potential.bunch_length(ring, a, b, cc, tau_boundary)
diff = 0
for i in range(n):
cavity = cavity_list[i]
old_F[i] = F[i]
F[i] = potential.form_factors(ring.f1, cavity.m, bunch_length)
denom = max(np.abs(F[i]), 1e-12)
diff += np.abs(F[i] - old_F[i]) / denom
if diff > 0.01:
for i in range(n):
F[i] = F[i] * 0.1 + old_F[i] * 0.9
return bunch_length, a, b, cc, converged, F
[docs]def _active_bosch(
ring: Synchrotron,
cavity_list: list[CavityResonator],
tau_boundary: float,
opts: EquilibriumOptions,
) -> tuple:
# Follow V.A. (i)-(iv) in [1].
if len(cavity_list) > 2:
raise NotImplementedError(
_MULTI_HC_MSG.format("Active Bosch", len(cavity_list))
)
I0 = opts.I0
optimal_tunning = opts.optimal_tunning
auto_set_for_xi = opts.auto_set_for_xi
n = len(cavity_list)
F = np.zeros(n)
F[0] = 1
F[1:] = 0.1
if auto_set_for_xi:
xi = opts.xi
if xi is None:
raise ValueError("xi must be given if auto_set_for_xi=True")
MC = cavity_list[0]
HC = cavity_list[1]
ratio = ring.U0 / (MC.Vc * (1 - 1 / HC.m**2))
if np.abs(ratio) > 1:
ratio = np.clip(ratio, -1, 1)
MC.theta = np.arccos(ratio)
HC.theta = np.arctan(HC.m * xi * np.tan(MC.theta)) - np.pi
HC.Vc = -xi * MC.Vc * np.sin(MC.theta) / (HC.m * np.sin(HC.theta))
a, b, cc = potential.potential_decomposition(ring, cavity_list)
bunch_length = potential.bunch_length(ring, a, b, cc, tau_boundary)
for i, cavity in enumerate(cavity_list):
F[i] = potential.form_factors(ring.f1, cavity.m, bunch_length)
if optimal_tunning:
for i, cavity in enumerate(cavity_list):
cavity.set_optimal_detune(I0, F[i])
return bunch_length, a, b, cc, True, F
[docs]def _venturini(
ring: Synchrotron,
cavity_list: list[CavityResonator],
tau_boundary: float,
opts: EquilibriumOptions,
) -> tuple:
if len(cavity_list) > 2:
raise NotImplementedError(_MULTI_HC_MSG.format("Venturini", len(cavity_list)))
I0 = opts.I0
passive_harmonic_cavity = opts.passive_harmonic_cavity
auto_set_MC_theta = opts.auto_set_MC_theta
optimal_tunning = opts.optimal_tunning
n = len(cavity_list)
F = opts.F_init if opts.F_init is not None else np.ones(n)
PHI = None
set_MC_phase_HCpassive = opts.set_MC_phase_HCpassive
MC = cavity_list[0]
HC = cavity_list[1]
if set_MC_phase_HCpassive:
delta = 0
for i in range(n):
if i == 0:
continue
cavity = cavity_list[i]
delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi)
if np.abs(ring.U0 + delta) > MC.Vc:
MC.theta = 0
else:
MC.theta = np.arccos((ring.U0 + delta) / MC.Vc)
if optimal_tunning:
MC.set_optimal_detune(I0)
if optimal_tunning and not passive_harmonic_cavity:
HC.set_optimal_detune(I0)
MC.set_generator(I0)
if passive_harmonic_cavity:
HC.Vg = 0
HC.theta_g = 0
else:
HC.set_generator(I0)
BLE = BeamLoadingEquilibrium(
ring,
cavity_list,
I0,
auto_set_MC_theta=auto_set_MC_theta,
B1=-1 * tau_boundary * c,
B2=tau_boundary * c,
)
try:
sol = BLE.beam_equilibrium()
bunch_length = BLE.std_rho() / c
converged = sol.success
F = BLE.F
PHI = BLE.PHI
for i, cavity in enumerate(cavity_list):
Vc_phasor = -1 * cavity.Vb(I0) * F[i] * np.exp(
1j * (cavity.psi - PHI[i])
) + cavity.Vg * np.exp(1j * cavity.theta_g)
cavity.Vc = np.abs(Vc_phasor)
cavity.theta = np.angle(Vc_phasor)
a, b, cc = potential.potential_decomposition(ring, cavity_list)
except Exception:
converged = False
bunch_length = None
a = None
b = None
cc = None
return bunch_length, a, b, cc, converged, F, PHI, BLE
[docs]def _alves(
ring: Synchrotron,
cavity_list: list[CavityResonator],
tau_boundary: float,
opts: EquilibriumOptions,
) -> tuple:
passive_harmonic_cavity = opts.passive_harmonic_cavity
if passive_harmonic_cavity is False:
raise NotImplementedError(
"Alves method with active harmonic cavities is not implemented."
)
if len(cavity_list) > 2:
raise NotImplementedError(_MULTI_HC_MSG.format("Alves", len(cavity_list)))
I0 = opts.I0
auto_set_MC_theta = opts.auto_set_MC_theta
optimal_tunning = opts.optimal_tunning
niter = opts.niter
tol = opts.tol
beta = opts.beta
m = opts.m
print_flag = opts.print_flag
filling = opts.filling
impedance = opts.impedance
n = len(cavity_list)
F = opts.F_init if opts.F_init is not None else np.ones(n)
PHI = np.zeros(n)
set_MC_phase_HCpassive = opts.set_MC_phase_HCpassive
MC = cavity_list[0]
HC = cavity_list[1]
if set_MC_phase_HCpassive:
delta = 0
for i in range(n):
if i == 0:
continue
cavity = cavity_list[i]
delta += cavity.Vb(I0) * F[i] * np.cos(cavity.psi)
if np.abs(ring.U0 + delta) > MC.Vc:
MC.theta = 0
else:
MC.theta = np.arccos((ring.U0 + delta) / MC.Vc)
if optimal_tunning:
MC.set_optimal_detune(I0)
MC.set_generator(I0)
HC.Vg = 0
HC.theta_g = 0
if filling is None:
filling = np.ones(ring.h) / ring.h
identical_bunches = True
else:
filling = np.asarray(filling)
identical_bunches = False
nb = np.count_nonzero(filling)
ring_pce = synchrotron_to_pycolleff(ring, I0, MC.Vc, nb)
mc_pce = cavityresonator_to_pycolleff(MC, Impedance=False)
hc_pce = cavityresonator_to_pycolleff(HC, Impedance=True)
sources = [mc_pce, hc_pce]
if impedance is not None:
impedance_pce = impedance_to_pycolleff(impedance)
sources += [impedance_pce]
longeq = LongitudinalEquilibrium(
ring=ring_pce, impedance_sources=sources, fillpattern=filling
)
longeq.identical_bunches = identical_bunches
longeq.feedback_on = True
longeq.feedback_method = 0 # 0 = Phasor, 1 = LS
longeq.zgrid = np.linspace(-1, 1, 2001) * ring_pce.rf_lamb / 2
longeq.max_mode = 1000 * ring_pce.harm_num
longeq.min_mode0_ratio = 1e-10
try:
_, converged = longeq.calc_longitudinal_equilibrium(
niter, tol, beta, m, print_flag
)
z0, sigmaz = longeq.calc_moments(longeq.zgrid, longeq.distributions)
bunch_length = sigmaz[0] / c
if auto_set_MC_theta:
ring_pce = synchrotron_to_pycolleff(ring, I0, MC.Vc, nb)
longeq = LongitudinalEquilibrium(
ring=ring_pce, impedance_sources=sources, fillpattern=filling
)
longeq.identical_bunches = identical_bunches
longeq.main_ref_phase_offset = z0[0] / c * ring.omega1
longeq.feedback_on = True
longeq.feedback_method = 0
longeq.zgrid = np.linspace(-1, 1, 2001) * ring_pce.rf_lamb / 2
longeq.max_mode = 1000 * ring_pce.harm_num
longeq.min_mode0_ratio = 1e-10
_, converged = longeq.calc_longitudinal_equilibrium(
niter, tol, beta, m, print_flag
)
except Exception:
converged = False
if converged:
z0, sigmaz = longeq.calc_moments(longeq.zgrid, longeq.distributions)
bunch_length = sigmaz[0] / c
longeq.ring.bunlen = sigmaz[0]
zgrid = longeq.zgrid - z0[0]
rho = longeq.distributions[0]
for i, cavity in enumerate(cavity_list):
exp = np.exp(1j * ring.k1 * cavity.m * zgrid)
FF = trapezoid(exp * rho, x=zgrid)
F[i] = np.real(FF)
PHI[i] = np.imag(FF)
MC.Vg = longeq.main_gen_amp_mon
MC.theta_g = -1 * longeq.main_gen_phase_mon
for i, cavity in enumerate(cavity_list):
Vc_phasor = -1 * cavity.Vb(I0) * F[i] * np.exp(
1j * (cavity.psi - PHI[i])
) + cavity.Vg * np.exp(1j * cavity.theta_g)
cavity.Vc = np.abs(Vc_phasor)
cavity.theta = np.angle(Vc_phasor)
a, b, cc = potential.potential_decomposition(ring, cavity_list)
LE = longeq
else:
bunch_length = None
a = None
b = None
cc = None
LE = None
return bunch_length, a, b, cc, converged, F, PHI, LE
# --------------------------------------------------------------------------- #
# Concrete solvers #
# --------------------------------------------------------------------------- #
[docs]class _SolverRepr:
"""Mixin giving the stateless solver classes a clean ClassName() repr."""
def __repr__(self) -> str:
return f"{type(self).__name__}()"
[docs]class BoschEquilibrium(_SolverRepr):
"""Original Bosch [1] iteration. Amplitude form factor + bunch length only."""
name = "Bosch"
provides = frozenset(
{
Capability.AMPLITUDE_FORM_FACTOR,
Capability.TAYLOR_POTENTIAL,
}
)
[docs] def solve(
self,
ring: Synchrotron,
cavity_list: list[CavityResonator],
opts: EquilibriumOptions,
) -> Equilibrium:
tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary)
if opts.passive_harmonic_cavity:
bl, a, b, c, conv, F = _passive_bosch(ring, cavity_list, tau_boundary, opts)
else:
bl, a, b, c, conv, F = _active_bosch(ring, cavity_list, tau_boundary, opts)
return Equilibrium(
converged=bool(conv),
I0=opts.I0,
bunch_length=bl,
a=a,
b=b,
c=c,
F=np.array(F),
cavity_list=deepcopy(cavity_list),
capabilities=self.provides,
Touschek=np.nan,
xi=_compute_xi(cavity_list) if conv else None,
)
[docs]class VenturiniEquilibrium(_SolverRepr):
"""Venturini [3,4] via mbtrack2 BeamLoadingEquilibrium. Passive and active HC."""
name = "Venturini"
provides = frozenset(
{
Capability.AMPLITUDE_FORM_FACTOR,
Capability.TAYLOR_POTENTIAL,
Capability.PHASE_FORM_FACTOR,
Capability.MBTRACK2_BLE,
Capability.TOUSCHEK_LIFETIME_RATIO,
}
)
[docs] def solve(
self,
ring: Synchrotron,
cavity_list: list[CavityResonator],
opts: EquilibriumOptions,
) -> Equilibrium:
tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary)
bl, a, b, c, conv, F, PHI, BLE = _venturini(
ring, cavity_list, tau_boundary, opts
)
try:
R = BLE.R_factor if conv else np.nan
except Exception:
R = np.nan
return Equilibrium(
converged=bool(conv),
I0=opts.I0,
bunch_length=bl,
a=a,
b=b,
c=c,
F=np.array(F),
PHI=None if PHI is None else np.array(PHI),
backend_BLE=BLE,
cavity_list=deepcopy(cavity_list),
capabilities=self.provides,
Touschek=R,
xi=_compute_xi(cavity_list) if conv else None,
)
[docs]class AlvesEquilibrium(_SolverRepr):
"""Alves [5] via pycolleff LongitudinalEquilibrium. Passive HC only."""
name = "Alves"
provides = frozenset(
{
Capability.AMPLITUDE_FORM_FACTOR,
Capability.TAYLOR_POTENTIAL,
Capability.PHASE_FORM_FACTOR,
Capability.PYCOLLEFF_LE,
Capability.FULL_DISTRIBUTION,
Capability.ARBITRARY_FILLING,
Capability.BROADBAND_IMPEDANCE,
Capability.TOUSCHEK_LIFETIME_RATIO,
}
)
[docs] def solve(
self,
ring: Synchrotron,
cavity_list: list[CavityResonator],
opts: EquilibriumOptions,
) -> Equilibrium:
tau_boundary = resolve_tau_boundary(ring, opts.tau_boundary)
bl, a, b, c3, conv, F, PHI, LE = _alves(ring, cavity_list, tau_boundary, opts)
if conv:
try:
rho0 = gaussian_bunch(LE.zgrid / c, ring.sigma_0) / c
R = trapezoid(rho0**2, LE.zgrid) / trapezoid(
LE.distributions[0] ** 2, LE.zgrid
)
except Exception:
R = np.nan
else:
R = np.nan
return Equilibrium(
converged=bool(conv),
I0=opts.I0,
bunch_length=bl,
a=a,
b=b,
c=c3,
F=np.array(F),
PHI=None if PHI is None else np.array(PHI),
backend_LE=LE,
cavity_list=deepcopy(cavity_list),
capabilities=self.provides,
Touschek=R,
xi=_compute_xi(cavity_list) if conv else None,
)