Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8f0dc3d
Implement call hook via Agent Gateway
yashmeet29 Jun 11, 2026
812da27
UTs
yashmeet29 Jun 12, 2026
75fc414
updating n8n call to include token, headers, message
rishikunnath2747 Jun 12, 2026
41e0808
improvements
rishikunnath2747 Jun 17, 2026
0dd847e
remove unused import
rishikunnath2747 Jun 17, 2026
5306f3a
Adding unit tests
rishikunnath2747 Jun 17, 2026
2ff0122
combining arguments into object
rishikunnath2747 Jun 17, 2026
b14493d
fix function documentation
rishikunnath2747 Jun 22, 2026
6d070d3
fixing job failures
rishikunnath2747 Jun 22, 2026
bb3a7cf
rename call_hook to call_hook_agw
rishikunnath2747 Jun 22, 2026
62a2dfc
Merge branch 'main' into callHookViaGateway
rishikunnath2747 Jun 22, 2026
d90d751
Update pyproject.toml
rishikunnath2747 Jun 22, 2026
a63bdf0
fix job failures
rishikunnath2747 Jun 22, 2026
5ecea63
fix(extensibility): apply pre-commit formatting fixes
rishikunnath2747 Jun 22, 2026
debe427
fix(extensibility): apply ruff formatting
rishikunnath2747 Jun 22, 2026
f068336
update comment
rishikunnath2747 Jun 22, 2026
b4f4f7a
Refactor call_hook_agw method
yashmeet29 Jun 24, 2026
eafc39d
remove unused-ignore-comment
yashmeet29 Jun 24, 2026
2c6f53c
fixed formatting
yashmeet29 Jun 24, 2026
61f6c54
Merge branch 'main' into callHookViaGateway
yashmeet29 Jun 25, 2026
0c360f4
Update pyproject.toml
yashmeet29 Jun 25, 2026
260ed83
review comments
rishikunnath2747 Jun 26, 2026
a38909d
Merge branch 'main' into callHookViaGateway
rishikunnath2747 Jun 26, 2026
14af675
fix scan failure
rishikunnath2747 Jun 26, 2026
004595e
Merge branch 'callHookViaGateway' of https://github.com/yashmeet29/cl…
rishikunnath2747 Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "sap-cloud-sdk"
version = "0.29.1"
version = "0.30.0"
description = "SAP Cloud SDK for Python"
readme = "README.md"
license = "Apache-2.0"
Expand Down
266 changes: 239 additions & 27 deletions src/sap_cloud_sdk/extensibility/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import itertools
import json
import logging
Expand All @@ -15,13 +16,14 @@

from sap_cloud_sdk.core.telemetry import Module, Operation
from sap_cloud_sdk.core.telemetry.metrics_decorator import record_metrics
from sap_cloud_sdk.agentgateway import create_client as create_agw_client
from sap_cloud_sdk.extensibility._models import (
DEFAULT_EXTENSION_CAPABILITY_ID,
ExtensionCapabilityImplementation,
Hook,
)
from sap_cloud_sdk.extensibility.config import HookConfig
from sap_cloud_sdk.extensibility.exceptions import TransportError
from sap_cloud_sdk.extensibility.exceptions import ExtensibilityError, TransportError

if TYPE_CHECKING:
from sap_cloud_sdk.extensibility._local_transport import LocalTransport
Expand All @@ -38,6 +40,7 @@

_EXECUTE_WORKFLOW_TOOL_NAME = "execute_workflow"
_GET_EXECUTION_TOOL_NAME = "get_execution"
_N8N_MCP_SERVER_NAME = "sap.btpn8n:apiResource:ManagedN8nMcpServer:v1"
Comment thread
NicoleMGomes marked this conversation as resolved.

_JSONRPC_VERSION = "2.0"

Expand Down Expand Up @@ -92,15 +95,15 @@ def _parse_response(response: httpx.Response) -> dict[str, Any]:
def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]:
if "error" in jsonrpc:
msg = jsonrpc["error"].get("message", "Unknown error")
raise TransportError(f"n8n returned an error: {msg}")
raise ExtensibilityError(f"n8n returned an error: {msg}")

result = jsonrpc.get("result", {})
if result.get("isError"):
content = result.get("content", [])
error_text = next(
(c.get("text", "") for c in content if c.get("type") == "text"), ""
)
raise TransportError(f"n8n tool call failed: {error_text}")
raise ExtensibilityError(f"n8n tool call failed: {error_text}")

