Skip to content

Commit

Permalink
Merge pull request #23 from lsst-dm/u/glauber/influxdb-api
Browse files Browse the repository at this point in the history
Influx DB API
  • Loading branch information
glaubervila authored Jun 24, 2024
2 parents 3e94b1e + 35180d6 commit b409196
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 88 deletions.
7 changes: 6 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@
79,
110
],
}
},
"python.analysis.extraPaths": [
"./python",
"./python",
"./python/lsst"
]
}
42 changes: 0 additions & 42 deletions python/lsst/consdb/dao/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,45 +326,3 @@ def stm_to_str(self, stm, with_parameters=False):
sql = sql.replace("\n", " ").replace("\r", "")

return sql

# Possible faster solution specifically for postgresql using COPY.
# def import_with_copy_expert(self, sql, data):
# """
# This method is recommended for importing large volumes of data. using the postgresql COPY method.

# The method is useful to handle all the parameters that PostgreSQL makes available
# in COPY statement: https://www.postgresql.org/docs/current/sql-copy.html

# it is necessary that the from clause is reading from STDIN.

# example:
# sql = COPY <table> (<columns) FROM STDIN with (FORMAT CSV, DELIMITER '|', HEADER);

# Parameters:
# sql (str): The sql statement should be in the form COPY table '.
# data (file-like ): a file-like object to read or write
# Returns:
# rowcount (int): the number of rows that the last execute*() produced (for DQL statements like SELECT) or affected (for DML statements like UPDATE or INSERT)

# References:
# https://www.psycopg.org/docs/cursor.html#cursor.copy_from
# https://stackoverflow.com/questions/30050097/copy-data-from-csv-to-postgresql-using-python
# https://stackoverflow.com/questions/13125236/sqlalchemy-psycopg2-and-postgresql-copy
# """

# with warnings.catch_warnings():
# warnings.simplefilter("ignore", category=sa_exc.SAWarning)

# connection = self.get_db_engine().raw_connection()
# try:
# cursor = connection.cursor()
# cursor.copy_expert(sql, data)
# connection.commit()

# cursor.close()
# return cursor.rowcount
# except Exception as e:
# connection.rollback()
# raise (e)
# finally:
# connection.close()
18 changes: 12 additions & 6 deletions python/lsst/consdb/dao/exposure_efd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

class ExposureEfdDao(DBBase):
"""
A class representing a Data Access Object (DAO) for accessing ExposureEFD data.
A class representing a Data Access Object (DAO) for accessing
ExposureEFD data.
Args:
db_uri (str): The URI of the database.
Attributes:
tbl: The table object representing the "ExposureEFD" table in the database.
tbl: The table object representing the "ExposureEFD" table in the
database.
"""

