kafka_dae_control.config
Utilities for reading Control IOC configuration from TOML.
Members
Create a new model by parsing and validating input data from keyword arguments. |
|
Validate and load a config file at the specified path. |
- class kafka_dae_control.config.ControlConfig(*, board_ip: IPv4Address, pv_prefix: str, runinfo_topic: str = 'runnervmp4gaq_runInfo', local_ip: IPv4Address, poll_interval_s: float = 1.0, kafka_producer: dict[str, str], state_file: Path = WindowsPath('state.json'), pv_update_interval_s: float = 0.1, read_port: int = 10000, write_port: int = 10002, flush_timeout_s: int = 1)[source]
Bases:
BaseModelCreate a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- board_ip: IPv4Address
IP address of the streaming control board
- local_ip: IPv4Address
Local IP to set the control board IP register to
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- kafka_dae_control.config.load_config(config_path: str) ControlConfig[source]
Validate and load a config file at the specified path.