Skip to content

Worker API

Utilities for running the Temporal worker that executes agent workflows.

run_worker

zap_ai.worker.run_worker(temporal_address='localhost:7233', task_queue='zap-agents', tool_registry=None, tracing_provider=None) async

Run a Temporal worker for Zap agents.

This is a blocking function that runs the worker until interrupted.

Parameters:

Name Type Description Default
temporal_address str

Temporal server address.

'localhost:7233'
task_queue str

Task queue name to listen on.

'zap-agents'
tool_registry Any | None

Optional ToolRegistry for activities. If None, stub activities will work but real tool execution will fail.

None
tracing_provider TracingProvider | None

Optional TracingProvider for observability. If None, tracing is disabled (NoOpTracingProvider used).

None
Example
import asyncio
from zap_ai.worker import run_worker

# Run with defaults (stubs only)
asyncio.run(run_worker())

# Run with custom configuration
asyncio.run(run_worker(
    temporal_address="temporal.example.com:7233",
    task_queue="my-agents",
))
Source code in src/zap_ai/worker/worker.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def run_worker(
    temporal_address: str = "localhost:7233",
    task_queue: str = "zap-agents",
    tool_registry: Any | None = None,
    tracing_provider: TracingProvider | None = None,
) -> None:
    """
    Run a Temporal worker for Zap agents.

    This is a blocking function that runs the worker until interrupted.

    Args:
        temporal_address: Temporal server address.
        task_queue: Task queue name to listen on.
        tool_registry: Optional ToolRegistry for activities.
            If None, stub activities will work but real tool
            execution will fail.
        tracing_provider: Optional TracingProvider for observability.
            If None, tracing is disabled (NoOpTracingProvider used).

    Example:
        ```python
        import asyncio
        from zap_ai.worker import run_worker

        # Run with defaults (stubs only)
        asyncio.run(run_worker())

        # Run with custom configuration
        asyncio.run(run_worker(
            temporal_address="temporal.example.com:7233",
            task_queue="my-agents",
        ))
        ```
    """
    # Connect to Temporal
    client = await Client.connect(temporal_address)

    # Create worker
    worker = await create_worker(client, task_queue, tool_registry, tracing_provider)

    print(f"Starting Zap worker on task queue '{task_queue}'...")
    print(f"Connected to Temporal at {temporal_address}")

    async with worker:
        print("Worker running. Press Ctrl+C to stop.")
        # Run forever until interrupted
        await asyncio.Event().wait()

create_worker

zap_ai.worker.create_worker(client, task_queue, tool_registry=None, tracing_provider=None) async

Create a Temporal worker for Zap agents.

Parameters:

Name Type Description Default
client Client

Temporal client instance.

required
task_queue str

Task queue name to listen on.

required
tool_registry Any | None

Optional ToolRegistry for activities. If None, stub activities will work but real tool execution will fail.

None
tracing_provider TracingProvider | None

Optional TracingProvider for observability. If None, tracing is disabled (NoOpTracingProvider used).

None

Returns:

Type Description
Worker

Configured Worker instance (not started).

Example
from temporalio.client import Client
from zap_ai.worker import create_worker

client = await Client.connect("localhost:7233")
worker = await create_worker(client, "zap-agents")

async with worker:
    await asyncio.Event().wait()
Source code in src/zap_ai/worker/worker.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def create_worker(
    client: Client,
    task_queue: str,
    tool_registry: Any | None = None,
    tracing_provider: TracingProvider | None = None,
) -> Worker:
    """
    Create a Temporal worker for Zap agents.

    Args:
        client: Temporal client instance.
        task_queue: Task queue name to listen on.
        tool_registry: Optional ToolRegistry for activities.
            If None, stub activities will work but real tool
            execution will fail.
        tracing_provider: Optional TracingProvider for observability.
            If None, tracing is disabled (NoOpTracingProvider used).

    Returns:
        Configured Worker instance (not started).

    Example:
        ```python
        from temporalio.client import Client
        from zap_ai.worker import create_worker

        client = await Client.connect("localhost:7233")
        worker = await create_worker(client, "zap-agents")

        async with worker:
            await asyncio.Event().wait()
        ```
    """
    # Set global registry for activities (None is OK for stubs)
    set_tool_registry(tool_registry)

    # Set global tracing provider for activities
    if tracing_provider:
        set_tracing_provider(tracing_provider)

    return Worker(
        client,
        task_queue=task_queue,
        workflows=[AgentWorkflow],
        activities=[
            inference_activity,
            tool_execution_activity,
            get_agent_config_activity,
        ],
        workflow_runner=create_production_runner(),
    )

Usage

Simple Worker

For most use cases, use run_worker():

worker.py
import asyncio
from zap_ai.worker import run_worker

asyncio.run(run_worker())

Run it:

python worker.py

Custom Worker

For more control, use create_worker():

import asyncio
from temporalio.client import Client
from zap_ai.worker import create_worker

async def main():
    # Connect to Temporal
    client = await Client.connect("localhost:7233")

    # Create worker with custom task queue
    worker = await create_worker(
        client,
        task_queue="my-custom-queue",
    )

    # Run the worker
    await worker.run()

asyncio.run(main())

Inline Worker (for examples)

Run the worker in the same process as your application:

import asyncio
from temporalio.client import Client
from zap_ai import Zap, ZapAgent
from zap_ai.worker import create_worker

async def main():
    agent = ZapAgent(name="MyAgent", prompt="...")
    zap = Zap(agents=[agent])

    # Connect to Temporal
    temporal_client = await Client.connect("localhost:7233")

    await zap.start()

    # Create worker with tool registry from Zap
    worker = await create_worker(
        temporal_client,
        task_queue=zap.task_queue,
        tool_registry=zap._tool_registry,
    )

    # Run worker in background
    worker_task = asyncio.create_task(worker.run())

    try:
        # Execute tasks
        task = await zap.execute_task(
            agent_name="MyAgent",
            task="Hello!",
        )
        # ... handle task ...
    finally:
        worker_task.cancel()
        await zap.stop()

asyncio.run(main())

Note

Running the worker inline is useful for examples and testing but not recommended for production. In production, run workers as separate processes for better resource management and fault isolation.