From 27b58ecee37007e2f893a82f0ba09b7deb3c4c5f Mon Sep 17 00:00:00 2001 From: Mark Saroufim Date: Wed, 24 Jun 2026 22:56:47 -0700 Subject: [PATCH] Expose runner queued job counts --- src/kernelbot/api/main.py | 32 +++++++++++++ src/libkernelbot/backend.py | 27 ++++++++++- src/libkernelbot/launchers/__init__.py | 4 +- src/libkernelbot/launchers/github.py | 62 +++++++++++++++++++------- src/libkernelbot/launchers/launcher.py | 28 +++++++++++- src/libkernelbot/launchers/modal.py | 39 ++++++++++++++-- tests/test_admin_api.py | 30 ++++++++++++- tests/test_backend.py | 33 ++++++++++++++ tests/test_github.py | 40 ++++++++++++++++- tests/test_modal.py | 38 +++++++++++++++- 10 files changed, 307 insertions(+), 26 deletions(-) diff --git a/src/kernelbot/api/main.py b/src/kernelbot/api/main.py index 1ceea3fe5..1491a6a51 100644 --- a/src/kernelbot/api/main.py +++ b/src/kernelbot/api/main.py @@ -106,6 +106,28 @@ def get_db(): return backend_instance.db +async def get_runner_queue_status( + gpu_type: str, + req: ProcessedSubmissionRequest | None = None, +) -> dict[str, Any]: + if not backend_instance: + raise HTTPException(status_code=500, detail="Bot instance not initialized") + + config = None + if req is not None and req.task is not None: + config = {"lang": req.task.lang.value} + + status = await backend_instance.get_runner_queue_status(gpu_type, config) + return status.to_dict() + + +async def get_submission_runner_queue_status(submission: dict) -> dict[str, Any] | None: + runs = submission.get("runs") or [] + if not runs: + return None + return await get_runner_queue_status(runs[0]["runner"]) + + async def validate_cli_header( x_popcorn_cli_id: Optional[str] = Header(None, alias="X-Popcorn-Cli-Id"), db_context=Depends(get_db), @@ -567,11 +589,13 @@ async def run_submission_async( sub_id, job_status_id = await enqueue_background_job( req, submission_mode_enum, backend_instance, background_submission_manager ) + runner_queue = await get_runner_queue_status(req.gpus[0], req) return JSONResponse( status_code=202, content={ "details": {"id": sub_id, "job_status_id": job_status_id}, + "runner_queue": runner_queue, "status": "accepted", }, ) @@ -874,6 +898,12 @@ async def get_gpus( raise HTTPException(status_code=500, detail=f"Error fetching GPU data: {e}") from e +@app.get("/runner_queue/{gpu_type}") +async def get_runner_queue(gpu_type: str) -> dict: + await simple_rate_limit() + return await get_runner_queue_status(gpu_type) + + @app.get("/submissions/{leaderboard_name}/{gpu_name}") async def get_submissions( leaderboard_name: str, @@ -985,6 +1015,7 @@ async def get_user_submission( # RunItem is a TypedDict (already a dict), select fields to expose run_fields = ("start_time", "end_time", "mode", "secret", "runner", "score", "passed") + runner_queue = await get_submission_runner_queue_status(submission) return { "id": submission["submission_id"], "leaderboard_id": submission["leaderboard_id"], @@ -1000,6 +1031,7 @@ async def get_user_submission( "error": submission.get("job_error"), "last_heartbeat": submission.get("job_last_heartbeat"), }, + "runner_queue": runner_queue, } except HTTPException: raise diff --git a/src/libkernelbot/backend.py b/src/libkernelbot/backend.py index 60214b28a..50a3d2623 100644 --- a/src/libkernelbot/backend.py +++ b/src/libkernelbot/backend.py @@ -10,7 +10,7 @@ enforce_submission_precheck, should_precheck_submission, ) -from libkernelbot.launchers import Launcher +from libkernelbot.launchers import Launcher, RunnerQueueStatus from libkernelbot.leaderboard_db import LeaderboardDB from libkernelbot.report import ( MultiProgressReporter, @@ -52,6 +52,31 @@ def register_launcher(self, launcher: Launcher): for gpu in launcher.gpus: self.launcher_map[gpu.value] = launcher + async def get_runner_queue_status( + self, gpu_name: str, config: dict | None = None + ) -> RunnerQueueStatus: + gpu = get_gpu_by_name(gpu_name) + if gpu is None: + return RunnerQueueStatus( + runner="unknown", + gpu=gpu_name, + queued_jobs=None, + status="unavailable", + error="unknown gpu", + ) + + launcher = self.launcher_map.get(gpu.value) + if launcher is None: + return RunnerQueueStatus( + runner=gpu.runner, + gpu=gpu.name, + queued_jobs=None, + status="unavailable", + error="runner is not registered", + ) + + return await launcher.get_queue_status(gpu, config) + async def submit_full( self, req: ProcessedSubmissionRequest, diff --git a/src/libkernelbot/launchers/__init__.py b/src/libkernelbot/launchers/__init__.py index df47476ff..2c0a40599 100644 --- a/src/libkernelbot/launchers/__init__.py +++ b/src/libkernelbot/launchers/__init__.py @@ -1,5 +1,5 @@ from .github import GitHubLauncher -from .launcher import Launcher +from .launcher import Launcher, RunnerQueueStatus from .modal import ModalLauncher -__all__ = [Launcher, GitHubLauncher, ModalLauncher] +__all__ = [Launcher, RunnerQueueStatus, GitHubLauncher, ModalLauncher] diff --git a/src/libkernelbot/launchers/github.py b/src/libkernelbot/launchers/github.py index 289f7c665..8e5964418 100644 --- a/src/libkernelbot/launchers/github.py +++ b/src/libkernelbot/launchers/github.py @@ -40,7 +40,7 @@ ) from libkernelbot.utils import KernelBotError, setup_logging -from .launcher import Launcher +from .launcher import Launcher, RunnerQueueStatus logger = setup_logging() @@ -66,6 +66,21 @@ def __init__(self, repo: str, token: str, branch: str): self._token_idx = 0 self.branch = branch + @staticmethod + def _workflow_for_gpu(gpu_type: GPU) -> tuple[str, str | None]: + if gpu_type.value in ["MI300", "MI250", "MI300x8", "MI355X"]: + return "amd_workflow.yml", { + "MI300": "amdgpu-mi300-x86-64", + "MI250": "amdgpu-mi250-x86-64", + "MI300x8": "amdgpu-mi300-8-x86-64", + "MI355X": "arc-runner-set", + }[gpu_type.value] + if gpu_type.value == "B200_Nebius": + return "helion_workflow.yml", None + if gpu_type.value == "NVIDIA": + return "nvidia_workflow.yml", None + raise ValueError(f"Invalid GPU type: {gpu_type.value}") + @staticmethod def _load_github_tokens(fallback_token: str) -> list[str]: primary = (os.getenv("GITHUB_TOKEN") or fallback_token).strip() @@ -94,29 +109,16 @@ async def run_submission( # noqa: C901 self, config: dict, gpu_type: GPU, status: RunProgressReporter ) -> FullResult: gpu_vendor = None - runner_name = None + selected_workflow, runner_name = self._workflow_for_gpu(gpu_type) if gpu_type.value in ["MI300", "MI250", "MI300x8", "MI355X"]: - selected_workflow = "amd_workflow.yml" - runner_name = { - "MI300": "amdgpu-mi300-x86-64", - "MI250": "amdgpu-mi250-x86-64", - "MI300x8": "amdgpu-mi300-8-x86-64", - "MI355X": "arc-runner-set", - }[gpu_type.value] gpu_vendor = "AMD" requirements = AMD_REQUIREMENTS elif gpu_type.value == "B200_Nebius": - selected_workflow = "helion_workflow.yml" - runner_name = None gpu_vendor = "NVIDIA" requirements = NVIDIA_REQUIREMENTS elif gpu_type.value == "NVIDIA": - selected_workflow = "nvidia_workflow.yml" - runner_name = None gpu_vendor = "NVIDIA" requirements = NVIDIA_REQUIREMENTS - else: - raise ValueError(f"Invalid GPU type: {gpu_type.value}") lang = config["lang"] if lang == "cu" and gpu_vendor == "AMD": @@ -199,6 +201,35 @@ async def run_submission( # noqa: C901 system = SystemInfo(**data.get("system", {})) return FullResult(success=True, error="", runs=runs, system=system) + async def get_queue_status( + self, gpu_type: GPU, config: dict | None = None + ) -> RunnerQueueStatus: + try: + selected_workflow, _ = self._workflow_for_gpu(gpu_type) + run = GitHubRun(self.repo, self._next_token(), self.branch, selected_workflow) + workflow = await run.get_workflow() + queued_runs = await asyncio.to_thread( + workflow.get_runs, + event="workflow_dispatch", + status="queued", + ) + queued_jobs = await asyncio.to_thread(lambda: queued_runs.totalCount) + except Exception as e: + logger.warning("Could not get GitHub queue stats for %s", gpu_type.name, exc_info=e) + return RunnerQueueStatus( + runner=self.name, + gpu=gpu_type.name, + queued_jobs=None, + status="unavailable", + error=str(e), + ) + + return RunnerQueueStatus( + runner=self.name, + gpu=gpu_type.name, + queued_jobs=queued_jobs, + ) + async def wait_callback(self, run: "GitHubRun", status: RunProgressReporter): await status.update( f"⏳ Workflow [{run.run_id}](<{run.html_url}>): {run.status} " @@ -246,7 +277,6 @@ def patched_create_dispatch( return status == 200 or status == 204 - class GitHubRun: def __init__(self, repo: str, token: str, branch: str, workflow_file: str): gh = Github(auth=Auth.Token(token)) diff --git a/src/libkernelbot/launchers/launcher.py b/src/libkernelbot/launchers/launcher.py index 67292bdc7..33ef7757e 100644 --- a/src/libkernelbot/launchers/launcher.py +++ b/src/libkernelbot/launchers/launcher.py @@ -1,10 +1,25 @@ +from dataclasses import asdict, dataclass from enum import Enum -from typing import Type +from typing import Any, Type from libkernelbot.consts import GPU from libkernelbot.report import RunProgressReporter +@dataclass +class RunnerQueueStatus: + runner: str + gpu: str + queued_jobs: int | None + running_jobs: int | None = None + available_runners: int | None = None + status: str = "available" + error: str | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + class Launcher: def __init__(self, name: str, gpus: Type[Enum]): self.name = name @@ -12,3 +27,14 @@ def __init__(self, name: str, gpus: Type[Enum]): async def run_submission(self, config: dict, gpu_type: GPU, status: RunProgressReporter): raise NotImplementedError() + + async def get_queue_status( + self, gpu_type: GPU, config: dict | None = None + ) -> RunnerQueueStatus: + return RunnerQueueStatus( + runner=self.name, + gpu=gpu_type.name, + queued_jobs=None, + status="unavailable", + error="queue status is not supported for this runner", + ) diff --git a/src/libkernelbot/launchers/modal.py b/src/libkernelbot/launchers/modal.py index 6c2308ec1..5fb1a15e0 100644 --- a/src/libkernelbot/launchers/modal.py +++ b/src/libkernelbot/launchers/modal.py @@ -7,7 +7,7 @@ from libkernelbot.run_eval import FullResult from libkernelbot.utils import setup_logging -from .launcher import Launcher +from .launcher import Launcher, RunnerQueueStatus logger = setup_logging(__name__) @@ -23,8 +23,7 @@ async def run_submission( loop = asyncio.get_event_loop() if config["lang"] == "cu": config["include_dirs"] = config.get("include_dirs", []) + self.additional_include_dirs - func_type = "pytorch" if config["lang"] == "py" else "cuda" - func_name = f"run_{func_type}_script_{gpu_type.value.lower()}" + func_name = self._function_name(config, gpu_type) logger.info(f"Starting Modal run using {func_name}") @@ -38,3 +37,37 @@ async def run_submission( await status.update("✅ Waiting for modal run to finish... Done") return result + + def _function_name(self, config: dict, gpu_type: GPU) -> str: + func_type = "pytorch" if config["lang"] == "py" else "cuda" + return f"run_{func_type}_script_{gpu_type.value.lower()}" + + async def get_queue_status( + self, gpu_type: GPU, config: dict | None = None + ) -> RunnerQueueStatus: + func_name = self._function_name(config or {"lang": "cu"}, gpu_type) + loop = asyncio.get_event_loop() + + try: + stats = await loop.run_in_executor( + None, + lambda: modal.Function.from_name( + "discord-bot-runner", func_name + ).get_current_stats(), + ) + except Exception as e: + logger.warning("Could not get Modal queue stats for %s", func_name, exc_info=e) + return RunnerQueueStatus( + runner=self.name, + gpu=gpu_type.name, + queued_jobs=None, + status="unavailable", + error=str(e), + ) + + return RunnerQueueStatus( + runner=self.name, + gpu=gpu_type.name, + queued_jobs=getattr(stats, "backlog", None), + available_runners=getattr(stats, "num_total_runners", None), + ) diff --git a/tests/test_admin_api.py b/tests/test_admin_api.py index 2ad3ab3dd..22fe43095 100644 --- a/tests/test_admin_api.py +++ b/tests/test_admin_api.py @@ -1,10 +1,12 @@ """Tests for admin API endpoints.""" -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi.testclient import TestClient +from libkernelbot.launchers import RunnerQueueStatus + @pytest.fixture def mock_backend(): @@ -80,6 +82,32 @@ def test_admin_stop(self, test_client, mock_backend): assert mock_backend.accepts_jobs is False +class TestRunnerQueue: + def test_get_runner_queue(self, test_client, mock_backend): + """GET /runner_queue/{gpu_type} returns runner backlog.""" + mock_backend.get_runner_queue_status = AsyncMock( + return_value=RunnerQueueStatus( + runner="Modal", + gpu="B200", + queued_jobs=6, + available_runners=1, + ) + ) + + response = test_client.get("/runner_queue/B200") + + assert response.status_code == 200 + assert response.json() == { + "runner": "Modal", + "gpu": "B200", + "queued_jobs": 6, + "running_jobs": None, + "available_runners": 1, + "status": "available", + "error": None, + } + + class TestAdminStats: """Test admin stats endpoint.""" diff --git a/tests/test_backend.py b/tests/test_backend.py index e443cef18..9116f4464 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -8,6 +8,7 @@ from libkernelbot import backend, consts, report from libkernelbot.kernelguard import KernelGuardRejected +from libkernelbot.launchers import RunnerQueueStatus from libkernelbot.run_eval import FullResult @@ -41,6 +42,38 @@ def _mock_launcher(bot: backend.KernelBackend, runs: dict, name="launcher"): return mock_launcher +@pytest.mark.asyncio +async def test_get_runner_queue_status_routes_to_registered_launcher(): + bot = object.__new__(backend.KernelBackend) + bot.launcher_map = {} + mock_launcher = _mock_launcher(bot, {}, name="Modal") + mock_launcher.get_queue_status = AsyncMock( + return_value=RunnerQueueStatus( + runner="Modal", + gpu="A100", + queued_jobs=4, + available_runners=2, + ) + ) + + status = await bot.get_runner_queue_status("A100", {"lang": "py"}) + + assert status.queued_jobs == 4 + assert status.available_runners == 2 + mock_launcher.get_queue_status.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_runner_queue_status_unknown_gpu(): + bot = object.__new__(backend.KernelBackend) + bot.launcher_map = {} + status = await bot.get_runner_queue_status("not-a-gpu") + + assert status.status == "unavailable" + assert status.queued_jobs is None + assert status.error == "unknown gpu" + + @pytest.mark.asyncio async def test_handle_submission(bot: backend.KernelBackend, task_directory): _submit_leaderboard(bot.db, task_directory) diff --git a/tests/test_github.py b/tests/test_github.py index 9d3327400..5b15d2d50 100644 --- a/tests/test_github.py +++ b/tests/test_github.py @@ -2,11 +2,12 @@ import subprocess from collections import namedtuple from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch import pytest from dotenv import load_dotenv -from libkernelbot.consts import GitHubGPU, SubmissionMode +from libkernelbot.consts import GitHubGPU, SubmissionMode, get_gpu_by_name from libkernelbot.launchers import GitHubLauncher from libkernelbot.report import RunProgressReporter from libkernelbot.task import build_task_config, make_task_definition @@ -31,6 +32,43 @@ async def update(self, message: str): self.updates.append(message) +@pytest.mark.asyncio +async def test_github_queue_status_counts_queued_workflow_runs(): + launcher = GitHubLauncher(repo="gpu-mode/kernelbot", token="token", branch="main") + workflow = MagicMock() + queued_runs = MagicMock() + queued_runs.totalCount = 7 + workflow.get_runs.return_value = queued_runs + + with patch("libkernelbot.launchers.github.GitHubRun") as run_cls: + run_cls.return_value.get_workflow = AsyncMock(return_value=workflow) + + status = await launcher.get_queue_status(get_gpu_by_name("NVIDIA")) + + assert status.runner == "GitHub" + assert status.gpu == "NVIDIA" + assert status.queued_jobs == 7 + workflow.get_runs.assert_called_once_with( + event="workflow_dispatch", + status="queued", + ) + + +@pytest.mark.asyncio +async def test_github_queue_status_reports_unavailable_on_error(): + launcher = GitHubLauncher(repo="gpu-mode/kernelbot", token="token", branch="main") + + with patch("libkernelbot.launchers.github.GitHubRun") as run_cls: + run_cls.return_value.get_workflow = AsyncMock(side_effect=RuntimeError("rate limited")) + + status = await launcher.get_queue_status(get_gpu_by_name("NVIDIA")) + + assert status.runner == "GitHub" + assert status.queued_jobs is None + assert status.status == "unavailable" + assert status.error == "rate limited" + + def get_github_repo(): """Get GitHub repository from git remote.""" try: diff --git a/tests/test_modal.py b/tests/test_modal.py index 5478e3e9d..f2f7721f1 100644 --- a/tests/test_modal.py +++ b/tests/test_modal.py @@ -2,11 +2,13 @@ import pprint import subprocess from pathlib import Path +from types import SimpleNamespace from typing import Tuple +from unittest.mock import MagicMock, patch import pytest -from libkernelbot.consts import GPU_TO_SM, ModalGPU, SubmissionMode +from libkernelbot.consts import GPU_TO_SM, ModalGPU, SubmissionMode, get_gpu_by_name from libkernelbot.launchers import ModalLauncher from libkernelbot.report import RunProgressReporter from libkernelbot.task import build_task_config, make_task_definition @@ -27,6 +29,40 @@ async def update(self, message: str): self.updates.append(message) +@pytest.mark.asyncio +async def test_modal_queue_status_uses_function_stats(): + launcher = ModalLauncher(add_include_dirs=[]) + function = MagicMock() + function.get_current_stats.return_value = SimpleNamespace( + backlog=5, + num_total_runners=2, + ) + + with patch("libkernelbot.launchers.modal.modal.Function.from_name", return_value=function): + status = await launcher.get_queue_status(get_gpu_by_name("B200"), {"lang": "cu"}) + + assert status.runner == "Modal" + assert status.gpu == "B200" + assert status.queued_jobs == 5 + assert status.available_runners == 2 + + +@pytest.mark.asyncio +async def test_modal_queue_status_reports_unavailable_on_error(): + launcher = ModalLauncher(add_include_dirs=[]) + + with patch( + "libkernelbot.launchers.modal.modal.Function.from_name", + side_effect=RuntimeError("modal unavailable"), + ): + status = await launcher.get_queue_status(get_gpu_by_name("B200"), {"lang": "cu"}) + + assert status.runner == "Modal" + assert status.queued_jobs is None + assert status.status == "unavailable" + assert status.error == "modal unavailable" + + @pytest.fixture(scope="session") def modal_deployment(project_root: Path): """