Skip to content

Commit

Permalink
Implement SQL.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed May 1, 2024
1 parent 9a1daa2 commit 80352ae
Showing 1 changed file with 71 additions and 72 deletions.
143 changes: 71 additions & 72 deletions python/lsst/consdb/pqserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import sqlalchemy.exc
import werkzeug.exceptions
import sqlalchemy
from flask import Flask, jsonify, request
from sqlalchemy import MetaData

from utils import setup_logging, setup_postgres

Expand Down Expand Up @@ -52,23 +50,22 @@ def __init__(self):
self.schemas = dict()
self.flexible_metadata_schemas = dict()
for instrument in INSTRUMENT_LIST:
md = MetaData(schema=f"cdb_{instrument}")
md = sqlalchemy.MetaData(schema=f"cdb_{instrument}")
md.reflect(engine)
self.table_names.update([str(table) for table in md.tables])
self.schemas[instrument] = md
self.flexible_metadata_schemas[instrument] = dict()
for obs_type in OBS_TYPE_LIST:
table_name = f"cdb_{instrument}.{obs_type}_flexdata"
schema_table_name = table_name + "_schema"
if table_name in self.schemas and schema_table_name in self.schemas:
logger.debug(f"SELECT key, dtype, doc FROM {schema_table_name}")
# query = engine.select(["key", "dtype", "doc"])
# .table(schema_name)
# self.flexible_metadata_schemas[instrument][obs_type] = {
# key: (dtype, doc)
# for key, dtype, doc
# in engine.execute(query)
# }
if table_name in md.tables and schema_table_name in md.tables:
schema_table = md.tables[schema_table_name]
stmt = sqlalchemy.select(schema_table.c["key", "dtype", "doc"])
with engine.connect() as conn:
for row in conn.execute(stmt):
self.flexible_metadata_schemas[instrument][obs_type] = {
row[0]: (row[1], row[2])
}

def compute_flexible_metadata_table_name(self, instrument: str, obs_type: str):
if instrument not in self.flexible_metadata_schemas:
Expand All @@ -77,7 +74,9 @@ def compute_flexible_metadata_table_name(self, instrument: str, obs_type: str):
)
if obs_type not in self.flexible_metadata_schemas[instrument]:
raise BadValueException(
"observation type", obs_type, list(self.flexible_metadata_schemas.keys())
"observation type",
obs_type,
list(self.flexible_metadata_schemas.keys()),
)
return f"cdb_{instrument}.{obs_type}_flexdata"

Expand All @@ -87,6 +86,16 @@ def compute_flexible_metadata_table_schema_name(
table_name = self.compute_flexible_metadata_table_name(instrument, obs_type)
return table_name + "_schema"

def get_flexible_metadata_table(self, instrument: str, obs_type: str):
table_name = self.compute_flexible_metadata_table_name(instrument, obs_type)
return self.schemas[instrument].tables[table_name]

def get_flexible_metadata_schema(self, instrument: str, obs_type: str):
table_name = self.compute_flexible_metadata_table_schema_name(
instrument, obs_type
)
return self.schemas[instrument].tables[table_name]


instrument_tables = InstrumentTables()

Expand Down Expand Up @@ -118,11 +127,6 @@ def handle_bad_value(e: BadValueException):
return jsonify(e.to_dict()), e.status_code


@app.errorhandler(werkzeug.exceptions.NotFound)
def handle_not_found(e):
logger.error(request)
return e, 404

###################################
# Web service application methods #
###################################
Expand Down Expand Up @@ -154,93 +158,92 @@ def root2():
def add_flexible_metadata_key(instrument: str, obs_type: str):
logger.info(request)
info = request.json
schema_table = instrument_tables.compute_flexible_metadata_schema_table_name(
instrument, obs_type
)
schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type)
key = info["key"]
dtype = info["dtype"]
if dtype not in DTYPE_LIST:
raise BadValueException("dtype", dtype, DTYPE_LIST)
doc = info["doc"]
logger.debug(
f"INSERT key, dtype, doc INTO {schema_table} VALUES ('{key}', '{dtype}', '{doc}')"
)
# with engine.conn() as session:
# session.insert(schema_table, [key, dtype, doc])
stmt = sqlalchemy.insert(schema_table).values(key=key, dtype=dtype, doc=doc)
logger.debug(str(stmt))
with engine.connect() as conn:
_ = conn.execute(stmt)
conn.commit()
return ("OK", 200)


@app.get("/consdb/flex/<instrument>/<obs_type>/schema")
def get_flexible_metadata_keys(instrument: str, obs_type: str):
logger.info(request)
table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
schema_table = instrument_tables.get_flexible_metadata_schema(instrument, obs_type)
key_descriptions = []
logger.debug(f"SELECT key, dtype, doc FROM {table}")
# key_descriptions = select key, dtype, doc from table
stmt = sqlalchemy.select(schema_table.c["key", "dtype", "doc"])
logger.debug(str(stmt))
with engine.connect() as conn:
for row in conn.execute(stmt):
key_descriptions.append(row)
return jsonify(key_descriptions)


