From 195e39028482ba6fbccffb6c0ee528c7ed5a839a Mon Sep 17 00:00:00 2001 From: Wes Kendall Date: Sat, 24 Aug 2024 20:08:16 -0500 Subject: [PATCH] Transaction-level lock support (#13) * Added acquire/release methods for manual control * Add transaction-level advisory lock support * Bump version and add release notes * Fix docs --- CHANGELOG.md | 12 +++++ docs/advisory.md | 40 +++++++++++++++-- pglock/core.py | 95 ++++++++++++++++++++++++++------------- pglock/tests/test_core.py | 64 +++++++++++++++++++++++++- pyproject.toml | 2 +- 5 files changed, 177 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 294a4c4..afbfca1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 1.6.0 (2024-08-24) + +#### Features + +- Support transaction-level locks by [@wesleykendall](https://github.com/wesleykendall) in [#13](https://github.com/Opus10/django-pglock/pull/13). + + Use `pglock.advisory(xact=True)` for transaction-level advisory locks. Both context manager and functional invocations are supported. + +#### Changes + +- Django 5.1 support, drop Django 3.2 support by [@wesleykendall](https://github.com/wesleykendall) in [#12](https://github.com/Opus10/django-pglock/pull/12). + ## 1.5.1 (2024-04-06) #### Trivial diff --git a/docs/advisory.md b/docs/advisory.md index a706c3a..a751dfe 100644 --- a/docs/advisory.md +++ b/docs/advisory.md @@ -22,6 +22,10 @@ def my_exclusive_function(): When creating an advisory lock, remember that the lock ID is a global name across the entire database. Be sure to choose meaningful names, ideally with namespaces, when serializing code with [pglock.advisory][]. +!!! warning + + [Session-based locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) are used by default and released when the context manager exits or the database connection is terminated. If connections are pooled (e.g., [pgbouncer](https://www.pgbouncer.org)) and code is killed without raising exceptions (e.g., out-of-memory errors), locks will be held until the connection is terminated. See [transaction-level locks](#transaction) for an alternative. + ## Configuring Lock Wait Time By default, [pglock.advisory][] will wait forever until the lock can be acquired. Use the `timeout` argument to change this behavior. For example, `timeout=0` will avoid waiting for the lock: @@ -51,7 +55,8 @@ The `side_effect` argument adjusts runtime characteristics when using a timeout. ```python with pglock.advisory(timeout=0, side_effect=pglock.Raise): - # A django.db.utils.OperationalError will be thrown if the lock cannot be acquired. + # A django.db.utils.OperationalError will be thrown if the lock + # cannot be acquired. ``` !!! note @@ -63,9 +68,38 @@ Use `side_effect=pglock.Skip` to skip the function entirely if the lock cannot b ```python @pglock.advisory(timeout=0, side_effect=pglock.Skip) def one_function_at_a_time(): - # This function runs once at a time. If this function runs anywhere else, it will be skipped. + # This function runs once at a time. If this function runs anywhere + # else, it will be skipped. ``` ## Shared Locks -Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information. \ No newline at end of file +Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information. + + + +## Transaction-Level Locks + +Use `pglock.advisory(xact=True)` to create a transaction-level advisory lock, which are released at the end of a transaction. + +When using the decorator or context manager, a transaction will be opened. A `RuntimeError` will be raised if a transaction is already open. + +Use the functional interface to acquire a lock if already in a transaction: + +```python +import pglock +from django.db import transaction + +with transaction.atomic(): + ... + acquired = pglock.advisory("lock_id", xact=True).acquire() + ... + +# The lock is released at the end of the transaction. +``` + +Remember that once acquired, a transaction-level lock cannot be manually released. It will only be released when the transaction is over. + +!!! danger + + The functional interface is only intended for transaction-level locks. Use the context manager or decorator for other use cases. diff --git a/pglock/core.py b/pglock/core.py index 36bf202..8a2417e 100644 --- a/pglock/core.py +++ b/pglock/core.py @@ -187,15 +187,17 @@ class advisory(contextlib.ContextDecorator): When using the default side effect, returns `True` if the lock was acquired or `False` if not. + Consult the + `Postgres docs `__ + for more information on shared and transactional locks. + Args: lock_id (Union[str, int], default=None): The ID of the lock. When using the decorator, it defaults to the full module path and function name of the wrapped function. It must be supplied to - the context manager. - shared (bool, default=False): When `True`, creates a shared - advisory lock. Consult the - `Postgres docs `__ - for more information. + the context manager or function calls. + shared (bool, default=False): When `True`, creates a shared lock. + xact (bool, default=False): When `True`, creates a transactional-level lock. using (str, default="default"): The database to use. timeout (Union[int, float, datetime.timedelta, None]): Set a timeout when waiting for the lock. This timeout only applies to the lock acquisition statement and not the @@ -223,6 +225,7 @@ def __init__( lock_id=None, *, shared=False, + xact=False, using=DEFAULT_DB_ALIAS, timeout=_unset, side_effect=None, @@ -232,6 +235,7 @@ def __init__( self.using = using self.side_effect = side_effect self.shared = shared + self.xact = xact self.timeout = _cast_timeout(timeout) # Use pg_try_advisory.. when a timeout of 0 has been applied. @@ -244,13 +248,8 @@ def __init__( def int_lock_id(self): return _cast_lock_id(self.lock_id) - @property - def acquire(self): - return f'pg{"_try" if self.nowait else ""}_advisory_lock{"_shared" if self.shared else ""}' - - @property - def release(self): - return f'pg_advisory_unlock{"_shared" if self.shared else ""}' + def in_transaction(self) -> bool: + return connections[self.using].in_atomic_block def __call__(self, func): self._func = func @@ -258,7 +257,7 @@ def __call__(self, func): @functools.wraps(func) def inner(*args, **kwargs): with self._recreate_cm(): - if self.acquired or self.side_effect != Skip: + if self._acquired or self.side_effect != Skip: return func(*args, **kwargs) return inner @@ -300,10 +299,18 @@ def _process_runtime_parameters(self): " Use it as a context manager instead." ) - def __enter__(self): + def acquire(self) -> bool: self._process_runtime_parameters() - sql = f"SELECT {self.acquire}({self.int_lock_id})" + if self.xact and not self.in_transaction(): + raise RuntimeError("Must be in a transaction to use xact=True.") + + acquire_sql = ( + f'pg{"_try" if self.nowait else ""}_advisory' + f'{"_xact" if self.xact else ""}_lock' + f'{"_shared" if self.shared else ""}' + ) + sql = f"SELECT {acquire_sql}({self.int_lock_id})" with connections[self.using].cursor() as cursor: try: @@ -311,39 +318,65 @@ def __enter__(self): if self.timeout is not _unset and not self.nowait: stack.enter_context(lock_timeout(self.timeout, using=self.using)) - if self.side_effect != Raise and connections[self.using].in_atomic_block: + if self.side_effect != Raise and self.in_transaction(): # If returning True/False, create a savepoint so that # the transaction isn't in an errored state when returning. stack.enter_context(transaction.atomic(using=self.using)) cursor.execute(sql) - self.acquired = cursor.fetchone()[0] if self.nowait else True + acquired = cursor.fetchone()[0] if self.nowait else True except OperationalError: # This block only happens when the lock times out if self.side_effect != Raise: - self.acquired = False + acquired = False else: raise - if not self.acquired and self.side_effect == Raise: - raise OperationalError(f'Could not acquire lock "{self.lock_id}"') + if not acquired and self.side_effect == Raise: + raise OperationalError(f'Could not acquire lock "{self.lock_id}"') + + return acquired + + def release(self) -> None: + if self.xact: + raise RuntimeError("Advisory locks with xact=True cannot be manually released.") + + with connections[self.using].cursor() as cursor: + release_sql = f'pg_advisory_unlock{"_shared" if self.shared else ""}' + cursor.execute(f"SELECT {release_sql}({self.int_lock_id})") + + def __enter__(self): + self._transaction_ctx = contextlib.ExitStack() + if self.xact: + if self.in_transaction(): + raise RuntimeError( + "Advisory locks with xact=True cannot run inside a transaction." + " Use the functional interface, i.e. pglock.advisory(...).acquire()" + ) + + # Transactional locks always create a durable transaction + self._transaction_ctx.enter_context(transaction.atomic(using=self.using, durable=True)) + + self._transaction_ctx.__enter__() + + self._acquired = self.acquire() - self.stack = contextlib.ExitStack() - if self.acquired and connections[self.using].in_atomic_block: - # Create a savepoint so that we can successfully release - # the lock if the transaction errors - self.stack.enter_context(transaction.atomic(using=self.using)) + self._savepoint_ctx = contextlib.ExitStack() + if self._acquired and not self.xact and self.in_transaction(): + # Create a savepoint so that we can successfully release + # the lock if the transaction errors + self._savepoint_ctx.enter_context(transaction.atomic(using=self.using)) - self.stack.__enter__() + self._savepoint_ctx.__enter__() - return self.acquired + return self._acquired def __exit__(self, exc_type, exc_value, traceback): - self.stack.__exit__(exc_type, exc_value, traceback) + self._savepoint_ctx.__exit__(exc_type, exc_value, traceback) + self._transaction_ctx.__exit__(exc_type, exc_value, traceback) - if self.acquired: - with connections[self.using].cursor() as cursor: - cursor.execute(f"SELECT {self.release}({self.int_lock_id})") + if self._acquired and not self.xact: + self.release() def model( diff --git a/pglock/tests/test_core.py b/pglock/tests/test_core.py index 17bb4c5..da1413d 100644 --- a/pglock/tests/test_core.py +++ b/pglock/tests/test_core.py @@ -245,7 +245,7 @@ def assert_lock_acquired(): @pytest.mark.django_db(transaction=True) -def test_advisory_transaction(reraise): +def test_advisory_inside_transaction(reraise): """Test errored transaction behavior for advisory locks""" barrier = threading.Barrier(2) rand_val = str(random.random()) @@ -284,6 +284,68 @@ def hold_lock_and_error(): acquired.join() +@pytest.mark.django_db(transaction=True) +def test_advisory_xact(reraise): + """Test basic transactional advisory lock behavior""" + barrier = threading.Barrier(2) + rand_val = str(random.random()) + + @reraise.wrap + def assert_lock_not_acquired(): + barrier.wait(timeout=5) + + with pglock.advisory(rand_val, xact=True, timeout=0) as acquired: + assert not acquired + + barrier.wait(timeout=5) + barrier.wait(timeout=5) + + with transaction.atomic(): + assert not pglock.advisory(rand_val, xact=True, timeout=0).acquire() + + barrier.wait(timeout=5) + + @reraise.wrap + def assert_lock_acquired(): + with pglock.advisory(rand_val, xact=True) as acquired: + assert acquired + barrier.wait(timeout=5) + barrier.wait(timeout=5) + + with transaction.atomic(): + assert pglock.advisory(rand_val, xact=True).acquire() + barrier.wait(timeout=5) + barrier.wait(timeout=5) + + not_acquired = threading.Thread(target=assert_lock_not_acquired) + acquired = threading.Thread(target=assert_lock_acquired) + not_acquired.start() + acquired.start() + not_acquired.join() + acquired.join() + + +@pytest.mark.django_db(transaction=True) +def test_advisory_xact_usage(): + """Test basic error handling scenarios with pglock.advisory(xact=True).""" + lock_id = str(random.random()) + + with pytest.raises(RuntimeError, match="Must be in a transaction"): + pglock.advisory(lock_id, xact=True).acquire() + + with transaction.atomic(): + with pytest.raises(RuntimeError, match="cannot run inside a transaction"): + with pglock.advisory(lock_id, xact=True): + pass + + with transaction.atomic(): + lock = pglock.advisory(lock_id, xact=True) + lock.acquire() + + with pytest.raises(RuntimeError, match="cannot be manually released."): + lock.release() + + def test_advsiory_id(): assert pglock.advisory_id(9223372036854775807) == (2147483647, 4294967295) assert pglock.advisory_id("hello") == (245608543, 3125670444) diff --git a/pyproject.toml b/pyproject.toml index 0ec76f8..7e3da87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ packages = [ exclude = [ "*/tests/" ] -version = "1.5.1" +version = "1.6.0" description = "Postgres locking routines and lock table access." authors = ["Wes Kendall"] classifiers = [