Skip to content

Commit

Permalink
Merge pull request #10 from flix-tech/feat-start-later
Browse files Browse the repository at this point in the history
New column names, allow delayed execution
  • Loading branch information
jacopofar authored May 27, 2024
2 parents e245b9c + 3924e1b commit 57c79fe
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 243 deletions.
21 changes: 14 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# Changelog

## [unreleased]

* Allow delayed schedule of tasks, use clearer name for database columns
* Add `get_many` helper to retrieve multiple tasks with a single DB call
* Add `add_many` helper to insert multiple tasks with a single transaction
* Create an index on the table, can now scale to millions of tasks

## 0.0.6 - 2024-02-13

* adding a task now returns its UUID, previously nothign was returned
* Adding a task now returns its UUID, previously nothing was returned

## 0.0.5 - 2023-11-06

Expand All @@ -11,18 +18,18 @@

## 0.0.4 - 2023-11-06

* change name of `check_expired_leases()` to make it a public method
* Change name of `check_expired_leases()` to make it a public method

## 0.0.3 - 2023-10-05

* add function to delete old completed tasks
* Add function to delete old completed tasks

## 0.0.2 - 2023-05-15

* improve types and formatting
* task is now in its own column, metadata is kept apart
* upgrades to the CI/CD pipeline
* Improve types and formatting
* Task is now in its own column, metadata is kept apart
* Upgrades to the CI/CD pipeline

## 0.0.1 - 2023-05-10

* first release
* First release
125 changes: 65 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,19 @@

# Postgres Task Queue

This repo will contain the [Postgres](https://www.postgresql.org/) based task queue logic, similar to [our redis-tq](https://github.com/flix-tech/redis-tq) but based on postgres instead.
This library makes it possible to define a list of tasks to be persisted in a Postgres database and executed by multiple workers. Tasks are retried automatically after a given timeout and for a given number of time. Tasks are persisted in the database and executed in order of insertion unless a specific start timestamp is provided.

This is similar to [our redis-tq](https://github.com/flix-tech/redis-tq) but based on Postgres thanks to the `FOR UPDATE SKIP LOCKED` feature.

Similar to redis-tq, this package allows for sharing data between multiple processes or hosts.

Tasks support a "lease time". After that time other workers may consider this client to have crashed or stalled and pick up the item instead. The number of retries can also be configured.

By default it keeps all the queues and tasks in a single table `task_queue`. If you want to use a different table for different queues for example it could also be configured when instantiating the queue.

You can either set the `create_table=True` when instantiating the queue or create the table yourself with the following query:
You can set the `create_table=True` when instantiating the queue to have the table created for you. If the table already exist it will not be touched.

```sql
CREATE TABLE task_queue (
id UUID PRIMARY KEY,
queue_name TEXT NOT NULL,
task JSONB NOT NULL,
ttl INT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
processing BOOLEAN NOT NULL DEFAULT false,
lease_timeout FLOAT,
deadline TIMESTAMP,
completed_at TIMESTAMP
)
```
When defining a task you can provide a `can_start_at` timestamp parameter, and the task will not be executed until then, which can be useful to schedule tasks. By default the current timestamp is used.

## Installation

Expand All @@ -38,58 +28,28 @@ $ pip install postgres-tq

[PyPI]: https://pypi.org/project/postgres-tq/

## How it works

It uses row level locks of postgres to mimic the atomic pop and atomic push of redis-tq when getting a new task from the queue:

```sql
UPDATE task_queue
SET processing = true,
deadline =
current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL)
WHERE id = (
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, task;
```

Let's say two workers try to get a new task at the same time, assuming that they will probably see the same task to be picked up using the subquery:

```sql
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
```

The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`.

However, since we are using `FOR UPDATE SKIP LOCKED` the first worker locks the row for update and the second worker, skips that locked row and chooses another row for itself to update. This way we can avoid the race condition.

The other methods `complete()` and `reschedule()` work similarly under the hood.

## How to use

On the producing side, populate the queue with tasks and a respective lease timeout:

```py
from datetime import datetime, UTC, timedelta
from postgrestq import TaskQueue

task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True)
task_queue = TaskQueue(
POSTGRES_CONN_STR,
queue_name, # name of the queue as a string
reset=True, # delete existing tasks for this queue
ttl_zero_callback=handle_failure, # will call handle_failure(task_id, task) when the task failed too many times
)

for i in range(10):
task_queue.add(some_task, lease_timeout, ttl=3)
task_queue.add(
some_task,
lease_timeout, # in seconds, after this interval it will be assumed to have failed (and the callback is called)
ttl=3, # attempts before abandoning
can_start_at=datetime.now(UTC) + timedelta(minutes=5), # start it not before than 5 minutes in the future
)
```

On the consuming side:
Expand All @@ -99,7 +59,7 @@ from postgrestq import TaskQueue

task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True)
while True:
task, task_id = task_queue.get()
task, task_id, _queue_name = task_queue.get()
if task is not None:
# do something with task and mark it as complete afterwards
task_queue.complete(task_id)
Expand All @@ -110,14 +70,17 @@ while True:
time.sleep(1)
```

Notice that `get()` returns the queue name too, in case in future multi-queue is implemented.
At the moment it's always the same as the queue_name given to the class.

Or you can even use the \_\_iter\_\_() method of the class TaskQueue and loop over the queue:

```py
from postgrestq import TaskQueue

task_queue = TaskQueue(POSTGRES_CONN_STR, queue_name, reset=True)

for task, id_ in taskqueue:
for task, id_, queue_name in taskqueue:
# do something with task and it's automatically
# marked as completed by the iterator at the end
# of the iteration
Expand All @@ -144,6 +107,48 @@ task_queue.prune_completed_tasks(3600)

```


## How it works

It uses row level locks of postgres to mimic the atomic pop and atomic push of redis-tq when getting a new task from the queue:

```sql
UPDATE task_queue
SET processing = true,
deadline =
current_timestamp + CAST(lease_timeout || ' seconds' AS INTERVAL)
WHERE id = (
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, task;
```

Let's say two workers try to get a new task at the same time, assuming that they will probably see the same task to be picked up using the subquery:

```sql
SELECT id
FROM task_queue
WHERE completed_at IS NULL
AND processing = false
AND queue_name = <your_queue_name>
AND ttl > 0
ORDER BY created_at
```

The first worker locks the row with the `FOR UPDATE` clause until the update is completed and committed. If we hadn't used the `SKIP LOCKED` clause, the second worker would have seen the same row and waited for the first worker to finish the update. However, since the first worker already updated it, the subquery would no longer be valid, and the second worker would return zero rows because `WHERE id = NULL`.

However, since we are using `FOR UPDATE SKIP LOCKED` the first worker locks the row for update and the second worker, skips that locked row and chooses another row for itself to update. This way we can avoid the race condition.

The other methods `complete()` and `reschedule()` work similarly under the hood.

## Running the tests

The tests will check a presence of an Postgres DB in the port 15432. To initiate one using docker you can run:
Expand Down
Loading

0 comments on commit 57c79fe

Please sign in to comment.