Skip to content

Core API

The core module contains the main classes for building and running agents.

Zap

The main orchestrator class that manages agents and Temporal connections.

zap_ai.Zap dataclass

Bases: Generic[TContext]

Main orchestrator for the Zap AI agent platform.

Zap manages a collection of agents and provides methods to execute tasks against them. It supports a generic context type that can be passed to agents with dynamic prompts. It handles: - Agent configuration validation at build time - Temporal client connection management - Task execution via Temporal workflows - Task status queries

Example
from zap_ai import Zap, ZapAgent

# Simple usage with static prompts
agents = [
    ZapAgent(name="MainAgent", prompt="You are helpful..."),
    ZapAgent(name="HelperAgent", prompt="You assist with..."),
]

zap = Zap(agents=agents)
await zap.start()

task = await zap.execute_task(
    agent_name="MainAgent",
    task="Help me with something",
)

# With typed context and dynamic prompts
from dataclasses import dataclass

@dataclass
class MyContext:
    user_name: str
    company: str

agent = ZapAgent[MyContext](
    name="Helper",
    prompt=lambda ctx: f"You assist {ctx.user_name} from {ctx.company}.",
)

zap: Zap[MyContext] = Zap(agents=[agent])
await zap.start()

task = await zap.execute_task(
    agent_name="Helper",
    task="Help me with something",
    context=MyContext(user_name="Alice", company="Acme"),
)

Attributes:

Name Type Description
agents list[ZapAgent[TContext]]

List of ZapAgent configurations. Validated at instantiation.

temporal_client Client | None

Optional pre-configured Temporal client. If None, a default connection to localhost:7233 is created in start().

task_queue str

Temporal task queue name for agent workflows. Default is "zap-agents".

