Skip to content
62 changes: 62 additions & 0 deletions contributing/samples/agent_tool_event_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# AgentTool Event Streaming Demo

This sample demonstrates the AgentTool event streaming feature (Issue #3984).


**Before the fix:**
- When a coordinator agent delegates to a sub-agent via AgentTool, the sub-agent's execution acts as a "black box"
- No events are yielded during sub-agent execution
- Frontend appears unresponsive for the duration of sub-agent execution
- Only the final result is returned after sub-agent completes

**After the fix:**
- Sub-agent events are streamed in real-time to the parent Runner
- Frontend receives immediate feedback about sub-agent progress
- Users can see intermediate steps, tool calls, and responses as they happen
- Much better UX for hierarchical multi-agent systems

## Running the Demo

```bash
cd contributing/samples
adk web .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The command to run the demo appears to be incorrect. The PR description suggests running adk web . from within the agent_tool_event_streaming directory. To align with that and ensure the sample runs correctly, this path should be updated.

Suggested change
cd contributing/samples
adk web .
cd contributing/samples/agent_tool_event_streaming
adk web .


```

Then in the web UI, select agent_tool_event_streaming from the dropdown
1. Ask: "Research the history of artificial intelligence"
2. Watch the events stream in real-time - you'll see:
- Coordinator agent's function call
- Research agent's step-by-step progress
- Research agent's intermediate responses
- Final summary


## Expected Behavior

With event streaming enabled, you should see:

1. **Coordinator events:**
- Function call to `research_agent`

2. **Research agent events (streamed in real-time):**
- "Step 1: Acknowledging task..."
- "Step 2: Researching topic..."
- "Step 3: Analyzing findings..."
- "Final summary: ..."

3. **Coordinator final response:**
- Summary of the research

All events should appear progressively, not all at once at the end.

## Before/After Comparison

To see the difference:

1. **Before fix:** Run on a branch without the event streaming feature
- You'll see: Coordinator call → (long pause) → Final result

2. **After fix:** Run on this branch
- You'll see: Coordinator call → Research steps streaming → Final result

17 changes: 17 additions & 0 deletions contributing/samples/agent_tool_event_streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .agent import root_agent

__all__ = ['root_agent']
66 changes: 66 additions & 0 deletions contributing/samples/agent_tool_event_streaming/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample demonstrating AgentTool event streaming.

This sample shows how events from sub-agents wrapped in AgentTool are
streamed to the parent Runner in real-time, providing visibility into
sub-agent execution progress.

Before the fix: Sub-agent events are buffered until completion, making
the frontend appear unresponsive during long-running sub-agent tasks.

After the fix: Sub-agent events are streamed immediately, providing
real-time feedback to the frontend.
"""

from google.adk import Agent
from google.adk.tools import AgentTool

# Sub-agent that performs a multi-step task
research_agent = Agent(
name='research_agent',
model='gemini-2.5-flash-lite',
description='A research agent that performs multi-step research tasks',
instruction="""
You are a research assistant. When given a research task, break it down
into steps and report your progress as you work:

1. First, acknowledge the task and outline your approach
2. Then, perform the research (simulate by thinking through the steps)
3. Finally, provide a comprehensive summary

Always be verbose about your progress so the user can see what you're doing.
""",
)

# Coordinator agent that delegates to the research agent
coordinator_agent = Agent(
name='coordinator_agent',
model='gemini-2.5-flash-lite',
description='A coordinator that delegates research tasks',
instruction="""
You are a coordinator agent. When users ask research questions, delegate
them to the research_agent tool. Always use the research_agent tool for
any research-related queries.
""",
tools=[
AgentTool(
agent=research_agent,
skip_summarization=True,
)
],
)

root_agent = coordinator_agent
33 changes: 18 additions & 15 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,27 @@ async def _postprocess_handle_function_calls_async(
function_call_event: Event,
llm_request: LlmRequest,
) -> AsyncGenerator[Event, None]:
if function_response_event := await functions.handle_function_calls_async(
# Handle function calls with AgentTool event streaming (handles both AgentTool and regular calls)
function_response_event = None
async for (
event
) in functions.handle_function_calls_async_with_agent_tool_streaming(
invocation_context, function_call_event, llm_request.tools_dict
):
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Always yield the function response event first
yield function_response_event
# Track the function response event for post-processing
if (
event.content
and event.content.parts
and any(
part.function_response
for part in event.content.parts
if part.function_response
)
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This condition to check for a function response can be simplified by using the event.get_function_responses() helper method. This will make the code more concise and readable.

      if event.get_function_responses():

function_response_event = event
yield event

if function_response_event:
# Check if this is a set_model_response function response
if json_response := _output_schema_processor.get_structured_model_response(
function_response_event
Expand Down
140 changes: 140 additions & 0 deletions src/google/adk/flows/llm_flows/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,146 @@ def generate_request_confirmation_event(
)


async def handle_function_calls_async_with_agent_tool_streaming(
invocation_context: InvocationContext,
function_call_event: Event,
tools_dict: dict[str, BaseTool],
) -> AsyncGenerator[Event, None]:
"""Handles function calls with event streaming for AgentTool.

Yields events from AgentTool sub-agents as they are generated, then
yields the final function response event.
"""
from ...agents.llm_agent import LlmAgent
from ...tools.agent_tool import AgentTool

function_calls = function_call_event.get_function_calls()
if not function_calls:
return

agent_tool_calls = []
regular_calls = []

# Separate AgentTool calls from regular calls
for function_call in function_calls:
tool = tools_dict.get(function_call.name)
if isinstance(tool, AgentTool):
agent_tool_calls.append((function_call, tool))
else:
regular_calls.append(function_call)

# If no AgentTool calls, use normal flow
if not agent_tool_calls:
function_response_event = await handle_function_calls_async(
invocation_context, function_call_event, tools_dict
)
if function_response_event:
auth_event = generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event
tool_confirmation_event = generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event
yield function_response_event
return
Comment on lines +217 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's some code duplication between this block for handling non-AgentTool calls and the block at the end of the function (lines 308-320) that handles AgentTool call responses. Both blocks generate and yield auth_event, tool_confirmation_event, and the final function_response_event.

To improve maintainability and reduce redundancy, consider extracting this common logic into a helper async generator function. This would centralize the event generation and yielding process.


# Stream events from AgentTool sub-agents
agent_tool_results = {}
for function_call, agent_tool in agent_tool_calls:
tool_context = _create_tool_context(invocation_context, function_call, None)
last_content = None

async for event in agent_tool.run_async_with_events(
args=function_call.args or {}, tool_context=tool_context
):
yield event
if event.content:
last_content = event.content

# Build final result from last content
if last_content:
merged_text = '\n'.join(p.text for p in last_content.parts if p.text)
if (
isinstance(agent_tool.agent, LlmAgent)
and agent_tool.agent.output_schema
):
tool_result = agent_tool.agent.output_schema.model_validate_json(
merged_text
).model_dump(exclude_none=True)
else:
tool_result = merged_text
if not isinstance(tool_result, dict):
tool_result = {'result': tool_result}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check to ensure tool_result is a dictionary is redundant. The __build_response_event function, which is called later with this result, already performs the same check. Removing this duplicated validation will simplify the code.

agent_tool_results[function_call.id] = tool_result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to wrap tool_result in a dictionary is redundant here. The __build_response_event function, which is called later with this result, already handles wrapping non-dictionary results. Removing this logic will make the code cleaner and more consistent with the non-streaming tool execution path.

      agent_tool_results[function_call.id] = tool_result


# Handle regular calls if any
regular_response_event = None
if regular_calls:
regular_call_event = Event(
invocation_id=function_call_event.invocation_id,
author=function_call_event.author,
content=types.Content(
role='user',
parts=[
part
for part in (function_call_event.content.parts or [])
if part.function_call
and part.function_call.name
not in [fc.name for fc, _ in agent_tool_calls]
],
),
branch=function_call_event.branch,
)
regular_response_event = await handle_function_calls_async(
invocation_context, regular_call_event, tools_dict
)

# Build AgentTool response events
agent_tool_response_events = []
for function_call, agent_tool in agent_tool_calls:
if function_call.id in agent_tool_results:
tool_context = _create_tool_context(
invocation_context, function_call, None
)
response_event = __build_response_event(
agent_tool,
agent_tool_results[function_call.id],
tool_context,
invocation_context,
)
agent_tool_response_events.append(response_event)

# Merge all response events
all_events = []
if regular_response_event:
all_events.append(regular_response_event)
all_events.extend(agent_tool_response_events)

if all_events:
if len(all_events) == 1:
final_response_event = all_events[0]
else:
final_response_event = merge_parallel_function_response_events(all_events)

# Yield auth and confirmation events
auth_event = generate_auth_event(invocation_context, final_response_event)
if auth_event:
yield auth_event

tool_confirmation_event = generate_request_confirmation_event(
invocation_context, function_call_event, final_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Yield the final function response event
yield final_response_event


async def handle_function_calls_async(
invocation_context: InvocationContext,
function_call_event: Event,
Expand Down
Loading