Streaming Events¶
Zap provides real-time streaming of events during task execution via an async generator API. This allows you to display progress to users, log events, or build reactive UIs.
Basic Usage¶
Use stream_task() instead of execute_task() to receive events as they occur:
from zap_ai import Zap, ZapAgent
from zap_ai.streaming import ThinkingEvent, ToolCallEvent, ToolResultEvent, CompletedEvent, ErrorEvent
agent = ZapAgent(
name="Assistant",
prompt="You are a helpful assistant.",
model="gpt-4o",
mcp_clients=[...],
)
zap = Zap(agents=[agent])
async def main():
await zap.start()
async for event in zap.stream_task(agent_name="Assistant", task="What's the weather?"):
match event:
case ThinkingEvent(iteration=i):
print(f"Thinking (iteration {i})...")
case ToolCallEvent(name=name, phrase=phrase):
print(phrase) # e.g., "Getting weather for London..."
case ToolResultEvent(name=name, success=success):
status = "✓" if success else "✗"
print(f"{status} {name} completed")
case CompletedEvent(result=result):
print(f"\nDone: {result}")
case ErrorEvent(error=error):
print(f"\nError: {error}")
Event Types¶
All events inherit from StreamEvent and include:
type- Event type string (e.g., "thinking", "tool_call")timestamp- ISO 8601 timestamptask_id- ID of the taskseq- Sequence number for ordering
ThinkingEvent¶
Emitted when the LLM starts processing an iteration.
@dataclass
class ThinkingEvent(StreamEvent):
iteration: int # Loop iteration number (0-indexed)
ToolCallEvent¶
Emitted when the agent calls a tool.
@dataclass
class ToolCallEvent(StreamEvent):
name: str # Tool name
arguments: dict # Tool arguments
phrase: str # Human-readable description
tool_call_id: str # Unique ID for this call
The phrase field provides a user-friendly description generated from the tool's MCP description. For example:
- Tool description: "Get weather for a city"
- Arguments:
{"city": "London"} - Phrase: "Getting weather for London..."
ToolResultEvent¶
Emitted when a tool execution completes.
@dataclass
class ToolResultEvent(StreamEvent):
name: str # Tool name
result: str # Tool result (stringified)
tool_call_id: str # Matches the ToolCallEvent
success: bool # Whether execution succeeded
CompletedEvent¶
Emitted when the task completes successfully.
@dataclass
class CompletedEvent(StreamEvent):
result: str # Final result from the agent
ErrorEvent¶
Emitted when the task fails.
@dataclass
class ErrorEvent(StreamEvent):
error: str # Error message
TokenEvent¶
Reserved for future token-level streaming (Phase 2).
@dataclass
class TokenEvent(StreamEvent):
token: str # Individual token
index: int # Token index in response
Configuration Options¶
stream_task() accepts the same parameters as execute_task(), plus:
| Parameter | Type | Default | Description |
|---|---|---|---|
poll_interval |
float |
0.1 |
Seconds between polling for events |
include_thinking |
bool |
True |
Include ThinkingEvent in stream |
include_tool_events |
bool |
True |
Include ToolCallEvent/ToolResultEvent |
Example with filtering:
# Only receive completion events (skip thinking and tool events)
async for event in zap.stream_task(
agent_name="Assistant",
task="Calculate something",
include_thinking=False,
include_tool_events=False,
):
if isinstance(event, CompletedEvent):
print(f"Result: {event.result}")
Architecture¶
Streaming uses Temporal query polling:
- The workflow emits events to an internal buffer as it runs
stream_task()polls the workflow viaget_eventsquery- Events are yielded to the async generator as they're discovered
- The buffer is bounded to prevent memory issues (last 500 events kept)
This approach provides ~100ms latency (configurable via poll_interval) and works reliably with Temporal's deterministic workflow model.
Example: Progress Display¶
import sys
from zap_ai.streaming import ThinkingEvent, ToolCallEvent, ToolResultEvent, CompletedEvent
async def run_with_progress(zap, task: str):
tool_count = 0
async for event in zap.stream_task(agent_name="Assistant", task=task):
match event:
case ThinkingEvent():
sys.stdout.write(".")
sys.stdout.flush()
case ToolCallEvent(phrase=phrase):
tool_count += 1
print(f"\n[{tool_count}] {phrase}")
case ToolResultEvent(success=success):
print(" └─ " + ("Done" if success else "Failed"))
case CompletedEvent(result=result):
print(f"\n\nCompleted: {result}")
return result
# Usage
result = await run_with_progress(zap, "What's the weather in Paris?")