diff --git a/pyproject.toml b/pyproject.toml index b297bd73..96b9ff77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sap-cloud-sdk" -version = "0.29.1" +version = "0.30.0" description = "SAP Cloud SDK for Python" readme = "README.md" license = "Apache-2.0" diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 01267bee..e34cfc2b 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import itertools import json import logging @@ -15,13 +16,14 @@ from sap_cloud_sdk.core.telemetry import Module, Operation from sap_cloud_sdk.core.telemetry.metrics_decorator import record_metrics +from sap_cloud_sdk.agentgateway import create_client as create_agw_client from sap_cloud_sdk.extensibility._models import ( DEFAULT_EXTENSION_CAPABILITY_ID, ExtensionCapabilityImplementation, Hook, ) from sap_cloud_sdk.extensibility.config import HookConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError, TransportError if TYPE_CHECKING: from sap_cloud_sdk.extensibility._local_transport import LocalTransport @@ -38,6 +40,7 @@ _EXECUTE_WORKFLOW_TOOL_NAME = "execute_workflow" _GET_EXECUTION_TOOL_NAME = "get_execution" +_N8N_MCP_SERVER_NAME = "sap.btpn8n:apiResource:ManagedN8nMcpServer:v1" _JSONRPC_VERSION = "2.0" @@ -92,7 +95,7 @@ def _parse_response(response: httpx.Response) -> dict[str, Any]: def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: if "error" in jsonrpc: msg = jsonrpc["error"].get("message", "Unknown error") - raise TransportError(f"n8n returned an error: {msg}") + raise ExtensibilityError(f"n8n returned an error: {msg}") result = jsonrpc.get("result", {}) if result.get("isError"): @@ -100,7 +103,7 @@ def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: error_text = next( (c.get("text", "") for c in content if c.get("type") == "text"), "" ) - raise TransportError(f"n8n tool call failed: {error_text}") + raise ExtensibilityError(f"n8n tool call failed: {error_text}") for item in result.get("content", []): if item.get("type") == "text": @@ -113,7 +116,7 @@ def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: if structured is not None: return structured - raise TransportError("Hook response contains no parseable content.") + raise ExtensibilityError("Hook response contains no parseable content.") class ExtensibilityClient: @@ -314,30 +317,12 @@ def call_hook( # 2. Fail fast on terminal statuses from execute-workflow if status in _EXECUTE_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {status!r}" + (f": {error_msg}" if error_msg else "") ) - # 3. Return immediately if execution completed synchronously - if status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) - ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc - - # 4. Poll get-execution for running/new/waiting/started + # 3. Poll get-execution for running/new/waiting/started execution_id = data.get("executionId") get_execution_arguments = { "workflowId": hook.n8n_workflow_config.workflow_id, @@ -393,20 +378,247 @@ def call_hook( ) return Message(**response_json) except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( + raise ExtensibilityError( f"Failed to extract response from last executed node: {exc}" ) from exc if last_status in _EXECUTION_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {last_status!r}" + (f": {error_msg}" if error_msg else "") ) # Continue polling for: running, waiting, new, unknown - raise TransportError( + raise ExtensibilityError( + f"Workflow execution timed out after {hook.timeout}s. " + f"Last status: {last_status!r}" + ) + + async def _discover_n8n_tools( + self, agw_client: Any, user_token: Optional[str] + ) -> tuple[Any, Any]: + tools = await agw_client.list_mcp_tools(user_token=user_token or None) + + execute_tool = next( + ( + t + for t in tools + if t.name == _EXECUTE_WORKFLOW_TOOL_NAME + and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if execute_tool is None: + raise ExtensibilityError( + f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) + + get_exec_tool = next( + ( + t + for t in tools + if t.name == _GET_EXECUTION_TOOL_NAME + and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if get_exec_tool is None: + raise ExtensibilityError( + f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) + + return execute_tool, get_exec_tool + + async def _execute_workflow_via_agw( + self, + agw_client: Any, + execute_tool: Any, + hook: Hook, + user_token: Optional[str], + message: Optional[Any], + headers: Optional[dict], + ) -> tuple[str, Any]: + message_body = message.model_dump(mode="json") if message is not None else {} + execute_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "inputs": { + "type": "webhook", + "webhookData": { + "method": hook.n8n_workflow_config.method, + "query": {}, + "body": message_body, + "headers": headers or {}, + }, + }, + } + try: + result_str = await agw_client.call_mcp_tool( + execute_tool, + user_token=user_token or None, + **execute_arguments, # type: ignore[arg-type] + ) + except Exception as exc: + raise TransportError( + f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}" + ) from exc + + try: + data = json.loads(result_str) + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + status = data.get("status", "") + if status in _EXECUTE_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise ExtensibilityError( + f"Workflow execution failed with status {status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + execution_id = data.get("executionId") + return str(execution_id), status + + @staticmethod + def _extract_message(data: dict) -> Message: + try: + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + async def _poll_hook_execution( + self, + agw_client: Any, + get_exec_tool: Any, + hook: Hook, + execution_id: str, + user_token: Optional[str], + initial_status: str, + ) -> Optional[Message]: + deadline = time.monotonic() + hook.timeout + last_status = initial_status + + while time.monotonic() < deadline: + await asyncio.sleep(_HOOK_POLL_INTERVAL) + + try: + get_execution_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "executionId": execution_id, + "includeData": True, + } + result_str = await agw_client.call_mcp_tool( + get_exec_tool, + user_token=user_token or None, + **get_execution_arguments, # type: ignore[arg-type] + ) + except Exception as exc: + raise TransportError( + f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}" + ) from exc + + try: + data = json.loads(result_str) + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + last_status = data.get("execution", {}).get("status", "") or data.get( + "status", "" + ) + + if last_status == "success": + return self._extract_message(data) + + if last_status in _EXECUTION_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise ExtensibilityError( + f"Workflow execution failed with status {last_status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + raise ExtensibilityError( f"Workflow execution timed out after {hook.timeout}s. " f"Last status: {last_status!r}" ) + + @record_metrics( + Module.EXTENSIBILITY, + Operation.EXTENSIBILITY_CALL_HOOK, + ) + async def call_hook_agw( + self, + hook: Hook, + user_token: Optional[str] = None, + message: Optional[Any] = None, + headers: Optional[dict] = None, + tenant_subdomain: Optional[str] = None, + ) -> Optional[Message]: + """Call a hook via Agent Gateway MCP tool invocation. + + Discovers the N8N MCP tools via Agent Gateway, executes the workflow via + ``execute_workflow``, then polls ``get_execution`` every 500 ms until the + execution succeeds, fails, or ``hook.timeout`` seconds elapse. + + Auth and endpoint resolution are handled internally by an AGW client + created from ``tenant_subdomain`` — no manual token or URL configuration + is required. + + Args: + hook: Hook configuration (workflow ID, method, timeout). + user_token: Optional user token forwarded to the Agent Gateway client + for MCP tool discovery and invocation. + message: Optional A2A ``Message`` payload serialised into the webhook + body sent to the n8n workflow. + headers: Optional HTTP headers included in the webhook data passed to + the n8n workflow. + tenant_subdomain: Tenant subdomain used to instantiate the Agent + Gateway client. Pass ``None`` to use the default subdomain. + + Returns: + Parsed ``Message`` from the last executed workflow node, or ``None`` + if the hook completed successfully but produced no message. + + Raises: + TransportError: On AGW tool call errors, terminal execution failures, + or timeout. + + Example: + ```python + from sap_cloud_sdk.extensibility import create_client + + client = create_client("sap.ai:agent:myAgent:v1") + impl = client.get_extension_capability_implementation(tenant="tenant-abc") + + if impl.hooks: + result = await client.call_hook_agw( + hook=impl.hooks[0], + user_token="my-user-token", + message=my_message, + tenant_subdomain="my-tenant", + ) + ``` + """ + agw_client = create_agw_client(tenant_subdomain) + execute_tool, get_exec_tool = await self._discover_n8n_tools( + agw_client, user_token + ) + execution_id, status = await self._execute_workflow_via_agw( + agw_client, execute_tool, hook, user_token, message, headers + ) + return await self._poll_hook_execution( + agw_client, get_exec_tool, hook, execution_id, user_token, status + ) diff --git a/src/sap_cloud_sdk/extensibility/user-guide.md b/src/sap_cloud_sdk/extensibility/user-guide.md index 2df95787..25e2e012 100644 --- a/src/sap_cloud_sdk/extensibility/user-guide.md +++ b/src/sap_cloud_sdk/extensibility/user-guide.md @@ -157,14 +157,37 @@ def call_hook( - `hook`: Hook configuration object containing workflow config (`n8n_workflow_config`), timeout, and other settings. - `hook_config`: Hook invocation configuration (`endpoint`, optional `auth_token`, and optional `payload`). - Returns the response data as a `Message` object, or `None` if no message is produced. -- Raises `TransportError` if the HTTP request fails or the response cannot be parsed as a valid `Message`. +- Raises `TransportError` if the HTTP request or SSE parsing fails. +- Raises `ExtensibilityError` if the workflow reaches a terminal failure status, times out, or n8n returns an application-level error. - The hook's `timeout` setting is used for the HTTP request timeout. - The hook HTTP method is taken from `hook.n8n_workflow_config.method`. - The workflow ID is taken from `hook.n8n_workflow_config.workflow_id`. -#### `N8nWorkflowConfig` +### `ExtensibilityClient.call_hook_agw()` -Workflow configuration embedded in each `Hook`. +Async variant of `call_hook()` that invokes a hook via the Agent Gateway MCP tool interface instead of a direct HTTP call. Auth and endpoint resolution are handled internally — no manual URL or token configuration is needed. + +```python +async def call_hook_agw( + self, + hook: Hook, + user_token: Optional[str] = None, + message: Optional[Message] = None, + headers: Optional[dict] = None, + tenant_subdomain: Optional[str] = None, +) -> Optional[Message]: ... +``` + +- `hook`: Hook configuration object containing workflow config, timeout, and other settings. +- `user_token`: Optional user token forwarded to the Agent Gateway client for MCP tool discovery and invocation. +- `message`: Optional A2A `Message` payload serialised into the webhook body sent to the n8n workflow. +- `headers`: Optional HTTP headers included in the webhook data passed to the n8n workflow. +- `tenant_subdomain`: Tenant subdomain used to instantiate the Agent Gateway client. Pass `None` to use the default subdomain. +- Returns the response data as a `Message` object, or `None` if no message is produced. +- Raises `TransportError` if the AGW tool call itself fails (network error, etc.). +- Raises `ExtensibilityError` if the n8n MCP tools are not found via AGW, the workflow reaches a terminal failure status, times out, or n8n returns an application-level error. + +#### `N8nWorkflowConfig` ```python @dataclass @@ -445,6 +468,35 @@ for hook in ext.hooks: print(f"On failure: {hook.on_failure}") ``` +### Calling Hook Endpoints via Agent Gateway + +Use `call_hook_agw()` to invoke a hook through the Agent Gateway MCP interface. No endpoint URL or auth token is required — the AGW client resolves these internally from the tenant subdomain. + +```python +from sap_cloud_sdk.extensibility import create_client, HookType +from a2a.types import Message, Role, TextPart + +client = create_client("sap.ai:agent:myAgent:v1") +ext = client.get_extension_capability_implementation(tenant=tenant_id) + +before_hooks = [h for h in ext.hooks if h.type == HookType.BEFORE] + +if before_hooks: + hook = before_hooks[0] + result = await client.call_hook_agw( + hook=hook, + user_token="my-user-token", + message=Message( + message_id="msg-001", + role=Role.user, + parts=[TextPart(text="Tool execution starting")], + ), + tenant_subdomain="my-tenant", + ) + if result: + print(f"Hook response: {result}") +``` + ### Calling Hook Endpoints Use the `call_hook()` method to execute a hook with a custom payload. Payloads use the `Message` type from `a2a.types`. @@ -561,7 +613,7 @@ Hooks can be configured with different failure behaviors via the `on_failure` fi ```python from sap_cloud_sdk.extensibility import create_client, OnFailure from sap_cloud_sdk.extensibility.config import HookConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError from a2a.types import Message, Role, TextPart client = create_client("sap.ai:agent:myAgent:v1") @@ -582,7 +634,7 @@ for hook in ext.hooks: response = client.call_hook(hook, hook_config) if response: print(f"Hook succeeded: {response}") - except TransportError as e: + except ExtensibilityError as e: if hook.on_failure == OnFailure.BLOCK: # Hook is configured to block on failure print(f"Critical hook failed, blocking: {e}") @@ -706,9 +758,9 @@ Validation issues produce log warnings but never prevent output generation. ### Exception Hierarchy -- `ExtensibilityError` -- Base exception for all extensibility module errors. +- `ExtensibilityError` -- Base exception for all extensibility module errors. Also raised directly for workflow-level failures: terminal execution status, timeout, n8n application-level errors (JSON-RPC errors, `isError` results), and missing MCP tools via AGW. - `ClientCreationError(ExtensibilityError)` -- Represents a client construction failure. Not raised by `create_client()` (which handles it internally), but available for use in custom client-construction logic. -- `TransportError(ExtensibilityError)` -- Raised by the transport layer on failure. Not seen when using the client, which catches all transport errors and returns an empty result. +- `TransportError(ExtensibilityError)` -- Raised by the transport layer on network-level failures: HTTP errors, SSE parsing failures, response decode errors. Not seen when using `get_extension_capability_implementation()`, which catches all errors and returns an empty result. May be raised by `call_hook()` and `call_hook_agw()` on network failures. ## Service Binding diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index ef409e5d..569ef73c 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -1,10 +1,17 @@ """Tests for ExtensibilityClient and create_client.""" -from unittest.mock import MagicMock, patch +import json +from unittest.mock import AsyncMock, MagicMock, patch +import pytest from sap_cloud_sdk.extensibility import create_client -from sap_cloud_sdk.extensibility.client import ExtensibilityClient +from sap_cloud_sdk.extensibility.client import ( + ExtensibilityClient, + _EXECUTE_WORKFLOW_TOOL_NAME, + _GET_EXECUTION_TOOL_NAME, + _N8N_MCP_SERVER_NAME, +) from sap_cloud_sdk.extensibility._models import ( ExtensionCapabilityImplementation, McpServer, @@ -17,7 +24,8 @@ ) from http import HTTPMethod from sap_cloud_sdk.extensibility.config import ExtensibilityConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError, TransportError +from sap_cloud_sdk.agentgateway._models import MCPTool class TestCreateClient: @@ -206,3 +214,358 @@ def test_error_logging(self): client.get_extension_capability_implementation(tenant=_TENANT) mock_logger.error.assert_called_once() assert "Failed to retrieve" in mock_logger.error.call_args[0][0] + + +# --------------------------------------------------------------------------- +# Helpers shared across call_hook tests +# --------------------------------------------------------------------------- + +def _make_hook(workflow_id: str = "wf-001", timeout: int = 30) -> Hook: + return Hook( + hook_id="agent_pre_hook", + id="9f6e5f66-7e4f-4ef0-a9f6-e6e1c1220c11", + n8n_workflow_config=N8nWorkflowConfig( + workflow_id=workflow_id, + method=HTTPMethod.POST, + ), + name="Pre Hook", + type=HookType.BEFORE, + deployment_type=DeploymentType.N8N, + timeout=timeout, + execution_mode=ExecutionMode.SYNC, + on_failure=OnFailure.CONTINUE, + order=0, + can_short_circuit=True, + ) + + +def _make_n8n_tool(name: str) -> MCPTool: + """Return an MCPTool belonging to the N8N MCP server.""" + return MCPTool( + name=name, + server_name=_N8N_MCP_SERVER_NAME, + description="", + input_schema={}, + url="https://agw.example.com/v1/mcp/sap.btpn8n:apiResource:ManagedN8nMcpServer:v1/gtid-1", + ) + + +def _make_other_server_tool(name: str) -> MCPTool: + """Return an MCPTool with the same name but from a different MCP server.""" + return MCPTool( + name=name, + server_name="sap.other:apiResource:OtherMcpServer:v1", + description="", + input_schema={}, + url="https://agw.example.com/v1/mcp/other/gtid-2", + ) + + +def _success_payload(workflow_id: str = "wf-001") -> str: + return json.dumps({ + "status": "success", + "data": { + "resultData": { + "lastNodeExecuted": "Respond to Webhook", + "runData": { + "Respond to Webhook": [ + { + "data": { + "main": [ + [ + { + "json": { + "message_id": "msg-1", + "context_id": "ctx-1", + "role": 2, + } + } + ] + ] + } + } + ] + }, + } + }, + }) + + +def _running_payload(execution_id: str = "exec-1") -> str: + return json.dumps({"status": "running", "executionId": execution_id}) + + +def _poll_success_payload() -> str: + return json.dumps({ + "status": "success", + "data": { + "resultData": { + "lastNodeExecuted": "Respond to Webhook", + "runData": { + "Respond to Webhook": [ + { + "data": { + "main": [ + [ + { + "json": { + "message_id": "msg-2", + "context_id": "ctx-1", + "role": 2, + } + } + ] + ] + } + } + ] + }, + } + }, + }) + + +def _make_agw_client(tools: list, tool_responses: list) -> MagicMock: + """Build a mock AgentGatewayClient with preset list_mcp_tools and call_mcp_tool results.""" + agw = MagicMock() + agw.list_mcp_tools = AsyncMock(return_value=tools) + agw.call_mcp_tool = AsyncMock(side_effect=tool_responses) + return agw + + +# --------------------------------------------------------------------------- +# Tests for ExtensibilityClient.call_hook +# --------------------------------------------------------------------------- + + +class TestCallHook: + """Tests for ExtensibilityClient.call_hook (async, AGW-based).""" + + def _make_client(self, agw: MagicMock) -> ExtensibilityClient: + """Build an ExtensibilityClient with a mock transport and patched AGW factory.""" + return ExtensibilityClient(MagicMock()) + + @pytest.mark.asyncio + async def test_execute_tool_not_found_raises(self): + """Raises ExtensibilityError when execute_workflow tool is absent.""" + agw = _make_agw_client(tools=[], tool_responses=[]) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(ExtensibilityError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_get_exec_tool_not_found_raises(self): + """Raises ExtensibilityError when get_execution tool is absent.""" + tools = [_make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME)] + agw = _make_agw_client(tools=tools, tool_responses=[]) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(ExtensibilityError, match=_GET_EXECUTION_TOOL_NAME): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_composite_key_ignores_wrong_server(self): + """Tools from a different server with the same names must not match.""" + tools = [ + _make_other_server_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_other_server_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client(tools=tools, tool_responses=[]) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(ExtensibilityError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_composite_key_picks_correct_tool_among_duplicates(self): + """Picks the N8N tool when another server exposes identically-named tools.""" + tools = [ + _make_other_server_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_other_server_tool(_GET_EXECUTION_TOOL_NAME), + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + assert result is not None + # Both tool calls must have used the N8N server tools, not the other one + for call in agw.call_mcp_tool.call_args_list: + assert call[0][0].server_name == _N8N_MCP_SERVER_NAME + + @pytest.mark.asyncio + async def test_success_synchronous(self): + """Returns a Message when get_execution responds with status=success.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + assert result is not None + assert result.message_id == "msg-2" + assert agw.call_mcp_tool.call_count == 2 + + @pytest.mark.asyncio + async def test_success_after_polling(self): + """Returns a Message after one poll round via get_execution.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + assert result is not None + assert result.message_id == "msg-2" + assert agw.call_mcp_tool.call_count == 2 + + @pytest.mark.asyncio + async def test_terminal_status_from_execute_raises(self): + """Raises ExtensibilityError on a terminal status from execute_workflow.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + terminal_payload = json.dumps({"status": "error", "error": "workflow crashed"}) + agw = _make_agw_client(tools=tools, tool_responses=[terminal_payload]) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(ExtensibilityError, match="workflow crashed"): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_terminal_status_from_poll_raises(self): + """Raises ExtensibilityError on a terminal status from get_execution poll.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + poll_terminal = json.dumps({"status": "error", "error": "node failed"}) + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), poll_terminal], + ) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + with pytest.raises(ExtensibilityError, match="node failed"): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_timeout_raises(self): + """Raises ExtensibilityError when deadline is exceeded without a success status.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + # Always returns "running" so the loop never exits via success/terminal + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload()] + [_running_payload()] * 100, + ) + client = self._make_client(agw) + # Use a hook with timeout=0 so monotonic deadline is immediately exceeded + hook = _make_hook(timeout=0) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + with pytest.raises(ExtensibilityError, match="timed out"): + await client.call_hook_agw(hook=hook, tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_agw_call_mcp_tool_exception_raises_transport_error(self): + """Wraps call_mcp_tool exceptions in TransportError.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = MagicMock() + agw.list_mcp_tools = AsyncMock(return_value=tools) + agw.call_mcp_tool = AsyncMock(side_effect=RuntimeError("network error")) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match="network error"): + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") + + @pytest.mark.asyncio + async def test_workflow_id_passed_to_execute_tool(self): + """Verifies the correct workflowId is forwarded to call_mcp_tool.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + await client.call_hook_agw( + hook=_make_hook(workflow_id="wf-xyz"), tenant_subdomain="t" + ) + # First call is execute_workflow — check workflowId was forwarded correctly + first_call_kwargs = agw.call_mcp_tool.call_args_list[0][1] + assert first_call_kwargs["workflowId"] == "wf-xyz"