From b08e48e61cce0f2d788b54ef795adb420bed8de9 Mon Sep 17 00:00:00 2001 From: Glauber Costa Vila Verde Date: Mon, 24 Jun 2024 10:01:55 -0700 Subject: [PATCH 1/2] Changed lsst_efd_client -> InfluxDB API for topic queries --- .vscode/settings.json | 7 +- python/lsst/consdb/dao/influxdb.py | 164 ++++++++++++++++++ python/lsst/consdb/efd_transform/__init__.py | 0 python/lsst/consdb/efd_transform/transform.py | 22 ++- python/lsst/consdb/transform_efd.py | 8 +- tests/efd_transform/test_config_model.py | 135 ++++++++++++++ 6 files changed, 320 insertions(+), 16 deletions(-) create mode 100644 python/lsst/consdb/dao/influxdb.py create mode 100644 python/lsst/consdb/efd_transform/__init__.py create mode 100644 tests/efd_transform/test_config_model.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 707c524c..5687d264 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,5 +6,10 @@ 79, 110 ], - } + }, + "python.analysis.extraPaths": [ + "./python", + "./python", + "./python/lsst" + ] } diff --git a/python/lsst/consdb/dao/influxdb.py b/python/lsst/consdb/dao/influxdb.py new file mode 100644 index 00000000..eaa4c3cf --- /dev/null +++ b/python/lsst/consdb/dao/influxdb.py @@ -0,0 +1,164 @@ +from urllib.parse import urljoin + +import astropy.time +import pandas as pd +import requests +from astropy.time import Time +from lsst_efd_client.auth_helper import NotebookAuth + + +class InfluxDBClient: + """A simple InfluxDB client. + + Parameters + ---------- + url : str + The URL of the InfluxDB API. + database_name : str + The name of the database to query. + username : str, optional + The username to authenticate with. + password : str, optional + The password to authenticate with. + """ + + def __init__( + self, + url: str, + database_name: str, + username: str | None = None, + password: str | None = None, + ) -> None: + self.url = url + self.database_name = database_name + self.auth = (username, password) if username and password else None + + def query(self, query: str) -> dict: + """Send a query to the InfluxDB API.""" + + params = {"db": self.database_name, "q": query} + try: + response = requests.get(f"{self.url}/query", params=params, auth=self.auth) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as exc: + raise Exception(f"An error occurred: {exc}") from exc + + def _to_dataframe(self, response: dict) -> pd.DataFrame: + """Convert an InfluxDB response to a Pandas dataframe. + + Parameters + ---------- + response : dict + The JSON response from the InfluxDB API. + """ + # One query submitted at a time + statement = response["results"][0] + # One topic queried at a time + series = statement["series"][0] + result = pd.DataFrame(series.get("values", []), columns=series["columns"]) + if "time" not in result.columns: + return result + result = result.set_index(pd.to_datetime(result["time"])).drop("time", axis=1) + if result.index.tzinfo is None: + result.index = result.index.tz_localize("UTC") + if "tags" in series: + for k, v in series["tags"].items(): + result[k] = v + if "name" in series: + result.name = series["name"] + return result + + def build_time_range_query(self, topic_name, fields, start, end, index=None, use_old_csc_indexing=False): + """Build a query based on a time range. + + Parameters + ---------- + topic_name : `str` + Name of topic for which to build a query. + fields : `str` or `list` + Name of field(s) to query. + start : `astropy.time.Time` + Start time of the time range. + end : `astropy.time.Time` + End time of the range either as an absolute time. + index : `int`, optional + When index is used, add an 'AND salIndex = index' to the query. + (default is `None`). + use_old_csc_indexing: `bool`, optional + When index is used, add an 'AND {CSCName}ID = index' to the query + which is the old CSC indexing name. + (default is `False`). + + Returns + ------- + query : `str` + A string containing the constructed query statement. + """ + if not isinstance(start, Time): + raise TypeError("The first time argument must be a time stamp") + + if not start.scale == "utc": + raise ValueError("Timestamps must be in UTC.") + + elif isinstance(end, Time): + if not end.scale == "utc": + raise ValueError("Timestamps must be in UTC.") + start_str = start.isot + end_str = end.isot + else: + raise TypeError("The second time argument must be the time stamp for the end " "or a time delta.") + index_str = "" + if index: + if use_old_csc_indexing: + parts = topic_name.split(".") + index_name = f"{parts[-2]}ID" # The CSC name is always the penultimate + else: + index_name = "salIndex" + index_str = f" AND {index_name} = {index}" + timespan = f"time >= '{start_str}Z' AND time <= '{end_str}Z'{index_str}" # influxdb demands last Z + + if isinstance(fields, str): + fields = [ + fields, + ] + elif isinstance(fields, bytes): + fields = fields.decode() + fields = [ + fields, + ] + + # Build query here + return f'SELECT {", ".join(fields)} FROM "{self.database_name}"."autogen"."{topic_name}" WHERE {timespan}' + + def select_time_series( + self, + topic_name, + fields, + start: astropy.time.Time, + end: astropy.time.Time, + index=None, + use_old_csc_indexing=False, + ): + + query = self.build_time_range_query(topic_name, fields, start, end, index, use_old_csc_indexing) + + response = self.query(query) + + if "series" not in response["results"][0]: + return pd.DataFrame() + + return self._to_dataframe(response) + + +class InfluxDbDao(InfluxDBClient): + + def __init__( + self, efd_name, database_name="efd", creds_service="https://roundtable.lsst.codes/segwarides/" + ): + auth = NotebookAuth(service_endpoint=creds_service) + host, schema_registry_url, port, user, password, path = auth.get_auth(efd_name) + + url = urljoin(f"https://{host}:{port}", f"{path}") + + super(InfluxDbDao, self).__init__(url, database_name=database_name, username=user, password=password) diff --git a/python/lsst/consdb/efd_transform/__init__.py b/python/lsst/consdb/efd_transform/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/lsst/consdb/efd_transform/transform.py b/python/lsst/consdb/efd_transform/transform.py index 7a40e22f..d5ac12ee 100644 --- a/python/lsst/consdb/efd_transform/transform.py +++ b/python/lsst/consdb/efd_transform/transform.py @@ -2,17 +2,15 @@ from typing import Any, List, Union import astropy.time -import lsst_efd_client import numpy import pandas from dao.butler import ButlerDao from dao.exposure_efd import ExposureEfdDao +from dao.influxdb import InfluxDbDao from dao.visit_efd import VisitEfdDao from efd_transform.summary import Summary from lsst.daf.butler import Butler -# from sqlalchemy import Engine - class Transform: """ @@ -40,7 +38,7 @@ def __init__( self, butler: Butler, db_uri: str, - efd: lsst_efd_client.EfdClient, + efd: InfluxDbDao, config: dict[str, Any], logger: logging.Logger, commit_every: int = 100, @@ -51,7 +49,7 @@ def __init__( Args: butler (Butler): The Butler object for accessing data. db_uri (str): The database connection string. - efd (lsst_efd_client.EfdClient): The EFD client for accessing + efd (dao.InfluxDbDao): The EFD client for accessing EFD data. config (dict[str, Any]): The configuration for the transformation process. @@ -72,7 +70,7 @@ def __init__( self.log.debug(f"EFD: {self.efd}") self.log.debug(f"Configs Columns: {len(self.config['columns'])}") - async def process_interval( + def process_interval( self, instrument: str, start_time: astropy.time.Time, @@ -127,7 +125,7 @@ async def process_interval( # Array with all topics needed for this column # topics = [{'name': topic name, series: pandas.DataFrame}] - topics = await self.topics_by_column(column, topic_interval) + topics = self.topics_by_column(column, topic_interval) if "ExposureEFD" in column["tables"]: for exposure in exposures: @@ -262,7 +260,7 @@ def concatenate_arrays( "Input data must be a list or list of lists or a numpy array or list of numpy arrays." ) - async def topics_by_column(self, column, topic_interval) -> list[dict]: + def topics_by_column(self, column, topic_interval) -> list[dict]: """ Retrieves the EFD topics and their corresponding series for a given column. @@ -278,13 +276,13 @@ async def topics_by_column(self, column, topic_interval) -> list[dict]: data = [] for topic in column["topics"]: - topic_series = await self.get_efd_values(topic, topic_interval) + topic_series = self.get_efd_values(topic, topic_interval) data.append({"topic": topic["name"], "series": topic_series}) self.log.debug(f"EFD Topic {topic['name']} return {len(topic_series)} rows") return data - async def get_efd_values( + def get_efd_values( self, topic: dict[str, Any], topic_interval: list[astropy.time.Time], @@ -296,12 +294,12 @@ async def get_efd_values( fields = [f["name"] for f in topic["fields"]] - series = await self.efd.select_time_series( + series = self.efd.select_time_series( topic["name"], fields, start - window, end + window, - topic.get("index", None), + index=topic.get("index", None), ) # TODO: Currently doing a temporary resample and interpolate. diff --git a/python/lsst/consdb/transform_efd.py b/python/lsst/consdb/transform_efd.py index 9b628389..aa21ba15 100644 --- a/python/lsst/consdb/transform_efd.py +++ b/python/lsst/consdb/transform_efd.py @@ -6,8 +6,10 @@ from typing import Any, Dict import astropy.time -import lsst_efd_client + +# import lsst_efd_client import yaml +from dao.influxdb import InfluxDbDao from efd_transform.config_model import ConfigModel from efd_transform.transform import Transform from lsst.daf.butler import Butler @@ -161,7 +163,7 @@ async def main() -> None: # db = create_engine(args.db_conn_str) db_uri = args.db_conn_str - efd = lsst_efd_client.EfdClient(args.efd_conn_str) + efd = InfluxDbDao(args.efd_conn_str) config = read_config(args.config_name) @@ -175,7 +177,7 @@ async def main() -> None: start_time = astropy.time.Time(args.start_time, format="isot") end_time = astropy.time.Time(args.end_time, format="isot") - await tm.process_interval( + tm.process_interval( args.instrument, start_time, end_time, diff --git a/tests/efd_transform/test_config_model.py b/tests/efd_transform/test_config_model.py new file mode 100644 index 00000000..13c1eb62 --- /dev/null +++ b/tests/efd_transform/test_config_model.py @@ -0,0 +1,135 @@ +import pytest +from pydantic import ValidationError + +from python.lsst.consdb.efd_transform.config_model import Column, ConfigModel, Field, Topic + + +def get_columns(): + columns = [ + Column( + name="column1", + function="function1", + type="type1", + unit="unit1", + description="description1", + topics=[ + Topic( + name="topic1", + fields=[ + Field(name="field1", is_array=False), + Field(name="field2", is_array=True), + ], + ), + Topic( + name="topic2", + fields=[ + Field(name="field3", is_array=False), + Field(name="field4", is_array=True), + ], + ), + ], + ), + Column( + name="column2", + function="function2", + type="type2", + unit="unit2", + description="description2", + topics=[ + Topic( + name="topic3", + fields=[ + Field(name="field5", is_array=False), + Field(name="field6", is_array=True), + ], + ), + ], + ), + ] + + return columns + + +def test_config_model_valid(): + columns = get_columns() + + config_model = ConfigModel(columns=columns) + + assert config_model.columns == columns + + +def test_config_model_invalid(): + + # Missing required attribute 'columns' + with pytest.raises(ValidationError): + ConfigModel() + + # Invalid type for 'columns' + with pytest.raises(ValidationError): + ConfigModel(columns="invalid") + + # Invalid type for 'name' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].name = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'function' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].function = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'type' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].type = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'unit' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].unit = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'description' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].description = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'tables' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].tables = "invalid" + ConfigModel.model_validate(columns) + + # Invalid type for 'topics' in Column + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].topics = "invalid" + ConfigModel.model_validate(columns) + + # Invalid type for 'name' in Topic + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].topics[0].name = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'fields' in Topic + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].topics[0].fields = "invalid" + ConfigModel.model_validate(columns) + + # Invalid type for 'name' in Field + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].topics[0].fields[0].name = 123 + ConfigModel.model_validate(columns) + + # Invalid type for 'is_array' in Field + with pytest.raises(ValidationError): + columns = get_columns() + columns[0].topics[0].fields[0].is_array = "invalid" + ConfigModel.model_validate(columns) From 35180d673ae08000b8929042552822fac6cc555a Mon Sep 17 00:00:00 2001 From: Glauber Costa Vila Verde Date: Mon, 24 Jun 2024 10:30:59 -0700 Subject: [PATCH 2/2] fixed lint --- python/lsst/consdb/dao/base.py | 42 ----------------- python/lsst/consdb/dao/exposure_efd.py | 18 +++++--- python/lsst/consdb/dao/visit_efd.py | 3 +- .../lsst/consdb/efd_transform/config_model.py | 3 +- python/lsst/consdb/efd_transform/summary.py | 45 ++++++++++--------- 5 files changed, 39 insertions(+), 72 deletions(-) diff --git a/python/lsst/consdb/dao/base.py b/python/lsst/consdb/dao/base.py index 3c736d77..a3d07382 100644 --- a/python/lsst/consdb/dao/base.py +++ b/python/lsst/consdb/dao/base.py @@ -326,45 +326,3 @@ def stm_to_str(self, stm, with_parameters=False): sql = sql.replace("\n", " ").replace("\r", "") return sql - - # Possible faster solution specifically for postgresql using COPY. - # def import_with_copy_expert(self, sql, data): - # """ - # This method is recommended for importing large volumes of data. using the postgresql COPY method. - - # The method is useful to handle all the parameters that PostgreSQL makes available - # in COPY statement: https://www.postgresql.org/docs/current/sql-copy.html - - # it is necessary that the from clause is reading from STDIN. - - # example: - # sql = COPY ( int: Args: df (pandas.DataFrame): The DataFrame to be upserted. - commit_every (int, optional): The number of rows to commit at a time. Defaults to 100. + commit_every (int, optional): The number of rows to commit + at a time. Defaults to 100. Returns: int: The number of rows upserted. diff --git a/python/lsst/consdb/dao/visit_efd.py b/python/lsst/consdb/dao/visit_efd.py index acfe4673..fa55fe9e 100644 --- a/python/lsst/consdb/dao/visit_efd.py +++ b/python/lsst/consdb/dao/visit_efd.py @@ -73,7 +73,8 @@ def upsert(self, df: pandas.DataFrame, commit_every: int = 100) -> int: Args: df (pandas.DataFrame): The DataFrame containing the data to upsert. - commit_every (int, optional): The number of rows to commit at once. Defaults to 100. + commit_every (int, optional): The number of rows to commit at once. + Defaults to 100. Returns: int: The number of rows upserted. diff --git a/python/lsst/consdb/efd_transform/config_model.py b/python/lsst/consdb/efd_transform/config_model.py index d734c0f0..1ae7c78d 100644 --- a/python/lsst/consdb/efd_transform/config_model.py +++ b/python/lsst/consdb/efd_transform/config_model.py @@ -9,7 +9,8 @@ class Field(BaseModel): Attributes: name (str): The name of the field. - is_array (bool, optional): Indicates if the field is an array. Defaults to False. + is_array (bool, optional): Indicates if the field is an array. + Defaults to False. """ name: str diff --git a/python/lsst/consdb/efd_transform/summary.py b/python/lsst/consdb/efd_transform/summary.py index 58a98823..3d283dc7 100644 --- a/python/lsst/consdb/efd_transform/summary.py +++ b/python/lsst/consdb/efd_transform/summary.py @@ -29,7 +29,8 @@ def apply(self, method_name: str) -> Union[float, None]: method_name: Name of the method to apply. Returns: - The result of the transformation method or None if the method is not found. + The result of the transformation method or None if the + method is not found. """ method = getattr(self, method_name, None) if method: @@ -59,7 +60,7 @@ def col_mean(self) -> float: if numpy.size(self.values) == 0: return numpy.nan - return numpy.nanmean(self.values, axis=0) + return numpy.nanmean(self.values, axis=0) def std(self, ddof: Optional[int] = 1) -> float: """ @@ -73,7 +74,7 @@ def std(self, ddof: Optional[int] = 1) -> float: """ if numpy.size(self.values) == 0: return numpy.nan - + return numpy.nanstd(self.values, ddof=ddof) def col_std(self, ddof: Optional[int] = 1) -> float: @@ -88,8 +89,8 @@ def col_std(self, ddof: Optional[int] = 1) -> float: """ if numpy.size(self.values) == 0: return numpy.nan - - return numpy.nanstd(self.values, ddof=ddof, axis=0) + + return numpy.nanstd(self.values, ddof=ddof, axis=0) def max(self) -> Union[float, int, bool]: """ @@ -113,7 +114,7 @@ def col_max(self) -> Union[float, int, bool]: if numpy.size(self.values) == 0: return numpy.nan - return numpy.nanmax(self.values, axis=0) + return numpy.nanmax(self.values, axis=0) def min(self) -> Union[float, int, bool]: """ @@ -137,7 +138,7 @@ def col_min(self) -> Union[float, int, bool]: if numpy.size(self.values) == 0: return numpy.nan - return numpy.nanmin(self.values, axis=0) + return numpy.nanmin(self.values, axis=0) def logical_and(self) -> Union[bool, numpy.ndarray]: """ @@ -148,7 +149,7 @@ def logical_and(self) -> Union[bool, numpy.ndarray]: """ if numpy.size(self.values) == 0: return numpy.nan - + return numpy.all(self.values) def col_logical_and(self) -> Union[bool, numpy.ndarray]: @@ -160,8 +161,8 @@ def col_logical_and(self) -> Union[bool, numpy.ndarray]: """ if numpy.size(self.values) == 0: return numpy.nan - - return numpy.all(self.values, axis=0) + + return numpy.all(self.values, axis=0) def logical_or(self) -> Union[bool, numpy.ndarray]: """ @@ -172,7 +173,7 @@ def logical_or(self) -> Union[bool, numpy.ndarray]: """ if numpy.size(self.values) == 0: return numpy.nan - + return numpy.any(self.values) def col_logical_or(self) -> Union[bool, numpy.ndarray]: @@ -184,8 +185,8 @@ def col_logical_or(self) -> Union[bool, numpy.ndarray]: """ if numpy.size(self.values) == 0: return numpy.nan - - return numpy.any(self.values, axis=0) + + return numpy.any(self.values, axis=0) def logical_not(self) -> numpy.ndarray: """ @@ -196,7 +197,7 @@ def logical_not(self) -> numpy.ndarray: """ if numpy.size(self.values) == 0: return numpy.nan - + return ~numpy.all(self.values) def col_logical_not(self) -> numpy.ndarray: @@ -208,8 +209,8 @@ def col_logical_not(self) -> numpy.ndarray: """ if numpy.size(self.values) == 0: return numpy.nan - - return ~numpy.all(self.values, axis=0) + + return ~numpy.all(self.values, axis=0) def comma_unique(self) -> str: """ @@ -222,21 +223,21 @@ def comma_unique(self) -> str: """ if numpy.size(self.values) == 0: return numpy.nan - values = self.values.split(',') - return ','.join(numpy.unique(values)) + values = self.values.split(",") + return ",".join(numpy.unique(values)) def semicolon_unique(self) -> str: """ Returns a string with semicolon-separated unique values. If the input string is empty, it returns NaN. - This method splits the input string by semicolons and returns a new string - with only the unique values, separated by semicolons. + This method splits the input string by semicolons and returns a + new string with only the unique values, separated by semicolons. Returns: str: A string with semicolon-separated unique values. """ if numpy.size(self.values) == 0: return numpy.nan - values = self.values.split(';') - return ';'.join(numpy.unique(values)) + values = self.values.split(";") + return ";".join(numpy.unique(values))