"""Static PVs for KDAECTRL."""
import logging
from queue import Queue
from p4p.nt import NTEnum, NTScalar
from p4p.server import ServerOperation, StaticProvider
from p4p.server.thread import SharedPV
from kafka_dae_control.data import Data
from kafka_dae_control.defaults import FrameSyncSelect
from kafka_dae_control.event_with_value import EventWithValue
from kafka_dae_control.worker_event_types import (
BeginEvent,
EndEvent,
FrameSyncSelectChangeEvent,
WorkerEvent,
)
logger = logging.getLogger(__name__)
[docs]
class StaticPVs:
"""Static PVs for KDAECTRL."""
def __init__(self, data: "Data", queue: Queue[WorkerEvent]) -> None:
"""Set up static PVs for KDAECTRL.
Args:
data: the data class containing the state of the program.
queue: the worker event queue.
"""
self.hw_running = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.running,
},
)
self.frame_sync_select_rbv = SharedPV(
nt=NTEnum(),
initial={
"choices": [x.name for x in FrameSyncSelect],
"index": data.frame_sync_select_rbv.value,
},
)
self.frame_sync_select_sp = SharedPV(
nt=NTEnum(),
initial={
"choices": [x.name for x in FrameSyncSelect],
"index": data.frame_sync_select_sp.value,
},
)
self.begin = SharedPV(nt=NTScalar(display=True, form=True), initial={"value": False})
self.end = SharedPV(nt=NTScalar(display=True, form=True), initial={"value": False})
self.run_number = SharedPV(
nt=NTScalar("s", display=True, form=True), initial={"value": str(data.run_number)}
)
self.i_run_number = SharedPV(
nt=NTScalar(display=True, form=True), initial={"value": data.run_number}
)
@self.begin.put # pragma: no cover
def begin_put(_: SharedPV, op: ServerOperation) -> None:
logger.info("begin")
ev = EventWithValue()
queue.put(BeginEvent(done_event=ev))
try:
ev.wait()
op.done()
except Exception as e: # noqa: BLE001
op.done(error=f"Failed to begin: {e}")
@self.end.put # pragma: no cover
def end_put(_: SharedPV, op: ServerOperation) -> None:
logger.info("end")
ev = EventWithValue()
queue.put(EndEvent(done_event=ev))
try:
ev.wait()
op.done()
except Exception as e: # noqa: BLE001
op.done(error=f"Failed to end: {e}")
@self.frame_sync_select_sp.put
def frame_sync_select_sp_put(pv: SharedPV, op: ServerOperation) -> None:
value = op.value()
logger.info("put with %s to frame_sync_select_sp", value)
ev = EventWithValue()
queue.put(FrameSyncSelectChangeEvent(value=FrameSyncSelect(value), done_event=ev))
try:
ev.wait()
op.done()
except Exception as e: # noqa: BLE001
op.done(error=f"Failed to set frame_sync_select_sp: {e}")
[docs]
def update_all(self, data: Data) -> None:
"""Post updates to all PVs using the data class values.
Args:
data: the data class containing the state of the program.
"""
self.run_number.post(str(data.run_number))
self.i_run_number.post(data.run_number)
self.hw_running.post(data.running)
self.frame_sync_select_rbv.post(data.frame_sync_select_rbv.value)
[docs]
def static_pv_provider(
pv_prefix: str, data: "Data", queue: Queue[WorkerEvent]
) -> tuple[StaticPVs, StaticProvider]:
"""Generate a static pv provider containing all the static PVs.
This also sets up basic post hooks for observable dataclass items.
Args:
pv_prefix: the PV prefix.
data: The data class containing the state of the program.
queue: the worker event queue.
Returns: A static pv provider containing static PVs.
"""
static_pvs = StaticPVs(data, queue)
static_provider = StaticProvider()
dae_prefix = "DAE:"
prefix = f"{pv_prefix}{dae_prefix}"
static_provider.add(f"{prefix}HWRUNNING", static_pvs.hw_running)
static_provider.add(f"{prefix}BEGINRUNEX", static_pvs.begin)
static_provider.add(f"{prefix}ENDRUN", static_pvs.end)
static_provider.add(f"{prefix}RUNNUMBER", static_pvs.run_number)
static_provider.add(f"{prefix}IRUNNUMBER", static_pvs.i_run_number)
static_provider.add(f"{prefix}DAETIMINGSOURCE", static_pvs.frame_sync_select_rbv)
static_provider.add(f"{prefix}DAETIMINGSOURCE:SP", static_pvs.frame_sync_select_sp)
return static_pvs, static_provider