From 9d3d0a2bae1048cab23815bf833a9192d6130241 Mon Sep 17 00:00:00 2001 From: CamDavidsonPilon Date: Tue, 17 Dec 2024 21:38:16 -0500 Subject: [PATCH] remove default field; adding more http methods for broadcast; calibration work --- contrib/jobs/00_stirring.yaml | 1 - contrib/jobs/03_temperature_automation.yaml | 4 +- contrib/jobs/04_dosing_automation.yaml | 5 - contrib/jobs/05_leds.yaml | 1 - contrib/jobs/06_led_automation.yaml | 1 - contrib/jobs/13_pwms.yaml | 1 - contrib/jobs/50_self_test.yaml | 2 - contrib/jobs/background_job.yaml.example | 4 +- pioreactorui/__init__.py | 14 +- pioreactorui/api.py | 334 +++++++------------- pioreactorui/structs.py | 4 +- pioreactorui/tasks.py | 78 +++-- pioreactorui/unit_api.py | 134 +++++++- 13 files changed, 314 insertions(+), 269 deletions(-) diff --git a/contrib/jobs/00_stirring.yaml b/contrib/jobs/00_stirring.yaml index 8ee7bc62..ec8d6ee0 100644 --- a/contrib/jobs/00_stirring.yaml +++ b/contrib/jobs/00_stirring.yaml @@ -10,5 +10,4 @@ published_settings: label: Target stir RPM description: Modify the target RPM of stirring. This will effect the optical density reading. Too low and the stirring may completely stop. Too high and the resulting vortex may interfere with the optics. type: numeric - default: null display: true diff --git a/contrib/jobs/03_temperature_automation.yaml b/contrib/jobs/03_temperature_automation.yaml index e463b012..b5ea8f5f 100644 --- a/contrib/jobs/03_temperature_automation.yaml +++ b/contrib/jobs/03_temperature_automation.yaml @@ -10,10 +10,8 @@ published_settings: unit: ℃ label: Target temperature type: numeric - description: Change the target temperature. Lower bound is the ambient temperature, and upperbound is 50℃. Only used in when Thermostat automation is active. - default: null + description: Change the target temperature. Lower bound is the ambient temperature, and upper bound is 50℃. Only used in when Thermostat automation is active. display: true - key: automation_name type: string display: false - default: "" diff --git a/contrib/jobs/04_dosing_automation.yaml b/contrib/jobs/04_dosing_automation.yaml index bd588cf1..2e67bb54 100644 --- a/contrib/jobs/04_dosing_automation.yaml +++ b/contrib/jobs/04_dosing_automation.yaml @@ -10,31 +10,26 @@ published_settings: unit: min label: Time between dosing type: numeric - default: null display: true description: Change how long to wait between dilutions. Typically between 0.5 and 60 minutes. Changing may immediately trigger a dosing event. - key: volume unit: mL label: Volume / dosing type: numeric - default: null display: true description: Change the volume per dilution. Typical values are between 0.0mL and 2.0mL. - key: target_normalized_od unit: AU label: Target nOD type: numeric - default: null display: true description: Change the target normalized optical density. Typical values are between 1.0 AU and 100.0 AU. - key: target_od unit: OD label: Target OD type: numeric - default: null display: true description: Change the target optical density. Only used if running "Turbidostat Targeting OD" automation. - key: automation_name type: string display: false - default: "" diff --git a/contrib/jobs/05_leds.yaml b/contrib/jobs/05_leds.yaml index 4c6d3208..a3a1525f 100644 --- a/contrib/jobs/05_leds.yaml +++ b/contrib/jobs/05_leds.yaml @@ -8,4 +8,3 @@ published_settings: label: LED intensity type: string display: true - default: "" diff --git a/contrib/jobs/06_led_automation.yaml b/contrib/jobs/06_led_automation.yaml index 88ce3d83..cca001bf 100644 --- a/contrib/jobs/06_led_automation.yaml +++ b/contrib/jobs/06_led_automation.yaml @@ -9,4 +9,3 @@ published_settings: - key: automation_name type: string display: false - default: "" diff --git a/contrib/jobs/13_pwms.yaml b/contrib/jobs/13_pwms.yaml index a7761a74..43356572 100644 --- a/contrib/jobs/13_pwms.yaml +++ b/contrib/jobs/13_pwms.yaml @@ -8,4 +8,3 @@ published_settings: label: PWM intensity type: string display: true - default: "" diff --git a/contrib/jobs/50_self_test.yaml b/contrib/jobs/50_self_test.yaml index 931e35e5..99bc087c 100644 --- a/contrib/jobs/50_self_test.yaml +++ b/contrib/jobs/50_self_test.yaml @@ -21,7 +21,6 @@ published_settings: type: boolean - key: correlations_between_pds_and_leds display: false - default: '[]' type: json - key: test_positive_correlation_between_rpm_and_stirring display: false @@ -43,5 +42,4 @@ published_settings: type: boolean - key: all_tests_passed display: false - default: false type: boolean diff --git a/contrib/jobs/background_job.yaml.example b/contrib/jobs/background_job.yaml.example index 22c8bdf7..b699fb76 100644 --- a/contrib/jobs/background_job.yaml.example +++ b/contrib/jobs/background_job.yaml.example @@ -10,12 +10,10 @@ published_settings: label: Stirring speed # human readable name description: This description is displayed with an editable field in Manage / Settings. type: numeric # one of numeric, boolean, text - default: null display: true # true to display on the /Pioreactors card - key: something_else unit: lb label: Something else description: This description is displayed with an editable field in Manage / Settings. type: numeric # one of numeric, boolean, text - default: null - display: true # true to display on the /Pioreactors card \ No newline at end of file + display: true # true to display on the /Pioreactors card diff --git a/pioreactorui/__init__.py b/pioreactorui/__init__.py index 79a0b0ae..3f69d97e 100644 --- a/pioreactorui/__init__.py +++ b/pioreactorui/__init__.py @@ -105,14 +105,16 @@ def handle_server_error(e): return app -def msg_to_JSON(msg: str, task: str, level: str) -> bytes: +def msg_to_JSON(msg: str, task: str, level: str, timestamp: None | str = None) -> bytes: + if timestamp is None: + timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") return dumps( { "message": msg.strip(), "task": task, "source": "ui", "level": level, - "timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "timestamp": timestamp, } ) @@ -157,8 +159,8 @@ def _get_app_db_connection(): return db -def _get_local_metadata_db_connection(): - db = getattr(g, "_metadata_database", None) +def _get_temp_local_metadata_db_connection(): + db = getattr(g, "_local_metadata_database", None) if db is None: db = g._local_metadata_database = sqlite3.connect( f"{tempfile.gettempdir()}/local_intermittent_pioreactor_metadata.sqlite" @@ -177,10 +179,10 @@ def query_app_db( return (rv[0] if rv else None) if one else rv -def query_local_metadata_db( +def query_temp_local_metadata_db( query: str, args=(), one: bool = False ) -> dict[str, t.Any] | list[dict[str, t.Any]] | None: - cur = _get_local_metadata_db_connection().execute(query, args) + cur = _get_temp_local_metadata_db_connection().execute(query, args) rv = cur.fetchall() cur.close() return (rv[0] if rv else None) if one else rv diff --git a/pioreactorui/api.py b/pioreactorui/api.py index 909572e8..3737f843 100644 --- a/pioreactorui/api.py +++ b/pioreactorui/api.py @@ -35,6 +35,7 @@ from . import client from . import HOSTNAME from . import modify_app_db +from . import msg_to_JSON from . import publish_to_error_log from . import publish_to_experiment_log from . import publish_to_log @@ -64,19 +65,7 @@ def get_workers_in_experiment(experiment: str) -> list[str]: return [unit["pioreactor_unit"] for unit in r] -def broadcast_get_across_cluster(endpoint: str) -> dict[str, Any]: - assert endpoint.startswith("/unit_api") - result = query_app_db("SELECT w.pioreactor_unit as unit FROM workers w") - assert result is not None - assert isinstance(result, list) - list_of_workers = tuple(r["unit"] for r in result) - - return tasks.multicast_get_across_cluster(endpoint, list_of_workers) - - -def broadcast_post_across_cluster(endpoint: str, json: dict | None = None) -> Result: - assert endpoint.startswith("/unit_api") - # order by desc so that the leader-worker, if exists, is done last. This is important for tasks like /reboot +def get_all_workers() -> list[str]: result = query_app_db( """ SELECT w.pioreactor_unit as unit @@ -84,11 +73,28 @@ def broadcast_post_across_cluster(endpoint: str, json: dict | None = None) -> Re ORDER BY w.added_at DESC """ ) - assert result is not None - assert isinstance(result, list) - list_of_workers = tuple(r["unit"] for r in result) + assert result is not None and isinstance(result, list) + return list(r["unit"] for r in result) + + +def broadcast_get_across_cluster(endpoint: str) -> dict[str, Any]: + assert endpoint.startswith("/unit_api") + return tasks.multicast_get_across_cluster(endpoint, get_all_workers()) + + +def broadcast_post_across_cluster(endpoint: str, json: dict | None = None) -> Result: + assert endpoint.startswith("/unit_api") + return tasks.multicast_post_across_cluster(endpoint, get_all_workers(), json=json) + + +def broadcast_delete_across_cluster(endpoint: str, json: dict | None = None) -> Result: + assert endpoint.startswith("/unit_api") + return tasks.multicast_delete_across_cluster(endpoint, get_all_workers(), json=json) - return tasks.multicast_post_across_cluster(endpoint, list_of_workers, json=json) + +def broadcast_patch_across_cluster(endpoint: str, json: dict | None = None) -> Result: + assert endpoint.startswith("/unit_api") + return tasks.multicast_patch_across_cluster(endpoint, get_all_workers(), json=json) @api.route("/workers/jobs/stop/experiments/", methods=["POST", "PATCH"]) @@ -358,6 +364,17 @@ def get_level_string(min_level: str) -> str: return jsonify(recent_logs) +@api.route("/experiments//logs", methods=["POST"]) +def publish_new_log(experiment: str) -> ResponseReturnValue: + body = request.get_json() + topic = f"pioreactor/{body['pioreactor_unit']}/{experiment}/logs/ui/info" + client.publish( + topic, + msg_to_JSON(body["message"], body["source"] or "user", "info", timestamp=body["timestamp"]), + ) + return Response(status=202) + + @api.route("/workers//experiments//logs", methods=["GET"]) def get_logs_for_unit_and_experiment(experiment: str, pioreactor_unit: str) -> ResponseReturnValue: """Shows event logs for a specific worker within an experiment""" @@ -600,172 +617,61 @@ def get_media_rates(experiment: str) -> ResponseReturnValue: ## CALIBRATIONS -@api.route("/calibrations/", methods=["GET"]) -def get_available_calibrations_type_by_unit(pioreactor_unit: str) -> ResponseReturnValue: - """ - { - "types": [ - "temperature", - "pH", - "dissolved_oxygen", - "conductivity" - ] - } - """ - try: - types = query_app_db( - "SELECT DISTINCT type FROM calibrations WHERE pioreactor_unit=?", - (pioreactor_unit), - ) - - except Exception as e: - publish_to_error_log(str(e), "get_available_calibrations_type_by_unit") - return Response(status=500) - - return jsonify(types) - - -@api.route("/calibrations//", methods=["GET"]) -def get_available_calibrations_of_type( - pioreactor_unit: str, calibration_type: str -) -> ResponseReturnValue: - try: - unit_calibration = query_app_db( - "SELECT * FROM calibrations WHERE type=? AND pioreactor_unit=?", - (calibration_type, pioreactor_unit), - ) - - except Exception as e: - publish_to_error_log(str(e), "get_available_calibrations_of_type") - return Response(status=500) - - return jsonify(unit_calibration) - - -@api.route("/calibrations///current", methods=["GET"]) -def get_current_calibrations_of_type( - pioreactor_unit: str, calibration_type: str -) -> ResponseReturnValue: - """ - retrieve the current calibration for type - """ - try: - r = query_app_db( - "SELECT * FROM calibrations WHERE type=? AND pioreactor_unit=? AND is_current=1", - (calibration_type, pioreactor_unit), - one=True, - ) - - if r is not None: - assert isinstance(r, dict) - r["data"] = current_app.json.loads(r["data"]) - return jsonify(r) - else: - return Response(status=404) - - except Exception as e: - publish_to_error_log(str(e), "get_current_calibrations_of_type") - return Response(status=500) +@api.route("/workers//calibrations", methods=["GET"]) +def get_all_calibrations(pioreactor_unit) -> ResponseReturnValue: + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_get_across_cluster("/unit_api/calibrations") + else: + task = tasks.multicast_get_across_cluster("/unit_api/calibrations", [pioreactor_unit]) + return create_task_response(task) -@api.route("/calibrations///", methods=["GET"]) -def get_calibration_by_name( - pioreactor_unit: str, calibration_type: str, calibration_name: str -) -> ResponseReturnValue: - """ - retrieve the calibration for type with name - """ - try: - r = query_app_db( - "SELECT * FROM calibrations WHERE type=? AND pioreactor_unit=? AND name=?", - (calibration_type, pioreactor_unit, calibration_name), - one=True, +@api.route("/workers//calibrations/", methods=["GET"]) +def get_calibrations(pioreactor_unit, cal_type) -> ResponseReturnValue: + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_get_across_cluster(f"/unit_api/calibrations/{cal_type}") + else: + task = tasks.multicast_get_across_cluster( + f"/unit_api/calibrations/{cal_type}", [pioreactor_unit] ) - if r is not None: - assert isinstance(r, dict) - r["data"] = current_app.json.loads(r["data"]) - return jsonify(r) - else: - return Response(status=404) - except Exception as e: - publish_to_error_log(str(e), "get_calibration_by_name") - return Response(status=500) + return create_task_response(task) @api.route( - "/calibrations///", - methods=["PATCH"], + "/workers//calibrations///active", methods=["PATCH"] ) -def patch_calibrations( - pioreactor_unit: str, calibration_type: str, calibration_name: str -) -> ResponseReturnValue: - body = request.get_json() - - if "current" in body and body["current"] == 1: - try: - # does the new one exist in the database? - existing_row = query_app_db( - "SELECT * FROM calibrations WHERE pioreactor_unit=(?) AND type=(?) AND name=(?)", - (pioreactor_unit, calibration_type, calibration_name), - one=True, - ) - if existing_row is None: - publish_to_error_log( - f"calibration {calibration_name=}, {pioreactor_unit=}, {calibration_type=} doesn't exist in database.", - "patch_calibrations", - ) - return Response(status=404) - - elif existing_row["is_current"] == 1: # type: ignore - # already current - return Response(status=200) - - modify_app_db( - "UPDATE calibrations SET is_current=0, set_to_current_at=NULL WHERE pioreactor_unit=(?) AND type=(?) AND is_current=1", - (pioreactor_unit, calibration_type), - ) - - modify_app_db( - "UPDATE calibrations SET is_current=1, set_to_current_at=CURRENT_TIMESTAMP WHERE pioreactor_unit=(?) AND type=(?) AND name=(?)", - (pioreactor_unit, calibration_type, calibration_name), - ) - return Response(status=200) +def set_active_calibration(pioreactor_unit, cal_type, cal_name) -> ResponseReturnValue: + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_patch_across_cluster( + f"/unit_api/calibrations/{cal_type}/{cal_name}/active" + ) + else: + task = tasks.multicast_patch_across_cluster( + f"/unit_api/calibrations/{cal_type}/{cal_name}/active", [pioreactor_unit] + ) + return create_task_response(task) - except Exception as e: - publish_to_error_log(str(e), "patch_calibrations") - return Response(status=500) +@api.route("/workers//calibrations//active", methods=["DELETE"]) +def remove_active_status_calibration(pioreactor_unit, cal_type) -> ResponseReturnValue: + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_delete_across_cluster(f"/unit_api/calibrations/{cal_type}/active") else: - return Response(status=404) - + task = tasks.multicast_delete_across_cluster( + f"/unit_api/calibrations/{cal_type}/active", [pioreactor_unit] + ) + return create_task_response(task) -@api.route("/calibrations", methods=["PUT"]) -def create_or_update_new_calibrations() -> ResponseReturnValue: - try: - body = request.get_json() - modify_app_db( - "INSERT OR REPLACE INTO calibrations (pioreactor_unit, created_at, type, data, name, is_current, set_to_current_at) values (?, ?, ?, ?, ?, ?, ?)", - ( - body["pioreactor_unit"], - body["created_at"], - body["type"], - current_app.json.dumps( - body - ).decode(), # keep it as a string, not bytes, probably equivalent to request.get_data(as_text=True) - body["name"], - 0, - None, - ), +@api.route("/workers//calibrations//", methods=["DELETE"]) +def remove_calibration(pioreactor_unit, cal_type, cal_name) -> ResponseReturnValue: + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_delete_across_cluster(f"/unit_api/calibrations/{cal_type}/{cal_name}") + else: + task = tasks.multicast_delete_across_cluster( + f"/unit_api/calibrations/{cal_type}/{cal_name}", [pioreactor_unit] ) - - return Response(status=201) - except KeyError as e: - publish_to_error_log(str(e), "create_or_update_new_calibrations") - return Response(status=400) - except Exception as e: - publish_to_error_log(str(e), "create_or_update_new_calibrations") - return Response(status=500) + return create_task_response(task) ## PLUGINS @@ -811,8 +717,8 @@ def get_jobs_running_across_cluster_in_experiment(experiment) -> ResponseReturnV ### SETTINGS -@api.route("/jobs/settings/job_name//experiments/", methods=["GET"]) -def get_settings_for_job_across_cluster_in_experiment(job_name, experiment) -> ResponseReturnValue: +@api.route("/experiments//jobs/settings/job_name/", methods=["GET"]) +def get_settings_for_job_across_cluster_in_experiment(experiment, job_name) -> ResponseReturnValue: list_of_assigned_workers = get_workers_in_experiment(experiment) return create_task_response( tasks.multicast_get_across_cluster( @@ -822,10 +728,10 @@ def get_settings_for_job_across_cluster_in_experiment(job_name, experiment) -> R @api.route( - "/jobs/settings/job_name//experiments//setting/", methods=["GET"] + "/experiments//jobs/settings/job_name//setting/", methods=["GET"] ) def get_setting_for_job_across_cluster_in_experiment( - job_name, experiment, setting + experiment, job_name, setting ) -> ResponseReturnValue: list_of_assigned_workers = get_workers_in_experiment(experiment) return create_task_response( @@ -836,34 +742,34 @@ def get_setting_for_job_across_cluster_in_experiment( ) -@api.route("/jobs/settings/workers//job_name/", methods=["GET"]) +@api.route("/workers//jobs/settings/job_name/", methods=["GET"]) def get_job_settings_for_worker(pioreactor_unit, job_name) -> ResponseReturnValue: - return create_task_response( - tasks.multicast_get_across_cluster( + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_get_across_cluster(f"/unit_api/jobs/settings/job_name/{job_name}") + else: + task = tasks.multicast_get_across_cluster( f"/unit_api/jobs/settings/job_name/{job_name}", [pioreactor_unit] ) - ) + return create_task_response(task) @api.route( - "/jobs/settings/workers//job_name//setting/", + "/workers//jobs/settings/job_name//setting/", methods=["GET"], ) def get_job_setting_for_worker(pioreactor_unit, job_name, setting) -> ResponseReturnValue: - return create_task_response( - tasks.multicast_get_across_cluster( + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = broadcast_get_across_cluster( + f"/unit_api/jobs/settings/job_name/{job_name}/setting/{setting}" + ) + else: + task = tasks.multicast_get_across_cluster( f"/unit_api/jobs/settings/job_name/{job_name}/setting/{setting}", [pioreactor_unit] ) - ) + return create_task_response(task) -@api.route("/jobs/settings/job_name//setting/", methods=["GET"]) -def get_setting_for_job_across_cluster(job_name, setting) -> ResponseReturnValue: - return create_task_response( - broadcast_get_across_cluster( - f"/unit_api/jobs/settings/job_name/{job_name}/setting/{setting}" - ) - ) +## MISC @api.route("/versions/app", methods=["GET"]) @@ -876,8 +782,6 @@ def get_ui_versions_across_cluster() -> ResponseReturnValue: return create_task_response(broadcast_get_across_cluster("/unit_api/versions/ui")) -## MISC - ## UPLOADS @@ -1100,17 +1004,16 @@ def export_datasets() -> ResponseReturnValue: "--output", filename_with_path.as_posix(), *cmd_tables, *experiment_options, *other_options ) try: - status, msg = result(blocking=True, timeout=5 * 60) + status = result(blocking=True, timeout=5 * 60) except HueyException: - status, msg = False, "Timed out on export." - publish_to_error_log(msg, "export_datasets") - return {"result": status, "filename": None, "msg": msg}, 500 + status = False + return {"result": status, "filename": None, "msg": "Timed out"}, 500 - if status == b"false": - publish_to_error_log(msg, "export_datasets") - return {"result": status, "filename": None, "msg": msg}, 500 + if not status: + publish_to_error_log("Failed.", "export_datasets") + return {"result": status, "filename": None, "msg": "Failed."}, 500 - return {"result": status, "filename": filename, "msg": msg}, 200 + return {"result": status, "filename": filename, "msg": "Finished"}, 200 @api.route("/experiments", methods=["GET"]) @@ -1258,12 +1161,6 @@ def upsert_unit_labels(experiment: str) -> ResponseReturnValue: """ Update or insert a new unit label for the current experiment. - This API endpoint accepts a PUT request with a JSON body containing a "unit" and a "label". - The "unit" is the identifier for the pioreactor and the "label" is the desired label for that unit. - If the unit label for the current experiment already exists, it will be updated; otherwise, a new entry will be created. - - The response will be a status code of 201 if the operation is successful, and 400 if there was an error. - JSON Request Body: { @@ -1278,11 +1175,6 @@ def upsert_unit_labels(experiment: str) -> ResponseReturnValue: "label": "new_label" } - Returns: - HTTP Response with status code 201 if successful, 400 if there was an error. - - Raises: - Exception: Any error encountered during the database operation is published to the error log. """ body = request.get_json() @@ -1489,6 +1381,11 @@ def update_config(filename: str) -> ResponseReturnValue: # filename is a string config = configparser.ConfigParser(allow_no_value=True) + # make unicode replacements + # https://github.com/Pioreactor/pioreactor/issues/539 + code = code.replace(chr(8211), chr(45)) # en-dash to dash + code = code.replace(chr(8212), chr(45)) # em + try: config.read_string(code) # test parser @@ -1498,6 +1395,12 @@ def update_config(filename: str) -> ResponseReturnValue: assert config["cluster.topology"] assert config.get("cluster.topology", "leader_hostname") assert config.get("cluster.topology", "leader_address") + assert config["mqtt"] + + if config.get("cluster.topology", "leader_address").startswith("http") or config.get( + "mqtt", "broker_address" + ).startswith("http"): + raise ValueError("Don't start addresses with http:// or https://") except configparser.DuplicateSectionError as e: msg = f"Duplicate section [{e.section}] was found. Please fix and try again." @@ -1511,8 +1414,8 @@ def update_config(filename: str) -> ResponseReturnValue: msg = "Incorrect syntax. Please fix and try again." publish_to_error_log(msg, "update_config") return {"msg": msg}, 400 - except (AssertionError, configparser.NoSectionError, KeyError, TypeError): - msg = "Missing required field(s) in [cluster.topology]: `leader_hostname` and/or `leader_address`. Please fix and try again." + except (AssertionError, configparser.NoSectionError, KeyError) as e: + msg = f"Missing required field(s): {e}" publish_to_error_log(msg, "update_config") return {"msg": msg}, 400 except ValueError as e: @@ -1520,7 +1423,7 @@ def update_config(filename: str) -> ResponseReturnValue: return {"msg": msg}, 400 except Exception as e: publish_to_error_log(str(e), "update_config") - msg = "Hm, something went wrong, check PioreactorUI logs." + msg = "Hm, something went wrong, check Pioreactor logs." return {"msg": msg}, 500 # if the config file is unit specific, we only need to run sync-config on that unit. @@ -1743,12 +1646,11 @@ def setup_worker_pioreactor() -> ResponseReturnValue: status = result(blocking=True, timeout=250) except HueyException: status = False - publish_to_log(status, "setup_worker_pioreactor") - publish_to_log(str(bool(status)), "setup_worker_pioreactor") + if status: return {"msg": f"Worker {new_name} added successfully."}, 200 else: - return {"msg": f"Failed to add worker {new_name}"}, 500 + return {"msg": f"Failed to add worker {new_name}. See logs."}, 500 @api.route("/workers", methods=["PUT"]) diff --git a/pioreactorui/structs.py b/pioreactorui/structs.py index fa46a44a..c9cfab98 100644 --- a/pioreactorui/structs.py +++ b/pioreactorui/structs.py @@ -14,7 +14,7 @@ class PublishedSettingsDescriptor(Struct, forbid_unknown_fields=True): # type: type: t.Literal["numeric", "boolean", "string", "json"] display: bool description: t.Optional[str] = None - default: t.Optional[t.Union[str, bool]] = None + default: t.Optional[t.Union[str, bool]] = None # DEPRECATED DO NOT USE unit: t.Optional[str] = None label: t.Optional[str] = None # if display is false, this isn't needed editable: bool = True @@ -28,7 +28,7 @@ class BackgroundJobDescriptor(Struct, forbid_unknown_fields=True): # type: igno source: t.Optional[str] = None # what plugin / app created this job? Usually `app` description: t.Optional[str] = None # if display is false, this isn't needed subtext: t.Optional[str] = None - is_testing: bool = False # DEPRECATED + is_testing: bool = False # DEPRECATED DO NOT USE #### Automations diff --git a/pioreactorui/tasks.py b/pioreactorui/tasks.py index 040988bf..4e79e122 100644 --- a/pioreactorui/tasks.py +++ b/pioreactorui/tasks.py @@ -15,7 +15,9 @@ from pioreactor.config import config from pioreactor.mureq import HTTPException +from pioreactor.pubsub import delete_from from pioreactor.pubsub import get_from +from pioreactor.pubsub import patch_into from pioreactor.pubsub import post_into from pioreactor.utils.networking import resolve_to_address @@ -90,7 +92,6 @@ def add_new_pioreactor(new_pioreactor_name: str, version: str, model: str) -> bo [PIO_EXECUTABLE, "workers", "add", new_pioreactor_name, "-v", version, "-m", model], ) cache.evict("config") - logger.info(result.returncode) return result.returncode == 0 @@ -152,31 +153,21 @@ def update_app_from_release_archive_on_specific_pioreactors( @huey.task() -def pio(*args: str, env: dict[str, str] = {}) -> tuple[bool, str]: +def pio(*args: str, env: dict[str, str] = {}) -> bool: logger.info(f'Executing `{join(("pio",) + args)}`, {env=}') - result = run( - (PIO_EXECUTABLE,) + args, capture_output=True, text=True, env=dict(os.environ) | env - ) - if result.returncode != 0: - return False, result.stderr.strip() - else: - return True, result.stdout.strip() + result = run((PIO_EXECUTABLE,) + args, env=dict(os.environ) | env) + return result.returncode == 0 @huey.task() @huey.lock_task("export-data-lock") -def pio_run_export_experiment_data(*args: str, env: dict[str, str] = {}) -> tuple[bool, str]: +def pio_run_export_experiment_data(*args: str, env: dict[str, str] = {}) -> bool: logger.info(f'Executing `{join(("pio", "run", "export_experiment_data") + args)}`, {env=}') result = run( (PIO_EXECUTABLE, "run", "export_experiment_data") + args, - capture_output=True, - text=True, env=dict(os.environ) | env, ) - if result.returncode != 0: - return False, result.stderr.strip() - else: - return True, result.stdout.strip() + return result.returncode == 0 @huey.task() @@ -244,18 +235,13 @@ def reboot() -> bool: @huey.task() -def pios(*args: str, env: dict[str, str] = {}) -> tuple[bool, str]: +def pios(*args: str, env: dict[str, str] = {}) -> bool: logger.info(f'Executing `{join(("pios",) + args + ("-y",))}`, {env=}') result = run( (PIOS_EXECUTABLE,) + args + ("-y",), - capture_output=True, - text=True, env=dict(os.environ) | env, ) - if result.returncode != 0: - return False, result.stderr.strip() - else: - return True, result.stdout.strip() + return result.returncode == 0 @huey.task() @@ -334,3 +320,49 @@ def multicast_get_across_cluster( tasks = get_worker.map(((worker, endpoint, json) for worker in workers)) return {worker: response for (worker, response) in tasks.get(blocking=True)} + + +@huey.task() +def patch_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]: + try: + r = patch_into(resolve_to_address(worker), endpoint, json=json, timeout=1) + r.raise_for_status() + return worker, r.json() + except HTTPException: + logger.error(f"Could not PATCH to {worker}'s endpoint {endpoint}. Check connection?") + return worker, None + + +@huey.task() +def multicast_patch_across_cluster( + endpoint: str, workers: list[str], json: dict | None = None +) -> dict[str, Any]: + # this function "consumes" one huey thread waiting fyi + assert endpoint.startswith("/unit_api") + + tasks = patch_worker.map(((worker, endpoint, json) for worker in workers)) + + return {worker: response for (worker, response) in tasks.get(blocking=True)} + + +@huey.task() +def delete_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]: + try: + r = delete_from(resolve_to_address(worker), endpoint, json=json, timeout=1) + r.raise_for_status() + return worker, r.json() if r.content else None + except HTTPException: + logger.error(f"Could not DELETE {worker}'s endpoint {endpoint}. Check connection?") + return worker, None + + +@huey.task() +def multicast_delete_across_cluster( + endpoint: str, workers: list[str], json: dict | None = None +) -> dict[str, Any]: + # this function "consumes" one huey thread waiting fyi + assert endpoint.startswith("/unit_api") + + tasks = delete_worker.map(((worker, endpoint, json) for worker in workers)) + + return {worker: response for (worker, response) in tasks.get(blocking=True)} diff --git a/pioreactorui/unit_api.py b/pioreactorui/unit_api.py index 8153a837..d63ce754 100644 --- a/pioreactorui/unit_api.py +++ b/pioreactorui/unit_api.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import annotations +import datetime import os from pathlib import Path from subprocess import run @@ -15,10 +16,14 @@ from flask.typing import ResponseReturnValue from huey.exceptions import HueyException from huey.exceptions import TaskException +from msgspec.yaml import decode as yaml_decode +from pioreactor.calibrations import CALIBRATION_PATH from pioreactor.config import get_leader_hostname +from pioreactor.utils import local_persistant_storage +from pioreactor.utils.timing import current_utc_timestamp from . import HOSTNAME -from . import query_local_metadata_db +from . import query_temp_local_metadata_db from . import tasks from . import VERSION from .config import cache @@ -121,6 +126,49 @@ def remove_file() -> ResponseReturnValue: return create_task_response(task) +# GET clock time +@unit_api.route("/unit_api/system/clock", methods=["GET"]) +def get_clock_time(): + try: + current_time = current_utc_timestamp() + return jsonify({"status": "success", "clock_time": current_time}), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +# PATCH / POST to set clock time +@unit_api.route("/unit_api/system/clock", methods=["PATCH", "POST"]) +def set_clock_time(): + try: + if HOSTNAME == get_leader_hostname(): + data = request.json + new_time = data.get("clock_time") + if not new_time: + return jsonify({"status": "error", "message": "clock_time field is required"}), 400 + + # Convert and validate the timestamp + try: + datetime_obj = datetime.fromisoformat(new_time) + except ValueError: + return ( + jsonify( + {"status": "error", "message": "Invalid clock_time format. Use ISO 8601."} + ), + 400, + ) + + # Update the system clock (requires admin privileges) + run(["sudo", "date", "-s", datetime_obj.strftime("%Y-%m-%d %H:%M:%S")], check=True) + return jsonify({"status": "success", "message": "Clock time successfully updated"}), 200 + else: + # sync using chrony + run(["sudo", "chronyc", "-a", "makestep"], check=True) + return jsonify({"status": "success", "message": "Clock time successfully synced"}), 200 + + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + ## RUNNING JOBS CONTROL @@ -203,7 +251,7 @@ def stop_all_jobs_by_source(job_source: str) -> ResponseReturnValue: @unit_api.route("/jobs/running/experiments/", methods=["GET"]) def get_running_jobs_for_experiment(experiment: str) -> ResponseReturnValue: - jobs = query_local_metadata_db( + jobs = query_temp_local_metadata_db( """SELECT * FROM pio_job_metadata where is_running=1 and experiment = (?)""", (experiment,), ) @@ -213,7 +261,7 @@ def get_running_jobs_for_experiment(experiment: str) -> ResponseReturnValue: @unit_api.route("/jobs/running", methods=["GET"]) def get_all_running_jobs() -> ResponseReturnValue: - jobs = query_local_metadata_db("SELECT * FROM pio_job_metadata where is_running=1") + jobs = query_temp_local_metadata_db("SELECT * FROM pio_job_metadata where is_running=1") return jsonify(jobs) @@ -231,7 +279,7 @@ def get_settings_for_a_specific_job(job_name) -> ResponseReturnValue: } } """ - settings = query_local_metadata_db( + settings = query_temp_local_metadata_db( """ SELECT s.setting, s.value FROM pio_job_published_settings s @@ -250,7 +298,7 @@ def get_settings_for_a_specific_job(job_name) -> ResponseReturnValue: @unit_api.route("/jobs/settings/job_name//setting/", methods=["GET"]) def get_specific_setting_for_a_job(job_name, setting) -> ResponseReturnValue: - setting = query_local_metadata_db( + setting = query_temp_local_metadata_db( """ SELECT s.setting, s.value FROM pio_job_published_settings s @@ -420,6 +468,82 @@ def get_ui_version() -> ResponseReturnValue: ) +### CALIBRATIONS + + +@unit_api.route("/calibrations", methods=["GET"]) +def get_all_calibrations() -> ResponseReturnValue: + calibration_dir = Path(f"{env['DOT_PIOREACTOR']}/storage/calibrations") + + if not calibration_dir.exists(): + return Response(status=404) + + all_calibrations: dict[str, list] = {} + + with local_persistant_storage("active_calibrations") as cache: + for file in calibration_dir.glob("*/*.yaml"): + try: + cal = yaml_decode(file.read_bytes()) + cal_type = cal["calibration_type"] + cal["is_active"] = cache.get(cal_type) == cal["calibration_name"] + if cal_type in all_calibrations: + all_calibrations[cal_type].append(cal) + else: + all_calibrations[cal_type] = [cal] + except Exception as e: + print(f"Error reading {file.stem}: {e}") + + return jsonify(all_calibrations) + + +@unit_api.route("/calibrations/", methods=["GET"]) +def get_calibrations(cal_type) -> ResponseReturnValue: + calibration_dir = Path(f"{env['DOT_PIOREACTOR']}/storage/calibrations/{cal_type}") + + if not calibration_dir.exists(): + return Response(status=404) + + calibrations: list[dict] = [] + + with local_persistant_storage("active_calibrations") as c: + for file in calibration_dir.glob("*.yaml"): + try: + cal = yaml_decode(file.read_bytes()) + cal["is_active"] = c.get(cal_type) == cal["calibration_name"] + calibrations.append(cal) + except Exception as e: + print(f"Error reading {file.stem}: {e}") + + return jsonify(calibrations) + + +@unit_api.route("/calibrations///active", methods=["PATCH"]) +def set_active_calibration(cal_type, cal_name) -> ResponseReturnValue: + with local_persistant_storage("active_calibrations") as c: + c[cal_type] = cal_name + + return Response(status=200) + + +@unit_api.route("/calibrations//active", methods=["DELETE"]) +def remove_active_status_calibration(cal_type) -> ResponseReturnValue: + with local_persistant_storage("active_calibrations") as c: + c.pop(cal_type) + + return Response(status=200) + + +@unit_api.route("/calibrations//", methods=["DELETE"]) +def remove_calibration(cal_type, cal_name) -> ResponseReturnValue: + target_file = CALIBRATION_PATH / cal_type / f"{cal_name}.yaml" + if not target_file.exists(): + return Response(status=404) + + target_file.unlink() + + return Response(status=200) + + @unit_api.errorhandler(404) def not_found(e): # Return JSON for API requests