Expand All @@ -28,7 +30,8 @@ def get_by_exposure_id(self, exposure_id: int):
exposure_id (int): The exposure ID.
Returns:
list: A list of dictionaries representing the rows retrieved from the table.
list: A list of dictionaries representing the rows retrieved from
the table.
"""
stm = select(self.tbl.c).where(and_(self.tbl.c.exposure_id == exposure_id))
Expand All @@ -39,14 +42,16 @@ def get_by_exposure_id(self, exposure_id: int):

def get_by_exposure_id_instrument(self, exposure_id: int, instrument: str):
"""
Retrieves rows from the "ExposureEFD" table based on exposure ID and instrument.
Retrieves rows from the "ExposureEFD" table based on exposure ID and
instrument.
Args:
exposure_id (int): The exposure ID.
instrument (str): The instrument name.
Returns:
list: A list of dictionaries representing the rows retrieved from the table.
list: A list of dictionaries representing the rows retrieved from
the table.
"""
stm = select(self.tbl.c).where(
Expand All @@ -73,7 +78,8 @@ def upsert(self, df: pandas.DataFrame, commit_every: int = 100) -> int:
Args:
df (pandas.DataFrame): The DataFrame to be upserted.
commit_every (int, optional): The number of rows to commit at a time. Defaults to 100.
commit_every (int, optional): The number of rows to commit
at a time. Defaults to 100.
Returns:
int: The number of rows upserted.
Expand Down
164 changes: 164 additions & 0 deletions python/lsst/consdb/dao/influxdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from urllib.parse import urljoin

import astropy.time
import pandas as pd
import requests
from astropy.time import Time
from lsst_efd_client.auth_helper import NotebookAuth


class InfluxDBClient:
"""A simple InfluxDB client.
Parameters
----------
url : str
The URL of the InfluxDB API.
database_name : str
The name of the database to query.
username : str, optional
The username to authenticate with.
password : str, optional
The password to authenticate with.
"""

def __init__(
self,
url: str,
database_name: str,
username: str | None = None,
password: str | None = None,
) -> None:
self.url = url
self.database_name = database_name
self.auth = (username, password) if username and password else None

def query(self, query: str) -> dict:
"""Send a query to the InfluxDB API."""

params = {"db": self.database_name, "q": query}
try:
response = requests.get(f"{self.url}/query", params=params, auth=self.auth)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
raise Exception(f"An error occurred: {exc}") from exc

def _to_dataframe(self, response: dict) -> pd.DataFrame:
"""Convert an InfluxDB response to a Pandas dataframe.
Parameters
----------
response : dict
The JSON response from the InfluxDB API.
"""
# One query submitted at a time
statement = response["results"][0]
# One topic queried at a time
series = statement["series"][0]
result = pd.DataFrame(series.get("values", []), columns=series["columns"])
if "time" not in result.columns:
return result
result = result.set_index(pd.to_datetime(result["time"])).drop("time", axis=1)
if result.index.tzinfo is None:
result.index = result.index.tz_localize("UTC")
if "tags" in series:
for k, v in series["tags"].items():
result[k] = v
if "name" in series:
result.name = series["name"]
return result

def build_time_range_query(self, topic_name, fields, start, end, index=None, use_old_csc_indexing=False):
"""Build a query based on a time range.
Parameters
----------
topic_name : `str`
Name of topic for which to build a query.
fields : `str` or `list`
Name of field(s) to query.
start : `astropy.time.Time`
Start time of the time range.
end : `astropy.time.Time`
End time of the range either as an absolute time.
index : `int`, optional
When index is used, add an 'AND salIndex = index' to the query.
(default is `None`).
use_old_csc_indexing: `bool`, optional
When index is used, add an 'AND {CSCName}ID = index' to the query
which is the old CSC indexing name.
(default is `False`).
Returns
-------
query : `str`
A string containing the constructed query statement.
"""
if not isinstance(start, Time):
raise TypeError("The first time argument must be a time stamp")

if not start.scale == "utc":
raise ValueError("Timestamps must be in UTC.")

elif isinstance(end, Time):
if not end.scale == "utc":
raise ValueError("Timestamps must be in UTC.")
start_str = start.isot
end_str = end.isot
else:
raise TypeError("The second time argument must be the time stamp for the end " "or a time delta.")
index_str = ""
if index:
if use_old_csc_indexing:
parts = topic_name.split(".")
index_name = f"{parts[-2]}ID" # The CSC name is always the penultimate
else:
index_name = "salIndex"
index_str = f" AND {index_name} = {index}"
timespan = f"time >= '{start_str}Z' AND time <= '{end_str}Z'{index_str}" # influxdb demands last Z

if isinstance(fields, str):
fields = [
fields,
]
elif isinstance(fields, bytes):
fields = fields.decode()
fields = [
fields,
]

# Build query here
return f'SELECT {", ".join(fields)} FROM "{self.database_name}"."autogen"."{topic_name}" WHERE {timespan}'

def select_time_series(
self,
topic_name,
fields,
start: astropy.time.Time,
end: astropy.time.Time,
index=None,
use_old_csc_indexing=False,
):

query = self.build_time_range_query(topic_name, fields, start, end, index, use_old_csc_indexing)

response = self.query(query)

if "series" not in response["results"][0]:
return pd.DataFrame()

return self._to_dataframe(response)


class InfluxDbDao(InfluxDBClient):

def __init__(
self, efd_name, database_name="efd", creds_service="https://roundtable.lsst.codes/segwarides/"
):
auth = NotebookAuth(service_endpoint=creds_service)
host, schema_registry_url, port, user, password, path = auth.get_auth(efd_name)

url = urljoin(f"https://{host}:{port}", f"{path}")

super(InfluxDbDao, self).__init__(url, database_name=database_name, username=user, password=password)
3 changes: 2 additions & 1 deletion python/lsst/consdb/dao/visit_efd.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def upsert(self, df: pandas.DataFrame, commit_every: int = 100) -> int:
Args:
df (pandas.DataFrame): The DataFrame containing the data to upsert.
commit_every (int, optional): The number of rows to commit at once. Defaults to 100.
commit_every (int, optional): The number of rows to commit at once.
Defaults to 100.
Returns:
int: The number of rows upserted.
Expand Down
Empty file.
3 changes: 2 additions & 1 deletion python/lsst/consdb/efd_transform/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class Field(BaseModel):
Attributes:
name (str): The name of the field.
is_array (bool, optional): Indicates if the field is an array. Defaults to False.
is_array (bool, optional): Indicates if the field is an array.
Defaults to False.
"""

name: str
Expand Down
Loading

0 comments on commit b409196

Please sign in to comment.