From 03c4cff89b0486c5febf807c1aecc2fa7a1c79b9 Mon Sep 17 00:00:00 2001 From: Roger Luethi Date: Wed, 24 Jun 2026 09:00:26 +0200 Subject: [PATCH] utils: Make RedisSemaphore.acquire atomic RedisSemaphore.acquire() checked capacity and took a slot in two separate Redis round-trips: if self.redis.zcard(self.key) < self.maxsize: # CHECK self.redis.zadd(self.key, {identifier: now}) # USE This is a time-of-check-to-time-of-use race. Two clients can both run ZCARD, both observe a free slot, and both proceed to ZADD, leaving the sorted set with more than maxsize members. The semaphore that bounds concurrent NetBox connections (create_netbox_semaphore, maxsize = NETBOX_MAX_CONNECTIONS) therefore over-admits under contention -- a local 50-client harness peaked at 22 holders against maxsize 5. Replace the check-and-acquire with a single server-side Lua EVAL that performs the expired-holder cleanup, the capacity check, and the slot reservation in one atomic step, so no other client can interleave between the check and the reservation. The same change fixes a second, opposite defect in the method: the cleanup boundary (now - 60) was computed once before the retry loop, so it never advanced while a caller waited and holders that expired during the wait were never reclaimed, causing spurious timeouts. now is now recomputed on every iteration and passed to the script, so the reclaim boundary advances with wall-clock time. The 60-second window is named HOLDER_EXPIRY. The race itself requires genuinely concurrent clients against a real Redis and is not reproducible with a mock; it was validated locally with a threaded real-Redis harness. The committed regression tests run the production Lua script against in-memory fakeredis (added as a dev dependency, with lupa for Lua support) and pin the two invariants the fix guarantees: the holder set never exceeds maxsize, and the reclaim boundary advances across retries so a slot freed mid-wait is granted. Assisted-by: Claude:claude-opus-4-8 Signed-off-by: Roger Luethi --- Pipfile | 2 + Pipfile.lock | 100 +++++++++++++++++++++++- osism/utils/__init__.py | 43 ++++++++-- tests/unit/utils/test_init_semaphore.py | 96 +++++++++++++++++++++++ 4 files changed, 232 insertions(+), 9 deletions(-) create mode 100644 tests/unit/utils/test_init_semaphore.py diff --git a/Pipfile b/Pipfile index bcc663a1..56983fb4 100644 --- a/Pipfile +++ b/Pipfile @@ -46,3 +46,5 @@ httpx = "==0.28.1" pytest = "==9.1.1" pytest-cov = "==7.1.0" pytest-mock = "==3.15.1" +fakeredis = "==2.36.2" +lupa = "==2.8" diff --git a/Pipfile.lock b/Pipfile.lock index 94756f4a..9b75c121 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "689396c917898700222f8bc45b24eb4cc076d08d49f88e781bb661f5c425472e" + "sha256": "11fa86c356edb2787814ff0a9eadf8f9cddc9c2c7324a5a07e8cfeabb2931708" }, "pipfile-spec": 6, "requires": {}, @@ -2365,6 +2365,15 @@ "markers": "python_version >= '3.10'", "version": "==7.14.3" }, + "fakeredis": { + "hashes": [ + "sha256:84cbb9c74ca8946c0d2499daadf3a5d0bfe3cfbac71e3398316d1a1eab3421c4", + "sha256:c37a0b307fae3f27ec7c19e59519e57b8c52782e00303df9075361b5ba441be6" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==2.36.2" + }, "h11": { "hashes": [ "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", @@ -2406,6 +2415,80 @@ "markers": "python_version >= '3.10'", "version": "==2.3.0" }, + "lupa": { + "hashes": [ + "sha256:097e7d0f1719a88020b67c82e05d53d7973c166952393afcecfd8434c7e19a15", + "sha256:0b5ebe1a13c45767919c86750b84fe2da9f6288b6f3cea4ce7660bb2abc9d921", + "sha256:1628371c6592a6d5650497a9e31fb2bb3a7e9883c1f301d1111265e484045af9", + "sha256:1ac2b1ec7504e6148cba1bc35ac36c74d18a0ca6d367ffe7e78a3773c2694c0e", + "sha256:24b4d8af5558e549b70daf1547f5c1c1d664ecea9fc790f83efe5d75e9a93797", + "sha256:250e035fdaffe8c87093e3ebc206ac29a26131b1568ea711d780c26001ce96e7", + "sha256:27044f3363047f946b3d3aab9157cbd172b3538ada9ec1baef43432bf7d03a78", + "sha256:281bedc5deb92d31e649a3552edd662449365a635904fa4d5cb4509c7245e34e", + "sha256:2e64acbbd47e9b82a64405a39e0d2b36a5a7dad8ab41c0f3437f572f7d282ba3", + "sha256:32e4e5103bbddcdd2458fb2ccae6c8ba11c9997c711d7e379e0d45551d109c76", + "sha256:33e7e5aebca64b154b0a1679caf79e19254ff37bba51e87abab6848f97cb2de1", + "sha256:348c3f8ecabb6324dcbc05c2740d762ef8fcec7b06c79e45262ab97a217684e3", + "sha256:360056453a7a4eaa4ac5a204c31a5a014b1eb2ee5490603234d2ba831684f1f2", + "sha256:3903c9cf628dae2f56405503247b77a61a3a61bd2dda470e336950c74776d55d", + "sha256:3ffcfd8e19f943ad459136b3f60f085ae4948f024192a93ca4b4ac3023ec88d8", + "sha256:4203fa1659315e939a5304e75001b8cc14234fb3cbb3ed86c049b0cc5d90fcee", + "sha256:450650f91c48c2415b0d59ab3abfcfda3b6efb5b858205f4d4bda8ad141fa529", + "sha256:45fc9da0145ecb0083ef5ff9975116cc784bd0258bdc2bd131ba15483ce18398", + "sha256:4f7c553c1d8cfffbe85d81daef730d12cae4b6002d457542914da0ac8a1145b3", + "sha256:4f81a02806e7c7ad26d8c6fa222c8bef1b0c1b124347c879be880b41339d41e4", + "sha256:4fe5d7a810b64ea8511eb885fc8cdde042ee5ff7b7d08ae78f32449756acb177", + "sha256:54cff414f21f8cd8c6be4aae52541f3b9cd39602b59e3a3db9b5c9f9f674ff18", + "sha256:58e18afed57955b41130e269c78f53d4123ab86e236b53816f4cbffa25cb5d30", + "sha256:5caf45d15d424cee52fd67341e96e2b1dde0658ae90eb156ac56aa0d8330bc38", + "sha256:6c817d5421094507662e5f8feb8cd1e154c10879921c06079b6063be9d8f33c5", + "sha256:6fbcc9911f05c67affbd225fc024268e61e98a18ad1b1c2aed6c8796e4056554", + "sha256:7667001804657496dee9feced2daae5000b4604a3218dd8e6b7b754982ba88b8", + "sha256:7bb223ee8f72d0dc076b0d65296ee72f1c69450f9d2fed5315f7707d98c4a03d", + "sha256:7f210d5a8353e510ea1199c42cf3cbdd630553bf2bc8fb4c00fea06fdec7c798", + "sha256:81b283bfb13cc43fa4910fc98ec110ab861bcb39680f48b266f99d6e3be1049e", + "sha256:81f2d843ce668b653146c007467570210ae44be51dac6926666c51d49536f307", + "sha256:86f6f668966965b15247dc32d064cfe7be67b71e584ccfacbe2f637575296878", + "sha256:891f72e0bffbed1e4175f975aeb2a083956586a100066525e1be485f617f7b25", + "sha256:8cf4f064a0e5531afce2d7d750120c10c10f9529139af6ca6150d13151034398", + "sha256:91d622777febda3ab1bed1d45295f2f32a4680c7b3d7caf8c669998ed5c44118", + "sha256:951496471056061598a7d1729a6cdf48d662fec777a9f2d8aa5a1e62fd30e5a5", + "sha256:97bd01e90b8031e56a5fd5bb70605aea09f1dba675c1140308a52780f93d06f1", + "sha256:9e0d11b8f3a8dac6413f704fef7161d048bb10c58bdac6cbffa5e60efa56e9a3", + "sha256:9e304fb1c50cf23fd8882afbe1aa87525ef8a72667bcab3b37b2bbb2bc542269", + "sha256:9e76e45057cfcaa20ee3422c2289a91f9d51783d020da3570ee226de8f6e71cd", + "sha256:9f3f3955f65f9fde2dc6eda3041ccd394cf54d4bf083f0cdf6feb3d58e5f38d3", + "sha256:9f6f41c91366e7d0d474f87d81c1274af861f40812bf729c9f97ab4c8f3c7ac8", + "sha256:a295f87b5b7ebbfd5191932e8cb0e51df3c7769101ac6b6c7d7c9fb27bfd1307", + "sha256:a591b9947ca347b41a63370e121d6e2b1458fe6dde9ae065029ec10a37f25ff4", + "sha256:ac6b6e8d0e617e26a98cbb44880bcd75de5d32b3ad7b3b3793583909292b47ed", + "sha256:b036738282a5acd2e71fdddb317c9df8b87c1673aa57f403d05fcc2be8abc4ba", + "sha256:b12e43c1fb787189dfc28cd604aef0baa2cb95e27da19498d520361d0ace070a", + "sha256:b9bddb09acfffb4f828f790f444b11dc0cca591afea1a244d9329eea2d20c003", + "sha256:ba3a7dd839f90c3d2e53bebe3c192b1f3f9fd720a6781256405123211fd0dce6", + "sha256:bfc470012ef66ad064c7bd77416af03a3452ef630b04b9012595ea13f2e54518", + "sha256:c2a5fd15dc62374e1661a55f01744c9ec1c56f291ba4a0749d3af2174556e78f", + "sha256:ce86dff1ee7f7cf45f5622065ae991949dd7bb1703581cbc58a630137bb7ccf9", + "sha256:ce9404c661dbac65cc9bed351ad45e797af93d30d70be309a3fa8209ac86d93b", + "sha256:d3d0cde2c77588d1c60875a4f34f059513476c6e1775351897195b51e0f3df08", + "sha256:d7edb13a7a5250b5c6c22d1495d9e842b5c9fc5081c8fe6b5efe2112fe3e41f9", + "sha256:d8022641b9ec8ecf2c5ecbe9f47e5a70e0b87c4b5ae921b92cb02a638e0acd08", + "sha256:d8766aff03a78c80ad2d188a8bdb216de5ec838359cd87e05bbdfa56394a6105", + "sha256:dc51250e76367a3e27fcd01dc769b9bfcbbc34f48df48dde53d6af6e75b7eaa5", + "sha256:e8d4f4dd4acf4a0e42adc6b1ad220e1c86fe3028402c2f78bd0728a6d241bbe9", + "sha256:f4342f4de76ae7ce2ab0672d36003bdb7e1a33252f293b569298ddd792e70e33", + "sha256:f4d01b2a08c70bbb883a9e082b6b36b89121ed5910b710f1ba11c73295ff4fba", + "sha256:f5a6af145b0ea818f01d27bfe2583a4b538570bef61d22c8773e0eccf011234c", + "sha256:f6ddca4774d5ca451768a95e378a3aa041076e29f4613b8562f8e98efb6690fd", + "sha256:f6f603391dffb256e36a79fd2044084d5f4b8a0a4c0e5ad291cd3ab3aaf1fd0a", + "sha256:f711a8ab0486b9ac6fdda94a22ddcfbc9f0d4a27e3a8cf1bf79c6e48b33017c1", + "sha256:f8a22088a552828958603323f0a5c4b3e11e03b75d0bf4c965ef879de9b60a8d", + "sha256:fc47f536ac13a79cef47d29a2b205576a22841f042a2bcec1676b95806e7706a" + ], + "index": "pypi", + "markers": "python_version >= '3.8'", + "version": "==2.8" + }, "packaging": { "hashes": [ "sha256:5fc45236b9446107ff2415ce77c807cee2862cb6fac22b8a73826d0693b0980e", @@ -2455,6 +2538,21 @@ "index": "pypi", "markers": "python_version >= '3.9'", "version": "==3.15.1" + }, + "redis": { + "hashes": [ + "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", + "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f" + ], + "markers": "python_version >= '3.9'", + "version": "==6.4.0" + }, + "sortedcontainers": { + "hashes": [ + "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", + "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0" + ], + "version": "==2.4.0" } } } diff --git a/osism/utils/__init__.py b/osism/utils/__init__.py index 719250ea..fd6a1a1a 100644 --- a/osism/utils/__init__.py +++ b/osism/utils/__init__.py @@ -119,6 +119,28 @@ class RedisSemaphore: a maximum concurrency limit. """ + # Holders older than this many seconds are considered crashed and reclaimed. + HOLDER_EXPIRY = 60 + + # Atomic check-and-acquire executed server-side so that the capacity check + # and the slot reservation cannot be interleaved by another client. Without + # this, two clients could both observe a free slot (ZCARD) and both take it + # (ZADD), admitting more than ``maxsize`` holders. Cleanup of expired + # holders happens inside the same call using the caller-supplied ``now`` so + # the reclaim boundary advances on every retry. + _ACQUIRE_LUA = """ + local now = tonumber(ARGV[1]) + local maxsize = tonumber(ARGV[2]) + local identifier = ARGV[3] + local expiry = tonumber(ARGV[4]) + redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - expiry) + if redis.call('ZCARD', KEYS[1]) < maxsize then + redis.call('ZADD', KEYS[1], now, identifier) + return 1 + end + return 0 + """ + def __init__(self, redis_client, key, maxsize, timeout=None): """Initialize the semaphore. @@ -145,16 +167,21 @@ def acquire(self, timeout=None): """ timeout = timeout or self.timeout or 10 identifier = str(uuid.uuid4()) - now = time.time() - end_time = now + timeout + end_time = time.time() + timeout while time.time() < end_time: - # Clean up expired holders - self.redis.zremrangebyscore(self.key, 0, now - 60) - - # Try to acquire - if self.redis.zcard(self.key) < self.maxsize: - self.redis.zadd(self.key, {identifier: now}) + now = time.time() + # Atomic cleanup + capacity check + reservation (see _ACQUIRE_LUA). + acquired = self.redis.eval( + self._ACQUIRE_LUA, + 1, + self.key, + now, + self.maxsize, + identifier, + self.HOLDER_EXPIRY, + ) + if acquired: self.identifier = identifier return True diff --git a/tests/unit/utils/test_init_semaphore.py b/tests/unit/utils/test_init_semaphore.py new file mode 100644 index 00000000..4cab9847 --- /dev/null +++ b/tests/unit/utils/test_init_semaphore.py @@ -0,0 +1,96 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Regression tests for :class:`osism.utils.RedisSemaphore.acquire`. + +These run the production Lua ``_ACQUIRE_LUA`` against an in-memory +``fakeredis`` server (which executes Lua via ``lupa``), so they exercise the +real server-side check-and-acquire rather than asserting on a mock's recorded +calls. A faithful many-client race needs a real Redis and lives outside the +unit suite; these tests lock in the two invariants the atomic implementation +guarantees: + +* the holder set never exceeds ``maxsize`` (TOCTOU over-admit fix), and +* the expired-holder reclaim boundary advances on every retry, so a slot that + frees up while a caller waits is eventually granted (stale-boundary fix). +""" + +import time + +import fakeredis +import pytest + +import osism.utils as utils_pkg + + +@pytest.fixture +def redis(): + """A fresh in-memory Redis with Lua scripting support per test.""" + return fakeredis.FakeStrictRedis() + + +def test_acquire_then_release_roundtrip(redis): + sem = utils_pkg.RedisSemaphore(redis, "job", 2, timeout=1) + + assert sem.acquire() is True + assert sem.identifier is not None + assert redis.zcard("semaphore:job") == 1 + + sem.release() + assert sem.identifier is None + assert redis.zcard("semaphore:job") == 0 + + +def test_acquire_never_exceeds_maxsize(redis): + """maxsize concurrent holders are admitted; the next caller is refused and + the holder set stays capped — the invariant the atomic script enforces.""" + maxsize = 3 + held = [] + for _ in range(maxsize): + sem = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1) + assert sem.acquire() is True + held.append(sem) + + assert redis.zcard("semaphore:job") == maxsize + + # One more, with the set already full: must be refused, set unchanged. + overflow = utils_pkg.RedisSemaphore(redis, "job", maxsize, timeout=1) + assert overflow.acquire() is False + assert redis.zcard("semaphore:job") == maxsize + + for sem in held: + sem.release() + assert redis.zcard("semaphore:job") == 0 + + +def test_acquire_returns_false_when_full_for_whole_window(redis): + """A genuinely full semaphore times out (no slot ever frees). + + The holder is freshly scored, so it never ages past HOLDER_EXPIRY within + the short window and the caller exhausts its timeout. + """ + redis.zadd("semaphore:job", {"holder": time.time()}) + sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=0.2) + + assert sem.acquire() is False + assert sem.identifier is None + assert redis.zcard("semaphore:job") == 1 # untouched + + +def test_reclaim_boundary_advances_across_retries(redis, mocker): + """The expiry cutoff is recomputed from the current time on every retry, + so a holder that ages past HOLDER_EXPIRY while the caller waits is + reclaimed and the slot is granted before the timeout. + + With a cutoff frozen at the loop's start this caller would never reclaim + the holder and would spuriously time out. HOLDER_EXPIRY is shrunk so the + holder ages out within a fraction of a second. + """ + mocker.patch.object(utils_pkg.RedisSemaphore, "HOLDER_EXPIRY", 0.1) + # One holder occupying the only slot, scored at "now"; it is not yet expired + # on the first attempt but ages past HOLDER_EXPIRY while the caller retries. + redis.zadd("semaphore:job", {"stale": time.time()}) + sem = utils_pkg.RedisSemaphore(redis, "job", 1, timeout=2) + + assert sem.acquire() is True + members = {m.decode() for m in redis.zrange("semaphore:job", 0, -1)} + assert members == {sem.identifier} # stale reclaimed, only new holder left