Skip to content

Commit

Permalink
add priorities to huey, which I think should prevent jams
Browse files Browse the repository at this point in the history
  • Loading branch information
CamDavidsonPilon committed Jan 26, 2025
1 parent 186c59a commit 5b4e5b4
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 33 deletions.
6 changes: 3 additions & 3 deletions asset-manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"files": {
"main.css": "/static/css/main.02152627.css",
"main.js": "/static/js/main.7aa869dd.js",
"main.js": "/static/js/main.c1b772c6.js",
"static/media/roboto-all-500-normal.woff": "/static/media/roboto-all-500-normal.0ab669b7a0d19b178f57.woff",
"static/media/roboto-all-700-normal.woff": "/static/media/roboto-all-700-normal.a457fde362a540fcadff.woff",
"static/media/roboto-all-400-normal.woff": "/static/media/roboto-all-400-normal.c5d001fa922fa66a147f.woff",
Expand Down Expand Up @@ -36,10 +36,10 @@
"static/media/roboto-greek-ext-700-normal.woff2": "/static/media/roboto-greek-ext-700-normal.bd9854c751441ccc1a70.woff2",
"index.html": "/index.html",
"main.02152627.css.map": "/static/css/main.02152627.css.map",
"main.7aa869dd.js.map": "/static/js/main.7aa869dd.js.map"
"main.c1b772c6.js.map": "/static/js/main.c1b772c6.js.map"
},
"entrypoints": [
"static/css/main.02152627.css",
"static/js/main.7aa869dd.js"
"static/js/main.c1b772c6.js"
]
}
2 changes: 1 addition & 1 deletion index.html
Original file line number Diff line number Diff line change
@@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Pioreactor"/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><script defer="defer" src="/static/js/main.7aa869dd.js"></script><link href="/static/css/main.02152627.css" rel="stylesheet"></head><body><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="/favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1"/><meta name="theme-color" content="#000000"/><meta name="description" content="Pioreactor"/><link rel="apple-touch-icon" href="/logo192.png"/><link rel="manifest" href="/manifest.json"/><script defer="defer" src="/static/js/main.c1b772c6.js"></script><link href="/static/css/main.02152627.css" rel="stylesheet"></head><body><div id="root"></div></body></html>
20 changes: 10 additions & 10 deletions pioreactorui/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def initialized():
logger.info(f"Cache directory = {CACHE_DIR}")


@huey.task()
@huey.task(priority=10)
def pio_run(*args: str, env: dict[str, str] = {}) -> bool:
# for long running pio run jobs where we don't care about the output / status
command = ("nohup", PIO_EXECUTABLE, "run") + args
Expand Down Expand Up @@ -178,7 +178,7 @@ def pio_run_export_experiment_data(*args: str, env: dict[str, str] = {}) -> bool
return result.returncode == 0


@huey.task()
@huey.task(priority=100)
def pio_kill(*args: str, env: dict[str, str] = {}) -> bool:
logger.info(f'Executing `{join(("pio", "kill") + args)}`, {env=}')
result = run((PIO_EXECUTABLE, "kill") + args, env=dict(os.environ) | env)
Expand Down Expand Up @@ -306,7 +306,7 @@ def write_config_and_sync(
return (False, "Could not sync configs to all Pioreactors.")


@huey.task()
@huey.task(priority=10)
def post_to_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]:
try:
r = post_into(resolve_to_address(worker), endpoint, json=json, timeout=1)
Expand All @@ -324,7 +324,7 @@ def post_to_worker(worker: str, endpoint: str, json: dict | None = None) -> tupl
return worker, None


@huey.task()
@huey.task(priority=5)
def multicast_post_across_cluster(
endpoint: str, workers: list[str], json: dict | None = None
) -> dict[str, Any]:
Expand All @@ -338,7 +338,7 @@ def multicast_post_across_cluster(
} # add a timeout so that we don't hold up a thread forever.


