Skip to content

Commit

Permalink
Merge pull request #137 from alephdata/feature/last-updated-rabbitmq
Browse files Browse the repository at this point in the history
last_updated timestamp for the status API (RabbitMQ impl)
  • Loading branch information
stchris authored Dec 20, 2023
2 parents 728dbfa + 1301028 commit 0333891
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
28 changes: 25 additions & 3 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from banal import ensure_list

from servicelayer.cache import get_redis, make_key
from servicelayer.util import unpack_int
from servicelayer.util import pack_now, unpack_int
from servicelayer import settings
from servicelayer.util import service_retries, backoff

Expand Down Expand Up @@ -79,6 +79,9 @@ def __init__(self, conn, name):
# sets that contain task ids of running and pending tasks
self.running_key = make_key(PREFIX, "qdj", name, "running")
self.pending_key = make_key(PREFIX, "qdj", name, "pending")
self.start_key = make_key(PREFIX, "qdj", name, "start")
self.end_key = make_key(PREFIX, "qdj", name, "end")
self.last_update_key = make_key(PREFIX, "qdj", name, "last_update")

def cancel(self):
"""Cancel processing of all tasks belonging to a dataset"""
Expand All @@ -89,6 +92,9 @@ def cancel(self):
pipe.delete(self.finished_key)
pipe.delete(self.running_key)
pipe.delete(self.pending_key)
pipe.delete(self.start_key)
pipe.delete(self.end_key)
pipe.delete(self.last_update_key)
pipe.execute()

def get_status(self):
Expand All @@ -100,6 +106,12 @@ def get_status(self):
status["finished"] = max(0, unpack_int(finished))
status["running"] = max(0, unpack_int(running))
status["pending"] = max(0, unpack_int(pending))
start, end, last_update = self.conn.mget(
(self.start_key, self.end_key, self.last_update_key)
)
status["start_time"] = start
status["end_time"] = end
status["last_update"] = last_update
return status

@classmethod
Expand Down Expand Up @@ -159,16 +171,22 @@ def add_task(self, task_id):
# add the dataset to active datasets
pipe.sadd(self.key, self.name)
pipe.sadd(self.pending_key, task_id)
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.delete(self.end_key)
pipe.execute()

def remove_task(self, task_id):
"""Remove a task that's not going to be executed"""
log.info(f"Removing task: {task_id}")
self.conn.srem(self.pending_key, task_id)
pipe = self.conn.pipeline()
pipe.srem(self.pending_key, task_id)
status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
# remove the dataset from active datasets
self.conn.srem(self.key, self.name)
pipe.srem(self.key, self.name)
pipe.set(self.last_update_key, pack_now())
pipe.execute()

def checkout_task(self, task_id):
"""Update state when a task is checked out for execution"""
Expand All @@ -178,6 +196,8 @@ def checkout_task(self, task_id):
pipe.sadd(self.key, self.name)
pipe.srem(self.pending_key, task_id)
pipe.sadd(self.running_key, task_id)
pipe.set(self.start_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.execute()

def mark_done(self, task: Task):
Expand All @@ -188,6 +208,8 @@ def mark_done(self, task: Task):
pipe.srem(self.running_key, task.task_id)
pipe.incr(self.finished_key)
pipe.delete(task.retry_key)
pipe.set(self.end_key, pack_now())
pipe.set(self.last_update_key, pack_now())
pipe.execute()
status = self.get_status()
if status["running"] == 0 and status["pending"] == 0:
Expand Down
11 changes: 11 additions & 0 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from unittest import TestCase
from unittest.mock import patch
import json
Expand All @@ -14,6 +15,7 @@
get_rabbitmq_connection,
dataset_from_collection_id,
)
from servicelayer.util import unpack_datetime


class CountingWorker(Worker):
Expand Down Expand Up @@ -58,6 +60,11 @@ def test_task_queue(self):
assert status["finished"] == 0, status
assert status["pending"] == 1, status
assert status["running"] == 0, status
assert status["end_time"] is None
started = unpack_datetime(status["start_time"])
last_updated = unpack_datetime(status["last_update"])
assert started < last_updated
assert abs(started - last_updated) < datetime.timedelta(seconds=1)

worker = CountingWorker(
queues=[settings.QUEUE_INGEST], conn=conn, num_threads=1
Expand Down Expand Up @@ -95,3 +102,7 @@ def test_task_queue(self):
assert status["finished"] == 1, status
assert status["pending"] == 0, status
assert status["running"] == 0, status
started = unpack_datetime(status["start_time"])
last_updated = unpack_datetime(status["last_update"])
end_time = unpack_datetime(status["end_time"])
assert started < end_time < last_updated

0 comments on commit 0333891

Please sign in to comment.