Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/agents/tracing/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,23 @@ def export(self, items: list[Trace | Span[Any]]) -> None:


class BackendSpanExporter(TracingExporter):
_OPENAI_TRACING_INGEST_ENDPOINT = "https://api.openai.com/v1/traces/ingest"
_OPENAI_TRACING_ALLOWED_USAGE_KEYS = frozenset(
{
"input_tokens",
"output_tokens",
"total_tokens",
"input_tokens_details",
"output_tokens_details",
}
)

def __init__(
self,
api_key: str | None = None,
organization: str | None = None,
project: str | None = None,
endpoint: str = "https://api.openai.com/v1/traces/ingest",
endpoint: str = _OPENAI_TRACING_INGEST_ENDPOINT,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
Expand Down Expand Up @@ -103,7 +114,14 @@ def export(self, items: list[Trace | Span[Any]]) -> None:
logger.warning("OPENAI_API_KEY is not set, skipping trace export")
continue

data = [item.export() for item in grouped if item.export()]
sanitize_for_openai = self._should_sanitize_for_openai_tracing_api()
data: list[dict[str, Any]] = []
for item in grouped:
exported = item.export()
if exported:
if sanitize_for_openai:
exported = self._sanitize_for_openai_tracing_api(exported)
data.append(exported)
payload = {"data": data}

headers = {
Expand Down Expand Up @@ -160,6 +178,36 @@ def export(self, items: list[Trace | Span[Any]]) -> None:
time.sleep(sleep_time)
delay = min(delay * 2, self.max_delay)

def _should_sanitize_for_openai_tracing_api(self) -> bool:
return self.endpoint.rstrip("/") == self._OPENAI_TRACING_INGEST_ENDPOINT.rstrip("/")

def _sanitize_for_openai_tracing_api(self, payload_item: dict[str, Any]) -> dict[str, Any]:
"""Drop fields known to be rejected by OpenAI tracing ingestion."""
span_data = payload_item.get("span_data")
if not isinstance(span_data, dict):
return payload_item

if span_data.get("type") != "generation":
return payload_item

usage = span_data.get("usage")
if not isinstance(usage, dict):
return payload_item

filtered_usage = {
key: value
for key, value in usage.items()
if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS
}
if filtered_usage == usage:
return payload_item

sanitized_span_data = dict(span_data)
sanitized_span_data["usage"] = filtered_usage
sanitized_payload_item = dict(payload_item)
sanitized_payload_item["span_data"] = sanitized_span_data
return sanitized_payload_item

def close(self):
"""Close the underlying HTTP client."""
self._client.close()
Expand Down
144 changes: 144 additions & 0 deletions tests/test_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,147 @@ def test_backend_span_exporter_close(mock_client):

# Ensure underlying http client is closed
mock_client.return_value.close.assert_called_once()


@patch("httpx.Client")
def test_backend_span_exporter_sanitizes_generation_usage_for_openai_tracing(mock_client):
"""Unsupported usage keys should be stripped before POSTing to OpenAI tracing."""

class DummyItem:
tracing_api_key = None

def __init__(self):
self.exported_payload = {
"object": "trace.span",
"span_data": {
"type": "generation",
"usage": {
"requests": 1,
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15,
"input_tokens_details": {"cached_tokens": 1},
"output_tokens_details": {"reasoning_tokens": 2},
},
},
}

def export(self):
return self.exported_payload

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key")
item = DummyItem()
exporter.export([item])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
sent_usage = sent_payload["span_data"]["usage"]
assert "requests" not in sent_usage
assert sent_usage["input_tokens"] == 10
assert sent_usage["output_tokens"] == 5
assert sent_usage["total_tokens"] == 15
assert sent_usage["input_tokens_details"] == {"cached_tokens": 1}
assert sent_usage["output_tokens_details"] == {"reasoning_tokens": 2}

# Ensure the original exported object has not been mutated.
assert "requests" in item.exported_payload["span_data"]["usage"]
exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_keeps_generation_usage_for_custom_endpoint(mock_client):
class DummyItem:
tracing_api_key = None

def __init__(self):
self.exported_payload = {
"object": "trace.span",
"span_data": {
"type": "generation",
"usage": {
"requests": 1,
"input_tokens": 10,
"output_tokens": 5,
},
},
}

def export(self):
return self.exported_payload

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(
api_key="test_key",
endpoint="https://example.com/v1/traces/ingest",
)
exporter.export([DummyItem()])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
assert sent_payload["span_data"]["usage"]["requests"] == 1
assert sent_payload["span_data"]["usage"]["input_tokens"] == 10
assert sent_payload["span_data"]["usage"]["output_tokens"] == 5
exporter.close()


@patch("httpx.Client")
def test_backend_span_exporter_does_not_modify_non_generation_usage(mock_client):
class DummyItem:
tracing_api_key = None

def export(self):
return {
"object": "trace.span",
"span_data": {
"type": "function",
"usage": {"requests": 1},
},
}

mock_response = MagicMock()
mock_response.status_code = 200
mock_client.return_value.post.return_value = mock_response

exporter = BackendSpanExporter(api_key="test_key")
exporter.export([DummyItem()])

sent_payload = mock_client.return_value.post.call_args.kwargs["json"]["data"][0]
assert sent_payload["span_data"]["usage"] == {"requests": 1}
exporter.close()


def test_sanitize_for_openai_tracing_api_keeps_allowed_generation_usage():
exporter = BackendSpanExporter(api_key="test_key")
payload = {
"object": "trace.span",
"span_data": {
"type": "generation",
"usage": {
"input_tokens": 1,
"output_tokens": 2,
"total_tokens": 3,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens_details": {"reasoning_tokens": 0},
},
},
}
assert exporter._sanitize_for_openai_tracing_api(payload) is payload
exporter.close()


def test_sanitize_for_openai_tracing_api_skips_non_dict_generation_usage():
exporter = BackendSpanExporter(api_key="test_key")
payload = {
"object": "trace.span",
"span_data": {
"type": "generation",
"usage": None,
},
}
assert exporter._sanitize_for_openai_tracing_api(payload) is payload
exporter.close()
Loading