@app.get("/consdb/flex/<instrument>/<obs_type>/obs/<int:obs_id>")
def get_flexible_metadata(instrument: str, obs_type: str, obs_id: int):
logger.info(request)
table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
table = instrument_tables.get_flexible_metadata_table(instrument, obs_type)
result = dict()
stmt = sqlalchemy.select(table.c["key", "value"]).where(table.c.obs_id == obs_id)
if request.args:
cols = request.args["k"]
logger.debug(
f"SELECT key, value FROM {table} WHERE obs_id = {obs_id} AND key in ('{cols}')"
)
# query = engine.select(["key", "value"])
# .from(table)
# .where("obs_id = :obs_id and key in (:cols)", obs_id, cols)
else:
logger.debug(f"SELECT key, value FROM {table} WHERE obs_id = {obs_id}")
# query = engine.select(["key", "value"])
# .from(table).where("obs_id = :obs_id", obs_id)
# for key, value in engine.execute(query):
# result[key] = value
stmt = stmt.where(table.c.key in cols)
logger.debug(str(stmt))
with engine.connect() as conn:
for row in conn.execute(stmt):
result[row[0]] = row[1]
return jsonify(result)


@app.post("/consdb/flex/<instrument>/<obs_type>/obs/<int:obs_id>")
def insert_flexible_metadata(instrument: str, obs_type: str, obs_id: int):
logger.info(request)
info = request.json
table = instrument_tables.compute_flexible_metadata_table_name(instrument, obs_type)
table = instrument_tables.get_flexible_metadata_table(instrument, obs_type)
schema = instrument_tables.flexible_metadata_schemas[instrument][obs_type]
key = info["key"]
if key not in schema:
raise BadValueException("key", key, list(schema.keys()))
value = info["value"]

# check value against dtype
logger.debug(
f"INSERT obs_id, key, value INTO {table} VALUES ({obs_id}, '{key}', '{value}')"
)
# for key, value in info.items():
# engine.insert(table, obs_id, key, value)
dtype = schema[key][0]
if dtype == "int" and str(int(value)) != value:
raise BadValueException("int value", value, [])
elif dtype == "float" and str(float(value)) != value:
raise BadValueException("float value", value, [])

stmt = sqlalchemy.insert(table).values(obs_id=obs_id, key=key, value=value)
logger.debug(str(stmt))
with engine.connect() as conn:
_ = conn.execute(stmt)
conn.commit()
return ("OK", 200)


@app.post("/consdb/insert/<instrument>")
def insert(instrument: str):
def insert(instrument: str, upsert: bool = False):
logger.info(request)
if instrument not in instrument_tables.schemas:
raise BadValueException(
"instrument", instrument, list(instrument_tables.schemas.keys())
)
info = request.json
table = instrument + "." + info["table"]
table_name = f"cdb_{instrument}." + info["table"]
table = instrument_tables.schemas[instrument].tables[table_name]
valdict = info["values"]
keylist = list(valdict.keys())
valuelist = list(valdict.values())
# check schema

logger.debug(f"INSERT {keylist} INTO {table} VALUES ({valuelist})")
# with engine.begin() as conn:
# conn.exec_driver_sql(
# f"INSERT INTO ? ({placeholders}) VALUES ({placeholders})",
# [table] + keylist + valuelist,
# )

stmt = sqlalchemy.insert(table).values(valdict)
logger.debug(str(stmt))
with engine.connect() as conn:
_ = conn.execute(stmt)
conn.commit()
return ("OK", 200)


Expand All @@ -254,24 +257,20 @@ def upsert(instrument: str):
info = request.json
table = instrument + "." + info["table"]
valdict = info["values"]
keylist = list(valdict.keys())
valuelist = list(valdict.values())
# check schema

logger.debug(f"UPDATE {keylist} INTO {table} VALUES ({valuelist})")
# with engine.begin() as conn:
# conn.exec_driver_sql(
# f"UPDATE INTO {table} ({placeholders}) VALUES ({placeholders})",
# [table] + keylist + valuelist,
# )

stmt = sqlalchemy.update(table).values(valdict)
logger.debug(str(stmt))
with engine.connect() as conn:
_ = conn.execute(stmt)
conn.commit()
return ("OK", 200)


@app.post("/consdb/query")
def query():
logger.info(request)
info = request.json
with engine.begin() as conn:
with engine.connect() as conn:
try:
cursor = conn.exec_driver_sql(info["query"])
first = True
Expand Down

0 comments on commit 80352ae

Please sign in to comment.