diff --git a/python/lsst/consdb/hinfo.py b/python/lsst/consdb/hinfo.py index 4c407589..49efcea2 100644 --- a/python/lsst/consdb/hinfo.py +++ b/python/lsst/consdb/hinfo.py @@ -2,6 +2,7 @@ import os import random import re +from dataclasses import dataclass from datetime import datetime from typing import Any, Sequence @@ -10,7 +11,6 @@ import httpx import kafkit.registry import kafkit.registry.httpx -import kafkit.ssl import yaml from astro_metadata_translator import ObservationInfo from lsst.resources import ResourcePath @@ -230,6 +230,28 @@ def process_date(day_obs: str) -> None: # Initialization # ################## + +@dataclass +class KafkaConfig: + """Class for configuring Kafka-related items.""" + + bootstrap: str + group_id: str + username: str + password: str + schema_url: str + + +def get_kafka_config() -> KafkaConfig: + return KafkaConfig( + bootstrap=os.environ["KAFKA_BOOTSTRAP"], + group_id=os.environ.get("KAFKA_GROUP_ID", "consdb-consumer"), + username=os.environ["KAFKA_USERNAME"], + password=os.environ["KAFKA_PASSWORD"], + schema_url=os.environ["SCHEMA_URL"], + ) + + instrument = os.environ.get("INSTRUMENT", "LATISS") match instrument: case "LATISS": @@ -275,9 +297,6 @@ def process_date(day_obs: str) -> None: if bucket_prefix: os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" -kafka_bootstrap = os.environ["KAFKA_BOOTSTRAP"] -schema_url = os.environ["SCHEMA_URL"] -kafka_group_id = "1" topic = f"lsst.{TOPIC_MAPPING[instrument]}.logevent_largeFileObjectAvailable" @@ -289,26 +308,24 @@ def process_date(day_obs: str) -> None: async def main() -> None: """Handle Header Service largeFileObjectAvailable messages.""" - global bucket_prefix, kafka_bootstrap, kafka_group_id, schema_url, topic + global bucket_prefix + kafka_config = get_kafka_config() async with httpx.AsyncClient() as client: schema_registry = kafkit.registry.httpx.RegistryApi( - http_client=client, url=schema_url + http_client=client, url=kafka_config.schema_url ) deserializer = kafkit.registry.Deserializer(registry=schema_registry) - ssl_context = kafkit.ssl.create_ssl_context( - cluster_ca_path=broker_ca_path, - client_cert_path=client_cert_path, - client_key_path=client_key_path, - ) consumer = aiokafka.AIOKafkaConsumer( topic, - bootstrap_servers=kafka_bootstrap, - ssl_context=ssl_context, - security_protocol="SSL", - group_id=kafka_group_id, + bootstrap_servers=kafka_config.bootstrap, + group_id=kafka_config.group_id, auto_offset_reset="earliest", + isolation_level="read_committed", + security_protocol="SASL_PLAINTEXT", + sasl_plain_username=kafka_config.username, + sasl_plain_password=kafka_config.password, ) await consumer.start()