kafka_dae_diagnostics.kafka.handlers

Utilities for reacting to Kafka messages.

Members

NonEmptyMessage

Dataclass holding a valid message (a non-error message with non-empty value) from Kafka.

extract_schema

Get the schema ID from a Kafka message.

handle_6s4t

Handle a 6s4t (run stop) message from Kafka.

handle_ev44

Handle an ev44 (event-data) message from Kafka.

handle_event_topic_messages

Handle Kafka event messages.

handle_event_topic_msg

Handle an arbitrary message from Kafka event topic.

handle_pl72

Handle a pl72 (run start) message from Kafka.

handle_pu00

Handle a pu00 (frame metadata) message from Kafka.

handle_run_info_messages

Handle Kafka run info messages.

handle_runinfo_msg

Handle an arbitrary message from Kafka runInfo topic.

class kafka_dae_diagnostics.kafka.handlers.NonEmptyMessage(value: bytes, partition: int | None)[source]

Bases: object

Dataclass holding a valid message (a non-error message with non-empty value) from Kafka.

partition: int | None

The partition ID on which this message was received.

value: bytes

The content bytes of this message.

kafka_dae_diagnostics.kafka.handlers.extract_schema(msg: NonEmptyMessage) str | None[source]

Get the schema ID from a Kafka message.

Returns:

Schema ID, or None if the Kafka message had no payload or the payload was too short.

kafka_dae_diagnostics.kafka.handlers.handle_6s4t(data: Data, msg: NonEmptyMessage) None[source]

Handle a 6s4t (run stop) message from Kafka.

kafka_dae_diagnostics.kafka.handlers.handle_ev44(data: Data, msg: NonEmptyMessage) None[source]

Handle an ev44 (event-data) message from Kafka.

Parameters:
  • data – Reference to data being served.

  • msg – Message received from Kafka.

kafka_dae_diagnostics.kafka.handlers.handle_event_topic_messages(event_messages: list[Message], data: Data) None[source]

Handle Kafka event messages.

Parameters:
  • event_messages – Messages received from Kafka event topic.

  • data – Data served by kafka_dae_diagnostics.

kafka_dae_diagnostics.kafka.handlers.handle_event_topic_msg(data: Data, msg: NonEmptyMessage) None[source]

Handle an arbitrary message from Kafka event topic.

Parameters:
  • data – Reference to data being served.

  • msg – Message bytes received from Kafka.

kafka_dae_diagnostics.kafka.handlers.handle_pl72(data: Data, msg: NonEmptyMessage, event_consumer: Consumer) None[source]

Handle a pl72 (run start) message from Kafka.

This zeroes the spectra array (reallocating if the size changed), and configures the event consumer to re-read all events since run start (in case run start timestamp was in the past).

Parameters:
  • data – Reference to data being served.

  • msg – Message bytes received from Kafka.

  • event_consumer – Kafka event topic consumer.

kafka_dae_diagnostics.kafka.handlers.handle_pu00(data: Data, msg: NonEmptyMessage) None[source]

Handle a pu00 (frame metadata) message from Kafka.

Parameters:
  • data – Reference to data being served.

  • msg – Message bytes received from Kafka.

kafka_dae_diagnostics.kafka.handlers.handle_run_info_messages(run_info_messages: list[Message], data: Data, event_consumer: Consumer) None[source]

Handle Kafka run info messages.

Parameters:
  • run_info_messages – Messages received from Kafka runInfo topic.

  • data – Data served by kafka_dae_diagnostics.

  • event_consumer – Consumer for event Kafka topic.

kafka_dae_diagnostics.kafka.handlers.handle_runinfo_msg(data: Data, msg: NonEmptyMessage, event_consumer: Consumer) None[source]

Handle an arbitrary message from Kafka runInfo topic.

Parameters:
  • data – Reference to data being served.

  • msg – Message bytes received from Kafka.

  • event_consumer – Kafka event topic consumer.