Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 88 additions & 105 deletions agentkit/auth/model_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Bring a user's *own* third-party model subscription into the sandbox.

Many foreign models (ChatGPT/Codex, Claude Code) are used via a JWT obtained by an
interactive SSO login, not a static API key. A user who subscribed to one of those
plans wants the **sandbox's** codex to run on *their* subscription.

The agentkit UserPool does not (and need not) federate with OpenAI/Anthropic. Codex
already supports SSO **locally**; in the sandbox it is "remote", so the agreed design
is dead simple and is what this module implements:

1. The user runs the provider's native SSO **on their PC** (``codex login`` for
Codex/ChatGPT, ``claude`` for Claude Code). That writes the provider's native
credential file:
Codex -> $CODEX_HOME/auth.json (default ~/.codex/auth.json)
Claude -> ~/.claude/.credentials.json (or the macOS Keychain)
2. We read that file verbatim and inject it into the **same native path inside
the user's private sandbox** — so the sandbox's codex finds its token exactly
where it natively looks. No proxy, no federation: the sandbox refreshes the
token itself against the provider, just like a local install would.

This module is pure (stdlib only) and side-effect-light so it is unit-testable; the
network/forwarding lives in the CLI layer (``cli_accesscontrol.py``).
"""Read a local model-subscription login and inject it into a sandbox.

Codex/ChatGPT and Claude Code log in over OAuth and store the token in a local file
($CODEX_HOME/auth.json for codex, ~/.claude/.credentials.json or the macOS Keychain for
Claude). This reads that file and writes the token to the same path inside the sandbox,
so the sandbox's codex runs on the user's subscription. codex refreshes the token itself.

Only the OAuth token is injected. The same file can also hold a long-lived API key
(codex: OPENAI_API_KEY); that is stripped before injection and an api-key-only file is
rejected, so a long-lived key never reaches the sandbox. See sanitize_*_for_injection.

The sandbox config.toml is not touched; select the subscription at exec with
`codex exec -c model_provider=openai`. Stdlib only; the sandbox transport lives in
sandbox/cli_model_login.py.
"""

from __future__ import annotations
Expand All @@ -42,18 +34,19 @@
import datetime
import json
import os
import re
import sys
from pathlib import Path
from typing import Callable, Optional

# ── Codex / ChatGPT facts ────────────────────────────────────────────────────
# Codex / ChatGPT facts
CODEX_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" # codex CLI's public OAuth client (the id_token aud)
CODEX_OAUTH_NAMESPACE = "https://api.openai.com/auth" # claim namespace holding the ChatGPT plan
DEFAULT_SANDBOX_CODEX_HOME = "/home/gem/.codex" # informational; the shell resolves ${CODEX_HOME:-$HOME/.codex}
CODEX_BUILTIN_PROVIDER = "openai" # codex's reserved built-in provider that uses ChatGPT auth
CODEX_INJECT_MARKER = "AGENTKIT_CODEX_INJECTED"
CLAUDE_INJECT_MARKER = "AGENTKIT_CLAUDE_INJECTED"
CLAUDE_KEYCHAIN_SERVICE = "Claude Code-credentials"
# codex auth.json OAuth token fields we carry into the sandbox (everything else is dropped).
CODEX_OAUTH_TOKEN_FIELDS = ("id_token", "access_token", "refresh_token", "account_id")

PROVIDERS = ("codex", "claude")

Expand All @@ -62,9 +55,9 @@ class ModelLoginError(RuntimeError):
"""A user-facing failure resolving or injecting a model subscription token."""


# ── small helpers ────────────────────────────────────────────────────────────
# small helpers
def b64(data: str | bytes) -> str:
"""Standard base64 (single line, no newlines) safe to embed in a shell ``printf``."""
"""Standard base64 (single line, no newlines) - safe to embed in a shell ``printf``."""
if isinstance(data, str):
data = data.encode("utf-8")
return base64.b64encode(data).decode("ascii")
Expand All @@ -86,7 +79,7 @@ def decode_jwt_claims(jwt: str) -> dict:
raise ModelLoginError(f"cannot decode JWT claims: {exc}") from exc


# ── Codex (ChatGPT) ──────────────────────────────────────────────────────────
# Codex (ChatGPT)
def codex_home_path(explicit: Optional[str] = None) -> Path:
"""Resolve the LOCAL codex home: --codex-home > $CODEX_HOME > ~/.codex."""
if explicit:
Expand All @@ -110,15 +103,40 @@ def read_codex_auth(path: str | Path) -> dict:
return data


def validate_codex_auth(data: dict) -> None:
"""A usable codex auth has ChatGPT tokens (SSO) or an API key."""
def codex_has_oauth(data: dict) -> bool:
tokens = data.get("tokens")
has_tokens = isinstance(tokens, dict) and bool(tokens.get("id_token") or tokens.get("access_token"))
has_key = bool(data.get("OPENAI_API_KEY"))
if not (has_tokens or has_key):
return isinstance(tokens, dict) and bool(tokens.get("id_token") or tokens.get("access_token"))