Source code in src/zap_ai/core/zap.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
@dataclass
class Zap(Generic[TContext]):
    """
    Main orchestrator for the Zap AI agent platform.

    Zap manages a collection of agents and provides methods to execute
    tasks against them. It supports a generic context type that can be
    passed to agents with dynamic prompts. It handles:
    - Agent configuration validation at build time
    - Temporal client connection management
    - Task execution via Temporal workflows
    - Task status queries

    Example:
        ```python
        from zap_ai import Zap, ZapAgent

        # Simple usage with static prompts
        agents = [
            ZapAgent(name="MainAgent", prompt="You are helpful..."),
            ZapAgent(name="HelperAgent", prompt="You assist with..."),
        ]

        zap = Zap(agents=agents)
        await zap.start()

        task = await zap.execute_task(
            agent_name="MainAgent",
            task="Help me with something",
        )

        # With typed context and dynamic prompts
        from dataclasses import dataclass

        @dataclass
        class MyContext:
            user_name: str
            company: str

        agent = ZapAgent[MyContext](
            name="Helper",
            prompt=lambda ctx: f"You assist {ctx.user_name} from {ctx.company}.",
        )

        zap: Zap[MyContext] = Zap(agents=[agent])
        await zap.start()

        task = await zap.execute_task(
            agent_name="Helper",
            task="Help me with something",
            context=MyContext(user_name="Alice", company="Acme"),
        )
        ```

    Attributes:
        agents: List of ZapAgent configurations. Validated at instantiation.
        temporal_client: Optional pre-configured Temporal client. If None,
            a default connection to localhost:7233 is created in start().
        task_queue: Temporal task queue name for agent workflows.
            Default is "zap-agents".
    """

    # Configuration (set at init)
    agents: list[ZapAgent[TContext]]
    temporal_client: TemporalClient | None = None
    task_queue: str = "zap-agents"
    tracing_provider: TracingProvider | None = None

    # Internal state (populated after init/start)
    _agent_map: dict[str, ZapAgent[TContext]] = field(default_factory=dict, init=False, repr=False)
    _started: bool = field(default=False, init=False, repr=False)
    _tool_registry: ToolRegistry | None = field(default=None, init=False, repr=False)
    _owns_temporal_client: bool = field(default=False, init=False, repr=False)
    _tracing: TracingProvider = field(init=False, repr=False)

    def __post_init__(self) -> None:
        """
        Validate configuration at build time.

        Called automatically after dataclass initialization. Performs:
        1. Duplicate agent name detection
        2. Sub-agent reference validation
        3. Circular dependency detection
        4. Builds internal agent lookup map
        5. Initializes tracing provider

        Raises:
            ZapConfigurationError: If any validation fails.
        """
        validate_no_duplicate_names(self.agents)
        self._agent_map = build_agent_map(self.agents)
        validate_sub_agent_references(self.agents, self._agent_map)
        validate_no_circular_dependencies(self.agents, self._agent_map)

        # Initialize tracing (use NoOp if not configured)
        self._tracing = self.tracing_provider or NoOpTracingProvider()

    def get_agent(self, name: str) -> ZapAgent[TContext]:
        """
        Get an agent by name.

        Args:
            name: The agent name to look up.

        Returns:
            The ZapAgent with the given name.

        Raises:
            AgentNotFoundError: If no agent with that name exists.
        """
        if name not in self._agent_map:
            raise AgentNotFoundError(
                f"Agent '{name}' not found. Available agents: {sorted(self._agent_map.keys())}"
            )
        return self._agent_map[name]

    def list_agents(self) -> list[str]:
        """Return list of all agent names."""
        return list(self._agent_map.keys())

    async def get_agent_tools(self, agent_name: str) -> list[str]:
        """
        Get list of tool names available to an agent.

        Useful for validating approval patterns before execution.

        Args:
            agent_name: Name of the agent.

        Returns:
            List of tool names available to the agent.

        Raises:
            ZapNotStartedError: If start() hasn't been called.
            AgentNotFoundError: If agent doesn't exist.

        Example:
            ```python
            tools = await zap.get_agent_tools("financial_agent")
            # ['transfer_funds', 'check_balance', 'delete_transaction']

            # Validate approval patterns
            rules = ApprovalRules(patterns=["transfer_*"])
            print(rules.preview_matches(tools))
            # {'transfer_*': ['transfer_funds']}
            ```
        """
        self._ensure_started()

        # Validate agent exists
        self.get_agent(agent_name)

        if not self._tool_registry:
            return []

        return self._tool_registry.get_tool_names(agent_name)

    async def start(self) -> None:
        """
        Initialize Temporal connection and MCP clients.

        Must be called before execute_task() or get_task(). This method:
        1. Connects to Temporal (if client not provided)
        2. Initializes the tool registry
        3. Pre-connects all MCP clients and discovers tools

        Raises:
            RuntimeError: If start() has already been called.
            ConnectionError: If Temporal connection fails.

        Example:
            ```python
            zap = Zap(agents=[...])
            await zap.start()  # Must call before using
            ```
        """
        if self._started:
            raise RuntimeError("Zap has already been started. Cannot call start() twice.")

        # Connect to Temporal if client not provided
        if self.temporal_client is None:
            from temporalio.client import Client

            self.temporal_client = await Client.connect("localhost:7233")
            self._owns_temporal_client = True

        # Initialize tool registry
        from zap_ai.mcp import ToolRegistry

        self._tool_registry = ToolRegistry()

        # Register all agents (connects MCP clients, discovers tools)
        await self._tool_registry.register_agents(self.agents, self._agent_map)

        # Set registry for activities
        from zap_ai.activities.tool_execution import set_tool_registry

        set_tool_registry(self._tool_registry)

        self._started = True

    def _ensure_started(self) -> None:
        """Raise if start() hasn't been called."""
        if not self._started:
            raise ZapNotStartedError("Zap has not been started. Call 'await zap.start()' first.")

    @property
    def tool_registry(self) -> "ToolRegistry | None":
        """
        Get the tool registry for worker setup.

        Returns the internal ToolRegistry instance for use with `create_worker`.
        Returns None if Zap hasn't been started.

        Example:
            ```python
            zap = Zap(agents=[...])
            await zap.start()

            worker = await create_worker(
                temporal_client,
                task_queue=zap.task_queue,
                tool_registry=zap.tool_registry,
            )
            ```
        """
        return self._tool_registry

    def _serialize_context_with_metadata(self, context: Any) -> dict[str, Any]:
        """
        Serialize context with type metadata for reconstruction.

        Automatically handles:
        - Pydantic models (via model_dump)
        - Dataclasses (via __dict__)
        - Plain dicts (passthrough)

        Returns dict with __zap_context_type__ metadata for non-dict types.
        """
        if context is None:
            return {}

        # Plain dict - no metadata needed
        if isinstance(context, dict):
            return context

        # Determine type info
        context_type = type(context)
        type_id = f"{context_type.__module__}.{context_type.__qualname__}"

        # Serialize data
        if hasattr(context, "model_dump"):  # Pydantic model
            data = context.model_dump()
        elif hasattr(context, "__dataclass_fields__"):  # Dataclass
            from dataclasses import asdict

            data = asdict(context)
        else:
            raise ValueError(f"Cannot serialize context of type {type(context)}")

        # Add metadata (inline)
        data["__zap_context_type__"] = type_id
        data["__zap_context_version__"] = "1"

        return data

    async def execute_task(
        self,
        agent_name: str | None = None,
        task: str | None = None,
        task_content: MessageContent | None = None,
        follow_up_on_task: str | None = None,
        context: TContext | None = None,
        approval_rules: "ApprovalRules | None" = None,
    ) -> Task:
        """
        Execute a new task or follow up on an existing one.

        For new tasks, starts a Temporal workflow for the specified agent.
        For follow-ups, sends a signal to the existing workflow.

        Args:
            agent_name: Name of the agent to execute the task. Required for
                new tasks, ignored for follow-ups (uses original agent).
            task: The task description/prompt to send to the agent (text only).
                Either task or task_content is required.
            task_content: Multimodal task content (text + images). If provided,
                takes precedence over `task`. Use this for vision tasks.
            follow_up_on_task: If provided, sends the task as a follow-up
                message to an existing task instead of starting a new one.
            context: Optional context to pass to agents with dynamic prompts
                and MCP tools. For dynamic prompts, this is used to resolve
                the prompt at runtime. For MCP tools, this is passed via the
                `meta` parameter and can be accessed using `CurrentZapContext()`.
                Defaults to {} if not provided.
            approval_rules: Optional rules for human-in-the-loop approval.
                When provided, tool calls matching the patterns will pause
                for human approval before execution.

        Returns:
            Task object with initial state. Use get_task() to poll for updates.

        Raises:
            ZapNotStartedError: If start() hasn't been called.
            AgentNotFoundError: If agent_name doesn't exist (new tasks only).
            TaskNotFoundError: If follow_up_on_task doesn't exist.
            VisionNotSupportedError: If task contains images but model doesn't
                support vision.
            ValueError: If required arguments are missing.

        Example (new task):
            ```python
            task = await zap.execute_task(
                agent_name="MyAgent",
                task="Analyze this data and summarize findings",
            )
            ```

        Example (with images - multimodal):
            ```python
            from zap_ai import TextContent, ImageContent

            task = await zap.execute_task(
                agent_name="VisionAgent",
                task_content=[
                    TextContent(text="What's in this image?"),
                    ImageContent.from_url("https://example.com/photo.jpg"),
                ],
            )
            ```

        Example (with context):
            ```python
            task = await zap.execute_task(
                agent_name="Helper",
                task="Help me with something",
                context=MyContext(user_name="Alice", company="Acme"),
            )
            ```

        Example (with approval rules):
            ```python
            from zap_ai import ApprovalRules

            task = await zap.execute_task(
                agent_name="FinancialAgent",
                task="Transfer $50,000 to vendor",
                approval_rules=ApprovalRules(patterns=["transfer_*", "delete_*"]),
            )
            # Later, check for pending approvals
            pending = await task.get_pending_approvals()
            await task.approve(pending[0].id)
            ```

        Example (follow-up):
            ```python
            task = await zap.execute_task(
                follow_up_on_task=task.id,
                task="Now export the summary to PDF",
            )
            ```
        """
        self._ensure_started()

        # Determine effective content
        effective_content: MessageContent
        if task_content is not None:
            effective_content = task_content
        elif task is not None:
            effective_content = task
        else:
            raise ValueError("Either 'task' or 'task_content' argument is required")

        if follow_up_on_task is not None:
            return await self._follow_up_task(follow_up_on_task, effective_content)

        # New task
        if agent_name is None:
            raise ValueError("agent_name is required for new tasks")

        # Validate agent exists and get agent config
        agent = self.get_agent(agent_name)

        # Validate vision support if content contains images
        if content_has_images(effective_content):
            from zap_ai.llm.provider import supports_vision

            if not supports_vision(agent.model):
                raise VisionNotSupportedError(
                    f"Model '{agent.model}' does not support vision. "
                    f"Cannot process task with images."
                )

        # Use default empty dict if no context provided
        effective_context: TContext = context if context is not None else {}  # type: ignore[assignment]

        # Warn if agent has dynamic prompt but no context provided
        if agent.is_dynamic_prompt() and context is None:
            warnings.warn(
                f"Agent '{agent_name}' has a dynamic prompt but no context was provided. "
                "The prompt will be called with an empty dict. "
                "Consider providing context via execute_task(context=...).",
                UserWarning,
                stacklevel=2,
            )

        # Resolve the prompt with context
        resolved_prompt = agent.resolve_prompt(effective_context)

        # Serialize context for MCP tools
        serialized_context: dict[str, Any] | None = None
        if context is not None:
            serialized_context = self._serialize_context_with_metadata(context)

        # Generate task ID
        task_id = f"{agent_name}-{uuid4().hex[:12]}"

        # Get tools for this agent
        tools: list[dict[str, Any]] = []
        tool_descriptions: dict[str, str] = {}
        if self._tool_registry:
            tools = self._tool_registry.get_tools_for_agent(agent_name)
            tool_descriptions = self._tool_registry.get_tool_descriptions(agent_name)

        # Validate approval rules if provided
        if approval_rules:
            tool_names = await self.get_agent_tools(agent_name)
            unmatched = approval_rules.get_unmatched_patterns(tool_names)
            if unmatched:
                warnings.warn(
                    f"Approval patterns don't match any tools: {unmatched}. "
                    f"Available tools: {tool_names}",
                    UserWarning,
                    stacklevel=2,
                )

        # Format content for workflow (convert ContentPart objects to dicts)
        initial_task = _format_content_for_workflow(effective_content)

        # Start Temporal workflow
        from zap_ai.workflows.agent_workflow import AgentWorkflow
        from zap_ai.workflows.models import AgentWorkflowInput

        await self.temporal_client.start_workflow(  # type: ignore[union-attr]
            AgentWorkflow.run,
            AgentWorkflowInput(
                agent_name=agent_name,
                initial_task=initial_task,
                system_prompt=resolved_prompt,
                model=agent.model,
                tools=tools,
                max_iterations=agent.max_iterations,
                temperature=agent.temperature,
                max_tokens=agent.max_tokens,
                approval_rules=approval_rules.to_dict() if approval_rules else None,
                tool_descriptions=tool_descriptions,
                context=serialized_context,
            ),
            id=task_id,
            task_queue=self.task_queue,
        )

        return Task(
            id=task_id,
            agent_name=agent_name,
            status=TaskStatus.PENDING,
        )

    async def stream_task(
        self,
        agent_name: str | None = None,
        task: str | None = None,
        task_content: MessageContent | None = None,
        context: TContext | None = None,
        approval_rules: "ApprovalRules | None" = None,
        *,
        poll_interval: float = 0.1,
        include_thinking: bool = True,
        include_tool_events: bool = True,
    ) -> "AsyncIterator[Event]":
        """
        Execute a task and stream events as an async generator.

        This method starts a task and yields streaming events as they occur,
        allowing real-time progress monitoring. Events are polled from the
        workflow via Temporal queries at the specified interval.

        Args:
            agent_name: Name of the agent to execute the task.
            task: The task description/prompt (text only).
            task_content: Multimodal task content (text + images).
            context: Optional context for agents with dynamic prompts.
            approval_rules: Optional rules for human-in-the-loop approval.
            poll_interval: How often to poll for new events (seconds). Default 0.1.
            include_thinking: Whether to yield ThinkingEvent. Default True.
            include_tool_events: Whether to yield ToolCallEvent and ToolResultEvent.
                Default True.

        Yields:
            Event objects in order: ThinkingEvent, ToolCallEvent, ToolResultEvent,
            and finally CompletedEvent or ErrorEvent.

        Raises:
            ZapNotStartedError: If start() hasn't been called.
            AgentNotFoundError: If agent_name doesn't exist.
            VisionNotSupportedError: If task contains images but model doesn't
                support vision.
            ValueError: If required arguments are missing.

        Example:
            ```python
            async for event in zap.stream_task(
                agent_name="MainAgent",
                task="What's the weather in Paris?"
            ):
                match event:
                    case ThinkingEvent(iteration=n):
                        print(f"Thinking (iteration {n})...")
                    case ToolCallEvent(phrase=phrase):
                        print(phrase)  # "Getting weather for Paris..."
                    case ToolResultEvent(name=name, success=success):
                        print(f"Tool {name}: {'OK' if success else 'FAILED'}")
                    case CompletedEvent(result=result):
                        print(f"Done: {result}")
                    case ErrorEvent(error=error):
                        print(f"Error: {error}")
            ```
        """
        from zap_ai.streaming.events import (
            CompletedEvent,
            ErrorEvent,
            ThinkingEvent,
            ToolCallEvent,
            ToolResultEvent,
            parse_event,
        )
        from zap_ai.workflows.agent_workflow import AgentWorkflow

        # Start the task
        task_obj = await self.execute_task(
            agent_name=agent_name,
            task=task,
            task_content=task_content,
            context=context,
            approval_rules=approval_rules,
        )

        # Get workflow handle for querying events
        handle = self.temporal_client.get_workflow_handle(task_obj.id)  # type: ignore[union-attr]
        last_seq = 0

        while True:
            await asyncio.sleep(poll_interval)

            try:
                events = await handle.query(AgentWorkflow.get_events, last_seq)
            except Exception:
                # Workflow may have completed - check for final state
                try:
                    status_str = await handle.query(AgentWorkflow.get_status)
                    if status_str == TaskStatus.COMPLETED.value:
                        result = await handle.query(AgentWorkflow.get_result)
                        yield CompletedEvent(result=result or "", task_id=task_obj.id)
                        return
                    elif status_str == TaskStatus.FAILED.value:
                        error = await handle.query(AgentWorkflow.get_error)
                        yield ErrorEvent(error=error or "Unknown error", task_id=task_obj.id)
                        return
                except Exception:
                    pass
                continue

            for event_data in events:
                last_seq = event_data["seq"]
                event = parse_event(event_data, task_obj.id)

                # Apply filters
                if not include_thinking and isinstance(event, ThinkingEvent):
                    continue
                if not include_tool_events and isinstance(event, (ToolCallEvent, ToolResultEvent)):
                    continue

                yield event

                # Terminal events end the loop
                if isinstance(event, (CompletedEvent, ErrorEvent)):
                    return

    async def _follow_up_task(self, task_id: str, message: MessageContent) -> Task:
        """
        Send a follow-up message to an existing task.

        Args:
            task_id: The task ID to send the message to.
            message: The follow-up message (text or multimodal).

        Returns:
            Updated Task object.

        Raises:
            TaskNotFoundError: If the task doesn't exist.
            VisionNotSupportedError: If message contains images but the
                task's agent model doesn't support vision.
        """
        from zap_ai.workflows.agent_workflow import AgentWorkflow

        # Validate vision support if message contains images
        # Extract agent name from task_id (format: "{agent_name}-{uuid}")
        if content_has_images(message):
            from zap_ai.llm.provider import supports_vision

            agent_name = task_id.split("-")[0]
            try:
                agent = self.get_agent(agent_name)
                if not supports_vision(agent.model):
                    raise VisionNotSupportedError(
                        f"Model '{agent.model}' does not support vision. "
                        f"Cannot send follow-up message with images."
                    )
            except AgentNotFoundError:
                # Agent name extraction failed - let the workflow handle it
                pass

        try:
            handle = self.temporal_client.get_workflow_handle(task_id)  # type: ignore[union-attr]

            # Format content for workflow
            formatted_message = _format_content_for_workflow(message)

            # Send signal
            await handle.signal(AgentWorkflow.add_message, formatted_message)

            # Query current state
            status_str = await handle.query(AgentWorkflow.get_status)
            history = await handle.query(AgentWorkflow.get_history)

            # Parse agent name from task ID
            agent_name = task_id.split("-")[0]

            return Task(
                id=task_id,
                agent_name=agent_name,
                status=TaskStatus(status_str),
                history=history,
            )

        except Exception as e:
            raise TaskNotFoundError(f"Task '{task_id}' not found: {e}") from e

    def _create_task_fetcher(self) -> "Callable[[str], Awaitable[Task]]":
        """Create a task fetcher callback bound to this Zap instance."""

        async def fetcher(task_id: str) -> Task:
            return await self.get_task(task_id)

        return fetcher

    def _create_approval_fetcher(
        self, task_id: str
    ) -> "Callable[[], Awaitable[list[ApprovalRequest]]]":
        """Create an approval fetcher callback for a specific task."""
        from zap_ai.workflows.models import ApprovalRequest

        async def fetcher() -> "list[ApprovalRequest]":
            from zap_ai.workflows.agent_workflow import AgentWorkflow

            handle = self.temporal_client.get_workflow_handle(task_id)  # type: ignore[union-attr]
            pending_dicts = await handle.query(AgentWorkflow.get_pending_approvals)
            return [ApprovalRequest.from_dict(d) for d in pending_dicts]

        return fetcher

    def _create_approval_sender(
        self, task_id: str
    ) -> "Callable[[str, bool, str | None], Awaitable[None]]":
        """Create an approval sender callback for a specific task."""

        async def sender(approval_id: str, approved: bool, reason: str | None) -> None:
            from zap_ai.workflows.agent_workflow import AgentWorkflow

            handle = self.temporal_client.get_workflow_handle(task_id)  # type: ignore[union-attr]
            await handle.signal(
                AgentWorkflow.approve_execution, args=[approval_id, approved, reason]
            )

        return sender

    async def get_task(self, task_id: str) -> Task:
        """
        Get the current state of a task.

        Queries the Temporal workflow for current status, result,
        conversation history, and sub-task information.

        The returned Task object includes:
        - Full conversation history via `task.history`
        - Sub-task IDs via `task.sub_tasks`
        - Ability to fetch sub-task Task objects via `await task.get_sub_tasks()`
        - Convenience methods: `get_text_content()`, `get_tool_calls()`,
          `get_turn()`, `get_turns()`, `turn_count()`

        Args:
            task_id: The task ID returned from execute_task().

        Returns:
            Task object with current state and conversation access.

        Raises:
            ZapNotStartedError: If start() hasn't been called.
            TaskNotFoundError: If no task with that ID exists.

        Example:
            ```python
            task = await zap.get_task(task_id)
            print(f"Status: {task.status}")
            if task.is_complete():
                print(f"Result: {task.result}")

            # Access conversation
            print(task.get_text_content())
            for tool_call in task.get_tool_calls():
                print(f"Called: {tool_call.name}")

            # Access sub-tasks
            sub_tasks = await task.get_sub_tasks()
            for sub in sub_tasks:
                print(f"Sub-task: {sub.id}")
            ```
        """
        self._ensure_started()

        from zap_ai.workflows.agent_workflow import AgentWorkflow

        try:
            handle = self.temporal_client.get_workflow_handle(task_id)  # type: ignore[union-attr]

            # Query workflow state
            status_str = await handle.query(AgentWorkflow.get_status)
            result = await handle.query(AgentWorkflow.get_result)
            error = await handle.query(AgentWorkflow.get_error)
            history = await handle.query(AgentWorkflow.get_history)

            # Query sub-agent conversations to get sub-task IDs
            sub_agent_convs = await handle.query(AgentWorkflow.get_sub_agent_conversations)
            sub_task_ids = [conv["conversation_id"] for conv in sub_agent_convs.values()]

            # Parse agent name from task ID
            agent_name = task_id.split("-")[0]

            return Task(
                id=task_id,
                agent_name=agent_name,
                status=TaskStatus(status_str),
                result=result,
                error=error,
                history=history,
                sub_tasks=sub_task_ids,
                _task_fetcher=self._create_task_fetcher(),
                _approval_fetcher=self._create_approval_fetcher(task_id),
                _approval_sender=self._create_approval_sender(task_id),
            )

        except Exception as e:
            raise TaskNotFoundError(f"Task '{task_id}' not found: {e}") from e

    async def cancel_task(self, task_id: str) -> None:
        """
        Cancel a running task.

        Sends a cancellation request to the Temporal workflow. The task
        will transition to FAILED status with a cancellation error.

        Args:
            task_id: The task ID to cancel.

        Raises:
            ZapNotStartedError: If start() hasn't been called.
            TaskNotFoundError: If no task with that ID exists.
        """
        self._ensure_started()

        # TODO: Phase 7 (optional) - Cancel Temporal workflow
        raise NotImplementedError("cancel_task not yet implemented")

    async def stop(self) -> None:
        """
        Gracefully shut down Zap.

        Disconnects MCP clients and closes Temporal connection.
        Does not cancel running tasks.
        """
        if not self._started:
            return

        # Shutdown tool registry (disconnects MCP clients)
        if self._tool_registry:
            await self._tool_registry.shutdown()
            self._tool_registry = None

        # Note: We don't close the Temporal client even if we created it,
        # as it may be reused or there may be running workflows.
        # The caller is responsible for client lifecycle.

        self._started = False

