Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Tests

on:
push:
branches: [main]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
- name: Install package and test tooling
run: |
python -m pip install --upgrade pip
pip install -e .
pip install --group dev
- name: Run tests with coverage
# No --cov-fail-under yet: establish the baseline first, then ratchet
# the gate up instead of picking an aspirational number that blocks CI.
run: |
pytest -q --cov=agentkit --cov-report=term --cov-report=xml
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
name: coverage-${{ matrix.python-version }}
path: coverage.xml
10 changes: 9 additions & 1 deletion agentkit/apps/a2a_app/a2a_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,21 @@ async def wrapper(*args, **kwargs):

with telemetry.tracer.start_as_current_span(name="a2a_invocation") as span:
exception = None
# Initialize before try: if execute_func raises, the finally block
# below would otherwise hit UnboundLocalError and mask the original
# exception (mcp_app's equivalent wrapper already does this).
result = None
try:
result = await execute_func(
executor_instance, context=context, event_queue=event_queue
)

except Exception as e:
logger.error("Invoke agent execute function failed: %s", e)
logger.exception(
"Invoke agent execute function failed (context_id=%s): %s",
context.context_id,
e,
)
exception = e
raise e
finally:
Expand Down
23 changes: 17 additions & 6 deletions agentkit/apps/a2a_app/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
from opentelemetry.metrics import get_meter
from opentelemetry.trace.span import Span
from a2a.server.agent_execution.context import RequestContext
from google.adk.a2a.converters.request_converter import _get_user_id

from agentkit.apps.utils import safe_serialize_to_json_string
from agentkit.apps.utils import dont_throw, safe_serialize_to_json_string

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
Expand All @@ -47,6 +46,15 @@
logger = logging.getLogger("agentkit." + __name__)


def _get_user_id(request: RequestContext) -> str:
# Inlined from google-adk's private a2a request converter helper: a2a_app
# must not import google-adk (an undeclared dependency for pure-A2A apps).
call_context = getattr(request, "call_context", None)
if call_context and call_context.user and call_context.user.user_name:
return call_context.user.user_name
return f"A2A_USER_{request.context_id}"


class Telemetry:
def __init__(self):
self.tracer = get_tracer("agentkit.a2a_app")
Expand All @@ -58,6 +66,7 @@ def __init__(self):
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)