for item in result.get("content", []):
if item.get("type") == "text":
Expand All @@ -113,7 +116,7 @@ def _extract_tool_result(jsonrpc: dict[str, Any]) -> dict[str, Any]:
if structured is not None:
return structured

raise TransportError("Hook response contains no parseable content.")
raise ExtensibilityError("Hook response contains no parseable content.")


class ExtensibilityClient:
Expand Down Expand Up @@ -314,30 +317,12 @@ def call_hook(
# 2. Fail fast on terminal statuses from execute-workflow
if status in _EXECUTE_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise TransportError(
raise ExtensibilityError(
f"Workflow execution failed with status {status!r}"
+ (f": {error_msg}" if error_msg else "")
)

# 3. Return immediately if execution completed synchronously
if status == "success":
try:
result_data = data.get("data", {}).get("resultData", {})
last_node = result_data.get("lastNodeExecuted", "")
response_json = (
result_data.get("runData", {})
.get(last_node, [{}])[0]
.get("data", {})
.get("main", [[{}]])[0][0]
.get("json", {})
)
return Message(**response_json)
except (KeyError, IndexError, TypeError, ValidationError) as exc:
raise TransportError(
f"Failed to extract response from last executed node: {exc}"
) from exc

# 4. Poll get-execution for running/new/waiting/started
# 3. Poll get-execution for running/new/waiting/started
execution_id = data.get("executionId")
get_execution_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
Expand Down Expand Up @@ -393,20 +378,247 @@ def call_hook(
)
return Message(**response_json)
except (KeyError, IndexError, TypeError, ValidationError) as exc:
raise TransportError(
raise ExtensibilityError(
f"Failed to extract response from last executed node: {exc}"
) from exc

if last_status in _EXECUTION_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise TransportError(
raise ExtensibilityError(
f"Workflow execution failed with status {last_status!r}"
+ (f": {error_msg}" if error_msg else "")
)

# Continue polling for: running, waiting, new, unknown

raise TransportError(
raise ExtensibilityError(
f"Workflow execution timed out after {hook.timeout}s. "
f"Last status: {last_status!r}"
)

async def _discover_n8n_tools(
self, agw_client: Any, user_token: Optional[str]
) -> tuple[Any, Any]:
tools = await agw_client.list_mcp_tools(user_token=user_token or None)

execute_tool = next(
(
t
for t in tools
if t.name == _EXECUTE_WORKFLOW_TOOL_NAME
and t.server_name == _N8N_MCP_SERVER_NAME
),
None,
)
if execute_tool is None:
Comment thread
NicoleMGomes marked this conversation as resolved.
raise ExtensibilityError(
f"MCP tool '{_EXECUTE_WORKFLOW_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' "
"not found via Agent Gateway."
)

get_exec_tool = next(
(
t
for t in tools
if t.name == _GET_EXECUTION_TOOL_NAME
and t.server_name == _N8N_MCP_SERVER_NAME
),
None,
)
if get_exec_tool is None:
raise ExtensibilityError(
f"MCP tool '{_GET_EXECUTION_TOOL_NAME}' on server '{_N8N_MCP_SERVER_NAME}' "
"not found via Agent Gateway."
)

return execute_tool, get_exec_tool

async def _execute_workflow_via_agw(
self,
agw_client: Any,
execute_tool: Any,
hook: Hook,
user_token: Optional[str],
message: Optional[Any],
headers: Optional[dict],
) -> tuple[str, Any]:
message_body = message.model_dump(mode="json") if message is not None else {}
execute_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
"inputs": {
"type": "webhook",
"webhookData": {
"method": hook.n8n_workflow_config.method,
"query": {},
"body": message_body,
"headers": headers or {},
},
},
}
try:
result_str = await agw_client.call_mcp_tool(
execute_tool,
user_token=user_token or None,
**execute_arguments, # type: ignore[arg-type]
)
except Exception as exc:
raise TransportError(
f"AGW tool call for '{_EXECUTE_WORKFLOW_TOOL_NAME}' failed: {exc}"
) from exc

try:
data = json.loads(result_str)
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

status = data.get("status", "")
if status in _EXECUTE_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise ExtensibilityError(
f"Workflow execution failed with status {status!r}"
+ (f": {error_msg}" if error_msg else "")
)

execution_id = data.get("executionId")
return str(execution_id), status

@staticmethod
def _extract_message(data: dict) -> Message:
try:
result_data = data.get("data", {}).get("resultData", {})
last_node = result_data.get("lastNodeExecuted", "")
response_json = (
result_data.get("runData", {})
.get(last_node, [{}])[0]
.get("data", {})
.get("main", [[{}]])[0][0]
.get("json", {})
)
return Message(**response_json)
except (KeyError, IndexError, TypeError, ValidationError) as exc:
raise TransportError(
f"Failed to extract response from last executed node: {exc}"
) from exc

async def _poll_hook_execution(
self,
agw_client: Any,
get_exec_tool: Any,
hook: Hook,
execution_id: str,
user_token: Optional[str],
initial_status: str,
) -> Optional[Message]:
deadline = time.monotonic() + hook.timeout
last_status = initial_status

while time.monotonic() < deadline:
await asyncio.sleep(_HOOK_POLL_INTERVAL)

try:
get_execution_arguments = {
"workflowId": hook.n8n_workflow_config.workflow_id,
"executionId": execution_id,
"includeData": True,
}
result_str = await agw_client.call_mcp_tool(
get_exec_tool,
user_token=user_token or None,
**get_execution_arguments, # type: ignore[arg-type]
)
except Exception as exc:
raise TransportError(
f"AGW tool call for '{_GET_EXECUTION_TOOL_NAME}' failed: {exc}"
) from exc

try:
data = json.loads(result_str)
except Exception as exc:
raise TransportError(f"Could not parse hook response: {exc}") from exc

last_status = data.get("execution", {}).get("status", "") or data.get(
"status", ""
)

if last_status == "success":
return self._extract_message(data)

if last_status in _EXECUTION_TERMINAL_STATUSES:
error_msg = data.get("error", "")
raise ExtensibilityError(
f"Workflow execution failed with status {last_status!r}"
+ (f": {error_msg}" if error_msg else "")
)

raise ExtensibilityError(
f"Workflow execution timed out after {hook.timeout}s. "
f"Last status: {last_status!r}"
)

@record_metrics(
Module.EXTENSIBILITY,
Operation.EXTENSIBILITY_CALL_HOOK,
)
async def call_hook_agw(
self,
hook: Hook,
user_token: Optional[str] = None,
message: Optional[Any] = None,
headers: Optional[dict] = None,
tenant_subdomain: Optional[str] = None,
) -> Optional[Message]:
"""Call a hook via Agent Gateway MCP tool invocation.

Discovers the N8N MCP tools via Agent Gateway, executes the workflow via
``execute_workflow``, then polls ``get_execution`` every 500 ms until the
execution succeeds, fails, or ``hook.timeout`` seconds elapse.

Auth and endpoint resolution are handled internally by an AGW client
created from ``tenant_subdomain`` — no manual token or URL configuration
is required.

Args:
hook: Hook configuration (workflow ID, method, timeout).
user_token: Optional user token forwarded to the Agent Gateway client
for MCP tool discovery and invocation.
message: Optional A2A ``Message`` payload serialised into the webhook
body sent to the n8n workflow.
headers: Optional HTTP headers included in the webhook data passed to
the n8n workflow.
tenant_subdomain: Tenant subdomain used to instantiate the Agent
Gateway client. Pass ``None`` to use the default subdomain.

Returns:
Parsed ``Message`` from the last executed workflow node, or ``None``
if the hook completed successfully but produced no message.

Raises:
TransportError: On AGW tool call errors, terminal execution failures,
or timeout.

Example:
```python
from sap_cloud_sdk.extensibility import create_client

client = create_client("sap.ai:agent:myAgent:v1")
impl = client.get_extension_capability_implementation(tenant="tenant-abc")

if impl.hooks:
result = await client.call_hook_agw(
hook=impl.hooks[0],
user_token="my-user-token",
message=my_message,
tenant_subdomain="my-tenant",
)
```
"""
agw_client = create_agw_client(tenant_subdomain)
execute_tool, get_exec_tool = await self._discover_n8n_tools(
agw_client, user_token
)
execution_id, status = await self._execute_workflow_via_agw(
agw_client, execute_tool, hook, user_token, message, headers
)
return await self._poll_hook_execution(
agw_client, get_exec_tool, hook, execution_id, user_token, status
)
Loading
Loading