start() async

Initialize Temporal connection and MCP clients.

Must be called before execute_task() or get_task(). This method: 1. Connects to Temporal (if client not provided) 2. Initializes the tool registry 3. Pre-connects all MCP clients and discovers tools

Raises:

Type Description
RuntimeError

If start() has already been called.

ConnectionError

If Temporal connection fails.

Example
zap = Zap(agents=[...])
await zap.start()  # Must call before using
Source code in src/zap_ai/core/zap.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
async def start(self) -> None:
    """
    Initialize Temporal connection and MCP clients.

    Must be called before execute_task() or get_task(). This method:
    1. Connects to Temporal (if client not provided)
    2. Initializes the tool registry
    3. Pre-connects all MCP clients and discovers tools

    Raises:
        RuntimeError: If start() has already been called.
        ConnectionError: If Temporal connection fails.

    Example:
        ```python
        zap = Zap(agents=[...])
        await zap.start()  # Must call before using
        ```
    """
    if self._started:
        raise RuntimeError("Zap has already been started. Cannot call start() twice.")

    # Connect to Temporal if client not provided
    if self.temporal_client is None:
        from temporalio.client import Client

        self.temporal_client = await Client.connect("localhost:7233")
        self._owns_temporal_client = True

    # Initialize tool registry
    from zap_ai.mcp import ToolRegistry

    self._tool_registry = ToolRegistry()

    # Register all agents (connects MCP clients, discovers tools)
    await self._tool_registry.register_agents(self.agents, self._agent_map)

    # Set registry for activities
    from zap_ai.activities.tool_execution import set_tool_registry

    set_tool_registry(self._tool_registry)

    self._started = True

stop() async

Gracefully shut down Zap.

Disconnects MCP clients and closes Temporal connection. Does not cancel running tasks.

Source code in src/zap_ai/core/zap.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
async def stop(self) -> None:
    """
    Gracefully shut down Zap.

    Disconnects MCP clients and closes Temporal connection.
    Does not cancel running tasks.
    """
    if not self._started:
        return

    # Shutdown tool registry (disconnects MCP clients)
    if self._tool_registry:
        await self._tool_registry.shutdown()
        self._tool_registry = None

    # Note: We don't close the Temporal client even if we created it,
    # as it may be reused or there may be running workflows.
    # The caller is responsible for client lifecycle.

    self._started = False

execute_task(agent_name=None, task=None, task_content=None, follow_up_on_task=None, context=None, approval_rules=None) async

Execute a new task or follow up on an existing one.

For new tasks, starts a Temporal workflow for the specified agent. For follow-ups, sends a signal to the existing workflow.

Parameters:

Name Type Description Default
agent_name str | None

Name of the agent to execute the task. Required for new tasks, ignored for follow-ups (uses original agent).

None
task str | None

The task description/prompt to send to the agent (text only). Either task or task_content is required.

None
task_content MessageContent | None

Multimodal task content (text + images). If provided, takes precedence over task. Use this for vision tasks.

None
follow_up_on_task str | None

If provided, sends the task as a follow-up message to an existing task instead of starting a new one.

None
context TContext | None

Optional context to pass to agents with dynamic prompts and MCP tools. For dynamic prompts, this is used to resolve the prompt at runtime. For MCP tools, this is passed via the meta parameter and can be accessed using CurrentZapContext(). Defaults to {} if not provided.

None
approval_rules 'ApprovalRules | None'

Optional rules for human-in-the-loop approval. When provided, tool calls matching the patterns will pause for human approval before execution.

None

Returns:

Type Description
Task

Task object with initial state. Use get_task() to poll for updates.

Raises:

Type Description
ZapNotStartedError

If start() hasn't been called.

AgentNotFoundError

If agent_name doesn't exist (new tasks only).

TaskNotFoundError

If follow_up_on_task doesn't exist.

VisionNotSupportedError

If task contains images but model doesn't support vision.

ValueError

If required arguments are missing.

Example (new task):

task = await zap.execute_task(
    agent_name="MyAgent",
    task="Analyze this data and summarize findings",
)

Example (with images - multimodal):

from zap_ai import TextContent, ImageContent

task = await zap.execute_task(
    agent_name="VisionAgent",
    task_content=[
        TextContent(text="What's in this image?"),
        ImageContent.from_url("https://example.com/photo.jpg"),
    ],
)

Example (with context):

task = await zap.execute_task(
    agent_name="Helper",
    task="Help me with something",
    context=MyContext(user_name="Alice", company="Acme"),
)

Example (with approval rules):

from zap_ai import ApprovalRules

task = await zap.execute_task(
    agent_name="FinancialAgent",
    task="Transfer $50,000 to vendor",
    approval_rules=ApprovalRules(patterns=["transfer_*", "delete_*"]),
)
# Later, check for pending approvals
pending = await task.get_pending_approvals()
await task.approve(pending[0].id)

