"""Static PVs."""
import time
from p4p.nt import NTScalar
from p4p.server import StaticProvider
from p4p.server.thread import SharedPV
from kafka_dae_diagnostics.data import Data
[docs]
def static_pv_provider(prefix: str, data: Data) -> StaticProvider:
""":py:obj:`p4p` static PV provider.
Args:
prefix: PV prefix
data: The data to serve
"""
static_pvs = StaticPVs(data)
static_provider = StaticProvider()
static_provider.add(f"{prefix}EVENTS", static_pvs.total_events)
static_provider.add(f"{prefix}MEVENTS", static_pvs.total_mevents)
static_provider.add(f"{prefix}TOTALCOUNTS", static_pvs.total_events)
static_provider.add(f"{prefix}EVENTMESSAGES", static_pvs.total_event_messages)
static_provider.add(f"{prefix}EVENTMODEFILEMB", static_pvs.total_event_megabytes)
static_provider.add(f"{prefix}COUNTRATE", static_pvs.count_rate)
static_provider.add(f"{prefix}EVENTMODEDATARATE", static_pvs.data_rate)
static_provider.add(f"{prefix}HISTMEMORY", static_pvs.histogram_memory)
static_provider.add(f"{prefix}NUMPERIODS", static_pvs.num_periods)
static_provider.add(f"{prefix}NUMSPECTRA", static_pvs.num_spectra)
static_provider.add(f"{prefix}NUMTIMECHANNELS", static_pvs.num_time_channels)
static_provider.add(f"{prefix}START_TIME", static_pvs.start_time)
static_provider.add(f"{prefix}STOP_TIME", static_pvs.stop_time)
static_provider.add(f"{prefix}RUNDURATION", static_pvs.run_duration)
static_provider.add(f"{prefix}PROCESSINGLAG", static_pvs.event_processing_lag)
static_provider.add(f"{prefix}DIAGNOSTICSLAG", static_pvs.diagnostics_update_lag)
data.callbacks["static-callbacks"] = lambda: static_pvs.update_all(data)
return static_provider
[docs]
class StaticPVs:
"""Hold static (scalar) PV definitions."""
def __init__(self, data: "Data") -> None:
"""Hold static (scalar) PV definitions."""
self._last_update = time.time()
self._last_update_data_size = data.total_event_megabytes
self.total_events = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.total_events,
"display.precision": 0,
},
)
self.total_mevents = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.mev,
"display.precision": 6,
},
)
self.total_event_messages = SharedPV(
nt=NTScalar(display=True, form=True),
initial={"value": data.total_event_messages, "display.precision": 0},
)
self.total_event_megabytes = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.total_event_megabytes,
"display.units": "MiB",
"display.precision": 3,
},
)
self.histogram_memory = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.histogram_megabytes,
"display.units": "MiB",
"display.precision": 3,
},
)
self.num_periods = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.num_periods,
"display.precision": 0,
},
)
self.num_spectra = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.num_spectra,
"display.precision": 0,
},
)
self.num_time_channels = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.num_time_channels,
"display.precision": 0,
},
)
self.count_rate = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.num_time_channels,
"display.units": "Mev/h",
"display.precision": 3,
},
)
self.data_rate = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.average_data_rate,
"display.units": "MiB/s",
"display.precision": 3,
},
)
self.start_time = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.start_time,
"display.precision": 0,
},
)
self.stop_time = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.start_time,
"display.precision": 0,
},
)
self.run_duration = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.duration,
"display.units": "s",
"display.precision": 1,
},
)
self.event_processing_lag = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": data.event_processing_lag,
"display.units": "s",
"display.precision": 3,
},
)
self.diagnostics_update_lag = SharedPV(
nt=NTScalar(display=True, form=True),
initial={
"value": 0,
"display.units": "s",
"display.precision": 3,
},
)
[docs]
def update_all(self, data: Data) -> None:
"""Update all PVs with new data.
Args:
data (Data): New data.
"""
now = time.time()
self.total_events.post(data.total_events, timestamp=now)
self.total_mevents.post(data.mev, timestamp=now)
self.total_event_messages.post(data.total_event_messages, timestamp=now)
self.total_event_megabytes.post(data.total_event_megabytes, timestamp=now)
self.histogram_memory.post(data.histogram_megabytes, timestamp=now)
self.num_periods.post(data.num_periods, timestamp=now)
self.num_spectra.post(data.num_spectra, timestamp=now)
self.num_time_channels.post(data.num_time_channels, timestamp=now)
self.count_rate.post(data.mev_per_hour, timestamp=now)
self.start_time.post(data.start_time, timestamp=now)
self.stop_time.post(data.stop_time, timestamp=now)
self.run_duration.post(data.duration, timestamp=now)
self.event_processing_lag.post(data.event_processing_lag, timestamp=now)
self.data_rate.post(data.average_data_rate, timestamp=now)
diagnostics_update_lag = now - self._last_update
self._last_update = now
self.diagnostics_update_lag.post(diagnostics_update_lag, timestamp=now)