From bbf3dc8fc3bf41ccad58e892513bb3b8465b987b Mon Sep 17 00:00:00 2001 From: Eduard Dumitru Date: Thu, 2 Jul 2026 12:04:42 +0200 Subject: [PATCH] feat(cli): migrate the uipath server runtime channel from HTTP to uipath-ipc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the HTTP/socket channel (Unix-socket + TCP + ready-ACK) for the pre-warmed Python runtime with a uipath-ipc named-pipe server. `uipath server --pipe-name ` now hosts IPythonRuntimeServer (Ping / StartJob / CancelJob) over a named pipe — a $TMPDIR/CoreFxPipe_ Unix socket on Linux, a Win32 pipe on Windows — which the .NET job executor connects to as the client. This unifies today's UDS-on-Linux / TCP-on-Windows split into one primitive. - cli_server.py: adds the IPythonRuntimeServer contract + PythonRuntimeService (served by IpcServer over NamedPipeServerTransport), with per-job os.environ/cwd isolation. DTOs cross as plain PascalCase dicts (matching the .NET DTOs). StartJob is @ipc_cancellable; CancelJob and a host-cancelled StartJob set a per-job threading.Event exposed via get_current_cancellation() for the runtime to poll (cooperative, best-effort). The server command now takes a required --pipe-name and runs on a Windows Proactor loop (named pipes require it). Removes the whole HTTP path (start_unix_server, start_tcp_server, create_app, handle_start, handle_health, host-validation, send_ack, the SOCKET_ENV_VAR / DEFAULT_SOCKET_PATH / DEFAULT_PORT / generate_socket_path symbols, and the --client-socket / --server-socket / --port / --tcp options). - pyproject.toml: add the uipath-ipc dependency, pinned via [tool.uv.sources] + a [[tool.uv.index]] to the anonymously accessible UiPath-Internal Azure Artifacts feed so uv/CI can resolve it. This is uv resolution config only, not baked into the wheel; a public pip install uipath still needs uipath-ipc resolvable (public-PyPI release or an optional extra), a follow-up decision. - tests: replace the HTTP test_server.py with test_server_ipc.py, driving the IPC server over a pipe (Ping / StartJob / CancelJob, env isolation, cancel). Phase 1 (RPC) of the HTTP -> IPC migration. Logs stay file-tailed; making run/debug/eval + the agent poll the cancel signal is a runtime-side follow-up. Coordinated with the .NET executor (which must pass --pipe-name). Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/uipath/pyproject.toml | 14 + packages/uipath/src/uipath/_cli/cli_server.py | 410 ++++++++---------- packages/uipath/tests/cli/test_server.py | 371 ---------------- packages/uipath/tests/cli/test_server_ipc.py | 216 +++++++++ packages/uipath/uv.lock | 12 + 5 files changed, 422 insertions(+), 601 deletions(-) delete mode 100644 packages/uipath/tests/cli/test_server.py create mode 100644 packages/uipath/tests/cli/test_server_ipc.py diff --git a/packages/uipath/pyproject.toml b/packages/uipath/pyproject.toml index efcce771c..efef3c4ec 100644 --- a/packages/uipath/pyproject.toml +++ b/packages/uipath/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "uipath-core>=0.5.26, <0.6.0", "uipath-runtime>=0.11.5, <0.12.0", "uipath-platform>=0.1.89, <0.2.0", + "uipath-ipc>=2.5.1", "click>=8.3.1", "httpx>=0.28.1", "pyjwt>=2.10.1", @@ -169,10 +170,23 @@ exclude-newer = "2 days" uipath-core = false uipath-runtime = false uipath-platform = false +uipath-ipc = false [tool.uv.sources] uipath-core = { path = "../uipath-core", editable = true } uipath-platform = { path = "../uipath-platform", editable = true } +# uipath-ipc is published to the (anonymously accessible) UiPath-Internal Azure +# Artifacts feed, not PyPI. Pin it to that index so uv fetches ONLY uipath-ipc +# there — this avoids a dependency-confusion exposure from a plain extra-index. +# NOTE: this is uv/dev-and-CI resolution config only (not baked into the wheel); +# `pip install uipath` from PyPI still needs uipath-ipc to be resolvable +# (public-PyPI release or an optional extra) — a separate decision. +uipath-ipc = { index = "uipath-internal" } + +[[tool.uv.index]] +name = "uipath-internal" +url = "https://pkgs.dev.azure.com/uipath/Public.Feeds/_packaging/UiPath-Internal/pypi/simple/" +explicit = true [[tool.uv.index]] name = "testpypi" diff --git a/packages/uipath/src/uipath/_cli/cli_server.py b/packages/uipath/src/uipath/_cli/cli_server.py index c32c6de11..f92c17ae8 100644 --- a/packages/uipath/src/uipath/_cli/cli_server.py +++ b/packages/uipath/src/uipath/_cli/cli_server.py @@ -1,17 +1,24 @@ +"""`uipath server` — serves the pre-warmed Python runtime over uipath-ipc. + +Hosts ``IPythonRuntimeServer`` (Ping / StartJob / CancelJob) on a named pipe and +forwards StartJob to run/debug/eval; the .NET job executor connects as the +client. DTOs cross the wire as plain PascalCase dicts (matching the .NET DTOs). +""" + import asyncio import importlib -import json import os import shlex import sys -import tempfile +import threading import time -from importlib.metadata import entry_points +from abc import ABC, abstractmethod +from importlib.metadata import PackageNotFoundError, entry_points, version from importlib.util import find_spec from typing import Any import click -from aiohttp import ClientSession, UnixConnector, web +from uipath_ipc import IpcServer, NamedPipeServerTransport, ipc_cancellable from ._telemetry import track_command from ._utils._console import ConsoleLogger @@ -21,10 +28,6 @@ console = ConsoleLogger() -SOCKET_ENV_VAR = "UIPATH_SERVER_SOCKET" -DEFAULT_SOCKET_PATH = "/tmp/uipath-server.sock" -DEFAULT_PORT = 8765 - IS_WINDOWS = sys.platform == "win32" COMMANDS = { @@ -95,11 +98,6 @@ def preload_modules() -> None: console.success(f"Modules pre-loaded in {elapsed:.2f}s") -def generate_socket_path() -> str: - """Generate a unique socket path for the server to listen on.""" - return os.path.join(tempfile.gettempdir(), f"uipath-server-{os.getpid()}.sock") - - def get_field(message: dict[str, Any], *keys: str) -> Any: """Get a field from message, trying multiple key variations.""" for key in keys: @@ -119,97 +117,77 @@ def parse_args(args: str | list[str] | None) -> list[str]: return [] -async def send_ack(ack_socket_path: str, server_socket_path: str) -> None: - """Send acknowledgment via HTTP POST to the ack socket.""" - ack_message: dict[str, str] = { - "status": "ready", - "socket": server_socket_path, - } +class _CurrentJob: + """The single in-flight job (jobs are serialized by ``_state.lock``), so a + ``CancelJob`` RPC — or a host-cancelled ``StartJob`` — can signal a + cooperative stop the runtime may poll via :func:`get_current_cancellation`. + """ - conn = UnixConnector(path=ack_socket_path) - try: - async with ClientSession(connector=conn) as session: - async with session.post( - "http://localhost/api/python/ack", # placeholder URL for Unix socket - json=ack_message, - ) as response: - if response.status == 200: - console.success(f"Sent ack to {ack_socket_path}") - else: - console.error(f"Ack failed with status {response.status}") - raise RuntimeError(f"Ack failed: {response.status}") - except Exception as e: - console.error(f"Failed to send ack to {ack_socket_path}: {e}") - raise - - -async def handle_health(request: web.Request) -> web.Response: - """Handle GET /health endpoint.""" - return web.Response(text="OK", status=200) - - -async def handle_start(request: web.Request) -> web.Response: - """Handle POST /jobs/{job_key}/start endpoint.""" - job_key = request.match_info.get("job_key") - if not job_key: - return web.json_response( - {"success": False, "error": "Missing job_key"}, - status=400, - ) + def __init__(self) -> None: + self.job_key: str | None = None + self.cancel_event: threading.Event | None = None - try: - message: dict[str, Any] = await request.json() - except json.JSONDecodeError: - return web.json_response( - {"success": False, "error": "Invalid JSON"}, - status=400, - ) + def begin(self, job_key: str) -> threading.Event: + event = threading.Event() + self.job_key = job_key + self.cancel_event = event + return event - command_name = get_field(message, "command", "Command") - if not isinstance(command_name, str): - return web.json_response( - {"success": False, "error": "Missing or invalid field: 'command'"}, - status=400, - ) + def end(self, job_key: str) -> None: + if self.job_key == job_key: + self.job_key = None + self.cancel_event = None - args_raw = get_field(message, "args", "Args") - args = parse_args(args_raw) + def cancel(self, job_key: str) -> bool: + if self.job_key == job_key and self.cancel_event is not None: + self.cancel_event.set() + return True + return False - env_vars = get_field(message, "environmentVariables", "EnvironmentVariables") or {} - working_dir = get_field(message, "workingDirectory", "WorkingDirectory") - console.info(f"Starting job {job_key}: {command_name} {args}") +_current_job = _CurrentJob() - cmd = COMMANDS.get(command_name) - if cmd is None: - return web.json_response( - {"success": False, "error": f"Unknown command: {command_name}"}, - status=400, - ) - console.info(f"Original cwd: {os.getcwd()}") - console.info(f"Requested working_dir: {working_dir}") +def get_current_cancellation() -> threading.Event | None: + """Cooperative-cancel signal for the currently running job, or ``None``. + A runtime (run/debug/eval, or the agent) may poll ``.is_set()`` to stop + early. Thread-safe (a ``threading.Event``); jobs are serialized, so there is + at most one at a time. NOTE: whether a job actually observes this is up to + the runtime/agent — cancellation is cooperative and best-effort. + """ + return _current_job.cancel_event + + +def _runtime_version() -> str: + try: + return version("uipath") + except PackageNotFoundError: + return "unknown" + + +async def _run_command_isolated( + cmd: Any, + args: list[str], + env_vars: dict[str, str], + working_dir: str | None, + cancel_event: threading.Event, +) -> dict[str, Any]: + """Run one command with per-job env/cwd isolation; return ``{ExitCode, Error}``. + + Serialized by ``_state.lock``, with ``os.environ`` reset to the server + baseline + request vars and cwd swapped/restored around the run. The rich + result is written off-channel (``output.json``); IPC carries only the exit + code and error. + """ if _state.lock is None or _state.baseline_env is None: raise RuntimeError("Server state not initialized") - # Validate environmentVariables type early - if env_vars and not isinstance(env_vars, dict): - return web.json_response( - { - "success": False, - "error": "Invalid field: 'environmentVariables' must be a dict", - }, - status=400, - ) - - # Serialize command execution to prevent concurrent os.environ mutation async with _state.lock: original_cwd = os.getcwd() - try: - # Start from server baseline + request env vars only. - # This ensures no env vars from previous requests leak through. + # Start from server baseline + request env vars only, so nothing from + # a previous job leaks through. os.environ.clear() os.environ.update(_state.baseline_env) if isinstance(env_vars, dict): @@ -219,40 +197,37 @@ async def handle_start(request: web.Request) -> web.Response: try: os.chdir(working_dir) except (FileNotFoundError, NotADirectoryError, PermissionError) as e: - return web.json_response( - { - "success": False, - "job_key": job_key, - "error": f"Cannot change to working directory: {e}", - }, - status=400, - ) - - result = await asyncio.to_thread(cmd.main, args, standalone_mode=False) - - return web.json_response( - { - "success": True, - "job_key": job_key, - "result": result, - } + return { + "ExitCode": 1, + "Error": f"Cannot change to working directory: {e}", + } + + job_task = asyncio.ensure_future( + asyncio.to_thread(cmd.main, args, standalone_mode=False) ) + try: + await asyncio.shield(job_task) + return {"ExitCode": 0, "Error": None} + except asyncio.CancelledError: + # The host cancelled the call. The worker thread can't be + # force-killed, so signal cooperative cancellation and wait for it + # to unwind before restoring env/cwd, then propagate. + cancel_event.set() + try: + await job_task + except BaseException: + pass + raise except SystemExit as e: exit_code = e.code if isinstance(e.code, int) else 1 - return web.json_response( - { - "success": exit_code == 0, - "job_key": job_key, - "error": None if exit_code == 0 else f"Exit code: {exit_code}", - } - ) - except Exception as e: - return web.json_response( - {"success": False, "job_key": job_key, "error": str(e)}, - status=500, - ) + return { + "ExitCode": exit_code, + "Error": None if exit_code == 0 else f"Exit code: {exit_code}", + } + except Exception as e: # report any job failure as a result, not a fault + return {"ExitCode": 1, "Error": str(e)} finally: - # Restore to server baseline + # Restore to server baseline. try: os.chdir(original_cwd) except OSError: @@ -261,146 +236,121 @@ async def handle_start(request: web.Request) -> web.Response: os.environ.update(_state.baseline_env) -ALLOWED_HOSTS = {"127.0.0.1", "localhost", "[::1]"} - - -@web.middleware -async def host_validation_middleware( - request: web.Request, handler: Any -) -> web.StreamResponse: - """Validate the Host header to prevent DNS rebinding attacks.""" - host = request.host - if host: - host = host.lower() - # Strip port from bracketed IPv6 (e.g. "[::1]:8765" -> "[::1]") - if host.startswith("["): - bracket_end = host.find("]") - if bracket_end != -1: - host = host[: bracket_end + 1] - # Strip port from IPv4/hostname (e.g. "localhost:8765" -> "localhost") - elif ":" in host: - host = host.rsplit(":", 1)[0] - # Strip trailing dot (e.g. "localhost." -> "localhost") - host = host.rstrip(".") - if host not in ALLOWED_HOSTS: - return web.json_response( - {"error": "Forbidden: invalid Host header"}, - status=403, - ) - return await handler(request) +class IPythonRuntimeServer(ABC): + """Contract the .NET job executor calls over uipath-ipc. + ``__name__`` is the wire endpoint (matching the .NET ``IPythonRuntimeServer``); + DTOs cross as PascalCase dicts. + """ -def create_app() -> web.Application: - """Create the aiohttp application.""" - app = web.Application(middlewares=[host_validation_middleware]) - app.router.add_get("/health", handle_health) - app.router.add_post("/jobs/{job_key}/start", handle_start) - return app + @abstractmethod + async def Ping(self) -> dict[str, Any]: + """Readiness probe → ``{Status, RuntimeVersion}`` (replaces the push-ACK).""" + @ipc_cancellable + @abstractmethod + async def StartJob(self, request: dict[str, Any]) -> dict[str, Any]: + """Run a job (``request`` = PythonRunRequest) → ``{ExitCode, Error}``. -async def start_unix_server( - ack_socket_path: str, server_socket_path: str | None = None -) -> None: - """Start Unix domain socket HTTP server.""" - _state.init() + ``@ipc_cancellable``: the .NET counterpart's trailing ``CancellationToken`` + lets the client cancel the call, which surfaces here as cooperative cancel. + """ - server_socket_path = server_socket_path or generate_socket_path() + @abstractmethod + async def CancelJob(self, jobKey: str) -> bool: + """Signal a cooperative cancel for ``jobKey``; ``True`` if it was in flight.""" - if os.path.exists(server_socket_path): - os.unlink(server_socket_path) - app = create_app() - runner = web.AppRunner(app) - await runner.setup() +class PythonRuntimeService: + """``IPythonRuntimeServer`` implementation backed by run/debug/eval.""" - try: - site = web.UnixSite(runner, server_socket_path) - await site.start() + async def Ping(self) -> dict[str, Any]: + return {"Status": "ready", "RuntimeVersion": _runtime_version()} - console.success(f"Server listening on unix://{server_socket_path}") + async def StartJob(self, request: dict[str, Any]) -> dict[str, Any]: + job_key = str(get_field(request, "JobKey", "jobKey") or "") - await send_ack(ack_socket_path, server_socket_path) + command_name = get_field(request, "Command", "command") + if not isinstance(command_name, str): + return {"ExitCode": 1, "Error": "Missing or invalid field: 'Command'"} - while True: - await asyncio.sleep(3600) - finally: - await runner.cleanup() - if os.path.exists(server_socket_path): - os.unlink(server_socket_path) + cmd = COMMANDS.get(command_name) + if cmd is None: + return {"ExitCode": 1, "Error": f"Unknown command: {command_name}"} + args = parse_args(get_field(request, "Args", "args")) + env_vars = get_field(request, "EnvironmentVariables", "environmentVariables") or {} + working_dir = get_field(request, "WorkingDirectory", "workingDirectory") -async def start_tcp_server(host: str, port: int) -> None: - """Start TCP HTTP server (Windows fallback).""" - _state.init() + console.info(f"Starting job {job_key}: {command_name} {args}") - app = create_app() - runner = web.AppRunner(app) - await runner.setup() + cancel_event = _current_job.begin(job_key) + try: + return await _run_command_isolated( + cmd, args, env_vars, working_dir, cancel_event + ) + finally: + _current_job.end(job_key) - try: - site = web.TCPSite(runner, host, port) - await site.start() + async def CancelJob(self, jobKey: str) -> bool: + signalled = _current_job.cancel(str(jobKey)) + console.info( + f"CancelJob {jobKey}: " + + ("signalled cooperative stop" if signalled else "no matching in-flight job") + ) + return signalled - console.success(f"Server listening on http://{host}:{port}") - while True: - await asyncio.sleep(3600) - finally: - await runner.cleanup() +async def start_ipc_server(pipe_name: str) -> None: + """Serve the Python runtime over a uipath-ipc named pipe until it is closed.""" + _state.init() + server = IpcServer( + transport=NamedPipeServerTransport(pipe_name), + services={IPythonRuntimeServer: PythonRuntimeService()}, + request_timeout=None, # jobs are long-running; no server-side timeout + ) + console.success(f"IPC server listening on pipe '{pipe_name}'") + async with server: + await server.serve_forever() @click.command() @click.option( - "--client-socket", - type=str, - default=None, - help=f"Unix socket path to send ready ack to (default: ${SOCKET_ENV_VAR} or {DEFAULT_SOCKET_PATH})", -) -@click.option( - "--server-socket", + "--pipe-name", type=str, - default=None, - help="Unix socket path the server listens on (default: auto-generated in tmp dir)", -) -@click.option( - "--port", - type=int, - default=None, - help=f"TCP port, used on Windows or when --tcp flag is set (default: {DEFAULT_PORT})", -) -@click.option( - "--tcp", - is_flag=True, - help="Force TCP mode even on Unix systems", + required=True, + help="Named-pipe name to serve the runtime over uipath-ipc. On Linux this is " + "a $TMPDIR/CoreFxPipe_ Unix socket; on Windows a Win32 named pipe.", ) @track_command("server") -def server( - client_socket: str | None, - server_socket: str | None, - port: int | None, - tcp: bool, -) -> None: - """Start an HTTP server that forwards commands to run/debug/eval. - - Creates its own socket to listen on and sends an ack to --client-socket with: - {"status": "ready", "socket": "/path/to/server.sock"} - - Endpoint: POST /jobs/{job_key}/start - Body: {"command": "run", "args": "agent.json '{}'", "environmentVariables": {}, "workingDirectory": "/path"} +def server(pipe_name: str) -> None: + """Serve the runtime over uipath-ipc. - Endpoint: GET /health + Hosts IPythonRuntimeServer (Ping / StartJob / CancelJob) on the named pipe and + forwards StartJob to run/debug/eval; the .NET job executor connects as the + client. """ - use_tcp = IS_WINDOWS or tcp - preload_modules() + _run_ipc_server(pipe_name) + + +def _run_ipc_server(pipe_name: str) -> None: + """Run the IPC server, on a Proactor event loop when on Windows. + Windows named pipes require the Proactor loop (the Selector loop can't do pipe + I/O). Proactor is the default since Python 3.8, but we build it explicitly + rather than trust the ambient policy, which another library in the process + (e.g. socketio) could have flipped to Selector. + """ try: - if use_tcp: - asyncio.run(start_tcp_server("127.0.0.1", port or DEFAULT_PORT)) + if IS_WINDOWS: + loop = asyncio.ProactorEventLoop() # type: ignore[attr-defined] + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(start_ipc_server(pipe_name)) + finally: + loop.close() else: - ack_socket_path = ( - client_socket or os.environ.get(SOCKET_ENV_VAR) or DEFAULT_SOCKET_PATH - ) - asyncio.run(start_unix_server(ack_socket_path, server_socket)) + asyncio.run(start_ipc_server(pipe_name)) except KeyboardInterrupt: console.info("Shutting down") diff --git a/packages/uipath/tests/cli/test_server.py b/packages/uipath/tests/cli/test_server.py deleted file mode 100644 index 185979924..000000000 --- a/packages/uipath/tests/cli/test_server.py +++ /dev/null @@ -1,371 +0,0 @@ -import asyncio -import json -import os -import threading -import time -from typing import Any - -import aiohttp -import pytest - -from uipath._cli.cli_server import start_tcp_server - - -def create_uipath_json(script_path: str, entrypoint_name: str = "main"): - """Helper to create uipath.json with functions.""" - return {"functions": {entrypoint_name: f"{script_path}:main"}} - - -async def start_job( - port: int, job_key: str, command: str, args: list[str] -) -> dict[str, Any]: - """Start a job on the server.""" - async with aiohttp.ClientSession() as session: - async with session.post( - f"http://127.0.0.1:{port}/jobs/{job_key}/start", - json={"command": command, "args": args}, - ) as response: - return await response.json() - - -async def start_job_with_env( - port: int, - job_key: str, - command: str, - args: list[str], - env_vars: dict[str, str], -) -> dict[str, Any]: - """Start a job on the server with environment variables.""" - async with aiohttp.ClientSession() as session: - async with session.post( - f"http://127.0.0.1:{port}/jobs/{job_key}/start", - json={ - "command": command, - "args": args, - "environmentVariables": env_vars, - }, - ) as response: - return await response.json() - - -class TestServer: - @pytest.fixture - def server_port(self): - """Use a random available port for testing.""" - import socket - - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("127.0.0.1", 0)) - return s.getsockname()[1] - - @pytest.fixture - def server(self, server_port): - """Start the server in a background thread.""" - - def run_server(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(start_tcp_server("127.0.0.1", server_port)) - except asyncio.CancelledError: - pass - finally: - loop.close() - - thread = threading.Thread(target=run_server, daemon=True) - thread.start() - time.sleep(0.5) - - yield server_port - - @pytest.fixture - def simple_script(self) -> str: - return """ -from dataclasses import dataclass - -@dataclass -class Input: - message: str - repeat: int = 1 - -def main(input: Input) -> str: - return (input.message + " ") * input.repeat -""" - - def test_start_job_success(self, server, temp_dir, simple_script): - """Test starting a job through the server.""" - port = server - job_key = "test-job-123" - - with pytest.MonkeyPatch().context() as mp: - mp.chdir(temp_dir) - - script_file = "entrypoint.py" - script_path = os.path.join(temp_dir, script_file) - with open(script_path, "w") as f: - f.write(simple_script) - - with open(os.path.join(temp_dir, "uipath.json"), "w") as f: - json.dump(create_uipath_json(script_file), f) - - input_file = os.path.join(temp_dir, "input.json") - with open(input_file, "w") as f: - json.dump({"message": "Hello", "repeat": 3}, f) - - output_file = os.path.join(temp_dir, "output.json") - - response = asyncio.run( - start_job( - port, - job_key, - "run", - ["main", "--input-file", input_file, "--output-file", output_file], - ) - ) - - assert response["success"] is True - assert response["job_key"] == job_key - assert os.path.exists(output_file) - - with open(output_file, "r") as f: - output = f.read() - assert "Hello" in output - - def test_start_job_unknown_command(self, server): - """Test starting a job with unknown command.""" - port = server - - response = asyncio.run(start_job(port, "job-123", "unknown_command", [])) - - assert response["success"] is False - assert "Unknown command" in response["error"] - - def test_start_job_missing_command(self, server): - """Test starting a job without command field.""" - port = server - - async def send_invalid(): - async with aiohttp.ClientSession() as session: - async with session.post( - f"http://127.0.0.1:{port}/jobs/job-123/start", - json={"args": ["some", "args"]}, - ) as response: - return await response.json() - - response = asyncio.run(send_invalid()) - - assert response["success"] is False - assert "command" in response["error"] - - def test_start_job_invalid_json(self, server): - """Test starting a job with invalid JSON.""" - port = server - - async def send_invalid(): - async with aiohttp.ClientSession() as session: - async with session.post( - f"http://127.0.0.1:{port}/jobs/job-123/start", - data="not valid json", - headers={"Content-Type": "application/json"}, - ) as response: - return await response.json() - - response = asyncio.run(send_invalid()) - - assert response["success"] is False - assert "Invalid JSON" in response["error"] - - def test_rejects_invalid_host_header(self, server): - """Test that requests with non-localhost Host header are rejected (DNS rebinding protection).""" - port = server - - async def send_with_host(host: str): - async with aiohttp.ClientSession() as session: - async with session.get( - f"http://127.0.0.1:{port}/health", - headers={"Host": host}, - ) as response: - return response.status, await response.json() - - # Attacker-controlled domain should be rejected - status, body = asyncio.run(send_with_host("attacker.example.com")) - assert status == 403 - assert "Forbidden" in body["error"] - - # Attacker domain with port should also be rejected - status, body = asyncio.run(send_with_host(f"evil.com:{port}")) - assert status == 403 - assert "Forbidden" in body["error"] - - def test_allows_localhost_host_header(self, server): - """Test that requests with valid localhost Host headers are accepted.""" - port = server - - async def send_with_host(host: str): - async with aiohttp.ClientSession() as session: - async with session.get( - f"http://127.0.0.1:{port}/health", - headers={"Host": host}, - ) as response: - return response.status, await response.text() - - # localhost variants should all be accepted - for host in [ - "127.0.0.1", - f"127.0.0.1:{port}", - "localhost", - f"localhost:{port}", - "[::1]", - f"[::1]:{port}", - "LOCALHOST", - f"LOCALHOST:{port}", - "LocalHost", - "127.0.0.1.", - f"localhost.:{port}", - ]: - status, body = asyncio.run(send_with_host(host)) - assert status == 200, f"Host '{host}' should be allowed but got {status}" - assert body == "OK" - - -class TestServerEnvIsolation: - """Test that environment variables are isolated between sequential server requests.""" - - @pytest.fixture - def server_port(self): - import socket - - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("127.0.0.1", 0)) - return s.getsockname()[1] - - @pytest.fixture - def env_snapshots(self): - return [] - - @pytest.fixture - def server_with_spy(self, server_port, env_snapshots): - """Start server with a spy command that captures os.environ.""" - import click - - from uipath._cli import cli_server - - @click.command() - def spy_cmd(): - env_snapshots.append(dict(os.environ)) - - original_commands = cli_server.COMMANDS.copy() - cli_server.COMMANDS["spy"] = spy_cmd - - def run_server(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(start_tcp_server("127.0.0.1", server_port)) - except asyncio.CancelledError: - pass - finally: - loop.close() - - thread = threading.Thread(target=run_server, daemon=True) - thread.start() - time.sleep(0.5) - - yield server_port - - cli_server.COMMANDS.clear() - cli_server.COMMANDS.update(original_commands) - - def test_env_vars_do_not_leak_between_requests( - self, server_with_spy, env_snapshots - ): - """Env vars from request 1 must not be visible in request 2.""" - port = server_with_spy - - # Request 1: set TEST_VAR_A - asyncio.run( - start_job_with_env( - port, - "job-1", - "spy", - [], - {"TEST_VAR_A": "value_a"}, - ) - ) - - # Request 2: set TEST_VAR_B (but NOT TEST_VAR_A) - asyncio.run( - start_job_with_env( - port, - "job-2", - "spy", - [], - {"TEST_VAR_B": "value_b"}, - ) - ) - - assert len(env_snapshots) == 2 - - env_run1 = env_snapshots[0] - env_run2 = env_snapshots[1] - - # Run 1 should have TEST_VAR_A - assert env_run1["TEST_VAR_A"] == "value_a" - assert "TEST_VAR_B" not in env_run1 - - # Run 2 should have TEST_VAR_B but NOT TEST_VAR_A - assert env_run2["TEST_VAR_B"] == "value_b" - assert "TEST_VAR_A" not in env_run2 - - def test_server_baseline_env_preserved(self, server_with_spy, env_snapshots): - """Server baseline env vars (like PATH) should be available during command execution.""" - from uipath._cli import cli_server - - port = server_with_spy - - asyncio.run( - start_job_with_env( - port, - "job-1", - "spy", - [], - {"CUSTOM_VAR": "custom_value"}, - ) - ) - - assert len(env_snapshots) == 1 - env_run = env_snapshots[0] - - # Baseline is captured at server start, not import time - baseline = cli_server._state.baseline_env - assert baseline is not None - - # Baseline env vars should be present - assert env_run.get("PATH") == baseline.get("PATH") - - # Request env var should override/add - assert env_run["CUSTOM_VAR"] == "custom_value" - - def test_env_restored_after_request(self, server_with_spy): - """os.environ should be restored to baseline after each request.""" - from uipath._cli import cli_server - - port = server_with_spy - - asyncio.run( - start_job_with_env( - port, - "job-1", - "spy", - [], - {"SHOULD_NOT_PERSIST": "temporary"}, - ) - ) - - baseline = cli_server._state.baseline_env - assert baseline is not None - - # After the request, os.environ should match baseline - assert "SHOULD_NOT_PERSIST" not in os.environ - for key in baseline: - assert os.environ.get(key) == baseline[key] diff --git a/packages/uipath/tests/cli/test_server_ipc.py b/packages/uipath/tests/cli/test_server_ipc.py new file mode 100644 index 000000000..07118ba4d --- /dev/null +++ b/packages/uipath/tests/cli/test_server_ipc.py @@ -0,0 +1,216 @@ +"""Tests for the uipath-ipc runtime server (``uipath server --pipe-name``). + +Mirrors ``test_server.py`` (which covers the HTTP path) but drives the new +``IpcServer`` over a named pipe with a Python ``uipath-ipc`` client. Adds the +readiness (``Ping``) and cancellation coverage the HTTP/ack path never had. + +Requires ``uipath-ipc`` to be installed. ``StartJob`` success runs the real +runtime (like ``test_server.test_start_job_success``); the rest exercise the IPC +wiring, env isolation, and the cooperative-cancel hook without it. +""" + +import asyncio +import json +import os +import threading +import time +from typing import Any, Awaitable, Callable + +import click +import pytest + +from uipath._cli import cli_server +from uipath._cli.cli_server import ( + IPythonRuntimeServer, + get_current_cancellation, + start_ipc_server, +) +from uipath_ipc import IpcClient, NamedPipeClientTransport + +_pipe_counter = 0 + + +def _unique_pipe() -> str: + global _pipe_counter + _pipe_counter += 1 + return f"uipath-ipc-test-{os.getpid()}-{_pipe_counter}" + + +def _serve_in_background(pipe_name: str) -> None: + """Run the IPC server on its own event loop in a daemon thread. + + ``asyncio.new_event_loop()`` yields the per-OS default loop — Proactor on + Windows (required for named pipes), Selector on Linux (CoreFxPipe UDS). + """ + + def run_server() -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(start_ipc_server(pipe_name)) + except asyncio.CancelledError: + pass + finally: + loop.close() + + thread = threading.Thread(target=run_server, daemon=True) + thread.start() + time.sleep(0.5) + + +async def _with_proxy( + pipe_name: str, fn: Callable[[Any], Awaitable[Any]] +) -> Any: + """Connect a uipath-ipc client to the pipe, run ``fn(proxy)``, then close.""" + client = IpcClient(transport=NamedPipeClientTransport(pipe_name)) + try: + proxy = client.get_proxy(IPythonRuntimeServer) + return await fn(proxy) + finally: + await client.aclose() + + +def create_uipath_json(script_path: str, entrypoint_name: str = "main") -> dict: + return {"functions": {entrypoint_name: f"{script_path}:main"}} + + +SIMPLE_SCRIPT = """ +from dataclasses import dataclass + +@dataclass +class Input: + message: str + repeat: int = 1 + +def main(input: Input) -> str: + return (input.message + " ") * input.repeat +""" + + +class TestIpcServer: + @pytest.fixture + def pipe(self): + pipe_name = _unique_pipe() + _serve_in_background(pipe_name) + # Daemon thread; the server blocks in serve_forever and is torn down when + # the process exits (mirrors test_server.py's background HTTP server). + yield pipe_name + + def test_ping_reports_ready(self, pipe): + result = asyncio.run(_with_proxy(pipe, lambda p: p.Ping())) + assert result["Status"] == "ready" + assert isinstance(result["RuntimeVersion"], str) + assert result["RuntimeVersion"] + + def test_start_job_success(self, pipe, temp_dir): + """A real 'run' job executes and writes output.json (needs the runtime).""" + script_file = "entrypoint.py" + with open(os.path.join(temp_dir, script_file), "w") as f: + f.write(SIMPLE_SCRIPT) + with open(os.path.join(temp_dir, "uipath.json"), "w") as f: + json.dump(create_uipath_json(script_file), f) + + input_file = os.path.join(temp_dir, "input.json") + with open(input_file, "w") as f: + json.dump({"message": "Hello", "repeat": 3}, f) + output_file = os.path.join(temp_dir, "output.json") + + request = { + "JobKey": "job-123", + "Command": "run", + "Args": ["main", "--input-file", input_file, "--output-file", output_file], + "WorkingDirectory": temp_dir, + "EnvironmentVariables": {}, + } + result = asyncio.run(_with_proxy(pipe, lambda p: p.StartJob(request))) + + assert result["ExitCode"] == 0 + assert result["Error"] is None + assert os.path.exists(output_file) + with open(output_file, "r") as f: + assert "Hello" in f.read() + + def test_start_job_unknown_command(self, pipe): + request = {"JobKey": "job-1", "Command": "does_not_exist"} + result = asyncio.run(_with_proxy(pipe, lambda p: p.StartJob(request))) + assert result["ExitCode"] != 0 + assert "Unknown command" in (result["Error"] or "") + + def test_cancel_job_without_running_job(self, pipe): + cancelled = asyncio.run(_with_proxy(pipe, lambda p: p.CancelJob("no-such-job"))) + assert cancelled is False + + def test_cooperative_cancel_signals_the_running_job(self, pipe): + """CancelJob sets the token the running command polls via + get_current_cancellation(); the command then stops cooperatively.""" + observed = threading.Event() + + @click.command() + def blocking_cmd() -> None: + event = get_current_cancellation() + assert event is not None, "cancellation signal not exposed to the job" + if event.wait(timeout=10): + observed.set() + + cli_server.COMMANDS["blocking"] = blocking_cmd + try: + + async def scenario(proxy: Any) -> tuple[bool, dict]: + pending = asyncio.ensure_future( + proxy.StartJob({"JobKey": "job-42", "Command": "blocking"}) + ) + await asyncio.sleep(0.5) # let the job start and register its token + cancelled = await proxy.CancelJob("job-42") + result = await asyncio.wait_for(pending, timeout=10) + return cancelled, result + + cancelled, result = asyncio.run(_with_proxy(pipe, scenario)) + + assert cancelled is True + assert observed.is_set(), "the running command never saw the cancel signal" + assert result["ExitCode"] == 0 + finally: + cli_server.COMMANDS.pop("blocking", None) + + +class TestIpcServerEnvIsolation: + """Env vars must not leak between sequential jobs (as on the HTTP path).""" + + @pytest.fixture + def pipe_with_spy(self): + env_snapshots: list[dict[str, str]] = [] + + @click.command() + def spy_cmd() -> None: + env_snapshots.append(dict(os.environ)) + + original = cli_server.COMMANDS.copy() + cli_server.COMMANDS["spy"] = spy_cmd + + pipe_name = _unique_pipe() + _serve_in_background(pipe_name) + try: + yield pipe_name, env_snapshots + finally: + cli_server.COMMANDS.clear() + cli_server.COMMANDS.update(original) + + def test_env_vars_do_not_leak_between_jobs(self, pipe_with_spy): + pipe_name, env_snapshots = pipe_with_spy + + async def run_two(proxy: Any) -> None: + await proxy.StartJob( + {"JobKey": "job-1", "Command": "spy", "EnvironmentVariables": {"TEST_VAR_A": "a"}} + ) + await proxy.StartJob( + {"JobKey": "job-2", "Command": "spy", "EnvironmentVariables": {"TEST_VAR_B": "b"}} + ) + + asyncio.run(_with_proxy(pipe_name, run_two)) + + assert len(env_snapshots) == 2 + run1, run2 = env_snapshots + assert run1["TEST_VAR_A"] == "a" + assert "TEST_VAR_B" not in run1 + assert run2["TEST_VAR_B"] == "b" + assert "TEST_VAR_A" not in run2 diff --git a/packages/uipath/uv.lock b/packages/uipath/uv.lock index 87ca9e781..d5406c6bc 100644 --- a/packages/uipath/uv.lock +++ b/packages/uipath/uv.lock @@ -7,6 +7,7 @@ exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for exclude-newer-span = "P2D" [options.exclude-newer-package] +uipath-ipc = false uipath-runtime = false uipath-platform = false uipath-core = false @@ -2572,6 +2573,7 @@ dependencies = [ { name = "tenacity" }, { name = "truststore" }, { name = "uipath-core" }, + { name = "uipath-ipc" }, { name = "uipath-platform" }, { name = "uipath-runtime" }, ] @@ -2624,6 +2626,7 @@ requires-dist = [ { name = "tenacity", specifier = ">=9.0.0" }, { name = "truststore", specifier = ">=0.10.1" }, { name = "uipath-core", editable = "../uipath-core" }, + { name = "uipath-ipc", specifier = ">=2.5.1", index = "https://pkgs.dev.azure.com/uipath/Public.Feeds/_packaging/UiPath-Internal/pypi/simple/" }, { name = "uipath-platform", editable = "../uipath-platform" }, { name = "uipath-runtime", specifier = ">=0.11.5,<0.12.0" }, ] @@ -2689,6 +2692,15 @@ dev = [ { name = "rust-just", specifier = ">=1.39.0" }, ] +[[package]] +name = "uipath-ipc" +version = "2.5.1+20260625.1" +source = { registry = "https://pkgs.dev.azure.com/uipath/Public.Feeds/_packaging/UiPath-Internal/pypi/simple/" } +sdist = { url = "https://pkgs.dev.azure.com/uipath/5b98d55c-1b14-4a03-893f-7a59746f1246/_packaging/788028a9-5a01-48ee-b925-3af51ae46294/pypi/download/uipath-ipc/2.5.1+20260625.1/uipath_ipc-2.5.1+20260625.1.tar.gz", hash = "sha256:bc202fef26d8ecae6b8bcfaccd6bcfb4675e0160faabf8d9281f1028643adee9" } +wheels = [ + { url = "https://pkgs.dev.azure.com/uipath/5b98d55c-1b14-4a03-893f-7a59746f1246/_packaging/788028a9-5a01-48ee-b925-3af51ae46294/pypi/download/uipath-ipc/2.5.1+20260625.1/uipath_ipc-2.5.1+20260625.1-py3-none-any.whl", hash = "sha256:0e90b0198b9afa8a1fa832e010aa1160914391d9b5610684a3bf582704f41773" }, +] + [[package]] name = "uipath-platform" version = "0.1.90"