Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 69 additions & 20 deletions Framework/Core/scripts/hyperloop-perf-server/hl_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,80 @@

from __future__ import annotations

import json
import os
import socket
import time

import httpx

# security-proxy (see ~/src/ali-bot/security-proxy): a localhost credential proxy
# that binds a RANDOM port and mints a per-service, daily-rotating gate token,
# both handed out over a per-user UNIX socket. Replaces the old fixed
# localhost:8888 + static-bearer ccdb-proxy. Every alimonitor.cern.ch artefact is
# routed through the "/alimonitor/" route (upstream = alimonitor.cern.ch root), so a
# single "alimonitor" gate token covers train-workdir / hyperloop / alihyperloop-data.
_AGENT_SOCK = os.path.expanduser(
os.environ.get("SECURITY_PROXY_AGENT_SOCK", "~/.security-proxy/agent.sock")
)
_PROXY_SERVICE = os.environ.get("SECURITY_PROXY_SERVICE", "alimonitor")
_creds_cache: dict[str, tuple[int, str, float]] = {}


def _proxy_creds(service: str) -> tuple[int, str]:
"""Return (port, gate_token) for ``service`` from the security-proxy agent socket.

Cached ~5 min; the proxy accepts the current and previous token, so a slightly
stale cached token still works across the daily rotation. Raises with a clear
hint if the proxy isn't running.
"""
now = time.time()
hit = _creds_cache.get(service)
if hit and now - hit[2] < 300:
return hit[0], hit[1]
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(5.0)
s.connect(_AGENT_SOCK)
s.sendall((service + "\n").encode())
buf = b""
while not buf.endswith(b"\n"):
chunk = s.recv(4096)
if not chunk:
break
buf += chunk
s.close()
data = json.loads(buf.decode())
except (OSError, ValueError) as exc:
raise RuntimeError(
f"security-proxy agent not reachable at {_AGENT_SOCK} ({exc}); "
"is the proxy running? (see ~/src/ali-bot/security-proxy)"
) from exc
if "error" in data:
raise RuntimeError(
f"security-proxy: {data['error']}; known services: {data.get('services', [])}"
)
port, token = int(data["port"]), data.get("token", "")
_creds_cache[service] = (port, token, now)
return port, token


async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes:
"""Fetch a workdir artefact, routing alimonitor URLs through the local proxy.
"""Fetch a workdir artefact, routing alimonitor URLs through the security-proxy.

Mirrors the grid-cert proxy convention used across the Hyperloop tooling:
``alimonitor.cern.ch/<path>`` is rewritten to
``http://localhost:8888/alimonitor/<path>`` with a bearer token, and
``Accept-Encoding: identity`` is required (otherwise the proxy returns a gzip
Content-Length mismatch). Retries transient protocol/read errors up to 3×.
``http://127.0.0.1:<port>/alimonitor/<path>``: the random port and a per-service,
daily-rotating gate token come from the security-proxy agent socket
(``~/.security-proxy/agent.sock``; override with ``SECURITY_PROXY_AGENT_SOCK``),
and the token is sent as ``Authorization: Bearer``. ``Accept-Encoding: identity``
is required (otherwise the proxy returns a gzip Content-Length mismatch). Retries
transient protocol/read errors up to 3×.

Args:
url: Direct artefact URL (perf script, igprof dump, side-car, ...).
proxy_token: Bearer token for the local proxy. Falls back to PROXY_TOKEN,
then HYPERLOOP_TOKEN, then ``token``.
token: Hyperloop auth token fallback.
url: Direct artefact URL, a local path, or a ``file://`` URL.
proxy_token: Accepted for backward compatibility but ignored — the gate token
is minted from the agent socket.
token: Ditto (ignored).
"""
# Local file (a path or a file:// URL) — read directly, no HTTP. Lets a
# locally-generated side-car (igprof-demangle-symbols output) be attached
Expand All @@ -40,20 +95,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes
with open(path, "rb") as f:
return f.read()

proxy_token = (
proxy_token
or os.environ.get("PROXY_TOKEN", "")
or token
or os.environ.get("HYPERLOOP_TOKEN", "")
)

fetch_url = url
headers = {"Accept-Encoding": "identity"}
if "alimonitor.cern.ch" in url:
path = url.split("alimonitor.cern.ch", 1)[1].lstrip("/")
fetch_url = f"http://localhost:8888/alimonitor/{path}"

headers = {"Authorization": f"Bearer {proxy_token}"} if proxy_token else {}
headers["Accept-Encoding"] = "identity"
port, gate = _proxy_creds(_PROXY_SERVICE)
fetch_url = f"http://127.0.0.1:{port}/{_PROXY_SERVICE}/{path}"
if gate:
headers["Authorization"] = f"Bearer {gate}"

async with httpx.AsyncClient(verify=False) as client:
for attempt in range(3):
Expand Down
173 changes: 173 additions & 0 deletions Framework/Core/scripts/hyperloop-perf-server/log_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
# All rights not expressly granted are reserved.
#
# This software is distributed under the terms of the GNU General Public
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
#
# In applying this license CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Log tools for the Hyperloop perf MCP server.

A train/device log (e.g. ``stdout.log``) is fetched once through the alimonitor
proxy and cached on disk; subsequent ``grep_log`` calls run regex queries over
the cached copy and return at most ``max_results`` matches (with optional
context), so a multi-MB log never has to come back over the wire — or into the
model's context — in full.
"""

