Skip to content

Streaming API

The streaming module provides event types for real-time task progress monitoring.

Event Types

All events inherit from StreamEvent and can be used with Zap.stream_task().

StreamEvent

Base class for all streaming events.

zap_ai.streaming.StreamEvent dataclass

Base class for all streaming events.

Attributes:

Name Type Description
type str

Event type identifier.

timestamp str

ISO format timestamp when event occurred.

task_id str

ID of the task this event belongs to.

seq int

Sequence number for ordering events.

Source code in src/zap_ai/streaming/events.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class StreamEvent:
    """
    Base class for all streaming events.

    Attributes:
        type: Event type identifier.
        timestamp: ISO format timestamp when event occurred.
        task_id: ID of the task this event belongs to.
        seq: Sequence number for ordering events.
    """

    type: str
    timestamp: str
    task_id: str
    seq: int

ThinkingEvent

Emitted when the LLM starts processing an iteration.

zap_ai.streaming.ThinkingEvent dataclass

Bases: StreamEvent

Emitted when the LLM starts reasoning/generating.

Attributes:

Name Type Description
iteration int

Current agentic loop iteration number.

Source code in src/zap_ai/streaming/events.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class ThinkingEvent(StreamEvent):
    """
    Emitted when the LLM starts reasoning/generating.

    Attributes:
        iteration: Current agentic loop iteration number.
    """

    iteration: int

    def __init__(
        self,
        iteration: int,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "thinking"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.iteration = iteration

ToolCallEvent

Emitted when the agent calls a tool.

zap_ai.streaming.ToolCallEvent dataclass

Bases: StreamEvent

Emitted when a tool call is about to be executed.

Attributes:

Name Type Description
name str

Tool name being called.

arguments dict[str, Any]

Tool call arguments.

phrase str

Human-readable description of the tool call.

tool_call_id str

Unique identifier for this tool call.

Source code in src/zap_ai/streaming/events.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@dataclass
class ToolCallEvent(StreamEvent):
    """
    Emitted when a tool call is about to be executed.

    Attributes:
        name: Tool name being called.
        arguments: Tool call arguments.
        phrase: Human-readable description of the tool call.
        tool_call_id: Unique identifier for this tool call.
    """

    name: str
    arguments: dict[str, Any]
    phrase: str
    tool_call_id: str

    def __init__(
        self,
        name: str,
        arguments: dict[str, Any],
        phrase: str,
        tool_call_id: str,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "tool_call"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.name = name
        self.arguments = arguments
        self.phrase = phrase
        self.tool_call_id = tool_call_id

ToolResultEvent

Emitted when a tool execution completes.

zap_ai.streaming.ToolResultEvent dataclass

Bases: StreamEvent

Emitted when a tool call completes.

Attributes:

Name Type Description
name str

Tool name that was called.

result str

Tool execution result (may be truncated).

tool_call_id str

Unique identifier matching the ToolCallEvent.

success bool

Whether the tool executed successfully.

Source code in src/zap_ai/streaming/events.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@dataclass
class ToolResultEvent(StreamEvent):
    """
    Emitted when a tool call completes.

    Attributes:
        name: Tool name that was called.
        result: Tool execution result (may be truncated).
        tool_call_id: Unique identifier matching the ToolCallEvent.
        success: Whether the tool executed successfully.
    """

    name: str
    result: str
    tool_call_id: str
    success: bool

    def __init__(
        self,
        name: str,
        result: str,
        tool_call_id: str,
        success: bool,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "tool_result"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.name = name
        self.result = result
        self.tool_call_id = tool_call_id
        self.success = success

TokenEvent

Reserved for future token-level streaming.

zap_ai.streaming.TokenEvent dataclass

Bases: StreamEvent

Emitted for individual tokens during streaming (Phase 2 only).

Attributes:

Name Type Description
token str

The token text.

index int

Token index in the current generation.

Source code in src/zap_ai/streaming/events.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@dataclass
class TokenEvent(StreamEvent):
    """
    Emitted for individual tokens during streaming (Phase 2 only).

    Attributes:
        token: The token text.
        index: Token index in the current generation.
    """

    token: str
    index: int

    def __init__(
        self,
        token: str,
        index: int,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "token"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.token = token
        self.index = index

CompletedEvent

Emitted when the task completes successfully.

zap_ai.streaming.CompletedEvent dataclass

Bases: StreamEvent

Emitted when the task completes successfully.

Attributes:

Name Type Description
result str

Final result string from the agent.

Source code in src/zap_ai/streaming/events.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@dataclass
class CompletedEvent(StreamEvent):
    """
    Emitted when the task completes successfully.

    Attributes:
        result: Final result string from the agent.
    """

    result: str

    def __init__(
        self,
        result: str,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "completed"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.result = result

ErrorEvent

Emitted when the task fails.

zap_ai.streaming.ErrorEvent dataclass

Bases: StreamEvent

Emitted when the task fails.

Attributes:

Name Type Description
error str

Error message describing the failure.

Source code in src/zap_ai/streaming/events.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@dataclass
class ErrorEvent(StreamEvent):
    """
    Emitted when the task fails.

    Attributes:
        error: Error message describing the failure.
    """

    error: str

    def __init__(
        self,
        error: str,
        task_id: str = "",
        seq: int = 0,
        timestamp: str | None = None,
    ) -> None:
        self.type = "error"
        self.timestamp = timestamp or datetime.now(timezone.utc).isoformat()
        self.task_id = task_id
        self.seq = seq
        self.error = error

Utility Functions

generate_tool_phrase

zap_ai.streaming.generate_tool_phrase(tool_name, description, arguments)

Generate a human-readable phrase for a tool call.

Converts tool descriptions to present participle form and substitutes argument values for a user-friendly status message.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being called.

required
description str | None

Tool description from MCP registry.

required
arguments dict[str, Any]

Arguments being passed to the tool.

required

Returns:

Type Description
str

Human-readable phrase like "Getting weather for London..."

Example

generate_tool_phrase("get_weather", "Get weather for a city", {"city": "London"}) "Getting weather for London..."

Source code in src/zap_ai/streaming/events.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def generate_tool_phrase(tool_name: str, description: str | None, arguments: dict[str, Any]) -> str:
    """
    Generate a human-readable phrase for a tool call.

    Converts tool descriptions to present participle form and substitutes
    argument values for a user-friendly status message.

    Args:
        tool_name: Name of the tool being called.
        description: Tool description from MCP registry.
        arguments: Arguments being passed to the tool.

    Returns:
        Human-readable phrase like "Getting weather for London..."

    Example:
        >>> generate_tool_phrase("get_weather", "Get weather for a city", {"city": "London"})
        "Getting weather for London..."
    """
    if not description:
        return f"Calling {tool_name}..."

    phrase = description

    # Convert to -ing form for common verbs
    verb_mappings = {
        "Get ": "Getting ",
        "Search ": "Searching ",
        "Create ": "Creating ",
        "Update ": "Updating ",
        "Delete ": "Deleting ",
        "Send ": "Sending ",
        "Fetch ": "Fetching ",
        "Find ": "Finding ",
        "Check ": "Checking ",
        "Validate ": "Validating ",
        "Process ": "Processing ",
        "Generate ": "Generating ",
        "Calculate ": "Calculating ",
        "Analyze ": "Analyzing ",
        "Read ": "Reading ",
        "Write ": "Writing ",
        "List ": "Listing ",
        "Query ": "Querying ",
        "Execute ": "Executing ",
        "Run ": "Running ",
    }

    for verb, replacement in verb_mappings.items():
        if phrase.startswith(verb):
            phrase = replacement + phrase[len(verb) :]
            break

    # Substitute argument values into the phrase
    for key, value in arguments.items():
        str_value = str(value)
        # Try various placeholder patterns
        phrase = phrase.replace(f"{{{key}}}", str_value)
        phrase = phrase.replace(f"a {key}", str_value)
        phrase = phrase.replace(f"the {key}", str_value)
        # Also try with underscores converted to spaces
        key_spaced = key.replace("_", " ")
        phrase = phrase.replace(f"a {key_spaced}", str_value)
        phrase = phrase.replace(f"the {key_spaced}", str_value)

    # Ensure phrase ends with ellipsis
    phrase = phrase.rstrip(".")
    if not phrase.endswith("..."):
        phrase += "..."

    return phrase

parse_event

zap_ai.streaming.parse_event(event_data, task_id)

Parse a raw event dictionary into a typed Event object.

Parameters:

Name Type Description Default
event_data dict[str, Any]

Raw event data from workflow query.

required
task_id str

ID of the task this event belongs to.

required

Returns:

Type Description
Event

Typed event object.

Raises:

Type Description
ValueError

If event type is unknown.

Source code in src/zap_ai/streaming/events.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def parse_event(event_data: dict[str, Any], task_id: str) -> Event:
    """
    Parse a raw event dictionary into a typed Event object.

    Args:
        event_data: Raw event data from workflow query.
        task_id: ID of the task this event belongs to.

    Returns:
        Typed event object.

    Raises:
        ValueError: If event type is unknown.
    """
    event_type = event_data.get("type")
    seq = event_data.get("seq", 0)
    timestamp = event_data.get("timestamp", "")

    if event_type == "thinking":
        return ThinkingEvent(
            iteration=event_data.get("iteration", 0),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    elif event_type == "tool_call":
        return ToolCallEvent(
            name=event_data.get("name", ""),
            arguments=event_data.get("arguments", {}),
            phrase=event_data.get("phrase", ""),
            tool_call_id=event_data.get("tool_call_id", ""),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    elif event_type == "tool_result":
        return ToolResultEvent(
            name=event_data.get("name", ""),
            result=event_data.get("result", ""),
            tool_call_id=event_data.get("tool_call_id", ""),
            success=event_data.get("success", True),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    elif event_type == "token":
        return TokenEvent(
            token=event_data.get("token", ""),
            index=event_data.get("index", 0),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    elif event_type == "completed":
        return CompletedEvent(
            result=event_data.get("result", ""),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    elif event_type == "error":
        return ErrorEvent(
            error=event_data.get("error", ""),
            task_id=task_id,
            seq=seq,
            timestamp=timestamp,
        )
    else:
        raise ValueError(f"Unknown event type: {event_type}")