Skip to content

Commit

Permalink
Fix pre-commit issues across the transform_efd codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
rcboufleur committed Jan 17, 2025
1 parent c4115fd commit cc46130
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 198 deletions.
4 changes: 1 addition & 3 deletions python/lsst/consdb/efd_transform/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ def validate_tables_when_unpivoted(cls, model):
"""
# Here 'model' is an instance of 'Column'
if model.store_unpivoted:
invalid_tables = [
table for table in model.tables if table not in UNPIVOTED_TABLES
]
invalid_tables = [table for table in model.tables if table not in UNPIVOTED_TABLES]
if invalid_tables:
raise ValueError(
f"When 'store_unpivoted' is True, only {UNPIVOTED_TABLES} "
Expand Down
16 changes: 4 additions & 12 deletions python/lsst/consdb/efd_transform/dao/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ def get_table(self, tablename, schema=None) -> Table:
metadata.reflect(engine)

if schema is not None and self.dialect == postgresql:
tbl = Table(
tablename, metadata, autoload_with=self.get_con(), schema=schema
)
tbl = Table(tablename, metadata, autoload_with=self.get_con(), schema=schema)
else:
tbl = Table(tablename, metadata, autoload_with=self.get_con())
return tbl
Expand Down Expand Up @@ -237,9 +235,7 @@ def execute_count(self, table: Table) -> int:
stm = func.count().select().select_from(table)
return con.scalar(stm)

def execute_upsert(
self, tbl: Table, df: pandas.DataFrame, commit_every: int = 100
) -> int:
def execute_upsert(self, tbl: Table, df: pandas.DataFrame, commit_every: int = 100) -> int:
"""Execute an upsert operation on the given table using the provided DataFrame.

Check failure on line 239 in python/lsst/consdb/efd_transform/dao/base.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

W505