def validate_codex_auth(data: dict) -> None:
"""A usable codex auth has ChatGPT OAuth tokens or an API key. Only the OAuth path is
injected; api-key-only auth is rejected in sanitize_codex_auth_for_injection."""
if not (codex_has_oauth(data) or data.get("OPENAI_API_KEY")):
raise ModelLoginError(
"codex auth.json has neither ChatGPT tokens nor an API key; run `codex login` first"
)


def sanitize_codex_auth_for_injection(data: dict) -> dict:
"""Return only the OAuth token material to inject, dropping any API key.

codex keeps a long-lived OPENAI_API_KEY in the same auth.json as the OAuth tokens.
This returns just the tokens with OPENAI_API_KEY set to null; an api-key-only auth
(no OAuth login) is rejected.
"""
if not codex_has_oauth(data):
raise ModelLoginError(
"codex auth.json has neither ChatGPT tokens nor an API key — run `codex login` first"
"no ChatGPT OAuth login in your codex auth. An API key is not injected into the "
"sandbox; run `codex login` (ChatGPT SSO) first."
)
tokens = data.get("tokens") or {}
safe_tokens = {k: tokens[k] for k in CODEX_OAUTH_TOKEN_FIELDS if tokens.get(k) is not None}
return {
"OPENAI_API_KEY": None, # do not inject the API key
"auth_mode": "chatgpt",
"tokens": safe_tokens,
"last_refresh": data.get("last_refresh"),
}


