diff --git a/Framework/Core/scripts/hyperloop-perf-server/hl_common.py b/Framework/Core/scripts/hyperloop-perf-server/hl_common.py index 4e64d21363806..8d642983a71d9 100644 --- a/Framework/Core/scripts/hyperloop-perf-server/hl_common.py +++ b/Framework/Core/scripts/hyperloop-perf-server/hl_common.py @@ -12,25 +12,80 @@ from __future__ import annotations +import json import os +import socket +import time import httpx +# security-proxy (see ~/src/ali-bot/security-proxy): a localhost credential proxy +# that binds a RANDOM port and mints a per-service, daily-rotating gate token, +# both handed out over a per-user UNIX socket. Replaces the old fixed +# localhost:8888 + static-bearer ccdb-proxy. Every alimonitor.cern.ch artefact is +# routed through the "/alimonitor/" route (upstream = alimonitor.cern.ch root), so a +# single "alimonitor" gate token covers train-workdir / hyperloop / alihyperloop-data. +_AGENT_SOCK = os.path.expanduser( + os.environ.get("SECURITY_PROXY_AGENT_SOCK", "~/.security-proxy/agent.sock") +) +_PROXY_SERVICE = os.environ.get("SECURITY_PROXY_SERVICE", "alimonitor") +_creds_cache: dict[str, tuple[int, str, float]] = {} + + +def _proxy_creds(service: str) -> tuple[int, str]: + """Return (port, gate_token) for ``service`` from the security-proxy agent socket. + + Cached ~5 min; the proxy accepts the current and previous token, so a slightly + stale cached token still works across the daily rotation. Raises with a clear + hint if the proxy isn't running. + """ + now = time.time() + hit = _creds_cache.get(service) + if hit and now - hit[2] < 300: + return hit[0], hit[1] + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect(_AGENT_SOCK) + s.sendall((service + "\n").encode()) + buf = b"" + while not buf.endswith(b"\n"): + chunk = s.recv(4096) + if not chunk: + break + buf += chunk + s.close() + data = json.loads(buf.decode()) + except (OSError, ValueError) as exc: + raise RuntimeError( + f"security-proxy agent not reachable at {_AGENT_SOCK} ({exc}); " + "is the proxy running? (see ~/src/ali-bot/security-proxy)" + ) from exc + if "error" in data: + raise RuntimeError( + f"security-proxy: {data['error']}; known services: {data.get('services', [])}" + ) + port, token = int(data["port"]), data.get("token", "") + _creds_cache[service] = (port, token, now) + return port, token + async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes: - """Fetch a workdir artefact, routing alimonitor URLs through the local proxy. + """Fetch a workdir artefact, routing alimonitor URLs through the security-proxy. - Mirrors the grid-cert proxy convention used across the Hyperloop tooling: ``alimonitor.cern.ch/`` is rewritten to - ``http://localhost:8888/alimonitor/`` with a bearer token, and - ``Accept-Encoding: identity`` is required (otherwise the proxy returns a gzip - Content-Length mismatch). Retries transient protocol/read errors up to 3×. + ``http://127.0.0.1:/alimonitor/``: the random port and a per-service, + daily-rotating gate token come from the security-proxy agent socket + (``~/.security-proxy/agent.sock``; override with ``SECURITY_PROXY_AGENT_SOCK``), + and the token is sent as ``Authorization: Bearer``. ``Accept-Encoding: identity`` + is required (otherwise the proxy returns a gzip Content-Length mismatch). Retries + transient protocol/read errors up to 3×. Args: - url: Direct artefact URL (perf script, igprof dump, side-car, ...). - proxy_token: Bearer token for the local proxy. Falls back to PROXY_TOKEN, - then HYPERLOOP_TOKEN, then ``token``. - token: Hyperloop auth token fallback. + url: Direct artefact URL, a local path, or a ``file://`` URL. + proxy_token: Accepted for backward compatibility but ignored — the gate token + is minted from the agent socket. + token: Ditto (ignored). """ # Local file (a path or a file:// URL) — read directly, no HTTP. Lets a # locally-generated side-car (igprof-demangle-symbols output) be attached @@ -40,20 +95,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes with open(path, "rb") as f: return f.read() - proxy_token = ( - proxy_token - or os.environ.get("PROXY_TOKEN", "") - or token - or os.environ.get("HYPERLOOP_TOKEN", "") - ) - fetch_url = url + headers = {"Accept-Encoding": "identity"} if "alimonitor.cern.ch" in url: path = url.split("alimonitor.cern.ch", 1)[1].lstrip("/") - fetch_url = f"http://localhost:8888/alimonitor/{path}" - - headers = {"Authorization": f"Bearer {proxy_token}"} if proxy_token else {} - headers["Accept-Encoding"] = "identity" + port, gate = _proxy_creds(_PROXY_SERVICE) + fetch_url = f"http://127.0.0.1:{port}/{_PROXY_SERVICE}/{path}" + if gate: + headers["Authorization"] = f"Bearer {gate}" async with httpx.AsyncClient(verify=False) as client: for attempt in range(3): diff --git a/Framework/Core/scripts/hyperloop-perf-server/log_tools.py b/Framework/Core/scripts/hyperloop-perf-server/log_tools.py new file mode 100644 index 0000000000000..4687a93cfebc6 --- /dev/null +++ b/Framework/Core/scripts/hyperloop-perf-server/log_tools.py @@ -0,0 +1,173 @@ +# Copyright 2019-2026 CERN and copyright holders of ALICE O2. +# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +# All rights not expressly granted are reserved. +# +# This software is distributed under the terms of the GNU General Public +# License v3 (GPL Version 3), copied verbatim in the file "COPYING". +# +# In applying this license CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. +"""Log tools for the Hyperloop perf MCP server. + +A train/device log (e.g. ``stdout.log``) is fetched once through the alimonitor +proxy and cached on disk; subsequent ``grep_log`` calls run regex queries over +the cached copy and return at most ``max_results`` matches (with optional +context), so a multi-MB log never has to come back over the wire — or into the +model's context — in full. +""" + +from __future__ import annotations + +import gzip +import hashlib +import os +import re +from dataclasses import dataclass + +from hl_common import fetch_bytes + +_CACHE_DIR = os.path.expanduser(os.environ.get("LOG_MCP_CACHE", "~/.cache/log-mcp")) +_MAX_LINE = 2000 # truncate individual lines in the output to keep results bounded + + +@dataclass +class LogReport: + url: str + name: str + path: str + n_lines: int + n_bytes: int + + +_logs: dict[str, LogReport] = {} + + +def _get(name: str) -> LogReport: + r = _logs.get(name) + if r is None: + avail = ", ".join(_logs) if _logs else "(none)" + raise ValueError(f"No log '{name}'. Loaded: {avail}. Use load_log first.") + return r + + +def _clip(line: str) -> str: + return line if len(line) <= _MAX_LINE else line[:_MAX_LINE] + " …[truncated]" + + +async def load_log(url: str, name: str = "", proxy_token: str = "") -> str: + """Fetch a log file and cache it for regex querying with grep_log. + + The file is downloaded (via the alimonitor proxy for ``alimonitor.cern.ch`` + URLs), decompressed if gzip'd, and cached on disk; grep_log then reads that + cached copy and never re-fetches. + + Args: + url: Direct URL to a log file (e.g. .../stdout.log or a .gz log). + name: Label (defaults to the filename portion of the URL). + proxy_token: Bearer token for the local proxy (else PROXY_TOKEN env). + """ + raw = await fetch_bytes(url, proxy_token=proxy_token) + data = gzip.decompress(raw) if (url.endswith(".gz") or raw[:2] == b"\x1f\x8b") else raw + text = data.decode("utf-8", errors="replace") + os.makedirs(_CACHE_DIR, exist_ok=True) + h = hashlib.sha1(url.encode()).hexdigest()[:12] + path = os.path.join(_CACHE_DIR, f"{h}.log") + with open(path, "w", errors="replace") as f: + f.write(text) + n_lines = text.count("\n") + (0 if text.endswith("\n") or not text else 1) + pname = name or url.rstrip("/").split("/")[-1] + _logs[pname] = LogReport(url, pname, path, n_lines, len(data)) + return f"Loaded log '{pname}': {n_lines:,} lines, {len(data):,} bytes." + + +def grep_log( + name: str, + pattern: str, + max_results: int = 50, + ignore_case: bool = False, + invert: bool = False, + context: int = 0, +) -> str: + """Regex-search a cached log and return at most max_results matching lines. + + Args: + name: Log name as returned by load_log. + pattern: Python regex (re.search semantics, matches anywhere in a line). + max_results: Maximum number of matching lines to return (default 50). + ignore_case: Case-insensitive match. + invert: Return non-matching lines instead. + context: Lines of context to show before and after each match (like grep -C). + """ + r = _get(name) + try: + rx = re.compile(pattern, re.IGNORECASE if ignore_case else 0) + except re.error as e: + return f"bad regex: {e}" + if max_results < 1: + return "max_results must be >= 1" + + with open(r.path, errors="replace") as f: + lines = f.read().splitlines() + + total = 0 + hits: list[int] = [] # line indices of the first max_results matches + for i, line in enumerate(lines): + matched = bool(rx.search(line)) + if invert: + matched = not matched + if matched: + total += 1 + if len(hits) < max_results: + hits.append(i) + + if total == 0: + return f"[{name}] no matches for /{pattern}/ in {r.n_lines:,} lines" + + ctx = max(0, context) + out: list[str] = [] + prev_end = -1 # last printed line index, to insert separators / avoid dup + for idx in hits: + lo, hi = max(0, idx - ctx), min(len(lines) - 1, idx + ctx) + if lo <= prev_end: # overlap with previous block: continue from there + lo = prev_end + 1 + elif prev_end >= 0: + out.append("--") + for j in range(lo, hi + 1): + mark = ":" if j == idx else "-" # ':' = the match line, '-' = context + out.append(f"{j + 1}{mark} {_clip(lines[j])}") + prev_end = hi + + shown = min(total, max_results) + header = f"[{name}] {total} match(es) for /{pattern}/" + ( + f"; showing first {shown}" if total > shown else "" + ) + return header + "\n" + "\n".join(out) + + +def list_logs() -> str: + """List loaded logs.""" + if not _logs: + return "No logs loaded. Use load_log first." + return "\n".join( + f"{n}: {r.n_lines:,} lines, {r.n_bytes:,} bytes, url={r.url}" for n, r in _logs.items() + ) + + +def drop_log(name: str) -> str: + """Free a log and delete its cached copy. + + Args: + name: Log name as returned by load_log. + """ + r = _get(name) + if os.path.exists(r.path): + os.remove(r.path) + del _logs[name] + return f"Dropped log '{name}'." + + +def register(mcp) -> None: + """Register the log tools on a shared FastMCP instance.""" + for fn in (load_log, grep_log, list_logs, drop_log): + mcp.tool()(fn) diff --git a/Framework/Core/scripts/hyperloop-perf-server/perf_mcp_server.py b/Framework/Core/scripts/hyperloop-perf-server/perf_mcp_server.py index 042628bc4911f..684e977be85b9 100644 --- a/Framework/Core/scripts/hyperloop-perf-server/perf_mcp_server.py +++ b/Framework/Core/scripts/hyperloop-perf-server/perf_mcp_server.py @@ -35,6 +35,7 @@ from mcp.server.fastmcp import FastMCP import igprof_tools +import log_tools from hl_common import fetch_bytes # --------------------------------------------------------------------------- @@ -398,6 +399,7 @@ def compare(name_a: str, name_b: str, n: int = 40, mode: str = "leaf") -> str: # --------------------------------------------------------------------------- igprof_tools.register(mcp) +log_tools.register(mcp) # --------------------------------------------------------------------------- diff --git a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py index 071a594ec6025..6012f23cab2c4 100644 --- a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py +++ b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py @@ -20,11 +20,11 @@ Usage ----- - python3 hyperloop_server.py [--proxy URL] [--token TOKEN] + python3 hyperloop_server.py [--allow-write] -Environment variables - HYPERLOOP_PROXY proxy base URL (default: http://localhost:8888) - HYPERLOOP_TOKEN bearer token (default: foo-baz) +Credentials come from the security-proxy (see ~/src/ali-bot/security-proxy): the +random port and the per-service "alimonitor" gate token are read from its agent +socket (~/.security-proxy/agent.sock; override with SECURITY_PROXY_AGENT_SOCK). """ from __future__ import annotations @@ -35,6 +35,7 @@ import json import os import re +import socket import sys import time @@ -43,9 +44,63 @@ mcp = FastMCP("hyperloop") -PROXY = os.environ.get("HYPERLOOP_PROXY", "http://localhost:8888") -TOKEN = os.environ.get("HYPERLOOP_TOKEN", "foo-baz") -API = f"{PROXY}/alihyperloop-data" +# security-proxy (see ~/src/ali-bot/security-proxy): random localhost port + a +# per-service, daily-rotating gate token, both read from a per-user UNIX socket. +# Everything is routed through the single "/alimonitor/" route (upstream = +# alimonitor.cern.ch root), so one "alimonitor" token covers both the +# alihyperloop-data API and the train-workdir artefacts. +_AGENT_SOCK = os.path.expanduser( + os.environ.get("SECURITY_PROXY_AGENT_SOCK", "~/.security-proxy/agent.sock") +) +_PROXY_SERVICE = os.environ.get("SECURITY_PROXY_SERVICE", "alimonitor") +_creds_cache: dict[str, tuple[int, str, float]] = {} + + +def _proxy_creds() -> tuple[int, str]: + """(port, gate_token) for the alimonitor service from the security-proxy agent + socket; cached ~5 min (the proxy accepts current+previous token, so a stale + cached token survives the daily rotation).""" + svc = _PROXY_SERVICE + now = time.time() + hit = _creds_cache.get(svc) + if hit and now - hit[2] < 300: + return hit[0], hit[1] + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect(_AGENT_SOCK) + s.sendall((svc + "\n").encode()) + buf = b"" + while not buf.endswith(b"\n"): + chunk = s.recv(4096) + if not chunk: + break + buf += chunk + s.close() + data = json.loads(buf.decode()) + except (OSError, ValueError) as exc: + raise RuntimeError( + f"security-proxy agent not reachable at {_AGENT_SOCK} ({exc}); " + "is the proxy running? (see ~/src/ali-bot/security-proxy)" + ) from exc + if "error" in data: + raise RuntimeError( + f"security-proxy: {data['error']}; known services: {data.get('services', [])}" + ) + port, token = int(data["port"]), data.get("token", "") + _creds_cache[svc] = (port, token, now) + return port, token + + +def _alimon() -> str: + """Base URL of the /alimonitor/ proxy route (= alimonitor.cern.ch root).""" + port, _ = _proxy_creds() + return f"http://127.0.0.1:{port}/{_PROXY_SERVICE}" + + +def _api() -> str: + """Base URL of the alihyperloop-data API (via the /alimonitor/ route).""" + return f"{_alimon()}/alihyperloop-data" # --- Write guardrails --------------------------------------------------------- # Wagon-creating tools are HARD-LOCKED to this one analysis. The destination is a @@ -59,14 +114,18 @@ def _headers() -> dict[str, str]: - return {"Authorization": f"Bearer {TOKEN}"} + _, tok = _proxy_creds() + h = {"Accept-Encoding": "identity"} + if tok: + h["Authorization"] = f"Bearer {tok}" + return h async def _get(path: str, params: dict | None = None) -> any: hdrs = _headers() hdrs["Accept-Encoding"] = "identity" async with httpx.AsyncClient(timeout=60) as client: - r = await client.get(f"{API}/{path}", params=params, headers=hdrs) + r = await client.get(f"{_api()}/{path}", params=params, headers=hdrs) r.raise_for_status() return r.json() @@ -77,23 +136,18 @@ async def _get_text(path: str, params: dict | None = None) -> str: hdrs = _headers() hdrs["Accept-Encoding"] = "identity" async with httpx.AsyncClient(timeout=60) as client: - r = await client.get(f"{API}/{path}", params=params, headers=hdrs) + r = await client.get(f"{_api()}/{path}", params=params, headers=hdrs) r.raise_for_status() return r.text -ALIMON = f"{PROXY}/alimonitor" -ALIMON_TOKEN = os.environ.get("HYPERLOOP_ALIMON_TOKEN", "jalien-secret") - - async def _get_workdir_json(train_id: int, fname: str): """Fetch a file from a test's train-workdir (alimonitor route).""" b = f"{train_id // 10000:04d}" n = f"{train_id:08d}" - url = f"{ALIMON}/train-workdir/tests/{b}/{n}/{fname}" - hdrs = {"Authorization": f"Bearer {ALIMON_TOKEN}", "Accept-Encoding": "identity"} + url = f"{_alimon()}/train-workdir/tests/{b}/{n}/{fname}" async with httpx.AsyncClient(timeout=180) as client: - r = await client.get(url, headers=hdrs) + r = await client.get(url, headers=_headers()) r.raise_for_status() return r.json() @@ -1097,7 +1151,7 @@ async def _post_form(path: str, data: dict) -> str: hdrs = _headers() hdrs["Accept-Encoding"] = "identity" async with httpx.AsyncClient(timeout=60) as client: - r = await client.post(f"{API}/{path}", data=data, headers=hdrs) + r = await client.post(f"{_api()}/{path}", data=data, headers=hdrs) r.raise_for_status() return r.text @@ -1345,20 +1399,15 @@ async def subscribe_dataset(dataset: str) -> str: def main(): import argparse - global PROXY, TOKEN, API, ALLOW_WRITE + global ALLOW_WRITE parser = argparse.ArgumentParser(description="AliHyperloop MCP server") - parser.add_argument("--proxy", default=PROXY, help="Proxy base URL") - parser.add_argument("--token", default=TOKEN, help="Bearer token") parser.add_argument("--allow-write", action="store_true", help=("Enable the wagon-write tools (clone/configure), " f"hard-locked to analysis {ALLOWED_ANALYSIS}. " "Off by default; HYPERLOOP_ALLOW_WRITE=1 also enables it.")) args = parser.parse_args() - PROXY = args.proxy - TOKEN = args.token - API = f"{PROXY}/alihyperloop-data" if args.allow_write: ALLOW_WRITE = True