From b50ace69155806f4c150db3dec41a99451791d37 Mon Sep 17 00:00:00 2001 From: Christian Berendt Date: Mon, 22 Jun 2026 21:32:00 +0200 Subject: [PATCH] Add integration test for OpenStack events WebSocket endpoint Cover the /v1/events/openstack WebSocket and the in-process websocket_manager via the FastAPI TestClient: connection accept, filter-message acknowledgment, delivery of a matching event, filtering-out of a non-matching event, and connection cleanup on disconnect. The TestClient fixture is module-scoped because the manager's module-level asyncio primitives bind to the first event loop that touches them; sharing one loop across the module keeps them valid. Events are pushed onto the loop-bound queue via the app's portal. Assisted-by: Claude:claude-fable-5 Signed-off-by: Christian Berendt --- Pipfile | 1 + Pipfile.lock | 11 +- tests/integration/test_api_websocket.py | 154 ++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_api_websocket.py diff --git a/Pipfile b/Pipfile index 028fa2a6..42e88578 100644 --- a/Pipfile +++ b/Pipfile @@ -46,5 +46,6 @@ httpx = "==0.28.1" pytest = "==9.1.1" pytest-cov = "==7.1.0" pytest-mock = "==3.15.1" +pytest-timeout = "==2.4.0" fakeredis = "==2.36.2" lupa = "==2.8" diff --git a/Pipfile.lock b/Pipfile.lock index 3151c34b..e6eafc25 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "66201c6c1ac1466bd21fafb5882279d442b5f550045ff85a8037c2ff357704b9" + "sha256": "5d42947735881532edef4a6b47bdb594a3d9b02b064b509fd0176006f43544c3" }, "pipfile-spec": 6, "requires": {}, @@ -3189,6 +3189,15 @@ "markers": "python_version >= '3.9'", "version": "==3.15.1" }, + "pytest-timeout": { + "hashes": [ + "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", + "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==2.4.0" + }, "redis": { "hashes": [ "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010", diff --git a/tests/integration/test_api_websocket.py b/tests/integration/test_api_websocket.py new file mode 100644 index 00000000..7d2a6464 --- /dev/null +++ b/tests/integration/test_api_websocket.py @@ -0,0 +1,154 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for the OpenStack events WebSocket endpoint. + +``GET /v1/events/openstack`` upgrades to a WebSocket served by the in-process +``websocket_manager`` (``osism/services/websocket_manager.py``). Driving it +through ``fastapi.testclient.TestClient`` exercises the connect / set-filters / +broadcast / disconnect path end-to-end. The manager broadcasts in-process via +an ``asyncio`` queue, so no service beyond the ``TestClient`` is needed; the +module is marked ``integration`` only to share the FastAPI / ``httpx`` setup +with the sibling facts test and stay in the same Tier 2 batch. +""" + +import uuid + +import pytest + +from osism.services.websocket_manager import EventMessage, websocket_manager + +# Most tests block on ``ws.receive_json()``, which bottoms out in Starlette's +# untimed ``queue.get()``. A regression that stops the server from emitting the +# ack or the matching event would otherwise hang forever and only die on the CI +# wall-clock timeout, disguising the failure. Cap every test so the hang turns +# into a quick failure at the exact ``receive_json()`` call. Everything runs +# in-process and completes in milliseconds; 10 seconds is generous headroom +# even on a slow CI runner. +pytestmark = [pytest.mark.integration, pytest.mark.timeout(10)] + +# Endpoint under test; kept as a constant so a path change touches one line. +WS_ENDPOINT = "/v1/events/openstack" + +# Representative event types shared across the tests: the tests subscribe to +# the compute event and use the baremetal event as the non-matching case. +MATCHING_EVENT_TYPE = "compute.instance.create.end" +NON_MATCHING_EVENT_TYPE = "baremetal.node.power_set.end" + + +def make_event(event_type): + """Build an ``EventMessage`` with a unique payload. + + The unique payload lets tests assert full-message equality and tell events + apart without depending on delivery order. + """ + return EventMessage(event_type, "openstack", {"resource": str(uuid.uuid4())}) + + +def set_filters(ws, **filters): + """Send a ``set_filters`` message and return the acknowledgment. + + Keeps the message shape (``action`` key plus filter lists) in one place so + a protocol change touches one line instead of every test. + """ + ws.send_json({"action": "set_filters", **filters}) + return ws.receive_json() + + +@pytest.fixture(scope="module") +def client(): + """A module-scoped ``TestClient`` bound to the FastAPI app. + + ``osism.api`` is imported lazily because importing it wires the event + bridge to Redis at module load -- safe only in the integration environment + where Redis is up. The fixture is module-scoped on purpose: the global + ``websocket_manager`` owns module-level ``asyncio`` primitives + (``event_queue``, ``_lock``) that bind to the first event loop that touches + them and raise "bound to a different event loop" on any other. Sharing one + ``TestClient`` (one loop) across this module keeps them valid. + """ + from fastapi.testclient import TestClient + + from osism import api + + with TestClient(api.app) as test_client: + yield test_client + + +def test_websocket_connect_is_accepted(client): + """The endpoint accepts the WebSocket upgrade.""" + with client.websocket_connect(WS_ENDPOINT): + pass + + +def test_set_filters_is_acknowledged(client): + """A ``set_filters`` message is processed and acknowledged verbatim.""" + with client.websocket_connect(WS_ENDPOINT) as ws: + ack = set_filters( + ws, + event_filters=[MATCHING_EVENT_TYPE], + node_filters=["server-01"], + service_filters=["compute"], + ) + + assert ack["type"] == "filter_update" + assert ack["status"] == "success" + assert ack["event_filters"] == [MATCHING_EVENT_TYPE] + assert ack["node_filters"] == ["server-01"] + assert ack["service_filters"] == ["compute"] + + +def test_matching_event_is_delivered(client): + """An event matching the connection's filters is delivered intact.""" + with client.websocket_connect(WS_ENDPOINT) as ws: + set_filters(ws, event_filters=[MATCHING_EVENT_TYPE]) + + event = make_event(MATCHING_EVENT_TYPE) + # Push onto the in-process queue from the app's event loop: the queue is + # loop-bound, so enqueuing from the test thread would be unsafe. + client.portal.call(websocket_manager.add_event, event) + + received = ws.receive_json() + + assert received == event.to_dict() + + +def test_non_matching_event_is_filtered_out(client): + """An event that does not match the filters is not delivered.""" + with client.websocket_connect(WS_ENDPOINT) as ws: + set_filters(ws, event_filters=[MATCHING_EVENT_TYPE]) + + non_matching = make_event(NON_MATCHING_EVENT_TYPE) + sentinel = make_event(MATCHING_EVENT_TYPE) + # Both are queued FIFO; the broadcaster skips the non-matching event, so + # the first (and only) message received is the matching sentinel. This + # proves the non-matching event was dropped without an absence/timeout. + client.portal.call(websocket_manager.add_event, non_matching) + client.portal.call(websocket_manager.add_event, sentinel) + + received = ws.receive_json() + + # Assert on the full payload (not just id/event_type): equality with the + # sentinel guards against partial-delivery regressions, and the inequality + # proves the dropped event's body is not what slipped through. + assert received == sentinel.to_dict() + assert received != non_matching.to_dict() + + +def test_disconnect_unregisters_connection(client): + """Disconnecting runs the ``finally`` cleanup and unregisters the socket.""" + before = len(websocket_manager.connections) + + with client.websocket_connect(WS_ENDPOINT) as ws: + set_filters(ws, event_filters=[MATCHING_EVENT_TYPE]) + + event = make_event(MATCHING_EVENT_TYPE) + client.portal.call(websocket_manager.add_event, event) + # Receiving the broadcast proves the connection is registered with the + # manager -- observable behavior instead of peeking at the registry. + assert ws.receive_json() == event.to_dict() + + # A leaked registration has no client-visible symptom (the broadcaster + # silently prunes dead sockets on the next send), so the registry size is + # the only surface that can catch the leak; every other assertion in this + # module sticks to observable behavior. + assert len(websocket_manager.connections) == before