doc line too long (87 > 79 characters)
Args:
Expand Down Expand Up @@ -273,9 +269,7 @@ def execute_upsert(
# List of columns without primary keys.
# These columns will be updated in case of conflict.
update_cols = [
c.name
for c in tbl.c
if c not in list(tbl.primary_key.columns) and c.name in insert_cols
c.name for c in tbl.c if c not in list(tbl.primary_key.columns) and c.name in insert_cols
]

# Convert the dataframe to a list of dicts
Expand Down Expand Up @@ -306,9 +300,7 @@ def execute_upsert(

return affected_rows

def execute_bulk_insert(
self, tbl: Table, df: pandas.DataFrame, commit_every: int = 100
) -> int:
def execute_bulk_insert(self, tbl: Table, df: pandas.DataFrame, commit_every: int = 100) -> int:
"""Insert data in bulk into the specified table using the provided DataFrame.

Check failure on line 304 in python/lsst/consdb/efd_transform/dao/base.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

W505

doc line too long (85 > 79 characters)
Args:
Expand Down
6 changes: 2 additions & 4 deletions python/lsst/consdb/efd_transform/dao/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ def exposures_by_period(
"""
where_clause = (
f"instrument=instr and exposure.timespan OVERLAPS "
f"(T'{start_time}/utc', T'{end_time}/utc')"
f"instrument=instr and exposure.timespan OVERLAPS " f"(T'{start_time}/utc', T'{end_time}/utc')"
)

resultset = self.butler.registry.queryDimensionRecords(
Expand Down Expand Up @@ -106,8 +105,7 @@ def visits_by_period(
"""
where_clause = (
f"instrument=instr and visit.timespan OVERLAPS "
f"(T'{start_time}/utc', T'{end_time}/utc')"
f"instrument=instr and visit.timespan OVERLAPS " f"(T'{start_time}/utc', T'{end_time}/utc')"
)

resultset = self.butler.registry.queryDimensionRecords(
Expand Down
44 changes: 11 additions & 33 deletions python/lsst/consdb/efd_transform/dao/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ def get_fields(self, topic_name):
"""
try:
# data = self.query(f'SHOW FIELD KEYS FROM "{topic_name}"')
data = self.query(
f'SHOW FIELD KEYS FROM "{self.database_name}"."autogen"."{topic_name}" '
)
data = self.query(f'SHOW FIELD KEYS FROM "{self.database_name}"."autogen"."{topic_name}" ')
field_keys = []
if "results" in data:
for result in data["results"]:
Expand Down Expand Up @@ -145,17 +143,12 @@ def _make_fields(self, fields, base_fields):
n = None
for bfield in base_fields:
for f in fields:
if (
f.startswith(bfield) and f[len(bfield) :].isdigit()
): # Check prefix is complete
if f.startswith(bfield) and f[len(bfield) :].isdigit(): # Check prefix is complete
ret.setdefault(bfield, []).append(f)
if n is None:
n = len(ret[bfield])
if n != len(ret[bfield]):
raise ValueError(
f"Field lengths do not agree for "
f"{bfield}: {n} vs. {len(ret[bfield])}"
)
raise ValueError(f"Field lengths do not agree for " f"{bfield}: {n} vs. {len(ret[bfield])}")

def sorter(prefix, val):
return int(val[len(prefix) :])
Expand Down Expand Up @@ -247,18 +240,13 @@ def _merge_packed_time_series(
"""
packed_fields = [
k
for k in packed_dataframe.keys()
if k.startswith(base_field) and k[len(base_field) :].isdigit()
k for k in packed_dataframe.keys() if k.startswith(base_field) and k[len(base_field) :].isdigit()
]
packed_fields = sorted(
packed_fields, key=lambda k: int(k[len(base_field) :])
) # sort by pack ID
packed_fields = sorted(packed_fields, key=lambda k: int(k[len(base_field) :])) # sort by pack ID
npack = len(packed_fields)
if npack % stride != 0:
raise RuntimeError(
"Stride must be a factor of the number of packed fields: "
f"{stride} v. {npack}"
"Stride must be a factor of the number of packed fields: " f"{stride} v. {npack}"
)
packed_len = len(packed_dataframe)
n_used = npack // stride # number of raw fields being used
Expand All @@ -269,8 +257,7 @@ def _merge_packed_time_series(
dt = 0
else:
dt = (
packed_dataframe[ref_timestamp_col].iloc[1]
- packed_dataframe[ref_timestamp_col].iloc[0]
packed_dataframe[ref_timestamp_col].iloc[1] - packed_dataframe[ref_timestamp_col].iloc[0]
) / npack
for i in range(0, npack, stride):
i0 = i // stride
Expand Down Expand Up @@ -411,9 +398,7 @@ def _to_dataframe(self, response: dict) -> pd.DataFrame:

return result

def build_time_range_query(
self, topic_name, fields, start, end, index=None, use_old_csc_indexing=False
):
def build_time_range_query(self, topic_name, fields, start, end, index=None, use_old_csc_indexing=False):
"""Build a query based on a time range.
Parameters
Expand Down Expand Up @@ -452,10 +437,7 @@ def build_time_range_query(
start_str = start.isot
end_str = end.isot
else:
raise TypeError(
"The second time argument must be the time stamp for the end "
"or a time delta."
)
raise TypeError("The second time argument must be the time stamp for the end " "or a time delta.")
index_str = ""
if index:
if use_old_csc_indexing:
Expand Down Expand Up @@ -522,9 +504,7 @@ def select_time_series(
A `~pandas.DataFrame` containing the results of the query.
"""
query = self.build_time_range_query(
topic_name, fields, start, end, index, use_old_csc_indexing
)
query = self.build_time_range_query(topic_name, fields, start, end, index, use_old_csc_indexing)
response = self.query(query)

if "series" not in response["results"][0]:
Expand Down Expand Up @@ -670,6 +650,4 @@ def __init__(
path = os.getenv("EFD_PATH", "/influxdb-enterprise-data/")
url = urljoin(f"https://{host}:{port}", f"{path}")

super(InfluxDbDao, self).__init__(
url, database_name=database_name, username=user, password=password
)
super(InfluxDbDao, self).__init__(url, database_name=database_name, username=user, password=password)
19 changes: 4 additions & 15 deletions python/lsst/consdb/efd_transform/dao/transformd.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ def task_started(self, id: int):
id,
{
"status": "running",
"process_start_time": datetime.now(timezone.utc).replace(
tzinfo=timezone.utc
),
"process_start_time": datetime.now(timezone.utc).replace(tzinfo=timezone.utc),
"process_end_time": None,
"process_exec_time": 0,
"exposures": 0,
Expand Down Expand Up @@ -272,9 +270,7 @@ def task_retries_increment(self, id: int):
except Exception as e:
raise Exception(f"Error updating task retries: {e}")

def select_next(
self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
) -> Dict:
def select_next(self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None) -> Dict:
"""Select the next pending record within the specified time range.
Args:
Expand All @@ -298,12 +294,7 @@ def select_next(
if end_time is not None:
clauses.append(self.tbl.c.end_time <= end_time)

stm = (
select(self.tbl.c)
.where(and_(*clauses))
.order_by(self.tbl.c.start_time)
.limit(1)
)
stm = select(self.tbl.c).where(and_(*clauses)).order_by(self.tbl.c.start_time).limit(1)

# print(self.debug_query(stm, with_parameters=True))
return self.fetch_one_dict(stm)
Expand All @@ -326,9 +317,7 @@ def select_last(self) -> Dict:
# print(self.debug_query(stm, with_parameters=True))
return self.fetch_one_dict(stm)

def select_recent(
self, end_time: datetime, limit: Optional[int] = None
) -> list[Dict]:
def select_recent(self, end_time: datetime, limit: Optional[int] = None) -> list[Dict]:
"""Select recent records.
Args:
Expand Down
12 changes: 3 additions & 9 deletions python/lsst/consdb/efd_transform/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def build_argparser() -> argparse.ArgumentParser:
argparse.ArgumentParser: The argument parser object.
"""
parser = argparse.ArgumentParser(
description="Generate the schema for the EFD transform."
)
parser = argparse.ArgumentParser(description="Generate the schema for the EFD transform.")
parser.add_argument(
"--config",
type=str,
Expand Down Expand Up @@ -92,9 +90,7 @@ def build_argparser() -> argparse.ArgumentParser:
f.write(" description: Instrument name.\n")
# Iterate over columns in the config file
for column in config["columns"]:
if "exposure_efd" in column["tables"] and not column.get(
"store_unpivoted", False
):
if "exposure_efd" in column["tables"] and not column.get("store_unpivoted", False):
column_name = column["name"]
f.write(f' - name: "{column_name}"\n')
f.write(f' "@id": "#exposure_efd.{column_name}"\n')
Expand Down Expand Up @@ -189,9 +185,7 @@ def build_argparser() -> argparse.ArgumentParser:
f.write(" description: Instrument name.\n")
# Iterate over columns in the config file
for column in config["columns"]:
if "visit1_efd" in column["tables"] and not column.get(
"store_unpivoted", False
):
if "visit1_efd" in column["tables"] and not column.get("store_unpivoted", False):
column_name = column["name"]
f.write(f' - name: "{column_name}"\n')
f.write(f' "@id": "#visit1_efd.{column_name}"\n')
Expand Down
22 changes: 5 additions & 17 deletions python/lsst/consdb/efd_transform/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,21 @@ def create_tasks(
# process interval.
if last_task is None:
start_time = (
datetime.now(tz=timezone.utc)
.replace(second=0, microsecond=0)
.replace(tzinfo=None)
datetime.now(tz=timezone.utc).replace(second=0, microsecond=0).replace(tzinfo=None)
)
start_time = (
Time(start_time.isoformat(), format="isot", scale="utc")
- proccess_interval_seconds
Time(start_time.isoformat(), format="isot", scale="utc") - proccess_interval_seconds
)
# Otherwise, start is the last task end time - time window.
else:
start_time = last_task["end_time"]
start_time = (
Time(start_time.isoformat(), format="isot", scale="utc")
- time_window_seconds
)
start_time = Time(start_time.isoformat(), format="isot", scale="utc") - time_window_seconds

if end_time is None:
# Considering that in real-time execution, the final time must be
# at most now. It is necessary to subtract the time window to
# ensure that the last task is less than now.
end_time = (
datetime.now(tz=timezone.utc)
.replace(second=0, microsecond=0)
.replace(tzinfo=None)
)
end_time = datetime.now(tz=timezone.utc).replace(second=0, microsecond=0).replace(tzinfo=None)
end_time = Time(end_time.isoformat(), format="isot", scale="utc")
end_time = end_time - time_window_seconds
# allows creating tasks for the future.
Expand Down Expand Up @@ -283,9 +273,7 @@ def next_task_to_run(

# Ensure that the task's end time is not greater than the current time.
if task is not None and task["end_time"] > datetime.now(timezone.utc):
self.log.debug(
f"Task end time {task['end_time']} is in the future. Skipping task."
)
self.log.debug(f"Task end time {task['end_time']} is in the future. Skipping task.")
return None

return task
12 changes: 3 additions & 9 deletions python/lsst/consdb/efd_transform/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,11 @@ def __init__(
if not isinstance(exposure_start, Time) or not isinstance(exposure_end, Time):
raise ValueError("Exposure times must be astropy.time.Time objects.")
if exposure_start >= exposure_end:
raise ValueError(
"Exposure start time must be earlier than exposure end time."
)
raise ValueError("Exposure start time must be earlier than exposure end time.")

df_time = Time(dataframe.index.to_pydatetime())
if exposure_start > df_time[-1] or exposure_end < df_time[0]:
raise ValueError(
"The DataFrame index must encompass the exposure time range."
)
raise ValueError("The DataFrame index must encompass the exposure time range.")

dataframe = dataframe.dropna().convert_dtypes()
if not all(
Expand All @@ -70,9 +66,7 @@ def __init__(
):
raise ValueError("All columns in the DataFrame must be numeric or boolean.")

self.data_array = (
dataframe.to_numpy(dtype=datatype) if datatype else dataframe.to_numpy()
)
self.data_array = dataframe.to_numpy(dtype=datatype) if datatype else dataframe.to_numpy()
self.timestamps = dataframe.index
self.exposure_start = exposure_start
self.exposure_end = exposure_end
Expand Down
Loading

0 comments on commit cc46130

Please sign in to comment.