from __future__ import annotations

import gzip
import hashlib
import os
import re
from dataclasses import dataclass

from hl_common import fetch_bytes

_CACHE_DIR = os.path.expanduser(os.environ.get("LOG_MCP_CACHE", "~/.cache/log-mcp"))
_MAX_LINE = 2000 # truncate individual lines in the output to keep results bounded


@dataclass
class LogReport:
url: str
name: str
path: str
n_lines: int
n_bytes: int


_logs: dict[str, LogReport] = {}


def _get(name: str) -> LogReport:
r = _logs.get(name)
if r is None:
avail = ", ".join(_logs) if _logs else "(none)"
raise ValueError(f"No log '{name}'. Loaded: {avail}. Use load_log first.")
return r


def _clip(line: str) -> str:
return line if len(line) <= _MAX_LINE else line[:_MAX_LINE] + " …[truncated]"


async def load_log(url: str, name: str = "", proxy_token: str = "") -> str:
"""Fetch a log file and cache it for regex querying with grep_log.

The file is downloaded (via the alimonitor proxy for ``alimonitor.cern.ch``
URLs), decompressed if gzip'd, and cached on disk; grep_log then reads that
cached copy and never re-fetches.

Args:
url: Direct URL to a log file (e.g. .../stdout.log or a .gz log).
name: Label (defaults to the filename portion of the URL).
proxy_token: Bearer token for the local proxy (else PROXY_TOKEN env).
"""
raw = await fetch_bytes(url, proxy_token=proxy_token)
data = gzip.decompress(raw) if (url.endswith(".gz") or raw[:2] == b"\x1f\x8b") else raw
text = data.decode("utf-8", errors="replace")
os.makedirs(_CACHE_DIR, exist_ok=True)
h = hashlib.sha1(url.encode()).hexdigest()[:12]
path = os.path.join(_CACHE_DIR, f"{h}.log")
with open(path, "w", errors="replace") as f:
f.write(text)
n_lines = text.count("\n") + (0 if text.endswith("\n") or not text else 1)
pname = name or url.rstrip("/").split("/")[-1]
_logs[pname] = LogReport(url, pname, path, n_lines, len(data))
return f"Loaded log '{pname}': {n_lines:,} lines, {len(data):,} bytes."


def grep_log(
name: str,
pattern: str,
max_results: int = 50,
ignore_case: bool = False,
invert: bool = False,
context: int = 0,
) -> str:
"""Regex-search a cached log and return at most max_results matching lines.

Args:
name: Log name as returned by load_log.
pattern: Python regex (re.search semantics, matches anywhere in a line).
max_results: Maximum number of matching lines to return (default 50).
ignore_case: Case-insensitive match.
invert: Return non-matching lines instead.
context: Lines of context to show before and after each match (like grep -C).
"""
r = _get(name)
try:
rx = re.compile(pattern, re.IGNORECASE if ignore_case else 0)
except re.error as e:
return f"bad regex: {e}"
if max_results < 1:
return "max_results must be >= 1"

with open(r.path, errors="replace") as f:
lines = f.read().splitlines()

total = 0
hits: list[int] = [] # line indices of the first max_results matches
for i, line in enumerate(lines):
matched = bool(rx.search(line))
if invert:
matched = not matched
if matched:
total += 1
if len(hits) < max_results:
hits.append(i)

if total == 0:
return f"[{name}] no matches for /{pattern}/ in {r.n_lines:,} lines"

ctx = max(0, context)
out: list[str] = []
prev_end = -1 # last printed line index, to insert separators / avoid dup
for idx in hits:
lo, hi = max(0, idx - ctx), min(len(lines) - 1, idx + ctx)
if lo <= prev_end: # overlap with previous block: continue from there
lo = prev_end + 1
elif prev_end >= 0:
out.append("--")
for j in range(lo, hi + 1):
mark = ":" if j == idx else "-" # ':' = the match line, '-' = context
out.append(f"{j + 1}{mark} {_clip(lines[j])}")
prev_end = hi

shown = min(total, max_results)
header = f"[{name}] {total} match(es) for /{pattern}/" + (
f"; showing first {shown}" if total > shown else ""
)
return header + "\n" + "\n".join(out)


def list_logs() -> str:
"""List loaded logs."""
if not _logs:
return "No logs loaded. Use load_log first."
return "\n".join(
f"{n}: {r.n_lines:,} lines, {r.n_bytes:,} bytes, url={r.url}" for n, r in _logs.items()
)


def drop_log(name: str) -> str:
"""Free a log and delete its cached copy.

Args:
name: Log name as returned by load_log.
"""
r = _get(name)
if os.path.exists(r.path):
os.remove(r.path)
del _logs[name]
return f"Dropped log '{name}'."


def register(mcp) -> None:
"""Register the log tools on a shared FastMCP instance."""
for fn in (load_log, grep_log, list_logs, drop_log):
mcp.tool()(fn)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from mcp.server.fastmcp import FastMCP

import igprof_tools
import log_tools
from hl_common import fetch_bytes

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -398,6 +399,7 @@ def compare(name_a: str, name_b: str, n: int = 40, mode: str = "leaf") -> str:
# ---------------------------------------------------------------------------

igprof_tools.register(mcp)
log_tools.register(mcp)


# ---------------------------------------------------------------------------
Expand Down
Loading