"""
Module where parameter scans and optimization scans are defined.
Every scan takes a SystemState (the ring + cavity list bundle) together with a
resolved set of typed option objects (EquilibriumOptions, TheoryOptions, and for
the optimisation scans OptimiserOptions), plus a ScanOptions controlling the
sweep itself. These are bundled with the resolved flow into a ScanContext. The
cavity list is deep-copied out of the state, so the caller's SystemState is left
untouched as grid points mutate cavity attributes.
"""
from __future__ import annotations
import copy
from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING
import numpy as np
from mpi4py import MPI
from tqdm import tqdm
from albums.equilibrium import resolve_tau_boundary
from albums.options import (
EquilibriumOptions,
OptimiserOptions,
ScanOptions,
TheoryOptions,
)
from albums.result_io import ResultArray, results_to_arrays
from albums.system import SystemState
if TYPE_CHECKING: # pragma: no cover - hints only
from mbtrack2 import CavityResonator, Synchrotron
from albums.optimiser import _get_vals
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
[docs]def _local_rows(var1):
"""This rank's slice of var1 plus the matching global row indices.
Splitting is even-as-possible (np.array_split), so a rank may own 0+ rows;
the global indices let after-opti passes address the full gathered psi map.
"""
gidx = np.array_split(np.arange(len(var1)), size)[rank]
return np.asarray(var1)[gidx], gidx
[docs]def _progress(total):
"""One consolidated progress bar, shown on rank 0 only."""
return tqdm(total=total, desc="scan", disable=(rank != 0))
[docs]@dataclass
class ScanContext:
"""Everything a scan grid point needs, bundled once per scan.
Attributes
----------
cavity_list : list of CavityResonator
Main cavity in position 0, the harmonic cavity in position 1.
ring : Synchrotron
Ring configuration.
flow : Flow
The already-resolved solve flow.
eq_opts : EquilibriumOptions
Beam current, integration boundary and solver options.
theory_opts : TheoryOptions
Shared theory inputs.
opti_opts : OptimiserOptions
Optimisation settings (used by the optimisation scans only).
scan_opts : ScanOptions
Sweep settings (skip).
psi0_HC, bounds : optional
Initial guess and bounds for the optimisation scans.
RoQ : float or None
Fixed R/Q value used by scan_psi_HC_Q0.
"""
cavity_list: list[CavityResonator]
ring: Synchrotron
flow: object
eq_opts: EquilibriumOptions
theory_opts: TheoryOptions = field(default_factory=TheoryOptions)
opti_opts: OptimiserOptions = field(default_factory=OptimiserOptions)
scan_opts: ScanOptions = field(default_factory=ScanOptions)
psi0_HC: float | None = None
bounds: tuple | None = None
RoQ: float | None = None
[docs]def _make_ctx(
state, flow, eq_opts, theory_opts, opti_opts=None, scan_opts=None, **extra
) -> ScanContext:
"""Build a ScanContext, resolving the flow once with the chosen mode coupling.
The cavity list is deep-copied out of state so that mutating cavity
attributes per grid point never touches the caller's SystemState.
"""
theory_opts = theory_opts or TheoryOptions()
scan_opts = scan_opts or ScanOptions()
return ScanContext(
cavity_list=copy.deepcopy(state.cavity_list),
ring=state.ring,
flow=flow,
eq_opts=eq_opts,
theory_opts=theory_opts,
opti_opts=opti_opts or OptimiserOptions(),
scan_opts=scan_opts,
**extra,
)
[docs]def _strip_unpicklable(res):
"""Drop the live backend handles / cavity objects from a Result before MPI
pickling. results_to_arrays ignores these fields anyway, and the pycolleff
backend_LE carries an unpicklable closure (see physics.ptbl.prepare_alves_le)."""
if res is not None and res.equilibrium is not None:
res.equilibrium.backend_BLE = None
res.equilibrium.backend_LE = None
res.equilibrium.cavity_list = None
return res
[docs]def _run_grid(func, var1, var2, ctx):
"""Evaluate func(v1, v2, ctx) over the var1 x var2 grid, returning a 2D object
array of Result on rank 0 (None on other ranks).
var1 rows are split across MPI ranks and object-gathered (pickling) to the
root — the entries are Result objects (nested arrays + dataclasses), so the
buffer-based comm.Gather cannot be used. Grid points skipped by the skip
fast-path stay None and are filled with sentinels downstream.
"""
var1_local, _ = _local_rows(var1)
local = np.empty((len(var1_local), len(var2)), dtype=object)
pbar = _progress(len(var1_local) * len(var2))
for i, v1 in enumerate(var1_local):
var_skip = False
for j, v2 in enumerate(var2):
res = func(v1, v2, ctx)
local[i, j] = _strip_unpicklable(res)
pbar.update(1)
if not res.converged:
if ctx.scan_opts.skip and var_skip:
pbar.update(len(var2) - j - 1)
break
var_skip = True
else:
var_skip = False
pbar.close()
gathered = comm.gather(local, root=0)
if rank != 0:
return None
return np.concatenate(gathered, axis=0)
# Generalized scan function
[docs]def __scan(func, var1, var2, ctx):
return _run_grid(func, var1, var2, ctx)
[docs]def _result(ctx, grid, var1, var2, u1, u2, l1, l2, extra=None):
"""Rank-0: wrap the gathered grid into a self-describing ResultArray (axes,
units, labels, flow). Returns None on other ranks.
"""
if rank != 0:
return None
tau = resolve_tau_boundary(ctx.ring, ctx.eq_opts.tau_boundary)
return ResultArray(
results_to_arrays(grid),
var1=np.asarray(var1),
var2=np.asarray(var2),
var1_unit=u1,
var2_unit=u2,
var1_label=l1,
var2_label=l2,
flow=ctx.flow,
tau_boundary=tau,
extra=extra,
)
[docs]def __scan_opti(func, var1, var2, ctx):
"""Optimisation grid: gather the full Result per point to root, and the
optimised psi to every rank (needed by the after-opti pass)."""
var1_local, _ = _local_rows(var1)
local = np.empty((len(var1_local), len(var2)), dtype=object)
local_psi = np.zeros((len(var1_local), len(var2)))
pbar = _progress(len(var1_local) * len(var2))
for i, v1 in enumerate(var1_local):
for j, v2 in enumerate(var2):
psi, res = func(v1, v2, ctx)
local_psi[i, j] = psi
local[i, j] = _strip_unpicklable(res)
pbar.update(1)
pbar.close()
# pickle-based collective: per-rank row counts may differ (uneven split).
psi_full = np.concatenate(comm.allgather(local_psi), axis=0)
gathered = comm.gather(local, root=0)
grid = np.concatenate(gathered, axis=0) if rank == 0 else None
return grid, psi_full
[docs]def __scan_after_opti(func, psi, var1, var2, psi_add, ctx):
var1_local, gidx = _local_rows(var1)
local = np.empty((len(var1_local), len(var2)), dtype=object)
pbar = _progress(len(var1_local) * len(var2))
for i, v1 in enumerate(var1_local):
for j, v2 in enumerate(var2):
local[i, j] = _strip_unpicklable(
func(psi[gidx[i], j] + psi_add, v1, v2, ctx)
)
pbar.update(1)
pbar.close()
gathered = comm.gather(local, root=0)
if rank != 0:
return None
return np.concatenate(gathered, axis=0)
# %% scan_psi_I0
[docs]def __psi_I0(psi, I0, ctx):
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, replace(ctx.eq_opts, I0=I0), ctx.theory_opts)
[docs]def scan_psi_I0(
state,
psi_HC_vals,
currents,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(state, flow, eq_opts, theory_opts, scan_opts=scan_opts)
grid = __scan(__psi_I0, psi_HC_vals, currents, ctx)
return _result(
ctx, grid, psi_HC_vals, currents, 1, 1e3, "Tuning angle [°]", "Current [mA]"
)
# %% scan_psi_RoQ
[docs]def __psi_RoQ(psi, RoQ, ctx):
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_RoQ(
state,
psi_HC_vals,
RoQ_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__psi_RoQ, psi_HC_vals, RoQ_vals, ctx)
return _result(
ctx, grid, psi_HC_vals, RoQ_vals, 1, 1, "Tuning angle [°]", "R/Q [ohm]"
)
# %% opti RoQ/I0
[docs]def __opti_I0_RoQ(I0, RoQ, ctx):
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
return _get_vals(
ctx.ring,
ctx.cavity_list,
ctx.flow,
ctx.psi0_HC,
ctx.bounds,
replace(ctx.eq_opts, I0=I0),
ctx.theory_opts,
ctx.opti_opts,
is_equilibrium=False,
)
[docs]def __opti_I0_RoQ_equilibrium(I0, RoQ, ctx):
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
return _get_vals(
ctx.ring,
ctx.cavity_list,
ctx.flow,
ctx.psi0_HC,
ctx.bounds,
replace(ctx.eq_opts, I0=I0),
ctx.theory_opts,
ctx.opti_opts,
is_equilibrium=True,
)
[docs]def _opti_result(ctx, grid, psi, var1, var2, u1, u2, l1, l2):
"""Build the optimisation ResultArray with the optimised psi grid in extra."""
extra = {"psi": psi} if rank == 0 else None
return _result(ctx, grid, var1, var2, u1, u2, l1, l2, extra=extra)
[docs]def scan_RoQ_I0(
state,
current_vals,
RoQ_vals,
flow,
eq_opts,
psi0_HC,
bounds,
theory_opts=TheoryOptions(),
opti_opts=OptimiserOptions(),
scan_opts=ScanOptions(),
scan_after_opti=True,
psi_add_after_opti=-0.1,
):
"""Optimise psi_HC over a current x R/Q grid.
Returns the optimisation ResultArray (background quantities xi/Touschek/
bunch_length/psi + flow instability markers).
When scan_after_opti is True, a second scan re-run slightly off the optimum is
attached as result.after_opti.
"""
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
opti_opts=opti_opts,
scan_opts=scan_opts,
psi0_HC=psi0_HC,
bounds=bounds,
)
grid, psi = __scan_opti(__opti_I0_RoQ, current_vals, RoQ_vals, ctx)
result = _opti_result(
ctx, grid, psi, current_vals, RoQ_vals, 1e3, 1, "Current [mA]", "R/Q [ohm]"
)
comm.Barrier()
if scan_after_opti:
grid2 = __scan_after_opti(
__psi_I0_RoQ, psi, current_vals, RoQ_vals, psi_add_after_opti, ctx
)
if rank == 0:
after = ctx
result.after_opti = _result(
after,
grid2,
current_vals,
RoQ_vals,
1e3,
1,
"Current [mA]",
"R/Q [ohm]",
)
return result
# %% opti RoQ/I0 equilibrium
[docs]def scan_RoQ_I0_equilibrium(
state,
current_vals,
RoQ_vals,
flow,
eq_opts,
psi0_HC,
bounds,
theory_opts=TheoryOptions(),
opti_opts=OptimiserOptions(),
scan_opts=ScanOptions(),
):
# for passive HC ! Set Q0=QL
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
opti_opts=opti_opts,
scan_opts=scan_opts,
psi0_HC=psi0_HC,
bounds=bounds,
)
grid, psi = __scan_opti(__opti_I0_RoQ_equilibrium, current_vals, RoQ_vals, ctx)
return _opti_result(
ctx, grid, psi, current_vals, RoQ_vals, 1e3, 1, "Current [mA]", "R/Q [ohm]"
)
# %% opti RoQ/Q0
[docs]def __opti_Q0_RoQ(Q0, RoQ, ctx):
ctx.cavity_list[1].Q = Q0
ctx.cavity_list[1].QL = Q0
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
return _get_vals(
ctx.ring,
ctx.cavity_list,
ctx.flow,
ctx.psi0_HC,
ctx.bounds,
ctx.eq_opts,
ctx.theory_opts,
ctx.opti_opts,
is_equilibrium=False,
)
[docs]def __opti_Q0_RoQ_equilibrium(Q0, RoQ, ctx):
ctx.cavity_list[1].Q = Q0
ctx.cavity_list[1].QL = Q0
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
return _get_vals(
ctx.ring,
ctx.cavity_list,
ctx.flow,
ctx.psi0_HC,
ctx.bounds,
ctx.eq_opts,
ctx.theory_opts,
ctx.opti_opts,
is_equilibrium=True,
)
[docs]def scan_RoQ_Q0(
state,
Q0_vals,
RoQ_vals,
flow,
eq_opts,
psi0_HC,
bounds,
theory_opts=TheoryOptions(),
opti_opts=OptimiserOptions(),
scan_opts=ScanOptions(),
scan_after_opti=True,
psi_add_after_opti=-0.1,
):
# for passive HC ! Set Q0=QL
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
opti_opts=opti_opts,
scan_opts=scan_opts,
psi0_HC=psi0_HC,
bounds=bounds,
)
grid, psi = __scan_opti(__opti_Q0_RoQ, Q0_vals, RoQ_vals, ctx)
result = _opti_result(
ctx, grid, psi, Q0_vals, RoQ_vals, 1, 1, "Harmonic cavity Q0", "R/Q [ohm]"
)
comm.Barrier()
if scan_after_opti:
grid2 = __scan_after_opti(
__psi_Q0_RoQ, psi, Q0_vals, RoQ_vals, psi_add_after_opti, ctx
)
if rank == 0:
after = ctx
result.after_opti = _result(
after, grid2, Q0_vals, RoQ_vals, 1, 1, "Harmonic cavity Q0", "R/Q [ohm]"
)
return result
# %% opti RoQ/Q0 equilibrium
[docs]def scan_RoQ_Q0_equilibrium(
state,
Q0_vals,
RoQ_vals,
flow,
eq_opts,
psi0_HC,
bounds,
theory_opts=TheoryOptions(),
opti_opts=OptimiserOptions(),
scan_opts=ScanOptions(),
):
# for passive HC ! Set Q0=QL
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
opti_opts=opti_opts,
scan_opts=scan_opts,
psi0_HC=psi0_HC,
bounds=bounds,
)
grid, psi = __scan_opti(__opti_Q0_RoQ_equilibrium, Q0_vals, RoQ_vals, ctx)
return _opti_result(
ctx, grid, psi, Q0_vals, RoQ_vals, 1, 1, "Harmonic cavity Q0", "R/Q [ohm]"
)
# %% generic find instability after opti
[docs]def __psi_I0_RoQ(psi, I0, RoQ, ctx):
ctx.cavity_list[1].Rs = RoQ * ctx.cavity_list[1].Q
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, replace(ctx.eq_opts, I0=I0), ctx.theory_opts)
[docs]def __psi_Q0_RoQ(psi, Q0, RoQ, ctx):
ctx.cavity_list[1].Q = Q0
ctx.cavity_list[1].QL = Q0
ctx.cavity_list[1].Rs = RoQ * Q0
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
# %% scan_psi_QL
[docs]def __psi_QL(psi, QL, ctx):
ctx.cavity_list[0].QL = QL
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_QL(
state,
psi_HC_vals,
QL_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__psi_QL, psi_HC_vals, QL_vals, ctx)
return _result(
ctx, grid, psi_HC_vals, QL_vals, 1, 1, "Tuning angle [°]", "Main cavity QL"
)
# %% scan_psi_MCdet
[docs]def __psi_MCdet(psi, det, ctx):
ctx.cavity_list[0].detune = det
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_MCdet(
state,
psi_HC_vals,
MCdet_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
# this scan forces a passive harmonic cavity with the detuning held fixed
eq_opts = replace(eq_opts, passive_harmonic_cavity=True, optimal_tunning=False)
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__psi_MCdet, psi_HC_vals, MCdet_vals, ctx)
return _result(
ctx,
grid,
psi_HC_vals,
MCdet_vals,
1,
1e-3,
"Tuning angle [°]",
"Main cavity detuning [kHz]",
)
# %% scan_psi_MC_Rs
[docs]def __psi_MCRs(psi, Rs, ctx):
ctx.cavity_list[0].Rs_per_cavity = Rs
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_MC_Rs(
state,
psi_HC_vals,
MC_Rs_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__psi_MCRs, psi_HC_vals, MC_Rs_vals, ctx)
return _result(
ctx,
grid,
psi_HC_vals,
MC_Rs_vals,
1,
1e-6,
"Tuning angle [°]",
"Main cavity Rs per cavity [MOhm]",
)
# %% scan_psi_MC_beta
[docs]def __psi_MC_beta(psi, beta, ctx):
ctx.cavity_list[0].beta = beta
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_MC_beta(
state,
psi_HC_vals,
MC_beta_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__psi_MC_beta, psi_HC_vals, MC_beta_vals, ctx)
return _result(
ctx,
grid,
psi_HC_vals,
MC_beta_vals,
1,
1,
"Tuning angle [°]",
"Main cavity beta",
)
# %% scan_psi_HC_Q0
[docs]def __psi_HC_Q0(psi, Q0, ctx):
ctx.cavity_list[1].Q = Q0
ctx.cavity_list[1].QL = Q0
ctx.cavity_list[1].Rs = Q0 * ctx.RoQ
ctx.cavity_list[1].psi = psi * np.pi / 180
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_psi_HC_Q0(
state,
psi_HC_vals,
HC_Q0_vals,
RoQ,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
# for passive HC ! Set Q0=QL
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
RoQ=RoQ,
scan_opts=scan_opts,
)
grid = __scan(__psi_HC_Q0, psi_HC_vals, HC_Q0_vals, ctx)
return _result(
ctx,
grid,
psi_HC_vals,
HC_Q0_vals,
1,
1,
"Tuning angle [°]",
"Harmonic cavity Q0",
)
# %% scan_MC_Vc_HC_Vc_active
[docs]def __MC_Vc_HC_Vc_active(MC_Vc, HC_Vc, ctx):
I0 = ctx.eq_opts.I0
cl = ctx.cavity_list
cl[0].Vc = MC_Vc
cl[0].theta = np.arccos(ctx.ring.U0 / cl[0].Vc)
cl[0].set_optimal_detune(I0)
cl[0].set_generator(I0)
cl[1].Vc = HC_Vc
cl[1].theta = -np.pi / 2
cl[1].set_optimal_detune(I0)
cl[1].set_generator(I0)
B = SystemState(ctx.ring, cl)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_MC_Vc_HC_Vc_active(
state,
MC_Vc_vals,
HC_Vc_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__MC_Vc_HC_Vc_active, MC_Vc_vals, HC_Vc_vals, ctx)
return _result(
ctx,
grid,
MC_Vc_vals,
HC_Vc_vals,
1e-6,
1e-3,
"MC voltage [MV]",
"HC voltage [kV]",
)
# %% scan_MC_Vc_MC_psi_active
[docs]def __MC_Vc_MC_psi_active(MC_Vc, MC_psi, ctx):
I0 = ctx.eq_opts.I0
cl = ctx.cavity_list
cl[0].Vc = MC_Vc
cl[0].theta = np.arccos(ctx.ring.U0 / cl[0].Vc)
cl[0].set_optimal_detune(I0)
cl[0].psi = cl[0].psi + MC_psi / 180 * np.pi
cl[0].set_generator(I0)
B = SystemState(ctx.ring, cl)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_MC_Vc_MC_psi_active(
state,
MC_Vc_vals,
MC_psi_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__MC_Vc_MC_psi_active, MC_Vc_vals, MC_psi_vals, ctx)
return _result(
ctx,
grid,
MC_Vc_vals,
MC_psi_vals,
1e-6,
1,
"MC voltage [MV]",
"MC delta psi from optimal [°]",
)
# %% scan_MC_psi_HC_psi_active
[docs]def __MC_psi_HC_psi_active(MC_psi, HC_psi, ctx):
I0 = ctx.eq_opts.I0
cl = ctx.cavity_list
cl[0].set_optimal_detune(I0)
cl[0].psi = cl[0].psi + MC_psi / 180 * np.pi
cl[0].set_generator(I0)
cl[1].set_optimal_detune(I0)
cl[1].psi = cl[1].psi + HC_psi / 180 * np.pi
cl[1].set_generator(I0)
B = SystemState(ctx.ring, cl)
return B.run(ctx.flow, ctx.eq_opts, ctx.theory_opts)
[docs]def scan_MC_psi_HC_psi_active(
state,
MC_psi_vals,
HC_psi_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__MC_psi_HC_psi_active, MC_psi_vals, HC_psi_vals, ctx)
return _result(
ctx,
grid,
MC_psi_vals,
HC_psi_vals,
1,
1,
"MC delta psi from optimal [°]",
"HC delta psi from optimal [°]",
)
# %% scan_HC_beta_I0_active
[docs]def __HC_beta_I0_active(HC_beta, I0, ctx):
cl = ctx.cavity_list
cl[0].set_optimal_detune(I0)
cl[0].set_generator(I0)
cl[1].beta = HC_beta
cl[1].set_optimal_detune(I0)
cl[1].set_generator(I0)
B = SystemState(ctx.ring, cl)
return B.run(ctx.flow, replace(ctx.eq_opts, I0=I0), ctx.theory_opts)
[docs]def scan_HC_beta_I0_active(
state,
HC_beta_vals,
I0_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__HC_beta_I0_active, HC_beta_vals, I0_vals, ctx)
return _result(ctx, grid, HC_beta_vals, I0_vals, 1, 1e3, "HC beta", "Current [mA]")
# %% scan_MC_psi_I0
[docs]def __MC_psi_I0(psi, I0, ctx):
ctx.cavity_list[0].psi = psi * np.pi / 180
ctx.cavity_list[0].set_generator(I0)
B = SystemState(ctx.ring, ctx.cavity_list)
return B.run(ctx.flow, replace(ctx.eq_opts, I0=I0), ctx.theory_opts)
[docs]def scan_MC_psi_I0(
state,
psi_MC_vals,
I0_vals,
flow,
eq_opts,
theory_opts=TheoryOptions(),
scan_opts=ScanOptions(),
):
ctx = _make_ctx(
state,
flow,
eq_opts,
theory_opts,
scan_opts=scan_opts,
)
grid = __scan(__MC_psi_I0, psi_MC_vals, I0_vals, ctx)
return _result(
ctx, grid, psi_MC_vals, I0_vals, 1, 1e3, "MC Tuning angle [°]", "Current [mA]"
)