diff --git a/Pipfile b/Pipfile index bcc663a1..56983fb4 100644 --- a/Pipfile +++ b/Pipfile @@ -46,3 +46,5 @@ httpx = "==0.28.1" pytest = "==9.1.1" pytest-cov = "==7.1.0" pytest-mock = "==3.15.1" +fakeredis = "==2.36.2" +lupa = "==2.8" diff --git a/Pipfile.lock b/Pipfile.lock index 94756f4a..9b75c121 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "689396c917898700222f8bc45b24eb4cc076d08d49f88e781bb661f5c425472e" + "sha256": "11fa86c356edb2787814ff0a9eadf8f9cddc9c2c7324a5a07e8cfeabb2931708" }, "pipfile-spec": 6, "requires": {}, @@ -2365,6 +2365,15 @@ "markers": "python_version >= '3.10'", "version": "==7.14.3" }, + "fakeredis": { + "hashes": [ + "sha256:84cbb9c74ca8946c0d2499daadf3a5d0bfe3cfbac71e3398316d1a1eab3421c4", + "sha256:c37a0b307fae3f27ec7c19e59519e57b8c52782e00303df9075361b5ba441be6" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==2.36.2" + }, "h11": { "hashes": [ "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", @@ -2406,6 +2415,80 @@ "markers": "python_version >= '3.10'", "version": "==2.3.0" }, + "lupa": { + "hashes": [ + "sha256:097e7d0f1719a88020b67c82e05d53d7973c166952393afcecfd8434c7e19a15", + "sha256:0b5ebe1a13c45767919c86750b84fe2da9f6288b6f3cea4ce7660bb2abc9d921", + "sha256:1628371c6592a6d5650497a9e31fb2bb3a7e9883c1f301d1111265e484045af9", + "sha256:1ac2b1ec7504e6148cba1bc35ac36c74d18a0ca6d367ffe7e78a3773c2694c0e", + "sha256:24b4d8af5558e549b70daf1547f5c1c1d664ecea9fc790f83efe5d75e9a93797", + "sha256:250e035fdaffe8c87093e3ebc206ac29a26131b1568ea711d780c26001ce96e7", + "sha256:27044f3363047f946b3d3aab9157cbd172b3538ada9ec1baef43432bf7d03a78", + "sha256:281bedc5deb92d31e649a3552edd662449365a635904fa4d5cb4509c7245e34e", + "sha256:2e64acbbd47e9b82a64405a39e0d2b36a5a7dad8ab41c0f3437f572f7d282ba3", + "sha256:32e4e5103bbddcdd2458fb2ccae6c8ba11c9997c711d7e379e0d45551d109c76", + "sha256:33e7e5aebca64b154b0a1679caf79e19254ff37bba51e87abab6848f97cb2de1", + "sha256:348c3f8ecabb6324dcbc05c2740d762ef8fcec7b06c79e45262ab97a217684e3", + "sha256:360056453a7a4eaa4ac5a204c31a5a014b1eb2ee5490603234d2ba831684f1f2", + "sha256:3903c9cf628dae2f56405503247b77a61a3a61bd2dda470e336950c74776d55d", + "sha256:3ffcfd8e19f943ad459136b3f60f085ae4948f024192a93ca4b4ac3023ec88d8", + "sha256:4203fa1659315e939a5304e75001b8cc14234fb3cbb3ed86c049b0cc5d90fcee", + "sha256:450650f91c48c2415b0d59ab3abfcfda3b6efb5b858205f4d4bda8ad141fa529", + "sha256:45fc9da0145ecb0083ef5ff9975116cc784bd0258bdc2bd131ba15483ce18398", + "sha256:4f7c553c1d8cfffbe85d81daef730d12cae4b6002d457542914da0ac8a1145b3", + "sha256:4f81a02806e7c7ad26d8c6fa222c8bef1b0c1b124347c879be880b41339d41e4", + "sha256:4fe5d7a810b64ea8511eb885fc8cdde042ee5ff7b7d08ae78f32449756acb177", + "sha256:54cff414f21f8cd8c6be4aae52541f3b9cd39602b59e3a3db9b5c9f9f674ff18", + "sha256:58e18afed57955b41130e269c78f53d4123ab86e236b53816f4cbffa25cb5d30", + "sha256:5caf45d15d424cee52fd67341e96e2b1dde0658ae90eb156ac56aa0d8330bc38", + "sha256:6c817d5421094507662e5f8feb8cd1e154c10879921c06079b6063be9d8f33c5", + "sha256:6fbcc9911f05c67affbd225fc024268e61e98a18ad1b1c2aed6c8796e4056554", + "sha256:7667001804657496dee9feced2daae5000b4604a3218dd8e6b7b754982ba88b8", + "sha256:7bb223ee8f72d0dc076b0d65296ee72f1c69450f9d2fed5315f7707d98c4a03d", + "sha256:7f210d5a8353e510ea1199c42cf3cbdd630553bf2bc8fb4c00fea06fdec7c798", + "sha256:81b283bfb13cc43fa4910fc98ec110ab861bcb39680f48b266f99d6e3be1049e", + "sha256:81f2d843ce668b653146c007467570210ae44be51dac6926666c51d49536f307", + "sha256:86f6f668966965b15247dc32d064cfe7be67b71e584ccfacbe2f637575296878", + "sha256:891f72e0bffbed1e4175f975aeb2a083956586a100066525e1be485f617f7b25", + "sha256:8cf4f064a0e5531afce2d7d750120c10c10f9529139af6ca6150d13151034398", + "sha256:91d622777febda3ab1bed1d45295f2f32a4680c7b3d7caf8c669998ed5c44118", + "sha256:951496471056061598a7d1729a6cdf48d662fec777a9f2d8aa5a1e62fd30e5a5", + "sha256:97bd01e90b8031e56a5fd5bb70605aea09f1dba675c1140308a52780f93d06f1", + "sha256:9e0d11b8f3a8dac6413f704fef7161d048bb10c58bdac6cbffa5e60efa56e9a3", + "sha256:9e304fb1c50cf23fd8882afbe1aa87525ef8a72667bcab3b37b2bbb2bc542269", + "sha256:9e76e45057cfcaa20ee3422c2289a91f9d51783d020da3570ee226de8f6e71cd", + "sha256:9f3f3955f65f9fde2dc6eda3041ccd394cf54d4bf083f0cdf6feb3d58e5f38d3", + "sha256:9f6f41c91366e7d0d474f87d81c1274af861f40812bf729c9f97ab4c8f3c7ac8", + "sha256:a295f87b5b7ebbfd5191932e8cb0e51df3c7769101ac6b6c7d7c9fb27bfd1307", + "sha256:a591b9947ca347b41a63370e121d6e2b1458fe6dde9ae065029ec10a37f25ff4", + "sha256:ac6b6e8d0e617e26a98cbb44880bcd75de5d32b3ad7b3b3793583909292b47ed", + "sha256:b036738282a5acd2e71fdddb317c9df8b87c1673aa57f403d05fcc2be8abc4ba", + "sha256:b12e43c1fb787189dfc28cd604aef0baa2cb95e27da19498d520361d0ace070a", + "sha256:b9bddb09acfffb4f828f790f444b11dc0cca591afea1a244d9329eea2d20c003", + "sha256:ba3a7dd839f90c3d2e53bebe3c192b1f3f9fd720a6781256405123211fd0dce6", + "sha256:bfc470012ef66ad064c7bd77416af03a3452ef630b04b9012595ea13f2e54518", + "sha256:c2a5fd15dc62374e1661a55f01744c9ec1c56f291ba4a0749d3af2174556e78f", + "sha256:ce86dff1ee7f7cf45f5622065ae991949dd7bb1703581cbc58a630137bb7ccf9", + "sha256:ce9404c661dbac65cc9bed351ad45e797af93d30d70be309a3fa8209ac86d93b", + "sha256:d3d0cde2c77588d1c60875a4f34f059513476c6e1775351897195b51e0f3df08", + "sha256:d7edb13a7a5250b5c6c22d1495d9e842b5c9fc5081c8fe6b5efe2112fe3e41f9", + "sha256:d8022641b9ec8ecf2c5ecbe9f47e5a70e0b87c4b5ae921b92cb02a638e0acd08", + "sha256:d8766aff03a78c80ad2d188a8bdb216de5ec838359cd87e05bbdfa56394a6105", + "sha256:dc51250e76367a3e27fcd01dc769b9bfcbbc34f48df48dde53d6af6e75b7eaa5", + "sha256:e8d4f4dd4acf4a0e42adc6b1ad220e1c86fe3028402c2f78bd0728a6d241bbe9", + "sha256:f4342f4de76ae7ce2ab0672d36003bdb7e1a33252f293b569298ddd792e70e33", + "sha256:f4d01b2a08c70bbb883a9e082b6b36b89121ed5910b710f1ba11c73295ff4fba", + "sha256:f5a6af145b0ea818f01d27bfe2583a4b538570bef61d22c8773e0eccf011234c", + "sha256:f6ddca4774d5ca451768a95e378a3aa041076e29f4613b8562f8e98efb6690fd", + "sha256:f6f603391dffb256e36a79fd2044084d5f4b8a0a4c0e5ad291cd3ab3aaf1fd0a", + "sha256:f711a8ab0486b9ac6fdda94a22ddcfbc9f0d4a27e3a8cf1bf79c6e48b33017c1", + "sha256:f8a22088a552828958603323f0a5c4b3e11e03b75d0bf4c965ef879de9b60a8d", + "sha256:fc47f536ac13a79cef47d29a2b205576a22841f042a2bcec1676b95806e7706a" + ], + "index": "pypi", + "markers": "python_version >= '3.8'", + "version": "==2.8" + }, "packaging": { "hashes": [ "sha256:5fc45236b9446107ff2415ce77c807cee2862cb6fac22b8a73826d0693b0980e", @@ -2455,6 +2538,21 @@ "index": "pypi", "markers": "python_version >= '3.9'", "version": "==3.15.1" + }, + "redis": { + "hashes": [ + "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", + "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f" + ], + "markers": "python_version >= '3.9'", + "version": "==6.4.0" + }, + "sortedcontainers": { + "hashes": [ + "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", + "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0" + ], + "version": "==2.4.0" } } } diff --git a/osism/utils/__init__.py b/osism/utils/__init__.py index 719250ea..fd6a1a1a 100644 --- a/osism/utils/__init__.py +++ b/osism/utils/__init__.py @@ -119,6 +119,28 @@ class RedisSemaphore: a maximum concurrency limit. """ + # Holders older than this many seconds are considered crashed and reclaimed. + HOLDER_EXPIRY = 60 + + # Atomic check-and-acquire executed server-side so that the capacity check + # and the slot reservation cannot be interleaved by another client. Without + # this, two clients could both observe a free slot (ZCARD) and both take it + # (ZADD), admitting more than ``maxsize`` holders. Cleanup of expired + # holders happens inside the same call using the caller-supplied ``now`` so + # the reclaim boundary advances on every retry. + _ACQUIRE_LUA = """ + local now = tonumber(ARGV[1]) + local maxsize = tonumber(ARGV[2]) + local identifier = ARGV[3] + local expiry = tonumber(ARGV[4]) + redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - expiry) + if redis.call('ZCARD', KEYS[1]) < maxsize then + redis.call('ZADD', KEYS[1], now, identifier) + return 1 + end + return 0 + """ + def __init__(self, redis_client, key, maxsize, timeout=None): """Initialize the semaphore. @@ -145,16 +167,21 @@ def acquire(self, timeout=None): """ timeout = timeout or self.timeout or 10 identifier = str(uuid.uuid4()) - now = time.time() - end_time = now + timeout + end_time = time.time() + timeout while time.time() < end_time: - # Clean up expired holders - self.redis.zremrangebyscore(self.key, 0, now - 60) - - # Try to acquire - if self.redis.zcard(self.key) < self.maxsize: - self.redis.zadd(self.key, {identifier: now}) + now = time.time() + # Atomic cleanup + capacity check + reservation (see _ACQUIRE_LUA). + acquired = self.redis.eval( + self._ACQUIRE_LUA, + 1, + self.key, + now, + self.maxsize, + identifier, + self.HOLDER_EXPIRY, + ) + if acquired: self.identifier = identifier return True diff --git a/tests/unit/utils/test_init_semaphore.py b/tests/unit/utils/test_init_semaphore.py new file mode 100644 index 00000000..4cab9847 --- /dev/null +++ b/tests/unit/utils/test_init_semaphore.py @@ -0,0 +1,96 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Regression tests for :class:`osism.utils.RedisSemaphore.acquire`. + +These run the production Lua ``_ACQUIRE_LUA`` against an in-memory +``fakeredis`` server (which executes Lua via ``lupa``), so they exercise the +real server-side check-and-acquire rather than asserting on a mock's recorded +calls. A faithful many-client race needs a real Redis and lives outside the +unit suite; these tests lock in the two invariants the atomic implementation +guarantees: + +* the holder set never exceeds ``maxsize`` (TOCTOU over-admit fix), and +* the expired-holder reclaim boundary advances on every retry, so a slot that + frees up while a caller waits is eventually granted (stale-boundary fix). +""" + +import time + +import fakeredis +import pytest + +import osism.utils as utils_pkg + + +@pytest.fixture +def redis(): + """A fresh in-memory Redis with Lua scripting support per test.""" + return fakeredis.FakeStrictRedis() + + +def test_acquire_then_release_roundtrip(redis): + sem = utils_pkg.RedisSemaphore(redis, "job", 2, timeout=1) + + assert sem.acquire() is True + assert sem.identifier is not None + assert redis.zcard("semaphore:job") == 1 + + sem.release() + assert sem.identifier is None + assert redis.zcard("semaphore:job") == 0 + + +def test_acquire_never_exceeds_maxsize(redis): + """maxsize concurrent holders are admitted; the next caller is refused and + the holder set stays capped — the invariant the atomic script enforces.""" + maxsize = 3 + held = [] + for _ in range(maxsize): + sem = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1) + assert sem.acquire() is True + held.append(sem) + + assert redis.zcard("semaphore:job") == maxsize + + # One more, with the set already full: must be refused, set unchanged. + overflow = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1) + assert overflow.acquire() is False + assert redis.zcard("semaphore:job") == maxsize + + for sem in held: + sem.release() + assert redis.zcard("semaphore:job") == 0 + + +def test_acquire_returns_false_when_full_for_whole_window(redis): + """A genuinely full semaphore times out (no slot ever frees). + + The holder is freshly scored, so it never ages past HOLDER_EXPIRY within + the short window and the caller exhausts its timeout. + """ + redis.zadd("semaphore:job", {"holder": time.time()}) + sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=0.2) + + assert sem.acquire() is False + assert sem.identifier is None + assert redis.zcard("semaphore:job") == 1 # untouched + + +def test_reclaim_boundary_advances_across_retries(redis, mocker): + """The expiry cutoff is recomputed from the current time on every retry, + so a holder that ages past HOLDER_EXPIRY while the caller waits is + reclaimed and the slot is granted before the timeout. + + With a cutoff frozen at the loop's start this caller would never reclaim + the holder and would spuriously time out. HOLDER_EXPIRY is shrunk so the + holder ages out within a fraction of a second. + """ + mocker.patch.object(utils_pkg.RedisSemaphore, "HOLDER_EXPIRY", 0.1) + # One holder occupying the only slot, scored at "now"; it is not yet expired + # on the first attempt but ages past HOLDER_EXPIRY while the caller retries. + redis.zadd("semaphore:job", {"stale": time.time()}) + sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=2) + + assert sem.acquire() is True + members = {m.decode() for m in redis.zrange("semaphore:job", 0, -1)} + assert members == {sem.identifier} # stale reclaimed, only new holder left