Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ httpx = "==0.28.1"
pytest = "==9.1.1"
pytest-cov = "==7.1.0"
pytest-mock = "==3.15.1"
fakeredis = "==2.36.2"
lupa = "==2.8"
100 changes: 99 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 35 additions & 8 deletions osism/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@ class RedisSemaphore:
a maximum concurrency limit.
"""

# Holders older than this many seconds are considered crashed and reclaimed.
HOLDER_EXPIRY = 60

# Atomic check-and-acquire executed server-side so that the capacity check
# and the slot reservation cannot be interleaved by another client. Without
# this, two clients could both observe a free slot (ZCARD) and both take it
# (ZADD), admitting more than ``maxsize`` holders. Cleanup of expired
# holders happens inside the same call using the caller-supplied ``now`` so
# the reclaim boundary advances on every retry.
_ACQUIRE_LUA = """
local now = tonumber(ARGV[1])
local maxsize = tonumber(ARGV[2])
local identifier = ARGV[3]
local expiry = tonumber(ARGV[4])
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - expiry)
if redis.call('ZCARD', KEYS[1]) < maxsize then
redis.call('ZADD', KEYS[1], now, identifier)
return 1
end
return 0
"""

def __init__(self, redis_client, key, maxsize, timeout=None):
"""Initialize the semaphore.

Expand All @@ -145,16 +167,21 @@ def acquire(self, timeout=None):
"""
timeout = timeout or self.timeout or 10
identifier = str(uuid.uuid4())
now = time.time()
end_time = now + timeout
end_time = time.time() + timeout

while time.time() < end_time:
# Clean up expired holders
self.redis.zremrangebyscore(self.key, 0, now - 60)

# Try to acquire
if self.redis.zcard(self.key) < self.maxsize:
self.redis.zadd(self.key, {identifier: now})
now = time.time()
# Atomic cleanup + capacity check + reservation (see _ACQUIRE_LUA).
acquired = self.redis.eval(
self._ACQUIRE_LUA,
1,
self.key,
now,
self.maxsize,
identifier,
self.HOLDER_EXPIRY,
)
if acquired:
self.identifier = identifier
return True

Expand Down
96 changes: 96 additions & 0 deletions tests/unit/utils/test_init_semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# SPDX-License-Identifier: Apache-2.0

"""Regression tests for :class:`osism.utils.RedisSemaphore.acquire`.

These run the production Lua ``_ACQUIRE_LUA`` against an in-memory
``fakeredis`` server (which executes Lua via ``lupa``), so they exercise the
real server-side check-and-acquire rather than asserting on a mock's recorded
calls. A faithful many-client race needs a real Redis and lives outside the
unit suite; these tests lock in the two invariants the atomic implementation
guarantees:

* the holder set never exceeds ``maxsize`` (TOCTOU over-admit fix), and
* the expired-holder reclaim boundary advances on every retry, so a slot that
frees up while a caller waits is eventually granted (stale-boundary fix).
"""

import time

import fakeredis
import pytest

import osism.utils as utils_pkg


@pytest.fixture
def redis():
"""A fresh in-memory Redis with Lua scripting support per test."""
return fakeredis.FakeStrictRedis()


def test_acquire_then_release_roundtrip(redis):
sem = utils_pkg.RedisSemaphore(redis, "job", 2, timeout=1)

assert sem.acquire() is True
assert sem.identifier is not None
assert redis.zcard("semaphore:job") == 1

sem.release()
assert sem.identifier is None
assert redis.zcard("semaphore:job") == 0


def test_acquire_never_exceeds_maxsize(redis):
"""maxsize concurrent holders are admitted; the next caller is refused and
the holder set stays capped — the invariant the atomic script enforces."""
maxsize = 3
held = []
for _ in range(maxsize):
sem = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1)
assert sem.acquire() is True
held.append(sem)

assert redis.zcard("semaphore:job") == maxsize

# One more, with the set already full: must be refused, set unchanged.
overflow = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1)
assert overflow.acquire() is False
assert redis.zcard("semaphore:job") == maxsize

for sem in held:
sem.release()
assert redis.zcard("semaphore:job") == 0


def test_acquire_returns_false_when_full_for_whole_window(redis):
"""A genuinely full semaphore times out (no slot ever frees).

The holder is freshly scored, so it never ages past HOLDER_EXPIRY within
the short window and the caller exhausts its timeout.
"""
redis.zadd("semaphore:job", {"holder": time.time()})
sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=0.2)

assert sem.acquire() is False
assert sem.identifier is None
assert redis.zcard("semaphore:job") == 1 # untouched


def test_reclaim_boundary_advances_across_retries(redis, mocker):
"""The expiry cutoff is recomputed from the current time on every retry,
so a holder that ages past HOLDER_EXPIRY while the caller waits is
reclaimed and the slot is granted before the timeout.

With a cutoff frozen at the loop's start this caller would never reclaim
the holder and would spuriously time out. HOLDER_EXPIRY is shrunk so the
holder ages out within a fraction of a second.
"""
mocker.patch.object(utils_pkg.RedisSemaphore, "HOLDER_EXPIRY", 0.1)
# One holder occupying the only slot, scored at "now"; it is not yet expired
# on the first attempt but ages past HOLDER_EXPIRY while the caller retries.
redis.zadd("semaphore:job", {"stale": time.time()})
sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=2)

assert sem.acquire() is True
members = {m.decode() for m in redis.zrange("semaphore:job", 0, -1)}
assert members == {sem.identifier} # stale reclaimed, only new holder left