Example (follow-up):

task = await zap.execute_task(
    follow_up_on_task=task.id,
    task="Now export the summary to PDF",
)

Source code in src/zap_ai/core/zap.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
async def execute_task(
    self,
    agent_name: str | None = None,
    task: str | None = None,
    task_content: MessageContent | None = None,
    follow_up_on_task: str | None = None,
    context: TContext | None = None,
    approval_rules: "ApprovalRules | None" = None,
) -> Task:
    """
    Execute a new task or follow up on an existing one.

    For new tasks, starts a Temporal workflow for the specified agent.
    For follow-ups, sends a signal to the existing workflow.

    Args:
        agent_name: Name of the agent to execute the task. Required for
            new tasks, ignored for follow-ups (uses original agent).
        task: The task description/prompt to send to the agent (text only).
            Either task or task_content is required.
        task_content: Multimodal task content (text + images). If provided,
            takes precedence over `task`. Use this for vision tasks.
        follow_up_on_task: If provided, sends the task as a follow-up
            message to an existing task instead of starting a new one.
        context: Optional context to pass to agents with dynamic prompts
            and MCP tools. For dynamic prompts, this is used to resolve
            the prompt at runtime. For MCP tools, this is passed via the
            `meta` parameter and can be accessed using `CurrentZapContext()`.
            Defaults to {} if not provided.
        approval_rules: Optional rules for human-in-the-loop approval.
            When provided, tool calls matching the patterns will pause
            for human approval before execution.

    Returns:
        Task object with initial state. Use get_task() to poll for updates.

    Raises:
        ZapNotStartedError: If start() hasn't been called.
        AgentNotFoundError: If agent_name doesn't exist (new tasks only).
        TaskNotFoundError: If follow_up_on_task doesn't exist.
        VisionNotSupportedError: If task contains images but model doesn't
            support vision.
        ValueError: If required arguments are missing.

    Example (new task):
        ```python
        task = await zap.execute_task(
            agent_name="MyAgent",
            task="Analyze this data and summarize findings",
        )
        ```

    Example (with images - multimodal):
        ```python
        from zap_ai import TextContent, ImageContent

        task = await zap.execute_task(
            agent_name="VisionAgent",
            task_content=[
                TextContent(text="What's in this image?"),
                ImageContent.from_url("https://example.com/photo.jpg"),
            ],
        )
        ```

    Example (with context):
        ```python
        task = await zap.execute_task(
            agent_name="Helper",
            task="Help me with something",
            context=MyContext(user_name="Alice", company="Acme"),
        )
        ```

    Example (with approval rules):
        ```python
        from zap_ai import ApprovalRules

        task = await zap.execute_task(
            agent_name="FinancialAgent",
            task="Transfer $50,000 to vendor",
            approval_rules=ApprovalRules(patterns=["transfer_*", "delete_*"]),
        )
        # Later, check for pending approvals
        pending = await task.get_pending_approvals()
        await task.approve(pending[0].id)
        ```

    Example (follow-up):
        ```python
        task = await zap.execute_task(
            follow_up_on_task=task.id,
            task="Now export the summary to PDF",
        )
        ```
    """
    self._ensure_started()

    # Determine effective content
    effective_content: MessageContent
    if task_content is not None:
        effective_content = task_content
    elif task is not None:
        effective_content = task
    else:
        raise ValueError("Either 'task' or 'task_content' argument is required")

    if follow_up_on_task is not None:
        return await self._follow_up_task(follow_up_on_task, effective_content)

    # New task
    if agent_name is None:
        raise ValueError("agent_name is required for new tasks")

    # Validate agent exists and get agent config
    agent = self.get_agent(agent_name)

    # Validate vision support if content contains images
    if content_has_images(effective_content):
        from zap_ai.llm.provider import supports_vision

        if not supports_vision(agent.model):
            raise VisionNotSupportedError(
                f"Model '{agent.model}' does not support vision. "
                f"Cannot process task with images."
            )

    # Use default empty dict if no context provided
    effective_context: TContext = context if context is not None else {}  # type: ignore[assignment]

    # Warn if agent has dynamic prompt but no context provided
    if agent.is_dynamic_prompt() and context is None:
        warnings.warn(
            f"Agent '{agent_name}' has a dynamic prompt but no context was provided. "
            "The prompt will be called with an empty dict. "
            "Consider providing context via execute_task(context=...).",
            UserWarning,
            stacklevel=2,
        )

    # Resolve the prompt with context
    resolved_prompt = agent.resolve_prompt(effective_context)

    # Serialize context for MCP tools
    serialized_context: dict[str, Any] | None = None
    if context is not None:
        serialized_context = self._serialize_context_with_metadata(context)

    # Generate task ID
    task_id = f"{agent_name}-{uuid4().hex[:12]}"

    # Get tools for this agent
    tools: list[dict[str, Any]] = []
    tool_descriptions: dict[str, str] = {}
    if self._tool_registry:
        tools = self._tool_registry.get_tools_for_agent(agent_name)
        tool_descriptions = self._tool_registry.get_tool_descriptions(agent_name)

    # Validate approval rules if provided
    if approval_rules:
        tool_names = await self.get_agent_tools(agent_name)
        unmatched = approval_rules.get_unmatched_patterns(tool_names)
        if unmatched:
            warnings.warn(
                f"Approval patterns don't match any tools: {unmatched}. "
                f"Available tools: {tool_names}",
                UserWarning,
                stacklevel=2,
            )

    # Format content for workflow (convert ContentPart objects to dicts)
    initial_task = _format_content_for_workflow(effective_content)

    # Start Temporal workflow
    from zap_ai.workflows.agent_workflow import AgentWorkflow
    from zap_ai.workflows.models import AgentWorkflowInput

    await self.temporal_client.start_workflow(  # type: ignore[union-attr]
        AgentWorkflow.run,
        AgentWorkflowInput(
            agent_name=agent_name,
            initial_task=initial_task,
            system_prompt=resolved_prompt,
            model=agent.model,
            tools=tools,
            max_iterations=agent.max_iterations,
            temperature=agent.temperature,
            max_tokens=agent.max_tokens,
            approval_rules=approval_rules.to_dict() if approval_rules else None,
            tool_descriptions=tool_descriptions,
            context=serialized_context,
        ),
        id=task_id,
        task_queue=self.task_queue,
    )

    return Task(
        id=task_id,
        agent_name=agent_name,
        status=TaskStatus.PENDING,
    )

stream_task(agent_name=None, task=None, task_content=None, context=None, approval_rules=None, *, poll_interval=0.1, include_thinking=True, include_tool_events=True) async

Execute a task and stream events as an async generator.

This method starts a task and yields streaming events as they occur, allowing real-time progress monitoring. Events are polled from the workflow via Temporal queries at the specified interval.

Parameters:

Name Type Description Default
agent_name str | None

Name of the agent to execute the task.

None
task str | None

The task description/prompt (text only).

None
task_content MessageContent | None

Multimodal task content (text + images).

None
context TContext | None

Optional context for agents with dynamic prompts.

None
approval_rules 'ApprovalRules | None'

Optional rules for human-in-the-loop approval.

None
poll_interval float

How often to poll for new events (seconds). Default 0.1.

0.1
include_thinking bool

Whether to yield ThinkingEvent. Default True.

True
include_tool_events bool

Whether to yield ToolCallEvent and ToolResultEvent. Default True.

True

Yields:

Type Description
'AsyncIterator[Event]'

Event objects in order: ThinkingEvent, ToolCallEvent, ToolResultEvent,

'AsyncIterator[Event]'

and finally CompletedEvent or ErrorEvent.

Raises:

Type Description
ZapNotStartedError

If start() hasn't been called.

AgentNotFoundError

If agent_name doesn't exist.

VisionNotSupportedError

If task contains images but model doesn't support vision.

ValueError

If required arguments are missing.

Example
async for event in zap.stream_task(
    agent_name="MainAgent",
    task="What's the weather in Paris?"
):
    match event:
        case ThinkingEvent(iteration=n):
            print(f"Thinking (iteration {n})...")
        case ToolCallEvent(phrase=phrase):
            print(phrase)  # "Getting weather for Paris..."
        case ToolResultEvent(name=name, success=success):
            print(f"Tool {name}: {'OK' if success else 'FAILED'}")
        case CompletedEvent(result=result):
            print(f"Done: {result}")
        case ErrorEvent(error=error):
            print(f"Error: {error}")