def codex_auth_summary(data: dict) -> dict:
Expand All @@ -127,6 +145,8 @@ def codex_auth_summary(data: dict) -> dict:
summary: dict = {
"provider": "codex (ChatGPT)",
"auth_mode": data.get("auth_mode") or ("chatgpt" if tokens else "apikey"),
"has_oauth_login": codex_has_oauth(data),
"has_local_api_key": bool(data.get("OPENAI_API_KEY")), # present locally; NOT injected
"has_refresh_token": bool(tokens.get("refresh_token")),
"account_id": tokens.get("account_id"),
}
Expand Down Expand Up @@ -160,7 +180,7 @@ def run_codex_login(
subprocess.run([codex_bin, "login"], env=env, timeout=timeout, check=True) # noqa: S603
except FileNotFoundError as exc:
raise ModelLoginError(
f"`{codex_bin}` not found on PATH install the Codex CLI and run `codex login`, "
f"`{codex_bin}` not found on PATH - install the Codex CLI and run `codex login`, "
f"or pass --auth-file <path-to-your-auth.json>"
) from exc
except subprocess.CalledProcessError as exc:
Expand Down Expand Up @@ -191,7 +211,7 @@ def resolve_local_codex_auth(
if not path.exists():
if not allow_login:
raise ModelLoginError(
f"no codex auth at {path} run `codex login` (or pass --auth-file / --no-login off)"
f"no codex auth at {path} - run `codex login` (or pass --auth-file / drop --no-login)"
)
(login_runner or run_codex_login)(codex_home=home, timeout=login_timeout)
if not path.exists():
Expand All @@ -201,76 +221,25 @@ def resolve_local_codex_auth(
return path, data


# ── Codex config.toml: switch the sandbox codex to the ChatGPT subscription ──
_TOP_MODEL_PIN = re.compile(r"^\s*(model|model_provider|review_model)\s*=")
_AUTH_METHOD = re.compile(r"^\s*preferred_auth_method\s*=")
_TABLE_HEADER = re.compile(r"^\s*\[")


def rewrite_codex_config_for_chatgpt(toml_text: str) -> str:
"""Switch an existing codex config.toml to ChatGPT-subscription auth, preserving everything else.

The sandbox's default config pins a custom ``model_provider``/``model`` to a Volcengine Ark
endpoint that authenticates with an API key (``env_key``); in that mode codex never looks at the
injected ChatGPT token. To use the subscription we:
* drop the top-level ``model`` / ``model_provider`` / ``review_model`` pins, so codex falls back
to its built-in ``openai`` provider and the subscription's default model, and
* force ``preferred_auth_method = "chatgpt"`` so the OAuth token wins over any stray API key.
Tables (``[tui]``, ``[projects.*]``, ``[mcp_servers.*]``, ``[model_providers.*]``) and other
top-level keys (approval_policy, sandbox_mode, model_reasoning_effort, …) are kept verbatim.
"""
out: list[str] = []
seen_table = False
for line in toml_text.splitlines():
if _TABLE_HEADER.match(line):
seen_table = True
if _AUTH_METHOD.match(line):
continue # re-added at the top
if not seen_table and _TOP_MODEL_PIN.match(line):
continue
out.append(line)
body = "\n".join(out).strip("\n")
return 'preferred_auth_method = "chatgpt"\n' + (body + "\n" if body else "")


def minimal_chatgpt_codex_config() -> str:
"""A config.toml for a sandbox that has no codex config yet — ChatGPT auth, headless-friendly."""
def build_codex_injection_command(*, auth_data: dict) -> str:
"""POSIX-sh command that writes the sanitized auth.json (OAuth only, API key stripped) into
${CODEX_HOME:-$HOME/.codex} at 0600 and prints a marker. auth_data is sanitized here so the
transport cannot inject an API key."""
payload = json.dumps(sanitize_codex_auth_for_injection(auth_data), ensure_ascii=False)
return "\n".join(
[
'preferred_auth_method = "chatgpt"',
'approval_policy = "never"',
'sandbox_mode = "danger-full-access"',
"",
'[projects."/home/gem"]',
'trust_level = "trusted"',
"",
"set -e",
'CH="${CODEX_HOME:-$HOME/.codex}"',
'mkdir -p "$CH"',
"umask 077",
f"printf %s '{b64(payload)}' | base64 -d > \"$CH/auth.json\"",
'chmod 600 "$CH/auth.json"',
f'echo "{CODEX_INJECT_MARKER} $CH"',
]
)


def read_codex_config_command() -> str:
"""Shell command that prints the sandbox's current codex config.toml (empty if none)."""
return 'cat "${CODEX_HOME:-$HOME/.codex}/config.toml" 2>/dev/null || true'


def build_codex_injection_command(*, auth_json: str, config_toml: Optional[str] = None) -> str:
"""A single POSIX-sh command that writes auth.json (and optionally config.toml) into the
sandbox's native codex home (``${CODEX_HOME:-$HOME/.codex}``), 0600, and prints a marker."""
lines = [
"set -e",
'CH="${CODEX_HOME:-$HOME/.codex}"',
'mkdir -p "$CH"',
"umask 077",
f"printf %s '{b64(auth_json)}' | base64 -d > \"$CH/auth.json\"",
'chmod 600 "$CH/auth.json"',
]
if config_toml is not None:
lines.append(f"printf %s '{b64(config_toml)}' | base64 -d > \"$CH/config.toml\"")
lines.append(f'echo "{CODEX_INJECT_MARKER} $CH"')
return "\n".join(lines)


# ── Claude Code ──────────────────────────────────────────────────────────────
# Claude Code
def claude_creds_path(explicit: Optional[str] = None) -> Path:
if explicit:
return Path(explicit).expanduser()
Expand Down Expand Up @@ -308,7 +277,7 @@ def read_claude_creds(*, creds_file: Optional[str] = None, allow_keychain: bool
raw = _read_macos_keychain(CLAUDE_KEYCHAIN_SERVICE)
if not raw:
raise ModelLoginError(
"no Claude Code credentials in ~/.claude/.credentials.json or the macOS Keychain "
"no Claude Code credentials in ~/.claude/.credentials.json or the macOS Keychain - "
"run `claude` and log in with your subscription first"
)
try:
Expand All @@ -317,7 +286,7 @@ def read_claude_creds(*, creds_file: Optional[str] = None, allow_keychain: bool
raise ModelLoginError("Claude Keychain entry is not valid JSON") from exc
else:
raise ModelLoginError(
"no Claude Code credentials at ~/.claude/.credentials.json "
"no Claude Code credentials at ~/.claude/.credentials.json - "
"run `claude` and log in with your subscription first"
)
if not isinstance(data, dict):
Expand All @@ -331,6 +300,18 @@ def validate_claude_creds(data: dict) -> None:
raise ModelLoginError("Claude credentials missing claudeAiOauth.accessToken")


def sanitize_claude_creds_for_injection(data: dict) -> dict:
"""Return only the claudeAiOauth object; other top-level fields (e.g. a stored API key)
are dropped."""
oauth = data.get("claudeAiOauth")
if not (isinstance(oauth, dict) and oauth.get("accessToken")):
raise ModelLoginError(
"no Claude OAuth login found. An API key is not injected into the sandbox; "
"run `claude` and log in with your subscription first."
)
return {"claudeAiOauth": oauth}


def claude_creds_summary(data: dict) -> dict:
oauth = data.get("claudeAiOauth") or {}
summary: dict = {
Expand All @@ -345,15 +326,17 @@ def claude_creds_summary(data: dict) -> dict:
return {k: v for k, v in summary.items() if v is not None}


def build_claude_injection_command(*, creds_json: str) -> str:
"""A single POSIX-sh command that writes Claude Code creds into ``$HOME/.claude``, 0600."""
def build_claude_injection_command(*, creds_data: dict) -> str:
"""POSIX-sh command that writes the sanitized Claude creds into
$HOME/.claude/.credentials.json at 0600."""
payload = json.dumps(sanitize_claude_creds_for_injection(creds_data), ensure_ascii=False)
return "\n".join(
[
"set -e",
'CD="$HOME/.claude"',
'mkdir -p "$CD"',
"umask 077",
f"printf %s '{b64(creds_json)}' | base64 -d > \"$CD/.credentials.json\"",
f"printf %s '{b64(payload)}' | base64 -d > \"$CD/.credentials.json\"",
'chmod 600 "$CD/.credentials.json"',
f'echo "{CLAUDE_INJECT_MARKER} $CD"',
]
Expand Down
Loading