Worker API¶
Utilities for running the Temporal worker that executes agent workflows.
run_worker¶
zap_ai.worker.run_worker(temporal_address='localhost:7233', task_queue='zap-agents', tool_registry=None, tracing_provider=None)
async
¶
Run a Temporal worker for Zap agents.
This is a blocking function that runs the worker until interrupted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
temporal_address
|
str
|
Temporal server address. |
'localhost:7233'
|
task_queue
|
str
|
Task queue name to listen on. |
'zap-agents'
|
tool_registry
|
Any | None
|
Optional ToolRegistry for activities. If None, stub activities will work but real tool execution will fail. |
None
|
tracing_provider
|
TracingProvider | None
|
Optional TracingProvider for observability. If None, tracing is disabled (NoOpTracingProvider used). |
None
|
Example
import asyncio
from zap_ai.worker import run_worker
# Run with defaults (stubs only)
asyncio.run(run_worker())
# Run with custom configuration
asyncio.run(run_worker(
temporal_address="temporal.example.com:7233",
task_queue="my-agents",
))
Source code in src/zap_ai/worker/worker.py
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | |
create_worker¶
zap_ai.worker.create_worker(client, task_queue, tool_registry=None, tracing_provider=None)
async
¶
Create a Temporal worker for Zap agents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
Client
|
Temporal client instance. |
required |
task_queue
|
str
|
Task queue name to listen on. |
required |
tool_registry
|
Any | None
|
Optional ToolRegistry for activities. If None, stub activities will work but real tool execution will fail. |
None
|
tracing_provider
|
TracingProvider | None
|
Optional TracingProvider for observability. If None, tracing is disabled (NoOpTracingProvider used). |
None
|
Returns:
| Type | Description |
|---|---|
Worker
|
Configured Worker instance (not started). |
Example
from temporalio.client import Client
from zap_ai.worker import create_worker
client = await Client.connect("localhost:7233")
worker = await create_worker(client, "zap-agents")
async with worker:
await asyncio.Event().wait()
Source code in src/zap_ai/worker/worker.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | |
Usage¶
Simple Worker¶
For most use cases, use run_worker():
import asyncio
from zap_ai.worker import run_worker
asyncio.run(run_worker())
Run it:
python worker.py
Custom Worker¶
For more control, use create_worker():
import asyncio
from temporalio.client import Client
from zap_ai.worker import create_worker
async def main():
# Connect to Temporal
client = await Client.connect("localhost:7233")
# Create worker with custom task queue
worker = await create_worker(
client,
task_queue="my-custom-queue",
)
# Run the worker
await worker.run()
asyncio.run(main())
Inline Worker (for examples)¶
Run the worker in the same process as your application:
import asyncio
from temporalio.client import Client
from zap_ai import Zap, ZapAgent
from zap_ai.worker import create_worker
async def main():
agent = ZapAgent(name="MyAgent", prompt="...")
zap = Zap(agents=[agent])
# Connect to Temporal
temporal_client = await Client.connect("localhost:7233")
await zap.start()
# Create worker with tool registry from Zap
worker = await create_worker(
temporal_client,
task_queue=zap.task_queue,
tool_registry=zap._tool_registry,
)
# Run worker in background
worker_task = asyncio.create_task(worker.run())
try:
# Execute tasks
task = await zap.execute_task(
agent_name="MyAgent",
task="Hello!",
)
# ... handle task ...
finally:
worker_task.cancel()
await zap.stop()
asyncio.run(main())
Note
Running the worker inline is useful for examples and testing but not recommended for production. In production, run workers as separate processes for better resource management and fault isolation.