Source code in src/zap_ai/core/zap.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
async def stream_task(
    self,
    agent_name: str | None = None,
    task: str | None = None,
    task_content: MessageContent | None = None,
    context: TContext | None = None,
    approval_rules: "ApprovalRules | None" = None,
    *,
    poll_interval: float = 0.1,
    include_thinking: bool = True,
    include_tool_events: bool = True,
) -> "AsyncIterator[Event]":
    """
    Execute a task and stream events as an async generator.

    This method starts a task and yields streaming events as they occur,
    allowing real-time progress monitoring. Events are polled from the
    workflow via Temporal queries at the specified interval.

    Args:
        agent_name: Name of the agent to execute the task.
        task: The task description/prompt (text only).
        task_content: Multimodal task content (text + images).
        context: Optional context for agents with dynamic prompts.
        approval_rules: Optional rules for human-in-the-loop approval.
        poll_interval: How often to poll for new events (seconds). Default 0.1.
        include_thinking: Whether to yield ThinkingEvent. Default True.
        include_tool_events: Whether to yield ToolCallEvent and ToolResultEvent.
            Default True.

    Yields:
        Event objects in order: ThinkingEvent, ToolCallEvent, ToolResultEvent,
        and finally CompletedEvent or ErrorEvent.

    Raises:
        ZapNotStartedError: If start() hasn't been called.
        AgentNotFoundError: If agent_name doesn't exist.
        VisionNotSupportedError: If task contains images but model doesn't
            support vision.
        ValueError: If required arguments are missing.

    Example:
        ```python
        async for event in zap.stream_task(
            agent_name="MainAgent",
            task="What's the weather in Paris?"
        ):
            match event:
                case ThinkingEvent(iteration=n):
                    print(f"Thinking (iteration {n})...")
                case ToolCallEvent(phrase=phrase):
                    print(phrase)  # "Getting weather for Paris..."
                case ToolResultEvent(name=name, success=success):
                    print(f"Tool {name}: {'OK' if success else 'FAILED'}")
                case CompletedEvent(result=result):
                    print(f"Done: {result}")
                case ErrorEvent(error=error):
                    print(f"Error: {error}")
        ```
    """
    from zap_ai.streaming.events import (
        CompletedEvent,
        ErrorEvent,
        ThinkingEvent,
        ToolCallEvent,
        ToolResultEvent,
        parse_event,
    )
    from zap_ai.workflows.agent_workflow import AgentWorkflow

    # Start the task
    task_obj = await self.execute_task(
        agent_name=agent_name,
        task=task,
        task_content=task_content,
        context=context,
        approval_rules=approval_rules,
    )

    # Get workflow handle for querying events
    handle = self.temporal_client.get_workflow_handle(task_obj.id)  # type: ignore[union-attr]
    last_seq = 0

    while True:
        await asyncio.sleep(poll_interval)

        try:
            events = await handle.query(AgentWorkflow.get_events, last_seq)
        except Exception:
            # Workflow may have completed - check for final state
            try:
                status_str = await handle.query(AgentWorkflow.get_status)
                if status_str == TaskStatus.COMPLETED.value:
                    result = await handle.query(AgentWorkflow.get_result)
                    yield CompletedEvent(result=result or "", task_id=task_obj.id)
                    return
                elif status_str == TaskStatus.FAILED.value:
                    error = await handle.query(AgentWorkflow.get_error)
                    yield ErrorEvent(error=error or "Unknown error", task_id=task_obj.id)
                    return
            except Exception:
                pass
            continue

        for event_data in events:
            last_seq = event_data["seq"]
            event = parse_event(event_data, task_obj.id)

            # Apply filters
            if not include_thinking and isinstance(event, ThinkingEvent):
                continue
            if not include_tool_events and isinstance(event, (ToolCallEvent, ToolResultEvent)):
                continue

            yield event

            # Terminal events end the loop
            if isinstance(event, (CompletedEvent, ErrorEvent)):
                return

get_task(task_id) async

Get the current state of a task.

Queries the Temporal workflow for current status, result, conversation history, and sub-task information.

The returned Task object includes: - Full conversation history via task.history - Sub-task IDs via task.sub_tasks - Ability to fetch sub-task Task objects via await task.get_sub_tasks() - Convenience methods: get_text_content(), get_tool_calls(), get_turn(), get_turns(), turn_count()

Parameters:

Name Type Description Default
task_id str

The task ID returned from execute_task().

required

Returns:

Type Description
Task

Task object with current state and conversation access.

Raises:

Type Description
ZapNotStartedError

If start() hasn't been called.

TaskNotFoundError

If no task with that ID exists.

Example
task = await zap.get_task(task_id)
print(f"Status: {task.status}")
if task.is_complete():
    print(f"Result: {task.result}")

# Access conversation
print(task.get_text_content())
for tool_call in task.get_tool_calls():
    print(f"Called: {tool_call.name}")

# Access sub-tasks
sub_tasks = await task.get_sub_tasks()
for sub in sub_tasks:
    print(f"Sub-task: {sub.id}")
Source code in src/zap_ai/core/zap.py
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
async def get_task(self, task_id: str) -> Task:
    """
    Get the current state of a task.

    Queries the Temporal workflow for current status, result,
    conversation history, and sub-task information.

    The returned Task object includes:
    - Full conversation history via `task.history`
    - Sub-task IDs via `task.sub_tasks`
    - Ability to fetch sub-task Task objects via `await task.get_sub_tasks()`
    - Convenience methods: `get_text_content()`, `get_tool_calls()`,
      `get_turn()`, `get_turns()`, `turn_count()`

    Args:
        task_id: The task ID returned from execute_task().

    Returns:
        Task object with current state and conversation access.

    Raises:
        ZapNotStartedError: If start() hasn't been called.
        TaskNotFoundError: If no task with that ID exists.

    Example:
        ```python
        task = await zap.get_task(task_id)
        print(f"Status: {task.status}")
        if task.is_complete():
            print(f"Result: {task.result}")

        # Access conversation
        print(task.get_text_content())
        for tool_call in task.get_tool_calls():
            print(f"Called: {tool_call.name}")

        # Access sub-tasks
        sub_tasks = await task.get_sub_tasks()
        for sub in sub_tasks:
            print(f"Sub-task: {sub.id}")
        ```
    """
    self._ensure_started()

    from zap_ai.workflows.agent_workflow import AgentWorkflow

    try:
        handle = self.temporal_client.get_workflow_handle(task_id)  # type: ignore[union-attr]

        # Query workflow state
        status_str = await handle.query(AgentWorkflow.get_status)
        result = await handle.query(AgentWorkflow.get_result)
        error = await handle.query(AgentWorkflow.get_error)
        history = await handle.query(AgentWorkflow.get_history)

        # Query sub-agent conversations to get sub-task IDs
        sub_agent_convs = await handle.query(AgentWorkflow.get_sub_agent_conversations)
        sub_task_ids = [conv["conversation_id"] for conv in sub_agent_convs.values()]

        # Parse agent name from task ID
        agent_name = task_id.split("-")[0]

        return Task(
            id=task_id,
            agent_name=agent_name,
            status=TaskStatus(status_str),
            result=result,
            error=error,
            history=history,
            sub_tasks=sub_task_ids,
            _task_fetcher=self._create_task_fetcher(),
            _approval_fetcher=self._create_approval_fetcher(task_id),
            _approval_sender=self._create_approval_sender(task_id),
        )

    except Exception as e:
        raise TaskNotFoundError(f"Task '{task_id}' not found: {e}") from e

get_agent(name)

Get an agent by name.

Parameters:

Name Type Description Default
name str

The agent name to look up.

required

Returns:

Type Description
ZapAgent[TContext]

The ZapAgent with the given name.

Raises:

Type Description
AgentNotFoundError

If no agent with that name exists.

Source code in src/zap_ai/core/zap.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def get_agent(self, name: str) -> ZapAgent[TContext]:
    """
    Get an agent by name.

    Args:
        name: The agent name to look up.

    Returns:
        The ZapAgent with the given name.

    Raises:
        AgentNotFoundError: If no agent with that name exists.
    """
    if name not in self._agent_map:
        raise AgentNotFoundError(
            f"Agent '{name}' not found. Available agents: {sorted(self._agent_map.keys())}"
        )
    return self._agent_map[name]

get_agent_tools(agent_name) async

Get list of tool names available to an agent.

Useful for validating approval patterns before execution.

Parameters:

Name Type Description Default
agent_name str

Name of the agent.

required

Returns:

Type Description
list[str]

List of tool names available to the agent.

Raises:

Type Description
ZapNotStartedError

If start() hasn't been called.

AgentNotFoundError

If agent doesn't exist.

Example
tools = await zap.get_agent_tools("financial_agent")
# ['transfer_funds', 'check_balance', 'delete_transaction']

# Validate approval patterns
rules = ApprovalRules(patterns=["transfer_*"])
print(rules.preview_matches(tools))
# {'transfer_*': ['transfer_funds']}
Source code in src/zap_ai/core/zap.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def get_agent_tools(self, agent_name: str) -> list[str]:
    """
    Get list of tool names available to an agent.

    Useful for validating approval patterns before execution.

    Args:
        agent_name: Name of the agent.

    Returns:
        List of tool names available to the agent.

    Raises:
        ZapNotStartedError: If start() hasn't been called.
        AgentNotFoundError: If agent doesn't exist.

    Example:
        ```python
        tools = await zap.get_agent_tools("financial_agent")
        # ['transfer_funds', 'check_balance', 'delete_transaction']

        # Validate approval patterns
        rules = ApprovalRules(patterns=["transfer_*"])
        print(rules.preview_matches(tools))
        # {'transfer_*': ['transfer_funds']}
        ```
    """
    self._ensure_started()

    # Validate agent exists
    self.get_agent(agent_name)

    if not self._tool_registry:
        return []

    return self._tool_registry.get_tool_names(agent_name)

ZapAgent

Configuration for an AI agent.

zap_ai.ZapAgent

Bases: BaseModel, Generic[TContext]

Configuration for an AI agent within the Zap platform.

ZapAgent defines all the properties needed to run an agent, including its system prompt (static or dynamic), LLM model, available tools (via MCP clients), and which other agents it can delegate to.

The prompt can be either: - A static string: "You are a helpful assistant." - A callable that receives context: lambda ctx: f"You assist {ctx['user_name']}."

Example
from zap_ai import ZapAgent
from fastmcp import Client

# Static prompt
agent = ZapAgent(
    name="ResearchAgent",
    prompt="You are a research assistant...",
    model="gpt-4o",
    mcp_clients=[Client("./tools.py")],
    sub_agents=["WriterAgent"],
)

# Dynamic prompt with context
agent = ZapAgent[MyContext](
    name="PersonalAgent",
    prompt=lambda ctx: f"You are {ctx.user_name}'s assistant.",
)

Attributes:

Name Type Description
name str

Unique identifier for the agent. Used as workflow ID prefix. Cannot contain spaces or special characters that would be invalid in a Temporal workflow ID.

prompt str | Callable[[Any], str]

System prompt that defines the agent's behavior and personality. Can be a string or a callable that takes context and returns a string. This is sent as the first message in every conversation.

model str

LiteLLM model identifier (e.g., "gpt-4o", "claude-3-opus-20240229", "anthropic/claude-3-sonnet"). See LiteLLM docs for full list.

mcp_clients list[Any]

List of FastMCP Client instances that provide tools to this agent. Clients are connected during Zap.start().

sub_agents list[str]

List of agent names that this agent can delegate to. A special "message_agent" tool is automatically added when this list is non-empty. Referenced agents must exist in the Zap instance.

discovery_prompt str | None

Description shown to parent agents when they can delegate to this agent. Used in the message_agent tool description. If None, agent won't appear in transfer tool options.

max_iterations int

