From 8f0dc3dc7b0ada74321d119dca5f33bd68827b78 Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Thu, 11 Jun 2026 12:27:48 +0530 Subject: [PATCH 01/21] Implement call hook via Agent Gateway --- src/sap_cloud_sdk/extensibility/__init__.py | 3 +- src/sap_cloud_sdk/extensibility/client.py | 515 ++++++++++++++------ 2 files changed, 359 insertions(+), 159 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/__init__.py b/src/sap_cloud_sdk/extensibility/__init__.py index 3d896225..1b5c172c 100644 --- a/src/sap_cloud_sdk/extensibility/__init__.py +++ b/src/sap_cloud_sdk/extensibility/__init__.py @@ -90,7 +90,7 @@ def _mock_file(name: str) -> str: ) from sap_cloud_sdk.extensibility._noop_transport import NoOpTransport from sap_cloud_sdk.extensibility._ums_transport import UmsTransport -from sap_cloud_sdk.extensibility.client import ExtensibilityClient +from sap_cloud_sdk.extensibility.client import ExtensibilityClient, call_hook from sap_cloud_sdk.extensibility.config import ExtensibilityConfig, HookConfig from sap_cloud_sdk.extensibility.exceptions import ( ClientCreationError, @@ -177,6 +177,7 @@ def create_client( # Client "create_client", "ExtensibilityClient", + "call_hook", # Local mode "LocalTransport", "CLOUD_SDK_LOCAL_EXTENSIBILITY_FILE_ENV", diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 01267bee..c0753312 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import itertools import json import logging @@ -15,6 +16,7 @@ from sap_cloud_sdk.core.telemetry import Module, Operation from sap_cloud_sdk.core.telemetry.metrics_decorator import record_metrics +from sap_cloud_sdk.agentgateway.agw_client import AgentGatewayClient from sap_cloud_sdk.extensibility._models import ( DEFAULT_EXTENSION_CAPABILITY_ID, ExtensionCapabilityImplementation, @@ -38,6 +40,7 @@ _EXECUTE_WORKFLOW_TOOL_NAME = "execute_workflow" _GET_EXECUTION_TOOL_NAME = "get_execution" +_N8N_MCP_SERVER_NAME = "sap.btpn8n:apiResource:ManagedN8nMcpServer:v1" _JSONRPC_VERSION = "2.0" @@ -210,117 +213,375 @@ def get_extension_capability_implementation( ) return ExtensionCapabilityImplementation(capability_id=capability_id) - @record_metrics( - Module.EXTENSIBILITY, - Operation.EXTENSIBILITY_CALL_HOOK, - ) - def call_hook( - self, - hook: Hook, - hook_config: HookConfig, - ) -> Optional[Message]: - """Call a hook's MCP endpoint and poll until completion. + # --------------------------------------------------------------------------- + # DEPRECATED: call_hook() instance method + # Replaced by the standalone call_hook() module-level function below, which + # uses the agw SDK module to call the MCP tool via Agent Gateway. Auth and + # endpoint resolution are now handled internally by AgentGatewayClient, + # eliminating the need for callers to supply HookConfig with a manual + # endpoint URL and auth token. + # --------------------------------------------------------------------------- + + # @record_metrics( + # Module.EXTENSIBILITY, + # Operation.EXTENSIBILITY_CALL_HOOK, + # ) + # def call_hook( + # self, + # hook: Hook, + # hook_config: HookConfig, + # ) -> Optional[Message]: + # """Call a hook's MCP endpoint and poll until completion. + # + # Executes the workflow via ``execute-workflow``, then polls + # ``get-execution`` every 500 ms until the execution succeeds, fails, + # or ``hook.timeout`` seconds elapse. + # + # This method is transport-agnostic: regardless of how extension + # metadata was fetched (backend, local file, or no-op), + # the actual hook invocation is always a direct HTTP call to the + # URL embedded in the :class:`Hook` object. + # + # Args: + # hook: Hook configuration (workflow ID, method, timeout). + # hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload). + # + # Returns: + # Parsed ``Message`` from the last executed workflow node, or ``None`` + # if the hook completed successfully but produced no message. + # + # Raises: + # TransportError: On HTTP errors, terminal execution failures, or timeout. + # + # Example: + # ```python + # from sap_cloud_sdk.extensibility import create_client + # + # client = create_client("sap.ai:agent:myAgent:v1") + # impl = client.get_extension_capability_implementation(tenant="tenant-abc") + # + # if impl.hooks: + # hook = impl.hooks[0] + # result = client.call_hook( + # hook, + # HookConfig( + # endpoint="https://gateway.example.com/v1/mcp/{ORD_ID}/{GTID}", + # auth_token="my-secret-token", + # payload={"foo": "bar"}, + # ), + # ) + # ``` + # """ + # headers = {**_JSONRPC_HEADERS} + # inject(headers) + # + # message_payload: dict[str, Any] = {} + # if hook_config.payload is not None: + # model_dump = getattr(hook_config.payload, "model_dump", None) + # if callable(model_dump): + # message_payload = cast(dict[str, Any], model_dump(exclude_none=True)) + # + # # 1. Execute workflow + # execute_workflow_arguments = { + # "workflowId": hook.n8n_workflow_config.workflow_id, + # "inputs": { + # "type": "webhook", + # "webhookData": { + # "method": hook.n8n_workflow_config.method, + # "query": {}, + # "body": message_payload, + # "headers": headers, + # }, + # }, + # } + # + # try: + # with httpx.Client( + # headers={"Authorization": f"Bearer {hook_config.auth_token}"}, + # timeout=hook.timeout, + # ) as client: + # tool_resp = client.post( + # hook_config.endpoint, + # json=_build_tool_call( + # execute_workflow_arguments, _EXECUTE_WORKFLOW_TOOL_NAME + # ), + # headers=headers, + # ) + # except TransportError: + # raise + # except Exception as exc: + # raise TransportError( + # f"HTTP request to hook MCP endpoint failed: {exc}" + # ) from exc + # + # try: + # data = _extract_tool_result(_parse_response(tool_resp)) + # except TransportError: + # raise + # except Exception as exc: + # raise TransportError(f"Could not parse hook response: {exc}") from exc + # + # status = data.get("status", "") + # + # # 2. Fail fast on terminal statuses from execute-workflow + # if status in _EXECUTE_TERMINAL_STATUSES: + # error_msg = data.get("error", "") + # raise TransportError( + # f"Workflow execution failed with status {status!r}" + # + (f": {error_msg}" if error_msg else "") + # ) + # + # # 3. Return immediately if execution completed synchronously + # if status == "success": + # try: + # result_data = data.get("data", {}).get("resultData", {}) + # last_node = result_data.get("lastNodeExecuted", "") + # response_json = ( + # result_data.get("runData", {}) + # .get(last_node, [{}])[0] + # .get("data", {}) + # .get("main", [[{}]])[0][0] + # .get("json", {}) + # ) + # return Message(**response_json) + # except (KeyError, IndexError, TypeError, ValidationError) as exc: + # raise TransportError( + # f"Failed to extract response from last executed node: {exc}" + # ) from exc + # + # # 4. Poll get-execution for running/new/waiting/started + # execution_id = data.get("executionId") + # get_execution_arguments = { + # "workflowId": hook.n8n_workflow_config.workflow_id, + # "executionId": str(execution_id), + # "includeData": True, + # } + # + # deadline = time.monotonic() + hook.timeout + # last_status = status + # while time.monotonic() < deadline: + # time.sleep(_HOOK_POLL_INTERVAL) + # + # try: + # with httpx.Client( + # headers={"Authorization": f"Bearer {hook_config.auth_token}"}, + # timeout=hook.timeout, + # ) as client: + # tool_resp = client.post( + # hook_config.endpoint, + # json=_build_tool_call( + # get_execution_arguments, _GET_EXECUTION_TOOL_NAME + # ), + # headers=headers, + # ) + # except TransportError: + # raise + # except Exception as exc: + # raise TransportError( + # f"HTTP request to hook MCP endpoint failed: {exc}" + # ) from exc + # + # try: + # data = _extract_tool_result(_parse_response(tool_resp)) + # except TransportError: + # raise + # except Exception as exc: + # raise TransportError(f"Could not parse hook response: {exc}") from exc + # + # last_status = data.get("execution", {}).get("status", "") or data.get( + # "status", "" + # ) + # + # if last_status == "success": + # try: + # result_data = data.get("data", {}).get("resultData", {}) + # last_node = result_data.get("lastNodeExecuted", "") + # response_json = ( + # result_data.get("runData", {}) + # .get(last_node, [{}])[0] + # .get("data", {}) + # .get("main", [[{}]])[0][0] + # .get("json", {}) + # ) + # return Message(**response_json) + # except (KeyError, IndexError, TypeError, ValidationError) as exc: + # raise TransportError( + # f"Failed to extract response from last executed node: {exc}" + # ) from exc + # + # if last_status in _EXECUTION_TERMINAL_STATUSES: + # error_msg = data.get("error", "") + # raise TransportError( + # f"Workflow execution failed with status {last_status!r}" + # + (f": {error_msg}" if error_msg else "") + # ) + # + # # Continue polling for: running, waiting, new, unknown + # + # raise TransportError( + # f"Workflow execution timed out after {hook.timeout}s. " + # f"Last status: {last_status!r}" + # ) - Executes the workflow via ``execute-workflow``, then polls - ``get-execution`` every 500 ms until the execution succeeds, fails, - or ``hook.timeout`` seconds elapse. - This method is transport-agnostic: regardless of how extension - metadata was fetched (backend, local file, or no-op), - the actual hook invocation is always a direct HTTP call to the - URL embedded in the :class:`Hook` object. +# --------------------------------------------------------------------------- +# New standalone call_hook() — uses agw SDK module +# --------------------------------------------------------------------------- - Args: - hook: Hook configuration (workflow ID, method, timeout). - hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload). - Returns: - Parsed ``Message`` from the last executed workflow node, or ``None`` - if the hook completed successfully but produced no message. +@record_metrics( + Module.EXTENSIBILITY, + Operation.EXTENSIBILITY_CALL_HOOK, +) +async def call_hook( + hook: Hook, + agw_client: AgentGatewayClient, +) -> Optional[Message]: + """Call a hook via Agent Gateway MCP tool invocation. - Raises: - TransportError: On HTTP errors, terminal execution failures, or timeout. + Discovers the N8N MCP tools via Agent Gateway, executes the workflow via + ``execute_workflow``, then polls ``get_execution`` every 500 ms until the + execution succeeds, fails, or ``hook.timeout`` seconds elapse. - Example: - ```python - from sap_cloud_sdk.extensibility import create_client + Auth and endpoint resolution are handled internally by the AGW client — + no manual token or URL configuration is required. - client = create_client("sap.ai:agent:myAgent:v1") - impl = client.get_extension_capability_implementation(tenant="tenant-abc") - - if impl.hooks: - hook = impl.hooks[0] - result = client.call_hook( - hook, - HookConfig( - endpoint="https://gateway.example.com/v1/mcp/{ORD_ID}/{GTID}", - auth_token="my-secret-token", - payload={"foo": "bar"}, - ), - ) - ``` - """ - headers = {**_JSONRPC_HEADERS} - inject(headers) - - message_payload: dict[str, Any] = {} - if hook_config.payload is not None: - model_dump = getattr(hook_config.payload, "model_dump", None) - if callable(model_dump): - message_payload = cast(dict[str, Any], model_dump(exclude_none=True)) - - # 1. Execute workflow - execute_workflow_arguments = { - "workflowId": hook.n8n_workflow_config.workflow_id, - "inputs": { + Args: + hook: Hook configuration (workflow ID, method, timeout). + agw_client: Configured Agent Gateway client used for tool discovery + and invocation. + + Returns: + Parsed ``Message`` from the last executed workflow node, or ``None`` + if the hook completed successfully but produced no message. + + Raises: + TransportError: On AGW tool call errors, terminal execution failures, + or timeout. + + Example: + ```python + from sap_cloud_sdk.extensibility import call_hook + from sap_cloud_sdk.agentgateway import create_client as create_agw_client + + agw_client = create_agw_client(tenant_subdomain="my-tenant") + + result = await call_hook( + hook=impl.hooks[0], + agw_client=agw_client, + ) + ``` + """ + # 1. Discover MCP tools — AGW resolves N8N GTID and handles auth internally + tools = await agw_client.list_mcp_tools() + + execute_tool = next( + ( + t for t in tools + if t.name == _EXECUTE_WORKFLOW_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if execute_tool is None: + raise TransportError( + f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) + + get_exec_tool = next( + ( + t for t in tools + if t.name == _GET_EXECUTION_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if get_exec_tool is None: + raise TransportError( + f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) + + # 2. Execute workflow + try: + result_str = await agw_client.call_mcp_tool( + execute_tool, + workflowId=hook.n8n_workflow_config.workflow_id, + inputs={ "type": "webhook", "webhookData": { "method": hook.n8n_workflow_config.method, "query": {}, - "body": message_payload, - "headers": headers, + "body": {}, + "headers": {}, }, }, - } + ) + except Exception as exc: + raise TransportError( + f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}" + ) from exc + + try: + data = json.loads(result_str) + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + status = data.get("status", "") + if status in _EXECUTE_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + if status == "success": try: - with httpx.Client( - headers={"Authorization": f"Bearer {hook_config.auth_token}"}, - timeout=hook.timeout, - ) as client: - tool_resp = client.post( - hook_config.endpoint, - json=_build_tool_call( - execute_workflow_arguments, _EXECUTE_WORKFLOW_TOOL_NAME - ), - headers=headers, - ) - except TransportError: - raise + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + # 3. Poll get_execution for running/new/waiting/started + execution_id = data.get("executionId") + deadline = time.monotonic() + hook.timeout + last_status = status + + while time.monotonic() < deadline: + await asyncio.sleep(_HOOK_POLL_INTERVAL) + + try: + result_str = await agw_client.call_mcp_tool( + get_exec_tool, + workflowId=hook.n8n_workflow_config.workflow_id, + executionId=str(execution_id), + includeData=True, + ) except Exception as exc: raise TransportError( - f"HTTP request to hook MCP endpoint failed: {exc}" + f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}" ) from exc try: - data = _extract_tool_result(_parse_response(tool_resp)) - except TransportError: - raise + data = json.loads(result_str) except Exception as exc: raise TransportError(f"Could not parse hook response: {exc}") from exc - status = data.get("status", "") - - # 2. Fail fast on terminal statuses from execute-workflow - if status in _EXECUTE_TERMINAL_STATUSES: - error_msg = data.get("error", "") - raise TransportError( - f"Workflow execution failed with status {status!r}" - + (f": {error_msg}" if error_msg else "") - ) + last_status = data.get("execution", {}).get("status", "") or data.get( + "status", "" + ) - # 3. Return immediately if execution completed synchronously - if status == "success": + if last_status == "success": try: result_data = data.get("data", {}).get("resultData", {}) last_node = result_data.get("lastNodeExecuted", "") @@ -337,76 +598,14 @@ def call_hook( f"Failed to extract response from last executed node: {exc}" ) from exc - # 4. Poll get-execution for running/new/waiting/started - execution_id = data.get("executionId") - get_execution_arguments = { - "workflowId": hook.n8n_workflow_config.workflow_id, - "executionId": str(execution_id), - "includeData": True, - } - - deadline = time.monotonic() + hook.timeout - last_status = status - while time.monotonic() < deadline: - time.sleep(_HOOK_POLL_INTERVAL) - - try: - with httpx.Client( - headers={"Authorization": f"Bearer {hook_config.auth_token}"}, - timeout=hook.timeout, - ) as client: - tool_resp = client.post( - hook_config.endpoint, - json=_build_tool_call( - get_execution_arguments, _GET_EXECUTION_TOOL_NAME - ), - headers=headers, - ) - except TransportError: - raise - except Exception as exc: - raise TransportError( - f"HTTP request to hook MCP endpoint failed: {exc}" - ) from exc - - try: - data = _extract_tool_result(_parse_response(tool_resp)) - except TransportError: - raise - except Exception as exc: - raise TransportError(f"Could not parse hook response: {exc}") from exc - - last_status = data.get("execution", {}).get("status", "") or data.get( - "status", "" + if last_status in _EXECUTION_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {last_status!r}" + + (f": {error_msg}" if error_msg else "") ) - if last_status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) - ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc - - if last_status in _EXECUTION_TERMINAL_STATUSES: - error_msg = data.get("error", "") - raise TransportError( - f"Workflow execution failed with status {last_status!r}" - + (f": {error_msg}" if error_msg else "") - ) - - # Continue polling for: running, waiting, new, unknown - - raise TransportError( - f"Workflow execution timed out after {hook.timeout}s. " - f"Last status: {last_status!r}" - ) + raise TransportError( + f"Workflow execution timed out after {hook.timeout}s. " + f"Last status: {last_status!r}" + ) From 812da27a5d831e433e9d70e3c403ae794a931c3d Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Fri, 12 Jun 2026 10:00:30 +0530 Subject: [PATCH 02/21] UTs --- tests/extensibility/unit/test_client.py | 289 +++++++++++++++++++++++- 1 file changed, 286 insertions(+), 3 deletions(-) diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index ef409e5d..901e3ae3 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -1,10 +1,17 @@ """Tests for ExtensibilityClient and create_client.""" -from unittest.mock import MagicMock, patch +import json +from unittest.mock import AsyncMock, MagicMock, patch +import pytest -from sap_cloud_sdk.extensibility import create_client -from sap_cloud_sdk.extensibility.client import ExtensibilityClient +from sap_cloud_sdk.extensibility import call_hook, create_client +from sap_cloud_sdk.extensibility.client import ( + ExtensibilityClient, + _EXECUTE_WORKFLOW_TOOL_NAME, + _GET_EXECUTION_TOOL_NAME, + _N8N_MCP_SERVER_NAME, +) from sap_cloud_sdk.extensibility._models import ( ExtensionCapabilityImplementation, McpServer, @@ -18,6 +25,7 @@ from http import HTTPMethod from sap_cloud_sdk.extensibility.config import ExtensibilityConfig from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.agentgateway._models import MCPTool class TestCreateClient: @@ -206,3 +214,278 @@ def test_error_logging(self): client.get_extension_capability_implementation(tenant=_TENANT) mock_logger.error.assert_called_once() assert "Failed to retrieve" in mock_logger.error.call_args[0][0] + + +# --------------------------------------------------------------------------- +# Helpers shared across call_hook tests +# --------------------------------------------------------------------------- + +def _make_hook(workflow_id: str = "wf-001", timeout: int = 30) -> Hook: + return Hook( + hook_id="agent_pre_hook", + id="9f6e5f66-7e4f-4ef0-a9f6-e6e1c1220c11", + n8n_workflow_config=N8nWorkflowConfig( + workflow_id=workflow_id, + method=HTTPMethod.POST, + ), + name="Pre Hook", + type=HookType.BEFORE, + deployment_type=DeploymentType.N8N, + timeout=timeout, + execution_mode=ExecutionMode.SYNC, + on_failure=OnFailure.CONTINUE, + order=0, + can_short_circuit=True, + ) + + +def _make_n8n_tool(name: str) -> MCPTool: + """Return an MCPTool belonging to the N8N MCP server.""" + return MCPTool( + name=name, + server_name=_N8N_MCP_SERVER_NAME, + description="", + input_schema={}, + url="https://agw.example.com/v1/mcp/sap.btpn8n:apiResource:ManagedN8nMcpServer:v1/gtid-1", + ) + + +def _make_other_server_tool(name: str) -> MCPTool: + """Return an MCPTool with the same name but from a different MCP server.""" + return MCPTool( + name=name, + server_name="sap.other:apiResource:OtherMcpServer:v1", + description="", + input_schema={}, + url="https://agw.example.com/v1/mcp/other/gtid-2", + ) + + +def _success_payload(workflow_id: str = "wf-001") -> str: + return json.dumps({ + "status": "success", + "data": { + "resultData": { + "lastNodeExecuted": "Respond to Webhook", + "runData": { + "Respond to Webhook": [ + { + "data": { + "main": [ + [ + { + "json": { + "message_id": "msg-1", + "context_id": "ctx-1", + "role": 2, + } + } + ] + ] + } + } + ] + }, + } + }, + }) + + +def _running_payload(execution_id: str = "exec-1") -> str: + return json.dumps({"status": "running", "executionId": execution_id}) + + +def _poll_success_payload() -> str: + return json.dumps({ + "status": "success", + "data": { + "resultData": { + "lastNodeExecuted": "Respond to Webhook", + "runData": { + "Respond to Webhook": [ + { + "data": { + "main": [ + [ + { + "json": { + "message_id": "msg-2", + "context_id": "ctx-1", + "role": 2, + } + } + ] + ] + } + } + ] + }, + } + }, + }) + + +def _make_agw_client(tools: list, tool_responses: list) -> MagicMock: + """Build a mock AgentGatewayClient with preset list_mcp_tools and call_mcp_tool results.""" + agw = MagicMock() + agw.list_mcp_tools = AsyncMock(return_value=tools) + agw.call_mcp_tool = AsyncMock(side_effect=tool_responses) + return agw + + +# --------------------------------------------------------------------------- +# Tests for standalone call_hook() +# --------------------------------------------------------------------------- + + +class TestCallHook: + """Tests for the standalone call_hook() module-level function.""" + + @pytest.mark.asyncio + async def test_execute_tool_not_found_raises(self): + """Raises TransportError when execute_workflow tool is absent.""" + agw = _make_agw_client(tools=[], tool_responses=[]) + with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_get_exec_tool_not_found_raises(self): + """Raises TransportError when get_execution tool is absent.""" + tools = [_make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME)] + agw = _make_agw_client(tools=tools, tool_responses=[]) + with pytest.raises(TransportError, match=_GET_EXECUTION_TOOL_NAME): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_composite_key_ignores_wrong_server(self): + """Tools from a different server with the same names must not match.""" + tools = [ + _make_other_server_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_other_server_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client(tools=tools, tool_responses=[]) + with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_composite_key_picks_correct_tool_among_duplicates(self): + """Picks the N8N tool when another server exposes identically-named tools.""" + tools = [ + _make_other_server_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_other_server_tool(_GET_EXECUTION_TOOL_NAME), + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_success_payload()], + ) + result = await call_hook(hook=_make_hook(), agw_client=agw) + assert result is not None + # call_mcp_tool must have been called with the N8N tool, not the other one + called_tool = agw.call_mcp_tool.call_args[0][0] + assert called_tool.server_name == _N8N_MCP_SERVER_NAME + + @pytest.mark.asyncio + async def test_success_synchronous(self): + """Returns a Message when execute_workflow responds with status=success.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_success_payload()], + ) + result = await call_hook(hook=_make_hook(), agw_client=agw) + assert result is not None + assert result.message_id == "msg-1" + agw.call_mcp_tool.assert_called_once() + + @pytest.mark.asyncio + async def test_success_after_polling(self): + """Returns a Message after one poll round via get_execution.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) + with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): + result = await call_hook(hook=_make_hook(), agw_client=agw) + assert result is not None + assert result.message_id == "msg-2" + assert agw.call_mcp_tool.call_count == 2 + + @pytest.mark.asyncio + async def test_terminal_status_from_execute_raises(self): + """Raises TransportError on a terminal status from execute_workflow.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + terminal_payload = json.dumps({"status": "error", "error": "workflow crashed"}) + agw = _make_agw_client(tools=tools, tool_responses=[terminal_payload]) + with pytest.raises(TransportError, match="workflow crashed"): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_terminal_status_from_poll_raises(self): + """Raises TransportError on a terminal status from get_execution poll.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + poll_terminal = json.dumps({"status": "error", "error": "node failed"}) + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), poll_terminal], + ) + with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): + with pytest.raises(TransportError, match="node failed"): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_timeout_raises(self): + """Raises TransportError when deadline is exceeded without a success status.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + # Always returns "running" so the loop never exits via success/terminal + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload()] + [_running_payload()] * 100, + ) + # Use a hook with timeout=0 so monotonic deadline is immediately exceeded + hook = _make_hook(timeout=0) + with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): + with pytest.raises(TransportError, match="timed out"): + await call_hook(hook=hook, agw_client=agw) + + @pytest.mark.asyncio + async def test_agw_call_mcp_tool_exception_raises_transport_error(self): + """Wraps call_mcp_tool exceptions in TransportError.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = MagicMock() + agw.list_mcp_tools = AsyncMock(return_value=tools) + agw.call_mcp_tool = AsyncMock(side_effect=RuntimeError("network error")) + with pytest.raises(TransportError, match="network error"): + await call_hook(hook=_make_hook(), agw_client=agw) + + @pytest.mark.asyncio + async def test_workflow_id_passed_to_execute_tool(self): + """Verifies the correct workflowId is forwarded to call_mcp_tool.""" + tools = [ + _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), + _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), + ] + agw = _make_agw_client(tools=tools, tool_responses=[_success_payload("wf-xyz")]) + await call_hook(hook=_make_hook(workflow_id="wf-xyz"), agw_client=agw) + kwargs = agw.call_mcp_tool.call_args[1] + assert kwargs["workflowId"] == "wf-xyz" From 75fc414f83740280e9d8e41ed07f325e25a8284b Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Fri, 12 Jun 2026 15:57:52 +0530 Subject: [PATCH 03/21] updating n8n call to include token, headers, message --- src/sap_cloud_sdk/extensibility/client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index c0753312..e88832be 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -436,6 +436,9 @@ def get_extension_capability_implementation( async def call_hook( hook: Hook, agw_client: AgentGatewayClient, + user_token: Optional[str] = None, + message: Optional[Any] = None, + headers: Optional[dict] = None, ) -> Optional[Message]: """Call a hook via Agent Gateway MCP tool invocation. @@ -473,7 +476,7 @@ async def call_hook( ``` """ # 1. Discover MCP tools — AGW resolves N8N GTID and handles auth internally - tools = await agw_client.list_mcp_tools() + tools = await agw_client.list_mcp_tools(user_token=user_token or None) execute_tool = next( ( @@ -502,17 +505,19 @@ async def call_hook( ) # 2. Execute workflow + message_body = message.model_dump(mode="json") if message is not None else {} try: result_str = await agw_client.call_mcp_tool( execute_tool, + user_token=user_token or None, workflowId=hook.n8n_workflow_config.workflow_id, inputs={ "type": "webhook", "webhookData": { "method": hook.n8n_workflow_config.method, "query": {}, - "body": {}, - "headers": {}, + "body": message_body, + "headers": headers or {}, }, }, ) @@ -563,6 +568,7 @@ async def call_hook( try: result_str = await agw_client.call_mcp_tool( get_exec_tool, + user_token=user_token or None, workflowId=hook.n8n_workflow_config.workflow_id, executionId=str(execution_id), includeData=True, From 41e0808e5ff31f3239a6779675e2fe220ef1dd56 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Wed, 17 Jun 2026 15:17:36 +0530 Subject: [PATCH 04/21] improvements 1. n8n ord id is already a constant 2. made the new call_hook method instance level, removed the comment from the old method 3. agw_client created in cloud sdk, tenant subdomain passed as parameter from agent 4. Caching as a future improvement --- src/sap_cloud_sdk/extensibility/client.py | 699 +++++++++++----------- 1 file changed, 345 insertions(+), 354 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index e88832be..f87b35df 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -16,7 +16,7 @@ from sap_cloud_sdk.core.telemetry import Module, Operation from sap_cloud_sdk.core.telemetry.metrics_decorator import record_metrics -from sap_cloud_sdk.agentgateway.agw_client import AgentGatewayClient +from sap_cloud_sdk.agentgateway import create_client as create_agw_client from sap_cloud_sdk.extensibility._models import ( DEFAULT_EXTENSION_CAPABILITY_ID, ExtensionCapabilityImplementation, @@ -213,369 +213,308 @@ def get_extension_capability_implementation( ) return ExtensionCapabilityImplementation(capability_id=capability_id) - # --------------------------------------------------------------------------- - # DEPRECATED: call_hook() instance method - # Replaced by the standalone call_hook() module-level function below, which - # uses the agw SDK module to call the MCP tool via Agent Gateway. Auth and - # endpoint resolution are now handled internally by AgentGatewayClient, - # eliminating the need for callers to supply HookConfig with a manual - # endpoint URL and auth token. - # --------------------------------------------------------------------------- - - # @record_metrics( - # Module.EXTENSIBILITY, - # Operation.EXTENSIBILITY_CALL_HOOK, - # ) - # def call_hook( - # self, - # hook: Hook, - # hook_config: HookConfig, - # ) -> Optional[Message]: - # """Call a hook's MCP endpoint and poll until completion. - # - # Executes the workflow via ``execute-workflow``, then polls - # ``get-execution`` every 500 ms until the execution succeeds, fails, - # or ``hook.timeout`` seconds elapse. - # - # This method is transport-agnostic: regardless of how extension - # metadata was fetched (backend, local file, or no-op), - # the actual hook invocation is always a direct HTTP call to the - # URL embedded in the :class:`Hook` object. - # - # Args: - # hook: Hook configuration (workflow ID, method, timeout). - # hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload). - # - # Returns: - # Parsed ``Message`` from the last executed workflow node, or ``None`` - # if the hook completed successfully but produced no message. - # - # Raises: - # TransportError: On HTTP errors, terminal execution failures, or timeout. - # - # Example: - # ```python - # from sap_cloud_sdk.extensibility import create_client - # - # client = create_client("sap.ai:agent:myAgent:v1") - # impl = client.get_extension_capability_implementation(tenant="tenant-abc") - # - # if impl.hooks: - # hook = impl.hooks[0] - # result = client.call_hook( - # hook, - # HookConfig( - # endpoint="https://gateway.example.com/v1/mcp/{ORD_ID}/{GTID}", - # auth_token="my-secret-token", - # payload={"foo": "bar"}, - # ), - # ) - # ``` - # """ - # headers = {**_JSONRPC_HEADERS} - # inject(headers) - # - # message_payload: dict[str, Any] = {} - # if hook_config.payload is not None: - # model_dump = getattr(hook_config.payload, "model_dump", None) - # if callable(model_dump): - # message_payload = cast(dict[str, Any], model_dump(exclude_none=True)) - # - # # 1. Execute workflow - # execute_workflow_arguments = { - # "workflowId": hook.n8n_workflow_config.workflow_id, - # "inputs": { - # "type": "webhook", - # "webhookData": { - # "method": hook.n8n_workflow_config.method, - # "query": {}, - # "body": message_payload, - # "headers": headers, - # }, - # }, - # } - # - # try: - # with httpx.Client( - # headers={"Authorization": f"Bearer {hook_config.auth_token}"}, - # timeout=hook.timeout, - # ) as client: - # tool_resp = client.post( - # hook_config.endpoint, - # json=_build_tool_call( - # execute_workflow_arguments, _EXECUTE_WORKFLOW_TOOL_NAME - # ), - # headers=headers, - # ) - # except TransportError: - # raise - # except Exception as exc: - # raise TransportError( - # f"HTTP request to hook MCP endpoint failed: {exc}" - # ) from exc - # - # try: - # data = _extract_tool_result(_parse_response(tool_resp)) - # except TransportError: - # raise - # except Exception as exc: - # raise TransportError(f"Could not parse hook response: {exc}") from exc - # - # status = data.get("status", "") - # - # # 2. Fail fast on terminal statuses from execute-workflow - # if status in _EXECUTE_TERMINAL_STATUSES: - # error_msg = data.get("error", "") - # raise TransportError( - # f"Workflow execution failed with status {status!r}" - # + (f": {error_msg}" if error_msg else "") - # ) - # - # # 3. Return immediately if execution completed synchronously - # if status == "success": - # try: - # result_data = data.get("data", {}).get("resultData", {}) - # last_node = result_data.get("lastNodeExecuted", "") - # response_json = ( - # result_data.get("runData", {}) - # .get(last_node, [{}])[0] - # .get("data", {}) - # .get("main", [[{}]])[0][0] - # .get("json", {}) - # ) - # return Message(**response_json) - # except (KeyError, IndexError, TypeError, ValidationError) as exc: - # raise TransportError( - # f"Failed to extract response from last executed node: {exc}" - # ) from exc - # - # # 4. Poll get-execution for running/new/waiting/started - # execution_id = data.get("executionId") - # get_execution_arguments = { - # "workflowId": hook.n8n_workflow_config.workflow_id, - # "executionId": str(execution_id), - # "includeData": True, - # } - # - # deadline = time.monotonic() + hook.timeout - # last_status = status - # while time.monotonic() < deadline: - # time.sleep(_HOOK_POLL_INTERVAL) - # - # try: - # with httpx.Client( - # headers={"Authorization": f"Bearer {hook_config.auth_token}"}, - # timeout=hook.timeout, - # ) as client: - # tool_resp = client.post( - # hook_config.endpoint, - # json=_build_tool_call( - # get_execution_arguments, _GET_EXECUTION_TOOL_NAME - # ), - # headers=headers, - # ) - # except TransportError: - # raise - # except Exception as exc: - # raise TransportError( - # f"HTTP request to hook MCP endpoint failed: {exc}" - # ) from exc - # - # try: - # data = _extract_tool_result(_parse_response(tool_resp)) - # except TransportError: - # raise - # except Exception as exc: - # raise TransportError(f"Could not parse hook response: {exc}") from exc - # - # last_status = data.get("execution", {}).get("status", "") or data.get( - # "status", "" - # ) - # - # if last_status == "success": - # try: - # result_data = data.get("data", {}).get("resultData", {}) - # last_node = result_data.get("lastNodeExecuted", "") - # response_json = ( - # result_data.get("runData", {}) - # .get(last_node, [{}])[0] - # .get("data", {}) - # .get("main", [[{}]])[0][0] - # .get("json", {}) - # ) - # return Message(**response_json) - # except (KeyError, IndexError, TypeError, ValidationError) as exc: - # raise TransportError( - # f"Failed to extract response from last executed node: {exc}" - # ) from exc - # - # if last_status in _EXECUTION_TERMINAL_STATUSES: - # error_msg = data.get("error", "") - # raise TransportError( - # f"Workflow execution failed with status {last_status!r}" - # + (f": {error_msg}" if error_msg else "") - # ) - # - # # Continue polling for: running, waiting, new, unknown - # - # raise TransportError( - # f"Workflow execution timed out after {hook.timeout}s. " - # f"Last status: {last_status!r}" - # ) - - -# --------------------------------------------------------------------------- -# New standalone call_hook() — uses agw SDK module -# --------------------------------------------------------------------------- - - -@record_metrics( - Module.EXTENSIBILITY, - Operation.EXTENSIBILITY_CALL_HOOK, -) -async def call_hook( - hook: Hook, - agw_client: AgentGatewayClient, - user_token: Optional[str] = None, - message: Optional[Any] = None, - headers: Optional[dict] = None, -) -> Optional[Message]: - """Call a hook via Agent Gateway MCP tool invocation. - - Discovers the N8N MCP tools via Agent Gateway, executes the workflow via - ``execute_workflow``, then polls ``get_execution`` every 500 ms until the - execution succeeds, fails, or ``hook.timeout`` seconds elapse. - - Auth and endpoint resolution are handled internally by the AGW client — - no manual token or URL configuration is required. - - Args: - hook: Hook configuration (workflow ID, method, timeout). - agw_client: Configured Agent Gateway client used for tool discovery - and invocation. - - Returns: - Parsed ``Message`` from the last executed workflow node, or ``None`` - if the hook completed successfully but produced no message. - - Raises: - TransportError: On AGW tool call errors, terminal execution failures, - or timeout. - - Example: - ```python - from sap_cloud_sdk.extensibility import call_hook - from sap_cloud_sdk.agentgateway import create_client as create_agw_client - - agw_client = create_agw_client(tenant_subdomain="my-tenant") - - result = await call_hook( - hook=impl.hooks[0], - agw_client=agw_client, - ) - ``` - """ - # 1. Discover MCP tools — AGW resolves N8N GTID and handles auth internally - tools = await agw_client.list_mcp_tools(user_token=user_token or None) - - execute_tool = next( - ( - t for t in tools - if t.name == _EXECUTE_WORKFLOW_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME - ), - None, - ) - if execute_tool is None: - raise TransportError( - f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " - "not found via Agent Gateway." - ) - - get_exec_tool = next( - ( - t for t in tools - if t.name == _GET_EXECUTION_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME - ), - None, + @record_metrics( + Module.EXTENSIBILITY, + Operation.EXTENSIBILITY_CALL_HOOK, ) - if get_exec_tool is None: - raise TransportError( - f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " - "not found via Agent Gateway." - ) - - # 2. Execute workflow - message_body = message.model_dump(mode="json") if message is not None else {} - try: - result_str = await agw_client.call_mcp_tool( - execute_tool, - user_token=user_token or None, - workflowId=hook.n8n_workflow_config.workflow_id, - inputs={ + def call_hook( + self, + hook: Hook, + hook_config: HookConfig, + ) -> Optional[Message]: + """Call a hook's MCP endpoint and poll until completion. + + Executes the workflow via ``execute-workflow``, then polls + ``get-execution`` every 500 ms until the execution succeeds, fails, + or ``hook.timeout`` seconds elapse. + + This method is transport-agnostic: regardless of how extension + metadata was fetched (backend, local file, or no-op), + the actual hook invocation is always a direct HTTP call to the + URL embedded in the :class:`Hook` object. + + Args: + hook: Hook configuration (workflow ID, method, timeout). + hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload). + + Returns: + Parsed ``Message`` from the last executed workflow node, or ``None`` + if the hook completed successfully but produced no message. + + Raises: + TransportError: On HTTP errors, terminal execution failures, or timeout. + + Example: + ```python + from sap_cloud_sdk.extensibility import create_client + + client = create_client("sap.ai:agent:myAgent:v1") + impl = client.get_extension_capability_implementation(tenant="tenant-abc") + + if impl.hooks: + hook = impl.hooks[0] + result = client.call_hook( + hook, + HookConfig( + endpoint="https://gateway.example.com/v1/mcp/{ORD_ID}/{GTID}", + auth_token="my-secret-token", + payload={"foo": "bar"}, + ), + ) + ``` + """ + headers = {**_JSONRPC_HEADERS} + inject(headers) + + message_payload: dict[str, Any] = {} + if hook_config.payload is not None: + model_dump = getattr(hook_config.payload, "model_dump", None) + if callable(model_dump): + message_payload = cast(dict[str, Any], model_dump(exclude_none=True)) + + # 1. Execute workflow + execute_workflow_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "inputs": { "type": "webhook", "webhookData": { "method": hook.n8n_workflow_config.method, "query": {}, - "body": message_body, - "headers": headers or {}, + "body": message_payload, + "headers": headers, }, }, - ) - except Exception as exc: + } + + try: + with httpx.Client( + headers={"Authorization": f"Bearer {hook_config.auth_token}"}, + timeout=hook.timeout, + ) as client: + tool_resp = client.post( + hook_config.endpoint, + json=_build_tool_call( + execute_workflow_arguments, _EXECUTE_WORKFLOW_TOOL_NAME + ), + headers=headers, + ) + except TransportError: + raise + except Exception as exc: + raise TransportError( + f"HTTP request to hook MCP endpoint failed: {exc}" + ) from exc + + try: + data = _extract_tool_result(_parse_response(tool_resp)) + except TransportError: + raise + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + status = data.get("status", "") + + # 2. Fail fast on terminal statuses from execute-workflow + if status in _EXECUTE_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + # 3. Return immediately if execution completed synchronously + if status == "success": + try: + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + # 4. Poll get-execution for running/new/waiting/started + execution_id = data.get("executionId") + get_execution_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "executionId": str(execution_id), + "includeData": True, + } + + deadline = time.monotonic() + hook.timeout + last_status = status + while time.monotonic() < deadline: + time.sleep(_HOOK_POLL_INTERVAL) + + try: + with httpx.Client( + headers={"Authorization": f"Bearer {hook_config.auth_token}"}, + timeout=hook.timeout, + ) as client: + tool_resp = client.post( + hook_config.endpoint, + json=_build_tool_call( + get_execution_arguments, _GET_EXECUTION_TOOL_NAME + ), + headers=headers, + ) + except TransportError: + raise + except Exception as exc: + raise TransportError( + f"HTTP request to hook MCP endpoint failed: {exc}" + ) from exc + + try: + data = _extract_tool_result(_parse_response(tool_resp)) + except TransportError: + raise + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + last_status = data.get("execution", {}).get("status", "") or data.get( + "status", "" + ) + + if last_status == "success": + try: + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + if last_status in _EXECUTION_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {last_status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + # Continue polling for: running, waiting, new, unknown + raise TransportError( - f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}" - ) from exc + f"Workflow execution timed out after {hook.timeout}s. " + f"Last status: {last_status!r}" + ) + + + @record_metrics( + Module.EXTENSIBILITY, + Operation.EXTENSIBILITY_CALL_HOOK, + ) + async def call_hook( + hook: Hook, + user_token: Optional[str] = None, + message: Optional[Any] = None, + headers: Optional[dict] = None, + tenant_subdomain: Optional[str] = None + ) -> Optional[Message]: + """Call a hook via Agent Gateway MCP tool invocation. + + Discovers the N8N MCP tools via Agent Gateway, executes the workflow via + ``execute_workflow``, then polls ``get_execution`` every 500 ms until the + execution succeeds, fails, or ``hook.timeout`` seconds elapse. + + Auth and endpoint resolution are handled internally by the AGW client — + no manual token or URL configuration is required. - try: - data = json.loads(result_str) - except Exception as exc: - raise TransportError(f"Could not parse hook response: {exc}") from exc + Args: + hook: Hook configuration (workflow ID, method, timeout). + agw_client: Configured Agent Gateway client used for tool discovery + and invocation. - status = data.get("status", "") + Returns: + Parsed ``Message`` from the last executed workflow node, or ``None`` + if the hook completed successfully but produced no message. - if status in _EXECUTE_TERMINAL_STATUSES: - error_msg = data.get("error", "") - raise TransportError( - f"Workflow execution failed with status {status!r}" - + (f": {error_msg}" if error_msg else "") - ) + Raises: + TransportError: On AGW tool call errors, terminal execution failures, + or timeout. - if status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) + Example: + ```python + from sap_cloud_sdk.extensibility import call_hook + from sap_cloud_sdk.agentgateway import create_client as create_agw_client + + agw_client = create_agw_client(tenant_subdomain="my-tenant") + + result = await call_hook( + hook=impl.hooks[0], + agw_client=agw_client, ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: + ``` + """ + # 1. Create AGW client for the given tenant subdomain. + agw_client = None + agw_client = create_agw_client(tenant_subdomain) + + # 2. Discover MCP tools — AGW resolves N8N GTID and handles auth internally + # TODO: Cache the list of mcp tools for performance. + tools = await agw_client.list_mcp_tools(user_token=user_token or None) + + execute_tool = next( + ( + t for t in tools + if t.name == _EXECUTE_WORKFLOW_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if execute_tool is None: raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc - - # 3. Poll get_execution for running/new/waiting/started - execution_id = data.get("executionId") - deadline = time.monotonic() + hook.timeout - last_status = status + f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) - while time.monotonic() < deadline: - await asyncio.sleep(_HOOK_POLL_INTERVAL) + get_exec_tool = next( + ( + t for t in tools + if t.name == _GET_EXECUTION_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + ), + None, + ) + if get_exec_tool is None: + raise TransportError( + f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " + "not found via Agent Gateway." + ) + # 3. Execute workflow + message_body = message.model_dump(mode="json") if message is not None else {} try: result_str = await agw_client.call_mcp_tool( - get_exec_tool, + execute_tool, user_token=user_token or None, workflowId=hook.n8n_workflow_config.workflow_id, - executionId=str(execution_id), - includeData=True, + inputs={ + "type": "webhook", + "webhookData": { + "method": hook.n8n_workflow_config.method, + "query": {}, + "body": message_body, + "headers": headers or {}, + }, + }, ) except Exception as exc: raise TransportError( - f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}" + f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}" ) from exc try: @@ -583,11 +522,16 @@ async def call_hook( except Exception as exc: raise TransportError(f"Could not parse hook response: {exc}") from exc - last_status = data.get("execution", {}).get("status", "") or data.get( - "status", "" - ) + status = data.get("status", "") + + if status in _EXECUTE_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {status!r}" + + (f": {error_msg}" if error_msg else "") + ) - if last_status == "success": + if status == "success": try: result_data = data.get("data", {}).get("resultData", {}) last_node = result_data.get("lastNodeExecuted", "") @@ -604,14 +548,61 @@ async def call_hook( f"Failed to extract response from last executed node: {exc}" ) from exc - if last_status in _EXECUTION_TERMINAL_STATUSES: - error_msg = data.get("error", "") - raise TransportError( - f"Workflow execution failed with status {last_status!r}" - + (f": {error_msg}" if error_msg else "") + # 4. Poll get_execution for running/new/waiting/started + execution_id = data.get("executionId") + deadline = time.monotonic() + hook.timeout + last_status = status + + while time.monotonic() < deadline: + await asyncio.sleep(_HOOK_POLL_INTERVAL) + + try: + result_str = await agw_client.call_mcp_tool( + get_exec_tool, + user_token=user_token or None, + workflowId=hook.n8n_workflow_config.workflow_id, + executionId=str(execution_id), + includeData=True, + ) + except Exception as exc: + raise TransportError( + f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}" + ) from exc + + try: + data = json.loads(result_str) + except Exception as exc: + raise TransportError(f"Could not parse hook response: {exc}") from exc + + last_status = data.get("execution", {}).get("status", "") or data.get( + "status", "" ) - raise TransportError( - f"Workflow execution timed out after {hook.timeout}s. " - f"Last status: {last_status!r}" - ) + if last_status == "success": + try: + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + if last_status in _EXECUTION_TERMINAL_STATUSES: + error_msg = data.get("error", "") + raise TransportError( + f"Workflow execution failed with status {last_status!r}" + + (f": {error_msg}" if error_msg else "") + ) + + raise TransportError( + f"Workflow execution timed out after {hook.timeout}s. " + f"Last status: {last_status!r}" + ) From 0dd847e523265ccf870e1d7d1388ad16d5c791a8 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Wed, 17 Jun 2026 15:28:07 +0530 Subject: [PATCH 05/21] remove unused import --- src/sap_cloud_sdk/extensibility/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/__init__.py b/src/sap_cloud_sdk/extensibility/__init__.py index 1b5c172c..3d896225 100644 --- a/src/sap_cloud_sdk/extensibility/__init__.py +++ b/src/sap_cloud_sdk/extensibility/__init__.py @@ -90,7 +90,7 @@ def _mock_file(name: str) -> str: ) from sap_cloud_sdk.extensibility._noop_transport import NoOpTransport from sap_cloud_sdk.extensibility._ums_transport import UmsTransport -from sap_cloud_sdk.extensibility.client import ExtensibilityClient, call_hook +from sap_cloud_sdk.extensibility.client import ExtensibilityClient from sap_cloud_sdk.extensibility.config import ExtensibilityConfig, HookConfig from sap_cloud_sdk.extensibility.exceptions import ( ClientCreationError, @@ -177,7 +177,6 @@ def create_client( # Client "create_client", "ExtensibilityClient", - "call_hook", # Local mode "LocalTransport", "CLOUD_SDK_LOCAL_EXTENSIBILITY_FILE_ENV", From 5306f3a52f1eef560c0b922967403b4494d0db1b Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Wed, 17 Jun 2026 19:06:31 +0530 Subject: [PATCH 06/21] Adding unit tests --- src/sap_cloud_sdk/extensibility/client.py | 1 + tests/extensibility/unit/test_client.py | 114 +++++++++++++++++----- 2 files changed, 93 insertions(+), 22 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index f87b35df..7a2f1c9c 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -420,6 +420,7 @@ def call_hook( Operation.EXTENSIBILITY_CALL_HOOK, ) async def call_hook( + self, hook: Hook, user_token: Optional[str] = None, message: Optional[Any] = None, diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index 901e3ae3..f5c449a6 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -5,7 +5,7 @@ import pytest -from sap_cloud_sdk.extensibility import call_hook, create_client +from sap_cloud_sdk.extensibility import create_client from sap_cloud_sdk.extensibility.client import ( ExtensibilityClient, _EXECUTE_WORKFLOW_TOOL_NAME, @@ -334,27 +334,44 @@ def _make_agw_client(tools: list, tool_responses: list) -> MagicMock: # --------------------------------------------------------------------------- -# Tests for standalone call_hook() +# Tests for ExtensibilityClient.call_hook # --------------------------------------------------------------------------- class TestCallHook: - """Tests for the standalone call_hook() module-level function.""" + """Tests for ExtensibilityClient.call_hook (async, AGW-based).""" + + def _make_client(self, agw: MagicMock) -> ExtensibilityClient: + """Build an ExtensibilityClient with a mock transport and patched AGW factory.""" + client = ExtensibilityClient(MagicMock()) + # Stash the agw on the instance for the patcher closure to return. + client._test_agw = agw # type: ignore[attr-defined] + return client @pytest.mark.asyncio async def test_execute_tool_not_found_raises(self): """Raises TransportError when execute_workflow tool is absent.""" agw = _make_agw_client(tools=[], tool_responses=[]) - with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): - await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_get_exec_tool_not_found_raises(self): """Raises TransportError when get_execution tool is absent.""" tools = [_make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME)] agw = _make_agw_client(tools=tools, tool_responses=[]) - with pytest.raises(TransportError, match=_GET_EXECUTION_TOOL_NAME): - await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match=_GET_EXECUTION_TOOL_NAME): + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_composite_key_ignores_wrong_server(self): @@ -364,8 +381,13 @@ async def test_composite_key_ignores_wrong_server(self): _make_other_server_tool(_GET_EXECUTION_TOOL_NAME), ] agw = _make_agw_client(tools=tools, tool_responses=[]) - with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): - await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_composite_key_picks_correct_tool_among_duplicates(self): @@ -380,7 +402,12 @@ async def test_composite_key_picks_correct_tool_among_duplicates(self): tools=tools, tool_responses=[_success_payload()], ) - result = await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") assert result is not None # call_mcp_tool must have been called with the N8N tool, not the other one called_tool = agw.call_mcp_tool.call_args[0][0] @@ -397,7 +424,12 @@ async def test_success_synchronous(self): tools=tools, tool_responses=[_success_payload()], ) - result = await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") assert result is not None assert result.message_id == "msg-1" agw.call_mcp_tool.assert_called_once() @@ -413,8 +445,15 @@ async def test_success_after_polling(self): tools=tools, tool_responses=[_running_payload(), _poll_success_payload()], ) - with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): - result = await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): + result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") assert result is not None assert result.message_id == "msg-2" assert agw.call_mcp_tool.call_count == 2 @@ -428,8 +467,13 @@ async def test_terminal_status_from_execute_raises(self): ] terminal_payload = json.dumps({"status": "error", "error": "workflow crashed"}) agw = _make_agw_client(tools=tools, tool_responses=[terminal_payload]) - with pytest.raises(TransportError, match="workflow crashed"): - await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match="workflow crashed"): + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_terminal_status_from_poll_raises(self): @@ -443,9 +487,16 @@ async def test_terminal_status_from_poll_raises(self): tools=tools, tool_responses=[_running_payload(), poll_terminal], ) - with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): with pytest.raises(TransportError, match="node failed"): - await call_hook(hook=_make_hook(), agw_client=agw) + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_timeout_raises(self): @@ -459,11 +510,18 @@ async def test_timeout_raises(self): tools=tools, tool_responses=[_running_payload()] + [_running_payload()] * 100, ) + client = self._make_client(agw) # Use a hook with timeout=0 so monotonic deadline is immediately exceeded hook = _make_hook(timeout=0) - with patch("sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock): + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, + ): with pytest.raises(TransportError, match="timed out"): - await call_hook(hook=hook, agw_client=agw) + await client.call_hook(hook=hook, tenant_subdomain="t") @pytest.mark.asyncio async def test_agw_call_mcp_tool_exception_raises_transport_error(self): @@ -475,8 +533,13 @@ async def test_agw_call_mcp_tool_exception_raises_transport_error(self): agw = MagicMock() agw.list_mcp_tools = AsyncMock(return_value=tools) agw.call_mcp_tool = AsyncMock(side_effect=RuntimeError("network error")) - with pytest.raises(TransportError, match="network error"): - await call_hook(hook=_make_hook(), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + with pytest.raises(TransportError, match="network error"): + await client.call_hook(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_workflow_id_passed_to_execute_tool(self): @@ -486,6 +549,13 @@ async def test_workflow_id_passed_to_execute_tool(self): _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), ] agw = _make_agw_client(tools=tools, tool_responses=[_success_payload("wf-xyz")]) - await call_hook(hook=_make_hook(workflow_id="wf-xyz"), agw_client=agw) + client = self._make_client(agw) + with patch( + "sap_cloud_sdk.extensibility.client.create_agw_client", + return_value=agw, + ): + await client.call_hook( + hook=_make_hook(workflow_id="wf-xyz"), tenant_subdomain="t" + ) kwargs = agw.call_mcp_tool.call_args[1] assert kwargs["workflowId"] == "wf-xyz" From 2ff0122401dcf7fac0313c2176d9fb420b4b4148 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Wed, 17 Jun 2026 21:43:21 +0530 Subject: [PATCH 07/21] combining arguments into object --- src/sap_cloud_sdk/extensibility/client.py | 32 ++++++++++++++--------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 7a2f1c9c..5066f12a 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -498,20 +498,23 @@ async def call_hook( # 3. Execute workflow message_body = message.model_dump(mode="json") if message is not None else {} + execute_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "inputs": { + "type": "webhook", + "webhookData": { + "method": hook.n8n_workflow_config.method, + "query": {}, + "body": message_body, + "headers": headers or {}, + }, + }, + } try: result_str = await agw_client.call_mcp_tool( execute_tool, user_token=user_token or None, - workflowId=hook.n8n_workflow_config.workflow_id, - inputs={ - "type": "webhook", - "webhookData": { - "method": hook.n8n_workflow_config.method, - "query": {}, - "body": message_body, - "headers": headers or {}, - }, - }, + **execute_arguments, ) except Exception as exc: raise TransportError( @@ -558,12 +561,15 @@ async def call_hook( await asyncio.sleep(_HOOK_POLL_INTERVAL) try: + get_execution_arguments = { + "workflowId": hook.n8n_workflow_config.workflow_id, + "executionId": str(execution_id), + "includeData": True, + } result_str = await agw_client.call_mcp_tool( get_exec_tool, user_token=user_token or None, - workflowId=hook.n8n_workflow_config.workflow_id, - executionId=str(execution_id), - includeData=True, + **get_execution_arguments, ) except Exception as exc: raise TransportError( From b14493dfac591467a6cc02cda791d74aebc75d37 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 12:08:17 +0530 Subject: [PATCH 08/21] fix function documentation --- src/sap_cloud_sdk/extensibility/client.py | 32 +++++++++++++++-------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 5066f12a..fb2e1228 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -433,13 +433,20 @@ async def call_hook( ``execute_workflow``, then polls ``get_execution`` every 500 ms until the execution succeeds, fails, or ``hook.timeout`` seconds elapse. - Auth and endpoint resolution are handled internally by the AGW client — - no manual token or URL configuration is required. + Auth and endpoint resolution are handled internally by an AGW client + created from ``tenant_subdomain`` — no manual token or URL configuration + is required. Args: hook: Hook configuration (workflow ID, method, timeout). - agw_client: Configured Agent Gateway client used for tool discovery - and invocation. + user_token: Optional user token forwarded to the Agent Gateway client + for MCP tool discovery and invocation. + message: Optional A2A ``Message`` payload serialised into the webhook + body sent to the n8n workflow. + headers: Optional HTTP headers included in the webhook data passed to + the n8n workflow. + tenant_subdomain: Tenant subdomain used to instantiate the Agent + Gateway client. Pass ``None`` to use the default subdomain. Returns: Parsed ``Message`` from the last executed workflow node, or ``None`` @@ -451,15 +458,18 @@ async def call_hook( Example: ```python - from sap_cloud_sdk.extensibility import call_hook - from sap_cloud_sdk.agentgateway import create_client as create_agw_client + from sap_cloud_sdk.extensibility import create_client - agw_client = create_agw_client(tenant_subdomain="my-tenant") + client = create_client("sap.ai:agent:myAgent:v1") + impl = client.get_extension_capability_implementation(tenant="tenant-abc") - result = await call_hook( - hook=impl.hooks[0], - agw_client=agw_client, - ) + if impl.hooks: + result = await client.call_hook( + hook=impl.hooks[0], + user_token="my-user-token", + message=my_message, + tenant_subdomain="my-tenant", + ) ``` """ # 1. Create AGW client for the given tenant subdomain. From 6d070d3dfab7b0c167f87c13ffd9f88f230ce5e1 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 13:37:02 +0530 Subject: [PATCH 09/21] fixing job failures --- pyproject.toml | 2 +- src/sap_cloud_sdk/extensibility/client.py | 4 ++-- tests/extensibility/unit/test_client.py | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c41737f2..169864d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sap-cloud-sdk" -version = "0.25.2" +version = "0.25.3" description = "SAP Cloud SDK for Python" readme = "README.md" license = "Apache-2.0" diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index fb2e1228..0927e289 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -524,7 +524,7 @@ async def call_hook( result_str = await agw_client.call_mcp_tool( execute_tool, user_token=user_token or None, - **execute_arguments, + **execute_arguments, # type: ignore[arg-type] ) except Exception as exc: raise TransportError( @@ -579,7 +579,7 @@ async def call_hook( result_str = await agw_client.call_mcp_tool( get_exec_tool, user_token=user_token or None, - **get_execution_arguments, + **get_execution_arguments, # type: ignore[arg-type] ) except Exception as exc: raise TransportError( diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index f5c449a6..d5951258 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -343,10 +343,7 @@ class TestCallHook: def _make_client(self, agw: MagicMock) -> ExtensibilityClient: """Build an ExtensibilityClient with a mock transport and patched AGW factory.""" - client = ExtensibilityClient(MagicMock()) - # Stash the agw on the instance for the patcher closure to return. - client._test_agw = agw # type: ignore[attr-defined] - return client + return ExtensibilityClient(MagicMock()) @pytest.mark.asyncio async def test_execute_tool_not_found_raises(self): From bb3a7cf53402357870d6f6fdff669c7adb7d72ec Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 15:19:37 +0530 Subject: [PATCH 10/21] rename call_hook to call_hook_agw --- src/sap_cloud_sdk/extensibility/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 0927e289..ad2cbb8b 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -419,7 +419,7 @@ def call_hook( Module.EXTENSIBILITY, Operation.EXTENSIBILITY_CALL_HOOK, ) - async def call_hook( + async def call_hook_agw( self, hook: Hook, user_token: Optional[str] = None, @@ -464,7 +464,7 @@ async def call_hook( impl = client.get_extension_capability_implementation(tenant="tenant-abc") if impl.hooks: - result = await client.call_hook( + result = await client.call_hook_agw( hook=impl.hooks[0], user_token="my-user-token", message=my_message, From d90d7510401811018b7826c65a83f78751fb83ce Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 15:28:12 +0530 Subject: [PATCH 11/21] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 641483d7..60d58226 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sap-cloud-sdk" -version = "0.27.1" +version = "0.28.0" description = "SAP Cloud SDK for Python" readme = "README.md" license = "Apache-2.0" From a63bdf0d27f51e6f2792dada780057693085b0d8 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 15:32:17 +0530 Subject: [PATCH 12/21] fix job failures --- src/sap_cloud_sdk/extensibility/client.py | 4 ++-- tests/extensibility/unit/test_client.py | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index ad2cbb8b..510acdd3 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -524,7 +524,7 @@ async def call_hook_agw( result_str = await agw_client.call_mcp_tool( execute_tool, user_token=user_token or None, - **execute_arguments, # type: ignore[arg-type] + **execute_arguments, # type: ignore[arg-type] # ty: ignore[invalid-argument-type] ) except Exception as exc: raise TransportError( @@ -579,7 +579,7 @@ async def call_hook_agw( result_str = await agw_client.call_mcp_tool( get_exec_tool, user_token=user_token or None, - **get_execution_arguments, # type: ignore[arg-type] + **get_execution_arguments, # type: ignore[arg-type] # ty: ignore[invalid-argument-type] ) except Exception as exc: raise TransportError( diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index d5951258..8160634a 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -355,7 +355,7 @@ async def test_execute_tool_not_found_raises(self): return_value=agw, ): with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_get_exec_tool_not_found_raises(self): @@ -368,7 +368,7 @@ async def test_get_exec_tool_not_found_raises(self): return_value=agw, ): with pytest.raises(TransportError, match=_GET_EXECUTION_TOOL_NAME): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_composite_key_ignores_wrong_server(self): @@ -384,7 +384,7 @@ async def test_composite_key_ignores_wrong_server(self): return_value=agw, ): with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_composite_key_picks_correct_tool_among_duplicates(self): @@ -404,7 +404,7 @@ async def test_composite_key_picks_correct_tool_among_duplicates(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") assert result is not None # call_mcp_tool must have been called with the N8N tool, not the other one called_tool = agw.call_mcp_tool.call_args[0][0] @@ -426,7 +426,7 @@ async def test_success_synchronous(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") assert result is not None assert result.message_id == "msg-1" agw.call_mcp_tool.assert_called_once() @@ -450,7 +450,7 @@ async def test_success_after_polling(self): "sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock, ): - result = await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") assert result is not None assert result.message_id == "msg-2" assert agw.call_mcp_tool.call_count == 2 @@ -470,7 +470,7 @@ async def test_terminal_status_from_execute_raises(self): return_value=agw, ): with pytest.raises(TransportError, match="workflow crashed"): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_terminal_status_from_poll_raises(self): @@ -493,7 +493,7 @@ async def test_terminal_status_from_poll_raises(self): new_callable=AsyncMock, ): with pytest.raises(TransportError, match="node failed"): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_timeout_raises(self): @@ -518,7 +518,7 @@ async def test_timeout_raises(self): new_callable=AsyncMock, ): with pytest.raises(TransportError, match="timed out"): - await client.call_hook(hook=hook, tenant_subdomain="t") + await client.call_hook_agw(hook=hook, tenant_subdomain="t") @pytest.mark.asyncio async def test_agw_call_mcp_tool_exception_raises_transport_error(self): @@ -536,7 +536,7 @@ async def test_agw_call_mcp_tool_exception_raises_transport_error(self): return_value=agw, ): with pytest.raises(TransportError, match="network error"): - await client.call_hook(hook=_make_hook(), tenant_subdomain="t") + await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_workflow_id_passed_to_execute_tool(self): @@ -551,7 +551,7 @@ async def test_workflow_id_passed_to_execute_tool(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - await client.call_hook( + await client.call_hook_agw( hook=_make_hook(workflow_id="wf-xyz"), tenant_subdomain="t" ) kwargs = agw.call_mcp_tool.call_args[1] From 5ecea63be8333d6fb1f2ea4e5b2704ed4965ee57 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 16:02:33 +0530 Subject: [PATCH 13/21] fix(extensibility): apply pre-commit formatting fixes --- src/sap_cloud_sdk/extensibility/client.py | 48 +++++++++++------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 510acdd3..352d9e31 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -223,34 +223,34 @@ def call_hook( hook_config: HookConfig, ) -> Optional[Message]: """Call a hook's MCP endpoint and poll until completion. - + Executes the workflow via ``execute-workflow``, then polls ``get-execution`` every 500 ms until the execution succeeds, fails, or ``hook.timeout`` seconds elapse. - + This method is transport-agnostic: regardless of how extension metadata was fetched (backend, local file, or no-op), the actual hook invocation is always a direct HTTP call to the URL embedded in the :class:`Hook` object. - + Args: hook: Hook configuration (workflow ID, method, timeout). hook_config: Hook invocation configuration (endpoint URL, auth token, optional payload). - + Returns: Parsed ``Message`` from the last executed workflow node, or ``None`` if the hook completed successfully but produced no message. - + Raises: TransportError: On HTTP errors, terminal execution failures, or timeout. - + Example: ```python from sap_cloud_sdk.extensibility import create_client - + client = create_client("sap.ai:agent:myAgent:v1") impl = client.get_extension_capability_implementation(tenant="tenant-abc") - + if impl.hooks: hook = impl.hooks[0] result = client.call_hook( @@ -265,13 +265,13 @@ def call_hook( """ headers = {**_JSONRPC_HEADERS} inject(headers) - + message_payload: dict[str, Any] = {} if hook_config.payload is not None: model_dump = getattr(hook_config.payload, "model_dump", None) if callable(model_dump): message_payload = cast(dict[str, Any], model_dump(exclude_none=True)) - + # 1. Execute workflow execute_workflow_arguments = { "workflowId": hook.n8n_workflow_config.workflow_id, @@ -285,7 +285,7 @@ def call_hook( }, }, } - + try: with httpx.Client( headers={"Authorization": f"Bearer {hook_config.auth_token}"}, @@ -304,16 +304,16 @@ def call_hook( raise TransportError( f"HTTP request to hook MCP endpoint failed: {exc}" ) from exc - + try: data = _extract_tool_result(_parse_response(tool_resp)) except TransportError: raise except Exception as exc: raise TransportError(f"Could not parse hook response: {exc}") from exc - + status = data.get("status", "") - + # 2. Fail fast on terminal statuses from execute-workflow if status in _EXECUTE_TERMINAL_STATUSES: error_msg = data.get("error", "") @@ -321,7 +321,7 @@ def call_hook( f"Workflow execution failed with status {status!r}" + (f": {error_msg}" if error_msg else "") ) - + # 3. Return immediately if execution completed synchronously if status == "success": try: @@ -339,7 +339,7 @@ def call_hook( raise TransportError( f"Failed to extract response from last executed node: {exc}" ) from exc - + # 4. Poll get-execution for running/new/waiting/started execution_id = data.get("executionId") get_execution_arguments = { @@ -347,12 +347,12 @@ def call_hook( "executionId": str(execution_id), "includeData": True, } - + deadline = time.monotonic() + hook.timeout last_status = status while time.monotonic() < deadline: time.sleep(_HOOK_POLL_INTERVAL) - + try: with httpx.Client( headers={"Authorization": f"Bearer {hook_config.auth_token}"}, @@ -371,18 +371,18 @@ def call_hook( raise TransportError( f"HTTP request to hook MCP endpoint failed: {exc}" ) from exc - + try: data = _extract_tool_result(_parse_response(tool_resp)) except TransportError: raise except Exception as exc: raise TransportError(f"Could not parse hook response: {exc}") from exc - + last_status = data.get("execution", {}).get("status", "") or data.get( "status", "" ) - + if last_status == "success": try: result_data = data.get("data", {}).get("resultData", {}) @@ -399,16 +399,16 @@ def call_hook( raise TransportError( f"Failed to extract response from last executed node: {exc}" ) from exc - + if last_status in _EXECUTION_TERMINAL_STATUSES: error_msg = data.get("error", "") raise TransportError( f"Workflow execution failed with status {last_status!r}" + (f": {error_msg}" if error_msg else "") ) - + # Continue polling for: running, waiting, new, unknown - + raise TransportError( f"Workflow execution timed out after {hook.timeout}s. " f"Last status: {last_status!r}" From debe427025d24068112193837d106f94b4bedc24 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 16:07:35 +0530 Subject: [PATCH 14/21] fix(extensibility): apply ruff formatting --- src/sap_cloud_sdk/extensibility/client.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 352d9e31..9e5c6e38 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -414,7 +414,6 @@ def call_hook( f"Last status: {last_status!r}" ) - @record_metrics( Module.EXTENSIBILITY, Operation.EXTENSIBILITY_CALL_HOOK, @@ -425,7 +424,7 @@ async def call_hook_agw( user_token: Optional[str] = None, message: Optional[Any] = None, headers: Optional[dict] = None, - tenant_subdomain: Optional[str] = None + tenant_subdomain: Optional[str] = None, ) -> Optional[Message]: """Call a hook via Agent Gateway MCP tool invocation. @@ -482,8 +481,10 @@ async def call_hook_agw( execute_tool = next( ( - t for t in tools - if t.name == _EXECUTE_WORKFLOW_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + t + for t in tools + if t.name == _EXECUTE_WORKFLOW_TOOL_NAME + and t.server_name == _N8N_MCP_SERVER_NAME ), None, ) @@ -495,8 +496,10 @@ async def call_hook_agw( get_exec_tool = next( ( - t for t in tools - if t.name == _GET_EXECUTION_TOOL_NAME and t.server_name == _N8N_MCP_SERVER_NAME + t + for t in tools + if t.name == _GET_EXECUTION_TOOL_NAME + and t.server_name == _N8N_MCP_SERVER_NAME ), None, ) From f068336acd6729fbaa18cae54e5d1fb28fdbb170 Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Mon, 22 Jun 2026 16:10:13 +0530 Subject: [PATCH 15/21] update comment --- src/sap_cloud_sdk/extensibility/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 9e5c6e38..c0713414 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -476,7 +476,7 @@ async def call_hook_agw( agw_client = create_agw_client(tenant_subdomain) # 2. Discover MCP tools — AGW resolves N8N GTID and handles auth internally - # TODO: Cache the list of mcp tools for performance. + # TODO: cache the list of mcp tools for performance. tools = await agw_client.list_mcp_tools(user_token=user_token or None) execute_tool = next( From b4f4f7a5279c7adddf9066a619df8f55e11a3d1f Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Wed, 24 Jun 2026 13:24:41 +0530 Subject: [PATCH 16/21] Refactor call_hook_agw method --- src/sap_cloud_sdk/extensibility/client.py | 229 +++++++++++----------- tests/extensibility/unit/test_client.py | 35 ++-- 2 files changed, 135 insertions(+), 129 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index c0713414..f890f7b7 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -322,25 +322,7 @@ def call_hook( + (f": {error_msg}" if error_msg else "") ) - # 3. Return immediately if execution completed synchronously - if status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) - ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc - - # 4. Poll get-execution for running/new/waiting/started + # 3. Poll get-execution for running/new/waiting/started execution_id = data.get("executionId") get_execution_arguments = { "workflowId": hook.n8n_workflow_config.workflow_id, @@ -414,68 +396,7 @@ def call_hook( f"Last status: {last_status!r}" ) - @record_metrics( - Module.EXTENSIBILITY, - Operation.EXTENSIBILITY_CALL_HOOK, - ) - async def call_hook_agw( - self, - hook: Hook, - user_token: Optional[str] = None, - message: Optional[Any] = None, - headers: Optional[dict] = None, - tenant_subdomain: Optional[str] = None, - ) -> Optional[Message]: - """Call a hook via Agent Gateway MCP tool invocation. - - Discovers the N8N MCP tools via Agent Gateway, executes the workflow via - ``execute_workflow``, then polls ``get_execution`` every 500 ms until the - execution succeeds, fails, or ``hook.timeout`` seconds elapse. - - Auth and endpoint resolution are handled internally by an AGW client - created from ``tenant_subdomain`` — no manual token or URL configuration - is required. - - Args: - hook: Hook configuration (workflow ID, method, timeout). - user_token: Optional user token forwarded to the Agent Gateway client - for MCP tool discovery and invocation. - message: Optional A2A ``Message`` payload serialised into the webhook - body sent to the n8n workflow. - headers: Optional HTTP headers included in the webhook data passed to - the n8n workflow. - tenant_subdomain: Tenant subdomain used to instantiate the Agent - Gateway client. Pass ``None`` to use the default subdomain. - - Returns: - Parsed ``Message`` from the last executed workflow node, or ``None`` - if the hook completed successfully but produced no message. - - Raises: - TransportError: On AGW tool call errors, terminal execution failures, - or timeout. - - Example: - ```python - from sap_cloud_sdk.extensibility import create_client - - client = create_client("sap.ai:agent:myAgent:v1") - impl = client.get_extension_capability_implementation(tenant="tenant-abc") - - if impl.hooks: - result = await client.call_hook_agw( - hook=impl.hooks[0], - user_token="my-user-token", - message=my_message, - tenant_subdomain="my-tenant", - ) - ``` - """ - # 1. Create AGW client for the given tenant subdomain. - agw_client = None - agw_client = create_agw_client(tenant_subdomain) - - # 2. Discover MCP tools — AGW resolves N8N GTID and handles auth internally + async def _discover_n8n_tools(self, agw_client: Any, user_token: Optional[str]) -> tuple[Any, Any]: # TODO: cache the list of mcp tools for performance. tools = await agw_client.list_mcp_tools(user_token=user_token or None) @@ -509,7 +430,17 @@ async def call_hook_agw( "not found via Agent Gateway." ) - # 3. Execute workflow + return execute_tool, get_exec_tool + + async def _execute_workflow_via_agw( + self, + agw_client: Any, + execute_tool: Any, + hook: Hook, + user_token: Optional[str], + message: Optional[Any], + headers: Optional[dict], + ) -> tuple[str, Any]: message_body = message.model_dump(mode="json") if message is not None else {} execute_arguments = { "workflowId": hook.n8n_workflow_config.workflow_id, @@ -540,7 +471,6 @@ async def call_hook_agw( raise TransportError(f"Could not parse hook response: {exc}") from exc status = data.get("status", "") - if status in _EXECUTE_TERMINAL_STATUSES: error_msg = data.get("error", "") raise TransportError( @@ -548,27 +478,38 @@ async def call_hook_agw( + (f": {error_msg}" if error_msg else "") ) - if status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) - ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc - - # 4. Poll get_execution for running/new/waiting/started execution_id = data.get("executionId") + return str(execution_id), status + + @staticmethod + def _extract_message(data: dict) -> Message: + try: + result_data = data.get("data", {}).get("resultData", {}) + last_node = result_data.get("lastNodeExecuted", "") + response_json = ( + result_data.get("runData", {}) + .get(last_node, [{}])[0] + .get("data", {}) + .get("main", [[{}]])[0][0] + .get("json", {}) + ) + return Message(**response_json) + except (KeyError, IndexError, TypeError, ValidationError) as exc: + raise TransportError( + f"Failed to extract response from last executed node: {exc}" + ) from exc + + async def _poll_hook_execution( + self, + agw_client: Any, + get_exec_tool: Any, + hook: Hook, + execution_id: str, + user_token: Optional[str], + initial_status: str, + ) -> Optional[Message]: deadline = time.monotonic() + hook.timeout - last_status = status + last_status = initial_status while time.monotonic() < deadline: await asyncio.sleep(_HOOK_POLL_INTERVAL) @@ -576,7 +517,7 @@ async def call_hook_agw( try: get_execution_arguments = { "workflowId": hook.n8n_workflow_config.workflow_id, - "executionId": str(execution_id), + "executionId": execution_id, "includeData": True, } result_str = await agw_client.call_mcp_tool( @@ -599,21 +540,7 @@ async def call_hook_agw( ) if last_status == "success": - try: - result_data = data.get("data", {}).get("resultData", {}) - last_node = result_data.get("lastNodeExecuted", "") - response_json = ( - result_data.get("runData", {}) - .get(last_node, [{}])[0] - .get("data", {}) - .get("main", [[{}]])[0][0] - .get("json", {}) - ) - return Message(**response_json) - except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( - f"Failed to extract response from last executed node: {exc}" - ) from exc + return self._extract_message(data) if last_status in _EXECUTION_TERMINAL_STATUSES: error_msg = data.get("error", "") @@ -626,3 +553,69 @@ async def call_hook_agw( f"Workflow execution timed out after {hook.timeout}s. " f"Last status: {last_status!r}" ) + + @record_metrics( + Module.EXTENSIBILITY, + Operation.EXTENSIBILITY_CALL_HOOK, + ) + async def call_hook_agw( + self, + hook: Hook, + user_token: Optional[str] = None, + message: Optional[Any] = None, + headers: Optional[dict] = None, + tenant_subdomain: Optional[str] = None, + ) -> Optional[Message]: + """Call a hook via Agent Gateway MCP tool invocation. + + Discovers the N8N MCP tools via Agent Gateway, executes the workflow via + ``execute_workflow``, then polls ``get_execution`` every 500 ms until the + execution succeeds, fails, or ``hook.timeout`` seconds elapse. + + Auth and endpoint resolution are handled internally by an AGW client + created from ``tenant_subdomain`` — no manual token or URL configuration + is required. + + Args: + hook: Hook configuration (workflow ID, method, timeout). + user_token: Optional user token forwarded to the Agent Gateway client + for MCP tool discovery and invocation. + message: Optional A2A ``Message`` payload serialised into the webhook + body sent to the n8n workflow. + headers: Optional HTTP headers included in the webhook data passed to + the n8n workflow. + tenant_subdomain: Tenant subdomain used to instantiate the Agent + Gateway client. Pass ``None`` to use the default subdomain. + + Returns: + Parsed ``Message`` from the last executed workflow node, or ``None`` + if the hook completed successfully but produced no message. + + Raises: + TransportError: On AGW tool call errors, terminal execution failures, + or timeout. + + Example: + ```python + from sap_cloud_sdk.extensibility import create_client + + client = create_client("sap.ai:agent:myAgent:v1") + impl = client.get_extension_capability_implementation(tenant="tenant-abc") + + if impl.hooks: + result = await client.call_hook_agw( + hook=impl.hooks[0], + user_token="my-user-token", + message=my_message, + tenant_subdomain="my-tenant", + ) + ``` + """ + agw_client = create_agw_client(tenant_subdomain) + execute_tool, get_exec_tool = await self._discover_n8n_tools(agw_client, user_token) + execution_id, status = await self._execute_workflow_via_agw( + agw_client, execute_tool, hook, user_token, message, headers + ) + return await self._poll_hook_execution( + agw_client, get_exec_tool, hook, execution_id, user_token, status + ) diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index 8160634a..55e8ea5b 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -397,39 +397,45 @@ async def test_composite_key_picks_correct_tool_among_duplicates(self): ] agw = _make_agw_client( tools=tools, - tool_responses=[_success_payload()], + tool_responses=[_running_payload(), _poll_success_payload()], ) client = self._make_client(agw) with patch( "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, ): result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") assert result is not None - # call_mcp_tool must have been called with the N8N tool, not the other one - called_tool = agw.call_mcp_tool.call_args[0][0] - assert called_tool.server_name == _N8N_MCP_SERVER_NAME + # Both tool calls must have used the N8N server tools, not the other one + for call in agw.call_mcp_tool.call_args_list: + assert call[0][0].server_name == _N8N_MCP_SERVER_NAME @pytest.mark.asyncio async def test_success_synchronous(self): - """Returns a Message when execute_workflow responds with status=success.""" + """Returns a Message when get_execution responds with status=success.""" tools = [ _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), ] agw = _make_agw_client( tools=tools, - tool_responses=[_success_payload()], + tool_responses=[_running_payload(), _poll_success_payload()], ) client = self._make_client(agw) with patch( "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, ): result = await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") assert result is not None - assert result.message_id == "msg-1" - agw.call_mcp_tool.assert_called_once() + assert result.message_id == "msg-2" + assert agw.call_mcp_tool.call_count == 2 @pytest.mark.asyncio async def test_success_after_polling(self): @@ -545,14 +551,21 @@ async def test_workflow_id_passed_to_execute_tool(self): _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), ] - agw = _make_agw_client(tools=tools, tool_responses=[_success_payload("wf-xyz")]) + agw = _make_agw_client( + tools=tools, + tool_responses=[_running_payload(), _poll_success_payload()], + ) client = self._make_client(agw) with patch( "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, + ), patch( + "sap_cloud_sdk.extensibility.client.asyncio.sleep", + new_callable=AsyncMock, ): await client.call_hook_agw( hook=_make_hook(workflow_id="wf-xyz"), tenant_subdomain="t" ) - kwargs = agw.call_mcp_tool.call_args[1] - assert kwargs["workflowId"] == "wf-xyz" + # First call is execute_workflow — check workflowId was forwarded correctly + first_call_kwargs = agw.call_mcp_tool.call_args_list[0][1] + assert first_call_kwargs["workflowId"] == "wf-xyz" From eafc39d5e7ef651dc25d9a4b69e030854cfe202c Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Wed, 24 Jun 2026 13:28:36 +0530 Subject: [PATCH 17/21] remove unused-ignore-comment --- src/sap_cloud_sdk/extensibility/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index f890f7b7..cc40980c 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -458,7 +458,7 @@ async def _execute_workflow_via_agw( result_str = await agw_client.call_mcp_tool( execute_tool, user_token=user_token or None, - **execute_arguments, # type: ignore[arg-type] # ty: ignore[invalid-argument-type] + **execute_arguments, # type: ignore[arg-type] ) except Exception as exc: raise TransportError( @@ -523,7 +523,7 @@ async def _poll_hook_execution( result_str = await agw_client.call_mcp_tool( get_exec_tool, user_token=user_token or None, - **get_execution_arguments, # type: ignore[arg-type] # ty: ignore[invalid-argument-type] + **get_execution_arguments, # type: ignore[arg-type] ) except Exception as exc: raise TransportError( From 2c6f53cf10922119181c112ffebdf4eb4b96845c Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Wed, 24 Jun 2026 13:43:39 +0530 Subject: [PATCH 18/21] fixed formatting --- src/sap_cloud_sdk/extensibility/client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index cc40980c..acb43adf 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -396,7 +396,9 @@ def call_hook( f"Last status: {last_status!r}" ) - async def _discover_n8n_tools(self, agw_client: Any, user_token: Optional[str]) -> tuple[Any, Any]: + async def _discover_n8n_tools( + self, agw_client: Any, user_token: Optional[str] + ) -> tuple[Any, Any]: # TODO: cache the list of mcp tools for performance. tools = await agw_client.list_mcp_tools(user_token=user_token or None) @@ -612,7 +614,9 @@ async def call_hook_agw( ``` """ agw_client = create_agw_client(tenant_subdomain) - execute_tool, get_exec_tool = await self._discover_n8n_tools(agw_client, user_token) + execute_tool, get_exec_tool = await self._discover_n8n_tools( + agw_client, user_token + ) execution_id, status = await self._execute_workflow_via_agw( agw_client, execute_tool, hook, user_token, message, headers ) From 0c360f41a0c60a80ab432d12e9261afd195f21cd Mon Sep 17 00:00:00 2001 From: "Yashmeet ." Date: Thu, 25 Jun 2026 09:43:00 +0530 Subject: [PATCH 19/21] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 725e69cc..b297bd73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sap-cloud-sdk" -version = "0.29.0" +version = "0.29.1" description = "SAP Cloud SDK for Python" readme = "README.md" license = "Apache-2.0" From 260ed83a287e15ffad48d7030fe6ee0567386d6d Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Fri, 26 Jun 2026 14:23:11 +0530 Subject: [PATCH 20/21] review comments --- pyproject.toml | 2 +- src/sap_cloud_sdk/extensibility/client.py | 31 ++++----- src/sap_cloud_sdk/extensibility/user-guide.md | 66 +++++++++++++++++-- 3 files changed, 76 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b297bd73..96b9ff77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sap-cloud-sdk" -version = "0.29.1" +version = "0.30.0" description = "SAP Cloud SDK for Python" readme = "README.md" license = "Apache-2.0" diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index acb43adf..76ce3f60 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -23,7 +23,7 @@ Hook, ) from sap_cloud_sdk.extensibility.config import HookConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError, TransportError if TYPE_CHECKING: from sap_cloud_sdk.extensibility._local_transport import LocalTransport @@ -95,7 +95,7 @@ def _parse_response(response: httpx.Response) -> dict[str, Any]: def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: if "error" in jsonrpc: msg = jsonrpc["error"].get("message", "Unknown error") - raise TransportError(f"n8n returned an error: {msg}") + raise ExtensibilityError(f"n8n returned an error: {msg}") result = jsonrpc.get("result", {}) if result.get("isError"): @@ -103,7 +103,7 @@ def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: error_text = next( (c.get("text", "") for c in content if c.get("type") == "text"), "" ) - raise TransportError(f"n8n tool call failed: {error_text}") + raise ExtensibilityError(f"n8n tool call failed: {error_text}") for item in result.get("content", []): if item.get("type") == "text": @@ -116,7 +116,7 @@ def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]: if structured is not None: return structured - raise TransportError("Hook response contains no parseable content.") + raise ExtensibilityError("Hook response contains no parseable content.") class ExtensibilityClient: @@ -317,7 +317,7 @@ def call_hook( # 2. Fail fast on terminal statuses from execute-workflow if status in _EXECUTE_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {status!r}" + (f": {error_msg}" if error_msg else "") ) @@ -378,28 +378,29 @@ def call_hook( ) return Message(**response_json) except (KeyError, IndexError, TypeError, ValidationError) as exc: - raise TransportError( + raise ExtensibilityError( f"Failed to extract response from last executed node: {exc}" ) from exc if last_status in _EXECUTION_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {last_status!r}" + (f": {error_msg}" if error_msg else "") ) # Continue polling for: running, waiting, new, unknown - raise TransportError( + raise ExtensibilityError( f"Workflow execution timed out after {hook.timeout}s. " f"Last status: {last_status!r}" ) async def _discover_n8n_tools( - self, agw_client: Any, user_token: Optional[str] + self, + agw_client: Any, + user_token: Optional[str] ) -> tuple[Any, Any]: - # TODO: cache the list of mcp tools for performance. tools = await agw_client.list_mcp_tools(user_token=user_token or None) execute_tool = next( @@ -412,7 +413,7 @@ async def _discover_n8n_tools( None, ) if execute_tool is None: - raise TransportError( + raise ExtensibilityError( f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " "not found via Agent Gateway." ) @@ -427,7 +428,7 @@ async def _discover_n8n_tools( None, ) if get_exec_tool is None: - raise TransportError( + raise ExtensibilityError( f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' " "not found via Agent Gateway." ) @@ -475,7 +476,7 @@ async def _execute_workflow_via_agw( status = data.get("status", "") if status in _EXECUTE_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {status!r}" + (f": {error_msg}" if error_msg else "") ) @@ -546,12 +547,12 @@ async def _poll_hook_execution( if last_status in _EXECUTION_TERMINAL_STATUSES: error_msg = data.get("error", "") - raise TransportError( + raise ExtensibilityError( f"Workflow execution failed with status {last_status!r}" + (f": {error_msg}" if error_msg else "") ) - raise TransportError( + raise ExtensibilityError( f"Workflow execution timed out after {hook.timeout}s. " f"Last status: {last_status!r}" ) diff --git a/src/sap_cloud_sdk/extensibility/user-guide.md b/src/sap_cloud_sdk/extensibility/user-guide.md index 2df95787..25e2e012 100644 --- a/src/sap_cloud_sdk/extensibility/user-guide.md +++ b/src/sap_cloud_sdk/extensibility/user-guide.md @@ -157,14 +157,37 @@ def call_hook( - `hook`: Hook configuration object containing workflow config (`n8n_workflow_config`), timeout, and other settings. - `hook_config`: Hook invocation configuration (`endpoint`, optional `auth_token`, and optional `payload`). - Returns the response data as a `Message` object, or `None` if no message is produced. -- Raises `TransportError` if the HTTP request fails or the response cannot be parsed as a valid `Message`. +- Raises `TransportError` if the HTTP request or SSE parsing fails. +- Raises `ExtensibilityError` if the workflow reaches a terminal failure status, times out, or n8n returns an application-level error. - The hook's `timeout` setting is used for the HTTP request timeout. - The hook HTTP method is taken from `hook.n8n_workflow_config.method`. - The workflow ID is taken from `hook.n8n_workflow_config.workflow_id`. -#### `N8nWorkflowConfig` +### `ExtensibilityClient.call_hook_agw()` -Workflow configuration embedded in each `Hook`. +Async variant of `call_hook()` that invokes a hook via the Agent Gateway MCP tool interface instead of a direct HTTP call. Auth and endpoint resolution are handled internally — no manual URL or token configuration is needed. + +```python +async def call_hook_agw( + self, + hook: Hook, + user_token: Optional[str] = None, + message: Optional[Message] = None, + headers: Optional[dict] = None, + tenant_subdomain: Optional[str] = None, +) -> Optional[Message]: ... +``` + +- `hook`: Hook configuration object containing workflow config, timeout, and other settings. +- `user_token`: Optional user token forwarded to the Agent Gateway client for MCP tool discovery and invocation. +- `message`: Optional A2A `Message` payload serialised into the webhook body sent to the n8n workflow. +- `headers`: Optional HTTP headers included in the webhook data passed to the n8n workflow. +- `tenant_subdomain`: Tenant subdomain used to instantiate the Agent Gateway client. Pass `None` to use the default subdomain. +- Returns the response data as a `Message` object, or `None` if no message is produced. +- Raises `TransportError` if the AGW tool call itself fails (network error, etc.). +- Raises `ExtensibilityError` if the n8n MCP tools are not found via AGW, the workflow reaches a terminal failure status, times out, or n8n returns an application-level error. + +#### `N8nWorkflowConfig` ```python @dataclass @@ -445,6 +468,35 @@ for hook in ext.hooks: print(f"On failure: {hook.on_failure}") ``` +### Calling Hook Endpoints via Agent Gateway + +Use `call_hook_agw()` to invoke a hook through the Agent Gateway MCP interface. No endpoint URL or auth token is required — the AGW client resolves these internally from the tenant subdomain. + +```python +from sap_cloud_sdk.extensibility import create_client, HookType +from a2a.types import Message, Role, TextPart + +client = create_client("sap.ai:agent:myAgent:v1") +ext = client.get_extension_capability_implementation(tenant=tenant_id) + +before_hooks = [h for h in ext.hooks if h.type == HookType.BEFORE] + +if before_hooks: + hook = before_hooks[0] + result = await client.call_hook_agw( + hook=hook, + user_token="my-user-token", + message=Message( + message_id="msg-001", + role=Role.user, + parts=[TextPart(text="Tool execution starting")], + ), + tenant_subdomain="my-tenant", + ) + if result: + print(f"Hook response: {result}") +``` + ### Calling Hook Endpoints Use the `call_hook()` method to execute a hook with a custom payload. Payloads use the `Message` type from `a2a.types`. @@ -561,7 +613,7 @@ Hooks can be configured with different failure behaviors via the `on_failure` fi ```python from sap_cloud_sdk.extensibility import create_client, OnFailure from sap_cloud_sdk.extensibility.config import HookConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError from a2a.types import Message, Role, TextPart client = create_client("sap.ai:agent:myAgent:v1") @@ -582,7 +634,7 @@ for hook in ext.hooks: response = client.call_hook(hook, hook_config) if response: print(f"Hook succeeded: {response}") - except TransportError as e: + except ExtensibilityError as e: if hook.on_failure == OnFailure.BLOCK: # Hook is configured to block on failure print(f"Critical hook failed, blocking: {e}") @@ -706,9 +758,9 @@ Validation issues produce log warnings but never prevent output generation. ### Exception Hierarchy -- `ExtensibilityError` -- Base exception for all extensibility module errors. +- `ExtensibilityError` -- Base exception for all extensibility module errors. Also raised directly for workflow-level failures: terminal execution status, timeout, n8n application-level errors (JSON-RPC errors, `isError` results), and missing MCP tools via AGW. - `ClientCreationError(ExtensibilityError)` -- Represents a client construction failure. Not raised by `create_client()` (which handles it internally), but available for use in custom client-construction logic. -- `TransportError(ExtensibilityError)` -- Raised by the transport layer on failure. Not seen when using the client, which catches all transport errors and returns an empty result. +- `TransportError(ExtensibilityError)` -- Raised by the transport layer on network-level failures: HTTP errors, SSE parsing failures, response decode errors. Not seen when using `get_extension_capability_implementation()`, which catches all errors and returns an empty result. May be raised by `call_hook()` and `call_hook_agw()` on network failures. ## Service Binding From 14af6754f7c2a3e4fe380847f09319033c4f658c Mon Sep 17 00:00:00 2001 From: Rishi Kunnath Date: Fri, 26 Jun 2026 15:50:13 +0530 Subject: [PATCH 21/21] fix scan failure --- src/sap_cloud_sdk/extensibility/client.py | 4 +--- tests/extensibility/unit/test_client.py | 24 +++++++++++------------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/sap_cloud_sdk/extensibility/client.py b/src/sap_cloud_sdk/extensibility/client.py index 76ce3f60..e34cfc2b 100644 --- a/src/sap_cloud_sdk/extensibility/client.py +++ b/src/sap_cloud_sdk/extensibility/client.py @@ -397,9 +397,7 @@ def call_hook( ) async def _discover_n8n_tools( - self, - agw_client: Any, - user_token: Optional[str] + self, agw_client: Any, user_token: Optional[str] ) -> tuple[Any, Any]: tools = await agw_client.list_mcp_tools(user_token=user_token or None) diff --git a/tests/extensibility/unit/test_client.py b/tests/extensibility/unit/test_client.py index 55e8ea5b..569ef73c 100644 --- a/tests/extensibility/unit/test_client.py +++ b/tests/extensibility/unit/test_client.py @@ -24,7 +24,7 @@ ) from http import HTTPMethod from sap_cloud_sdk.extensibility.config import ExtensibilityConfig -from sap_cloud_sdk.extensibility.exceptions import TransportError +from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError, TransportError from sap_cloud_sdk.agentgateway._models import MCPTool @@ -347,19 +347,19 @@ def _make_client(self, agw: MagicMock) -> ExtensibilityClient: @pytest.mark.asyncio async def test_execute_tool_not_found_raises(self): - """Raises TransportError when execute_workflow tool is absent.""" + """Raises ExtensibilityError when execute_workflow tool is absent.""" agw = _make_agw_client(tools=[], tool_responses=[]) client = self._make_client(agw) with patch( "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + with pytest.raises(ExtensibilityError, match=_EXECUTE_WORKFLOW_TOOL_NAME): await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_get_exec_tool_not_found_raises(self): - """Raises TransportError when get_execution tool is absent.""" + """Raises ExtensibilityError when get_execution tool is absent.""" tools = [_make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME)] agw = _make_agw_client(tools=tools, tool_responses=[]) client = self._make_client(agw) @@ -367,7 +367,7 @@ async def test_get_exec_tool_not_found_raises(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - with pytest.raises(TransportError, match=_GET_EXECUTION_TOOL_NAME): + with pytest.raises(ExtensibilityError, match=_GET_EXECUTION_TOOL_NAME): await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio @@ -383,7 +383,7 @@ async def test_composite_key_ignores_wrong_server(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - with pytest.raises(TransportError, match=_EXECUTE_WORKFLOW_TOOL_NAME): + with pytest.raises(ExtensibilityError, match=_EXECUTE_WORKFLOW_TOOL_NAME): await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio @@ -463,7 +463,7 @@ async def test_success_after_polling(self): @pytest.mark.asyncio async def test_terminal_status_from_execute_raises(self): - """Raises TransportError on a terminal status from execute_workflow.""" + """Raises ExtensibilityError on a terminal status from execute_workflow.""" tools = [ _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), @@ -475,12 +475,12 @@ async def test_terminal_status_from_execute_raises(self): "sap_cloud_sdk.extensibility.client.create_agw_client", return_value=agw, ): - with pytest.raises(TransportError, match="workflow crashed"): + with pytest.raises(ExtensibilityError, match="workflow crashed"): await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_terminal_status_from_poll_raises(self): - """Raises TransportError on a terminal status from get_execution poll.""" + """Raises ExtensibilityError on a terminal status from get_execution poll.""" tools = [ _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), @@ -498,12 +498,12 @@ async def test_terminal_status_from_poll_raises(self): "sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock, ): - with pytest.raises(TransportError, match="node failed"): + with pytest.raises(ExtensibilityError, match="node failed"): await client.call_hook_agw(hook=_make_hook(), tenant_subdomain="t") @pytest.mark.asyncio async def test_timeout_raises(self): - """Raises TransportError when deadline is exceeded without a success status.""" + """Raises ExtensibilityError when deadline is exceeded without a success status.""" tools = [ _make_n8n_tool(_EXECUTE_WORKFLOW_TOOL_NAME), _make_n8n_tool(_GET_EXECUTION_TOOL_NAME), @@ -523,7 +523,7 @@ async def test_timeout_raises(self): "sap_cloud_sdk.extensibility.client.asyncio.sleep", new_callable=AsyncMock, ): - with pytest.raises(TransportError, match="timed out"): + with pytest.raises(ExtensibilityError, match="timed out"): await client.call_hook_agw(hook=hook, tenant_subdomain="t") @pytest.mark.asyncio