Postgres advisory lock.
A typical deadlock in a Django app.
def test_deadlock_without_transaction_advisory_lock(transactional_db):
import threading
from django.db import transaction
from myapp.models import Book
lock_id = "foo.bar"
book1 = Book.objects.create(name="Book-1")
book2 = Book.objects.create(name="Book-2")
caught = None
event = threading.Event()
def update_1(e, _lock_id):
nonlocal caught
try:
with transaction.atomic():
e.wait()
Book.objects.filter(id=book1.pk).update(name="Name-1")
Book.objects.filter(id=book2.pk).update(name="Name-2")
except Exception as exc:
caught = exc
def update_2(e, _lock_id):
nonlocal caught
try:
with transaction.atomic():
e.wait()
Book.objects.filter(id=book2.pk).update(name="Name3")
Book.objects.filter(id=book1.pk).update(name="Name4")
except Exception as exc:
caught = exc
thread_a = threading.Thread(target=update_1, args=(event, lock_id))
thread_a.start()
thread_b = threading.Thread(target=update_2, args=(event, lock_id))
thread_b.start()
event.set()
thread_a.join()
thread_b.join()
assert caught is not None
Avoid a deadlock with an advisory lock.
from enum import Enum
import contextlib
import zlib
from typing import Optional, Callable
class LockQuery(Enum):
TRANSACTION_ADVISORY_LOCK_BASE = "SELECT pg_advisory_xact_lock({lock_id})"
TRANSACTION_ADVISORY_LOCK_TUPLE_FORMAT = (
"SELECT pg_advisory_xact_lock({lock_id_1}, {lock_id_2})"
)
def generate_crc_32_lock_id(lock_id: str) -> int:
return zlib.crc32(lock_id.encode("utf-8"))
@contextlib.contextmanager
def transaction_advisory_lock(
lock_id: str,
using: Optional[str] = None,
key_generator: Optional[Callable[[str], int]] = generate_crc_32_lock_id,
):
"""
Obtains an exclusive transaction-level advisory lock.
"""
using = using or DEFAULT_DB_ALIAS
lock_id = key_generator(lock_id)
match lock_id:
case int():
query = LockQuery.TRANSACTION_ADVISORY_LOCK_BASE.value
command = query.format(lock_id=lock_id)
case [int() as l1, int() as l2]:
query = LockQuery.TRANSACTION_ADVISORY_LOCK_TUPLE_FORMAT.value
command = query.format(lock_id_1=l1, lock_id_2=l2)
case _:
raise ValueError("Unsupported param format.")
connection = connections[using]
if not connection.in_atomic_block:
raise RuntimeError("Must be inside a transaction.")
cursor = connection.cursor()
cursor.execute(command)
try:
yield
finally:
cursor.close()