Maximum number of agentic loop iterations before forcing completion. Prevents infinite loops. Each iteration is one LLM call + optional tool execution.

temperature float

Sampling temperature for LLM responses (0.0-2.0). Lower values (e.g., 0.2) make output more focused and deterministic. Higher values (e.g., 0.8) make output more random and creative. Defaults to 0.7.

max_tokens int | None

Maximum number of tokens to generate in the response. If None (default), uses the model's default limit.

Source code in src/zap_ai/core/agent.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class ZapAgent(BaseModel, Generic[TContext]):
    """
    Configuration for an AI agent within the Zap platform.

    ZapAgent defines all the properties needed to run an agent, including
    its system prompt (static or dynamic), LLM model, available tools
    (via MCP clients), and which other agents it can delegate to.

    The prompt can be either:
    - A static string: "You are a helpful assistant."
    - A callable that receives context: lambda ctx: f"You assist {ctx['user_name']}."

    Example:
        ```python
        from zap_ai import ZapAgent
        from fastmcp import Client

        # Static prompt
        agent = ZapAgent(
            name="ResearchAgent",
            prompt="You are a research assistant...",
            model="gpt-4o",
            mcp_clients=[Client("./tools.py")],
            sub_agents=["WriterAgent"],
        )

        # Dynamic prompt with context
        agent = ZapAgent[MyContext](
            name="PersonalAgent",
            prompt=lambda ctx: f"You are {ctx.user_name}'s assistant.",
        )
        ```

    Attributes:
        name: Unique identifier for the agent. Used as workflow ID prefix.
            Cannot contain spaces or special characters that would be
            invalid in a Temporal workflow ID.
        prompt: System prompt that defines the agent's behavior and personality.
            Can be a string or a callable that takes context and returns a string.
            This is sent as the first message in every conversation.
        model: LiteLLM model identifier (e.g., "gpt-4o", "claude-3-opus-20240229",
            "anthropic/claude-3-sonnet"). See LiteLLM docs for full list.
        mcp_clients: List of FastMCP Client instances that provide tools to
            this agent. Clients are connected during Zap.start().
        sub_agents: List of agent names that this agent can delegate to.
            A special "message_agent" tool is automatically added when
            this list is non-empty. Referenced agents must exist in the
            Zap instance.
        discovery_prompt: Description shown to parent agents when they can
            delegate to this agent. Used in the message_agent tool
            description. If None, agent won't appear in transfer tool options.
        max_iterations: Maximum number of agentic loop iterations before
            forcing completion. Prevents infinite loops. Each iteration
            is one LLM call + optional tool execution.
        temperature: Sampling temperature for LLM responses (0.0-2.0).
            Lower values (e.g., 0.2) make output more focused and deterministic.
            Higher values (e.g., 0.8) make output more random and creative.
            Defaults to 0.7.
        max_tokens: Maximum number of tokens to generate in the response.
            If None (default), uses the model's default limit.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    # Required fields
    name: str = Field(
        ...,
        min_length=1,
        max_length=100,
        description="Unique identifier for the agent (no spaces allowed)",
    )
    prompt: str | Callable[[Any], str] = Field(
        ...,
        description="System prompt - static string or callable(context) -> str",
    )

    # Optional fields with defaults
    model: str = Field(
        default="gpt-4o",
        min_length=1,
        description="LiteLLM model identifier",
    )
    # Note: Using Any instead of Client due to Temporal sandbox restrictions.
    # Importing fastmcp.Client at runtime causes beartype circular import issues
    # when Temporal validates workflows. The Client type is available for static
    # type checking via TYPE_CHECKING import above.
    mcp_clients: list[Any] = Field(
        default_factory=list,
        description="FastMCP clients providing tools to this agent",
    )
    sub_agents: list[str] = Field(
        default_factory=list,
        description="Names of agents this agent can delegate to",
    )
    discovery_prompt: str | None = Field(
        default=None,
        description="Description for parent agents (shown in message_agent tool)",
    )
    max_iterations: int = Field(
        default=50,
        ge=1,
        le=500,
        description="Maximum agentic loop iterations",
    )
    temperature: float = Field(
        default=0.7,
        ge=0.0,
        le=2.0,
        description="Sampling temperature for LLM responses (0.0-2.0)",
    )
    max_tokens: int | None = Field(
        default=None,
        ge=1,
        description="Maximum tokens to generate (None for model default)",
    )

    @field_validator("name")
    @classmethod
    def validate_name_format(cls, v: str) -> str:
        """
        Validate that name is suitable for use as a Temporal workflow ID prefix.

        Rules:
        - No spaces (would break workflow ID parsing)
        - No forward slashes (used as delimiter in some contexts)
        - Must be alphanumeric with underscores/hyphens only

        Raises:
            ValueError: If name contains invalid characters.
        """
        if " " in v:
            raise ValueError(
                f"Agent name cannot contain spaces: '{v}'. Use underscores or hyphens instead."
            )
        if "/" in v:
            raise ValueError(f"Agent name cannot contain forward slashes: '{v}'.")
        # Allow alphanumeric, underscore, hyphen
        allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-")
        invalid_chars = set(v) - allowed_chars
        if invalid_chars:
            raise ValueError(
                f"Agent name contains invalid characters: {invalid_chars}. "
                "Only alphanumeric, underscore, and hyphen are allowed."
            )
        return v

    @field_validator("prompt")
    @classmethod
    def validate_prompt(cls, v: str | Callable[[Any], str]) -> str | Callable[[Any], str]:
        """Validate prompt - string must be non-empty, callable must be callable."""
        if isinstance(v, str):
            if not v:
                raise ValueError("Prompt string cannot be empty")
            return v
        if callable(v):
            return v
        raise ValueError("Prompt must be a string or callable")

    @field_validator("sub_agents")
    @classmethod
    def validate_sub_agents_no_duplicates(cls, v: list[str]) -> list[str]:
        """Ensure no duplicate sub-agent references."""
        if len(v) != len(set(v)):
            duplicates = [name for name in v if v.count(name) > 1]
            raise ValueError(f"Duplicate sub-agent references: {set(duplicates)}")
        return v

    def is_dynamic_prompt(self) -> bool:
        """Check if this agent uses a dynamic (callable) prompt."""
        return callable(self.prompt)

    def resolve_prompt(self, context: TContext) -> str:
        """
        Resolve the prompt with the given context.

        Args:
            context: The context to pass to a dynamic prompt.

        Returns:
            The resolved system prompt string.

        Raises:
            TypeError: If prompt resolution fails.
        """
        if callable(self.prompt):
            return self.prompt(context)
        return self.prompt

is_dynamic_prompt()

Check if this agent uses a dynamic (callable) prompt.

Source code in src/zap_ai/core/agent.py
183
184
185
def is_dynamic_prompt(self) -> bool:
    """Check if this agent uses a dynamic (callable) prompt."""
    return callable(self.prompt)

resolve_prompt(context)

Resolve the prompt with the given context.

Parameters:

Name Type Description Default
context TContext

The context to pass to a dynamic prompt.

required

Returns:

Type Description
str

The resolved system prompt string.

Raises:

Type Description
TypeError

If prompt resolution fails.

Source code in src/zap_ai/core/agent.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def resolve_prompt(self, context: TContext) -> str:
    """
    Resolve the prompt with the given context.

    Args:
        context: The context to pass to a dynamic prompt.

    Returns:
        The resolved system prompt string.

    Raises:
        TypeError: If prompt resolution fails.
    """
    if callable(self.prompt):
        return self.prompt(context)
    return self.prompt

Task

Represents a task execution.

zap_ai.Task dataclass

Represents a task execution within the Zap platform.

A Task is created when you call zap.execute_task() and tracks the full lifecycle of that execution. Use zap.get_task(task_id) to retrieve updated task state.

Example
task = await zap.execute_task(agent_name="MyAgent", task="Do something")
print(f"Task ID: {task.id}")

# Poll for completion
while not task.status.is_terminal():
    await asyncio.sleep(1)
    task = await zap.get_task(task.id)

if task.status == TaskStatus.COMPLETED:
    print(f"Result: {task.result}")
else:
    print(f"Failed: {task.error}")

Attributes:

Name Type Description
id str

Unique identifier for this task. Format: "{agent_name}-{uuid}". Used as the Temporal workflow ID.

agent_name str

Name of the agent executing this task.

status TaskStatus

Current execution status. See TaskStatus for details.

result str | None

Final result string if completed, None otherwise.

history list[dict[str, Any]]

List of conversation messages in LiteLLM format. Each message is a dict with "role" and "content" keys. May include tool calls and tool results.

sub_tasks list[str]

List of child task IDs spawned for sub-agent delegation.

error str | None

Error message if failed, None otherwise.

created_at datetime

Timestamp when task was created.

updated_at datetime

Timestamp of last status update.

Source code in src/zap_ai/core/task.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@dataclass
class Task:
    """
    Represents a task execution within the Zap platform.

    A Task is created when you call `zap.execute_task()` and tracks the
    full lifecycle of that execution. Use `zap.get_task(task_id)` to
    retrieve updated task state.

    Example:
        ```python
        task = await zap.execute_task(agent_name="MyAgent", task="Do something")
        print(f"Task ID: {task.id}")

        # Poll for completion
        while not task.status.is_terminal():
            await asyncio.sleep(1)
            task = await zap.get_task(task.id)

        if task.status == TaskStatus.COMPLETED:
            print(f"Result: {task.result}")
        else:
            print(f"Failed: {task.error}")
        ```

    Attributes:
        id: Unique identifier for this task. Format: "{agent_name}-{uuid}".
            Used as the Temporal workflow ID.
        agent_name: Name of the agent executing this task.
        status: Current execution status. See TaskStatus for details.
        result: Final result string if completed, None otherwise.
        history: List of conversation messages in LiteLLM format.
            Each message is a dict with "role" and "content" keys.
            May include tool calls and tool results.
        sub_tasks: List of child task IDs spawned for sub-agent delegation.
        error: Error message if failed, None otherwise.
        created_at: Timestamp when task was created.
        updated_at: Timestamp of last status update.
    """

    # Required fields (set at creation)
    id: str
    agent_name: str

    # Status tracking
    status: TaskStatus = TaskStatus.PENDING
    result: str | None = None
    error: str | None = None

    # Conversation history (list of LiteLLM message dicts)
    history: list[dict[str, Any]] = field(default_factory=list)

    # Sub-task tracking (child workflow IDs)
    sub_tasks: list[str] = field(default_factory=list)

    # Timestamps
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

    # Private: callback to fetch sub-tasks (injected by Zap)
    _task_fetcher: Callable[[str], Awaitable["Task"]] | None = field(
        default=None, repr=False, compare=False
    )

    # Private: callbacks for approval operations (injected by Zap)
    _approval_fetcher: Callable[[], Awaitable[list["ApprovalRequest"]]] | None = field(
        default=None, repr=False, compare=False
    )
    _approval_sender: Callable[[str, bool, str | None], Awaitable[None]] | None = field(
        default=None, repr=False, compare=False
    )

    def is_complete(self) -> bool:
        """Return True if task has reached a terminal state."""
        return self.status.is_terminal()

    def is_successful(self) -> bool:
        """Return True if task completed successfully."""
        return self.status == TaskStatus.COMPLETED

    def get_last_message(self) -> dict[str, Any] | None:
        """Return the most recent message in history, or None if empty."""
        if not self.history:
            return None
        return self.history[-1]

    def get_assistant_messages(self) -> list[dict[str, Any]]:
        """Return all assistant messages from history."""
        return [msg for msg in self.history if msg.get("role") == "assistant"]

    def get_tool_calls_count(self) -> int:
        """Return total number of tool calls made during this task."""
        count = 0
        for msg in self.history:
            if msg.get("role") == "assistant" and msg.get("tool_calls"):
                count += len(msg["tool_calls"])
        return count

    def get_text_content(self) -> str:
        """
        Extract all text content from conversation history.

        Returns concatenated text from user and assistant messages,
        excluding tool calls and tool results.

        Returns:
            Combined text content as a single string, with messages
            separated by double newlines.
        """
        from zap_ai.conversation import get_text_content

        return get_text_content(self.history)

    def get_tool_calls(self) -> list["ToolCallInfo"]:
        """
        Get all tool calls with their results.

        Returns:
            List of ToolCallInfo objects containing tool name, arguments,
            and results (if available).
        """
        from zap_ai.conversation import get_tool_calls

        return get_tool_calls(self.history)

    def get_turns(self) -> list["ConversationTurn"]:
        """
        Get all conversation turns.

        A turn is defined as a user message (or system prompt for turn 0),
        followed by all assistant responses and tool interactions until
        the next user message.

        Returns:
            List of ConversationTurn objects, one per turn.
        """
        from zap_ai.conversation import get_turns

        return get_turns(self.history)

    def get_turn(self, turn_num: int) -> "ConversationTurn | None":
        """
        Get messages for a specific conversation turn.

        Args:
            turn_num: Turn number (0-indexed). Turn 0 may contain system prompt.

        Returns:
            ConversationTurn with the turn's messages, or None if turn doesn't exist.
        """
        from zap_ai.conversation import get_turn

        return get_turn(self.history, turn_num)

    def turn_count(self) -> int:
        """Return the number of conversation turns."""
        from zap_ai.conversation import turn_count

        return turn_count(self.history)

    async def get_sub_tasks(self) -> list["Task"]:
        """
        Fetch full Task objects for all sub-tasks.

        This method requires the Task to have been created via `zap.get_task()`,
        which injects the necessary callback for fetching sub-task data.

        Returns:
            List of Task objects for each sub-task spawned by this task.

        Raises:
            RuntimeError: If Task was not created via Zap.get_task().

        Example:
            ```python
            task = await zap.get_task(task_id)
            sub_tasks = await task.get_sub_tasks()
            for sub in sub_tasks:
                print(f"Sub-task {sub.id}: {sub.status}")
            ```
        """
        if not self._task_fetcher:
            raise RuntimeError(
                "Cannot fetch sub-tasks: Task was not created via Zap.get_task(). "
                "Use zap.get_task(task_id) to get a Task with sub-task access."
            )

        if not self.sub_tasks:
            return []

        # Fetch all sub-tasks concurrently
        tasks = [self._task_fetcher(sub_id) for sub_id in self.sub_tasks]
        return list(await asyncio.gather(*tasks))

    async def get_pending_approvals(self) -> list[dict[str, Any]]:
        """
        Get all pending approval requests for this task.

        This method requires the Task to have been created via `zap.get_task()`,
        which injects the necessary callback for fetching approval data.

        Returns:
            List of dicts with approval request data:
            - id: Unique approval ID
            - tool_name: Name of the tool requiring approval
            - tool_args: Arguments passed to the tool
            - requested_at: ISO timestamp of when approval was requested
            - timeout_at: ISO timestamp of when approval will timeout
            - context: Additional context (agent_name, workflow_id)

        Raises:
            RuntimeError: If Task was not created via Zap.get_task().

        Example:
            ```python
            task = await zap.get_task(task_id)
            pending = await task.get_pending_approvals()
            for req in pending:
                print(f"Tool: {req['tool_name']}, Args: {req['tool_args']}")
                await task.approve(req['id'])
            ```
        """
        if not self._approval_fetcher:
            raise RuntimeError(
                "Cannot fetch approvals: Task was not created via Zap.get_task(). "
                "Use zap.get_task(task_id) to get a Task with approval access."
            )

        requests = await self._approval_fetcher()
        return [r.to_dict() for r in requests]

    async def approve(self, approval_id: str) -> None:
        """
        Approve a pending tool execution.

        Args:
            approval_id: ID of the approval request to approve.

        Raises:
            RuntimeError: If Task was not created via Zap.get_task().

        Example:
            ```python
            task = await zap.get_task(task_id)
            pending = await task.get_pending_approvals()
            await task.approve(pending[0]['id'])
            ```
        """
        if not self._approval_sender:
            raise RuntimeError(
                "Cannot send approval: Task was not created via Zap.get_task(). "
                "Use zap.get_task(task_id) to get a Task with approval access."
            )

        await self._approval_sender(approval_id, True, None)

    async def reject(self, approval_id: str, reason: str | None = None) -> None:
        """
        Reject a pending tool execution.

        Args:
            approval_id: ID of the approval request to reject.
            reason: Optional reason for rejection.

        Raises:
            RuntimeError: If Task was not created via Zap.get_task().

        Example:
            ```python
            task = await zap.get_task(task_id)
            pending = await task.get_pending_approvals()
            await task.reject(pending[0]['id'], reason="Amount exceeds limit")
            ```
        """
        if not self._approval_sender:
            raise RuntimeError(
                "Cannot send rejection: Task was not created via Zap.get_task(). "
                "Use zap.get_task(task_id) to get a Task with approval access."
            )

        await self._approval_sender(approval_id, False, reason)

is_complete()

Return True if task has reached a terminal state.

Source code in src/zap_ai/core/task.py
131
132
133
def is_complete(self) -> bool:
    """Return True if task has reached a terminal state."""
    return self.status.is_terminal()

is_successful()

Return True if task completed successfully.

Source code in src/zap_ai/core/task.py
135
136
137
def is_successful(self) -> bool:
    """Return True if task completed successfully."""
    return self.status == TaskStatus.COMPLETED

get_last_message()

Return the most recent message in history, or None if empty.

Source code in src/zap_ai/core/task.py
139
140
141
142
143
def get_last_message(self) -> dict[str, Any] | None:
    """Return the most recent message in history, or None if empty."""
    if not self.history:
        return None
    return self.history[-1]

get_assistant_messages()

Return all assistant messages from history.

Source code in src/zap_ai/core/task.py
145
146
147
def get_assistant_messages(self) -> list[dict[str, Any]]:
    """Return all assistant messages from history."""
    return [msg for msg in self.history if msg.get("role") == "assistant"]

get_tool_calls_count()

Return total number of tool calls made during this task.

Source code in src/zap_ai/core/task.py
149
150
151
152
153
154
155
def get_tool_calls_count(self) -> int:
    """Return total number of tool calls made during this task."""
    count = 0
    for msg in self.history:
        if msg.get("role") == "assistant" and msg.get("tool_calls"):
            count += len(msg["tool_calls"])
    return count

get_text_content()

Extract all text content from conversation history.

Returns concatenated text from user and assistant messages, excluding tool calls and tool results.

Returns:

Type Description
str

Combined text content as a single string, with messages

str

separated by double newlines.

Source code in src/zap_ai/core/task.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_text_content(self) -> str:
    """
    Extract all text content from conversation history.

    Returns concatenated text from user and assistant messages,
    excluding tool calls and tool results.

    Returns:
        Combined text content as a single string, with messages
        separated by double newlines.
    """
    from zap_ai.conversation import get_text_content

    return get_text_content(self.history)

get_tool_calls()

Get all tool calls with their results.

Returns:

Type Description
list['ToolCallInfo']

List of ToolCallInfo objects containing tool name, arguments,

list['ToolCallInfo']

and results (if available).

Source code in src/zap_ai/core/task.py
172
173
174
175
176
177
178
179
180
181
182
def get_tool_calls(self) -> list["ToolCallInfo"]:
    """
    Get all tool calls with their results.

    Returns:
        List of ToolCallInfo objects containing tool name, arguments,
        and results (if available).
    """
    from zap_ai.conversation import get_tool_calls

    return get_tool_calls(self.history)

get_turns()

Get all conversation turns.

A turn is defined as a user message (or system prompt for turn 0), followed by all assistant responses and tool interactions until the next user message.

Returns:

Type Description
list['ConversationTurn']

List of ConversationTurn objects, one per turn.

Source code in src/zap_ai/core/task.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def get_turns(self) -> list["ConversationTurn"]:
    """
    Get all conversation turns.

    A turn is defined as a user message (or system prompt for turn 0),
    followed by all assistant responses and tool interactions until
    the next user message.

    Returns:
        List of ConversationTurn objects, one per turn.
    """
    from zap_ai.conversation import get_turns

    return get_turns(self.history)

get_turn(turn_num)

Get messages for a specific conversation turn.

Parameters:

Name Type Description Default
turn_num int

Turn number (0-indexed). Turn 0 may contain system prompt.

required

Returns:

Type Description
'ConversationTurn | None'

ConversationTurn with the turn's messages, or None if turn doesn't exist.

Source code in src/zap_ai/core/task.py
199
200
201
202
203
204
205
206
207
208
209
210
211
def get_turn(self, turn_num: int) -> "ConversationTurn | None":
    """
    Get messages for a specific conversation turn.

    Args:
        turn_num: Turn number (0-indexed). Turn 0 may contain system prompt.

    Returns:
        ConversationTurn with the turn's messages, or None if turn doesn't exist.
    """
    from zap_ai.conversation import get_turn

    return get_turn(self.history, turn_num)

turn_count()

Return the number of conversation turns.

Source code in src/zap_ai/core/task.py
213
214
215
216
217
def turn_count(self) -> int:
    """Return the number of conversation turns."""
    from zap_ai.conversation import turn_count

    return turn_count(self.history)

get_sub_tasks() async

Fetch full Task objects for all sub-tasks.

This method requires the Task to have been created via zap.get_task(), which injects the necessary callback for fetching sub-task data.

Returns:

Type Description
list['Task']

List of Task objects for each sub-task spawned by this task.

Raises:

Type Description
RuntimeError

If Task was not created via Zap.get_task().

Example
task = await zap.get_task(task_id)
sub_tasks = await task.get_sub_tasks()
for sub in sub_tasks:
    print(f"Sub-task {sub.id}: {sub.status}")
Source code in src/zap_ai/core/task.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def get_sub_tasks(self) -> list["Task"]:
    """
    Fetch full Task objects for all sub-tasks.

    This method requires the Task to have been created via `zap.get_task()`,
    which injects the necessary callback for fetching sub-task data.

    Returns:
        List of Task objects for each sub-task spawned by this task.

    Raises:
        RuntimeError: If Task was not created via Zap.get_task().

    Example:
        ```python
        task = await zap.get_task(task_id)
        sub_tasks = await task.get_sub_tasks()
        for sub in sub_tasks:
            print(f"Sub-task {sub.id}: {sub.status}")
        ```
    """
    if not self._task_fetcher:
        raise RuntimeError(
            "Cannot fetch sub-tasks: Task was not created via Zap.get_task(). "
            "Use zap.get_task(task_id) to get a Task with sub-task access."
        )

    if not self.sub_tasks:
        return []

    # Fetch all sub-tasks concurrently
    tasks = [self._task_fetcher(sub_id) for sub_id in self.sub_tasks]
    return list(await asyncio.gather(*tasks))

get_pending_approvals() async

Get all pending approval requests for this task.

This method requires the Task to have been created via zap.get_task(), which injects the necessary callback for fetching approval data.

Returns:

Type Description
list[dict[str, Any]]

List of dicts with approval request data:

list[dict[str, Any]]
  • id: Unique approval ID
list[dict[str, Any]]
  • tool_name: Name of the tool requiring approval
list[dict[str, Any]]
  • tool_args: Arguments passed to the tool
list[dict[str, Any]]
  • requested_at: ISO timestamp of when approval was requested
list[dict[str, Any]]
  • timeout_at: ISO timestamp of when approval will timeout
list[dict[str, Any]]
  • context: Additional context (agent_name, workflow_id)

Raises:

Type Description
RuntimeError

If Task was not created via Zap.get_task().

Example
task = await zap.get_task(task_id)
pending = await task.get_pending_approvals()
for req in pending:
    print(f"Tool: {req['tool_name']}, Args: {req['tool_args']}")
    await task.approve(req['id'])
Source code in src/zap_ai/core/task.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
async def get_pending_approvals(self) -> list[dict[str, Any]]:
    """
    Get all pending approval requests for this task.

    This method requires the Task to have been created via `zap.get_task()`,
    which injects the necessary callback for fetching approval data.

    Returns:
        List of dicts with approval request data:
        - id: Unique approval ID
        - tool_name: Name of the tool requiring approval
        - tool_args: Arguments passed to the tool
        - requested_at: ISO timestamp of when approval was requested
        - timeout_at: ISO timestamp of when approval will timeout
        - context: Additional context (agent_name, workflow_id)

    Raises:
        RuntimeError: If Task was not created via Zap.get_task().

    Example:
        ```python
        task = await zap.get_task(task_id)
        pending = await task.get_pending_approvals()
        for req in pending:
            print(f"Tool: {req['tool_name']}, Args: {req['tool_args']}")
            await task.approve(req['id'])
        ```
    """
    if not self._approval_fetcher:
        raise RuntimeError(
            "Cannot fetch approvals: Task was not created via Zap.get_task(). "
            "Use zap.get_task(task_id) to get a Task with approval access."
        )

    requests = await self._approval_fetcher()
    return [r.to_dict() for r in requests]

approve(approval_id) async

Approve a pending tool execution.

Parameters:

Name Type Description Default
approval_id str

ID of the approval request to approve.

required

Raises:

Type Description
RuntimeError

If Task was not created via Zap.get_task().

Example
task = await zap.get_task(task_id)
pending = await task.get_pending_approvals()
await task.approve(pending[0]['id'])
Source code in src/zap_ai/core/task.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def approve(self, approval_id: str) -> None:
    """
    Approve a pending tool execution.

    Args:
        approval_id: ID of the approval request to approve.

    Raises:
        RuntimeError: If Task was not created via Zap.get_task().

    Example:
        ```python
        task = await zap.get_task(task_id)
        pending = await task.get_pending_approvals()
        await task.approve(pending[0]['id'])
        ```
    """
    if not self._approval_sender:
        raise RuntimeError(
            "Cannot send approval: Task was not created via Zap.get_task(). "
            "Use zap.get_task(task_id) to get a Task with approval access."
        )

    await self._approval_sender(approval_id, True, None)

reject(approval_id, reason=None) async

Reject a pending tool execution.

Parameters:

Name Type Description Default
approval_id str

ID of the approval request to reject.

required
reason str | None

Optional reason for rejection.

None

Raises:

Type Description
RuntimeError

If Task was not created via Zap.get_task().

Example
task = await zap.get_task(task_id)
pending = await task.get_pending_approvals()
await task.reject(pending[0]['id'], reason="Amount exceeds limit")
Source code in src/zap_ai/core/task.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
async def reject(self, approval_id: str, reason: str | None = None) -> None:
    """
    Reject a pending tool execution.

    Args:
        approval_id: ID of the approval request to reject.
        reason: Optional reason for rejection.

    Raises:
        RuntimeError: If Task was not created via Zap.get_task().

    Example:
        ```python
        task = await zap.get_task(task_id)
        pending = await task.get_pending_approvals()
        await task.reject(pending[0]['id'], reason="Amount exceeds limit")
        ```
    """
    if not self._approval_sender:
        raise RuntimeError(
            "Cannot send rejection: Task was not created via Zap.get_task(). "
            "Use zap.get_task(task_id) to get a Task with approval access."
        )

    await self._approval_sender(approval_id, False, reason)

TaskStatus

Enum for task execution states.

zap_ai.TaskStatus

Bases: str, Enum

Status of a task execution.

The lifecycle of a task typically follows: PENDING -> THINKING -> (AWAITING_TOOL <-> THINKING)* -> COMPLETED

With approval rules enabled, the lifecycle may include: THINKING -> AWAITING_APPROVAL -> (approved) -> AWAITING_TOOL

At any point, a task can transition to FAILED if an unrecoverable error occurs.

Attributes:

Name Type Description
PENDING

Task has been created but workflow hasn't started yet.

THINKING

Agent is thinking (LLM inference in progress).

AWAITING_TOOL

Waiting for one or more tool executions to complete. Includes sub-agent delegation via message_agent tool.

AWAITING_APPROVAL

Tool call requires human approval before execution. Use Task.get_pending_approvals() to see pending requests, and Task.approve() or Task.reject() to respond.

COMPLETED

Task finished successfully. Result is available.

FAILED

Task failed with an error. Error details available in Task.error field.

Source code in src/zap_ai/core/task.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class TaskStatus(str, Enum):
    """
    Status of a task execution.

    The lifecycle of a task typically follows:
    PENDING -> THINKING -> (AWAITING_TOOL <-> THINKING)* -> COMPLETED

    With approval rules enabled, the lifecycle may include:
    THINKING -> AWAITING_APPROVAL -> (approved) -> AWAITING_TOOL

    At any point, a task can transition to FAILED if an unrecoverable
    error occurs.

    Attributes:
        PENDING: Task has been created but workflow hasn't started yet.
        THINKING: Agent is thinking (LLM inference in progress).
        AWAITING_TOOL: Waiting for one or more tool executions to complete.
            Includes sub-agent delegation via message_agent tool.
        AWAITING_APPROVAL: Tool call requires human approval before execution.
            Use Task.get_pending_approvals() to see pending requests, and
            Task.approve() or Task.reject() to respond.
        COMPLETED: Task finished successfully. Result is available.
        FAILED: Task failed with an error. Error details available in
            Task.error field.
    """

    PENDING = "pending"
    THINKING = "thinking"
    AWAITING_TOOL = "awaiting_tool"
    AWAITING_APPROVAL = "awaiting_approval"
    COMPLETED = "completed"
    FAILED = "failed"

    def is_terminal(self) -> bool:
        """Return True if this is a terminal (final) status."""
        return self in (TaskStatus.COMPLETED, TaskStatus.FAILED)

    def is_active(self) -> bool:
        """Return True if the task is actively being processed."""
        return self in (TaskStatus.THINKING, TaskStatus.AWAITING_TOOL, TaskStatus.AWAITING_APPROVAL)

is_terminal()

Return True if this is a terminal (final) status.

Source code in src/zap_ai/core/task.py
50
51
52
def is_terminal(self) -> bool:
    """Return True if this is a terminal (final) status."""
    return self in (TaskStatus.COMPLETED, TaskStatus.FAILED)

is_active()

Return True if the task is actively being processed.

Source code in src/zap_ai/core/task.py
54
55
56
def is_active(self) -> bool:
    """Return True if the task is actively being processed."""
    return self in (TaskStatus.THINKING, TaskStatus.AWAITING_TOOL, TaskStatus.AWAITING_APPROVAL)