Bases: Generic[T]
Container for multiple async locks which automatically deletes unused locks
Source code in PyDrocsid/multilock.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 | class MultiLock(Generic[T]):
"""Container for multiple async locks which automatically deletes unused locks"""
def __init__(self) -> None:
self.locks: dict[T, Lock] = {}
self.requests: dict[T, int] = {}
def __getitem__(self, key: T) -> _LockContext[T]:
return _LockContext(self, key)
async def acquire(self, key: T) -> None:
lock: Lock = self.locks.setdefault(key, Lock())
self.requests[key] = self.requests.get(key, 0) + 1
await lock.acquire()
def release(self, key: T) -> None:
lock: Lock = self.locks[key]
lock.release()
self.requests[key] -= 1
if not self.requests[key]:
self.locks.pop(key)
self.requests.pop(key)
|