@huey.task()
@huey.task(priority=10)
def get_from_worker(
worker: str, endpoint: str, json: dict | None = None, timeout=1.0, return_raw=False
) -> tuple[str, Any]:
Expand All @@ -361,7 +361,7 @@ def get_from_worker(
return worker, None


@huey.task()
@huey.task(priority=5)
def multicast_get_across_cluster(
endpoint: str,
workers: list[str],
Expand All @@ -379,7 +379,7 @@ def multicast_get_across_cluster(
} # add a timeout so that we don't hold up a thread forever.


@huey.task()
@huey.task(priority=10)
def patch_to_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]:
try:
r = patch_into(resolve_to_address(worker), endpoint, json=json, timeout=1)
Expand All @@ -397,7 +397,7 @@ def patch_to_worker(worker: str, endpoint: str, json: dict | None = None) -> tup
return worker, None


@huey.task()
@huey.task(priority=5)
def multicast_patch_across_cluster(
endpoint: str, workers: list[str], json: dict | None = None
) -> dict[str, Any]:
Expand All @@ -411,7 +411,7 @@ def multicast_patch_across_cluster(
} # add a timeout so that we don't hold up a thread forever.


@huey.task()
@huey.task(priority=10)
def delete_from_worker(worker: str, endpoint: str, json: dict | None = None) -> tuple[str, Any]:
try:
r = delete_from(resolve_to_address(worker), endpoint, json=json, timeout=1)
Expand All @@ -429,7 +429,7 @@ def delete_from_worker(worker: str, endpoint: str, json: dict | None = None) ->
return worker, None


@huey.task()
@huey.task(priority=5)
def multicast_delete_across_cluster(
endpoint: str, workers: list[str], json: dict | None = None
) -> dict[str, Any]:
Expand Down
24 changes: 10 additions & 14 deletions pioreactorui/unit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pathlib import Path
from subprocess import run
from time import sleep
from time import time

from flask import abort
from flask import Blueprint
Expand All @@ -26,7 +25,6 @@
from pioreactor.config import get_leader_hostname
from pioreactor.structs import CalibrationBase
from pioreactor.structs import subclass_union
from pioreactor.utils import local_intermittent_storage
from pioreactor.utils import local_persistent_storage
from pioreactor.utils.timing import current_utc_timestamp
from pioreactor.utils.timing import to_datetime
Expand All @@ -41,6 +39,7 @@
from .config import huey
from .utils import attach_cache_control
from .utils import create_task_response
from .utils import is_rate_limited
from pioreactorui import structs


Expand Down Expand Up @@ -242,18 +241,6 @@ def dir_listing(req_path: str):
## RUNNING JOBS CONTROL


def is_rate_limited(job: str, expire_time_seconds=1.0) -> bool:
"""
Check if the user has made a request within the debounce duration.
"""
with local_intermittent_storage("debounce") as cache:
if cache.get(job) and (time() - cache.get(job)) < expire_time_seconds:
return True
else:
cache.set(job, time())
return False


@unit_api.route("/jobs/run/job_name/<job>", methods=["PATCH", "POST"])
def run_job(job: str) -> ResponseReturnValue:
"""
Expand Down Expand Up @@ -336,6 +323,15 @@ def get_all_running_jobs() -> ResponseReturnValue:
return jsonify(jobs)


@unit_api.route("/jobs/running/<job>", methods=["GET"])
def get_running_job(job) -> ResponseReturnValue:
jobs = query_temp_local_metadata_db(
"SELECT * FROM pio_job_metadata where is_running=1 and job_name=?", (job,)
)

return jsonify(jobs)


@unit_api.route("/long_running_jobs/running", methods=["GET"])
def get_all_long_running_jobs() -> ResponseReturnValue:
jobs = query_temp_local_metadata_db(
Expand Down
14 changes: 14 additions & 0 deletions pioreactorui/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from __future__ import annotations

import re
from time import time

from flask import jsonify
from flask import Response
from flask.typing import ResponseReturnValue
from pioreactor.utils import local_intermittent_storage
from pioreactor.whoami import get_unit_name


Expand Down Expand Up @@ -46,3 +48,15 @@ def is_valid_unix_filename(filename: str) -> bool:
and "/" not in filename
and "\0" not in filename
)


def is_rate_limited(job: str, expire_time_seconds=1.0) -> bool:
"""
Check if the user has made a request within the debounce duration.
"""
with local_intermittent_storage("debounce") as cache:
if cache.get(job) and (time() - cache.get(job)) < expire_time_seconds:
return True
else:
cache.set(job, time())
return False
8 changes: 4 additions & 4 deletions static/js/main.7aa869dd.js → static/js/main.c1b772c6.js

Large diffs are not rendered by default.

File renamed without changes.

Large diffs are not rendered by default.

0 comments on commit 5b4e5b4

Please sign in to comment.