@dont_throw
def trace_a2a_agent(
self,
func: Callable,
Expand Down Expand Up @@ -89,10 +98,12 @@ def trace_a2a_agent(
if user_id:
span.set_attribute(key="gen_ai.user.id", value=user_id)

span.set_attribute(
key="gen_ai.input",
value=safe_serialize_to_json_string(request.message.parts),
)
message = getattr(request, "message", None)
if message is not None:
span.set_attribute(
key="gen_ai.input",
value=safe_serialize_to_json_string(message.parts),
)

span.set_attribute(key="gen_ai.span.kind", value="a2a_agent")
span.set_attribute(key="gen_ai.operation.name", value="invoke_agent")
Expand Down
104 changes: 29 additions & 75 deletions agentkit/apps/agent_server_app/agent_server_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from agentkit.apps.agent_server_app.telemetry import telemetry
from agentkit.apps.base_app import BaseAgentkitApp
from agentkit.apps.utils import SENSITIVE_HEADERS

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -261,7 +262,19 @@ async def event_generator():
telemetry.trace_agent_server_finish(
path="/run_sse", func_result="", exception=e
)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
# Do not echo internal exception details (paths, backend
# errors) to the client; full detail stays in server logs.
yield (
"data: "
+ json.dumps(
{
"error": "internal error while running agent; "
"see server logs",
"error_type": type(e).__name__,
}
)
+ "\n\n"
)
# Returns a streaming response with the proper media type for SSE

return StreamingResponse(
Expand All @@ -280,78 +293,6 @@ async def event_generator():
routes.insert(0, routes.pop(i))
break

@self.app.post("/run_sse")
async def run_agent_sse(req: RunAgentRequest) -> StreamingResponse:
print("my run sse !!!")
# SSE endpoint
session = await self.server.session_service.get_session(
app_name=req.app_name,
user_id=req.user_id,
session_id=req.session_id,
)
if not session:
raise HTTPException(status_code=404, detail="Session not found")

# Convert the events to properly formatted SSE
async def event_generator():
try:
stream_mode = (
StreamingMode.SSE
if req.streaming
else StreamingMode.NONE
)
runner = await self.server.get_runner_async(req.app_name)
async with Aclosing(
runner.run_async(
user_id=req.user_id,
session_id=req.session_id,
new_message=req.new_message,
state_delta=req.state_delta,
run_config=RunConfig(streaming_mode=stream_mode),
invocation_id=req.invocation_id,
)
) as agen:
async for event in agen:
# ADK Web renders artifacts from `actions.artifactDelta`
# during part processing *and* during action processing
# 1) the original event with `artifactDelta` cleared (content)
# 2) a content-less "action-only" event carrying `artifactDelta`
events_to_stream = [event]
if (
event.actions.artifact_delta
and event.content
and event.content.parts
):
content_event = event.model_copy(deep=True)
content_event.actions.artifact_delta = {}
artifact_event = event.model_copy(deep=True)
artifact_event.content = None
events_to_stream = [
content_event,
artifact_event,
]

for event_to_stream in events_to_stream:
sse_event = event_to_stream.model_dump_json(
exclude_none=True,
by_alias=True,
)
logger.debug(
"Generated event in agent run streaming: %s",
sse_event,
)
yield f"data: {sse_event}\n\n"
except Exception as e:
logger.exception("Error in event_generator: %s", e)
yield f"data: {json.dumps({'error': str(e)})}\n\n"

# Returns a streaming response with the proper media type for SSE

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)

# Attach ASGI middleware for unified telemetry across all routes
self.app.add_middleware(AgentkitTelemetryHTTPMiddleware)

Expand All @@ -364,7 +305,7 @@ async def _invoke_compat(request: Request):
telemetry_headers = {
k: v
for k, v in dict(headers).items()
if k.lower() not in {"authorization", "token"}
if k.lower() not in SENSITIVE_HEADERS
}
# trace request attributes on current span
telemetry.trace_agent_server(
Expand Down Expand Up @@ -443,10 +384,23 @@ async def event_generator():
# finish span on successful end of stream handled by middleware
pass
except Exception as e:
logger.exception("Error in /invoke event_generator: %s", e)
telemetry.trace_agent_server_finish(
path="/invoke", func_result="", exception=e
)
yield f'data: {{"error": "{str(e)}"}}\n\n'
# Do not echo internal exception details to the client;
# keep parity with the /run_sse error frame above.
yield (
"data: "
+ json.dumps(
{
"error": "internal error while running agent; "
"see server logs",
"error_type": type(e).__name__,
}
)
+ "\n\n"
)

return StreamingResponse(
event_generator(),
Expand Down
3 changes: 2 additions & 1 deletion agentkit/apps/agent_server_app/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from opentelemetry import context as context_api

from agentkit.apps.agent_server_app.telemetry import telemetry
from agentkit.apps.utils import SENSITIVE_HEADERS

_EXCLUDED_HEADERS = {"authorization", "token"}
_EXCLUDED_HEADERS = SENSITIVE_HEADERS


class AgentkitTelemetryHTTPMiddleware:
Expand Down
5 changes: 4 additions & 1 deletion agentkit/apps/simple_app/simple_app_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ async def _process_invoke(self, request: Request) -> tuple[dict, dict, Any]:
if asyncio.iscoroutinefunction(self.func):
return payload, headers, await self.func(*args)
else:
return payload, headers, self.func(*args)
# Run sync entrypoints in a worker thread: executing them inline
# would block the event loop and stall all concurrent requests if
# the entrypoint does blocking IO or heavy computation.
return payload, headers, await asyncio.to_thread(self.func, *args)

def _convert_to_sse(self, obj) -> bytes:
"""Convert object to Server-Sent Events format using safe serialization.
Expand Down
31 changes: 10 additions & 21 deletions agentkit/apps/simple_app/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@

import logging
import time
import traceback
from typing import Callable

from opentelemetry import trace
from opentelemetry.trace import get_tracer
from opentelemetry.metrics import get_meter
from opentelemetry.trace.span import Span

from agentkit.apps.utils import safe_serialize_to_json_string
from agentkit.apps.utils import (
SENSITIVE_HEADERS,
dont_throw,
safe_serialize_to_json_string,
)

_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
Expand All @@ -46,26 +49,11 @@

logger = logging.getLogger("agentkit." + __name__)

_EXCLUDED_HEADERS = SENSITIVE_HEADERS

def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.

@param func: The function to wrap
@return: The wrapper function
"""

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logger.error(
"Agentkit failed to trace in %s, error: %s",
func.__name__,
traceback.format_exc(),
)

return wrapper
def _redact_headers(headers: dict) -> dict:
return {k: v for k, v in headers.items() if k.lower() not in _EXCLUDED_HEADERS}


class Telemetry:
Expand Down Expand Up @@ -105,7 +93,8 @@ def trace_agent(
span.set_attribute(key="gen_ai.func_name", value=func.__name__)

span.set_attribute(
key="gen_ai.request.headers", value=safe_serialize_to_json_string(headers)
key="gen_ai.request.headers",
value=safe_serialize_to_json_string(_redact_headers(headers)),
)
session_id = headers.get("session_id")
if session_id:
Expand Down
38 changes: 38 additions & 0 deletions agentkit/apps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,47 @@

import json
import logging
import traceback

logger = logging.getLogger("agentkit." + __name__)

# Credential-bearing headers that must never be recorded on telemetry spans:
# span attributes bypass the logging redaction filter. Single source of truth
# for every app's telemetry/middleware header filtering.
SENSITIVE_HEADERS = frozenset(
{
"authorization",
"proxy-authorization",
"token",
"x-security-token", # STS credentials (see agentkit.auth._sigv4)
"x-api-key",
"api-key",
"cookie",
"set-cookie",
}
)


def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.

@param func: The function to wrap
@return: The wrapper function
"""

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logger.error(
"Agentkit failed to trace in %s, error: %s",
func.__name__,
traceback.format_exc(),
)

return wrapper


def safe_serialize_to_json_string(obj):
"""Safely serialize object directly to JSON string with progressive fallback handling.
Expand Down
Loading
Loading