Skip to content

Tracing API

The tracing module provides observability for agent execution.

Registry Functions

zap_ai.tracing.set_tracing_provider(provider)

Set the global tracing provider for activities.

Called during worker initialization to provide activities access to the tracing provider.

Parameters:

Name Type Description Default
provider TracingProvider

TracingProvider instance.

required
Source code in src/zap_ai/tracing/__init__.py
42
43
44
45
46
47
48
49
50
51
52
53
def set_tracing_provider(provider: TracingProvider) -> None:
    """
    Set the global tracing provider for activities.

    Called during worker initialization to provide activities
    access to the tracing provider.

    Args:
        provider: TracingProvider instance.
    """
    global _tracing_provider
    _tracing_provider = provider

zap_ai.tracing.get_tracing_provider()

Get the global tracing provider.

Returns NoOpTracingProvider if not configured.

Returns:

Type Description
TracingProvider

TracingProvider instance.

Source code in src/zap_ai/tracing/__init__.py
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_tracing_provider() -> TracingProvider:
    """
    Get the global tracing provider.

    Returns NoOpTracingProvider if not configured.

    Returns:
        TracingProvider instance.
    """
    global _tracing_provider
    if _tracing_provider is None:
        _tracing_provider = NoOpTracingProvider()
    return _tracing_provider

zap_ai.tracing.reset_tracing_provider()

Reset the global tracing provider to None.

Primarily used for testing to ensure clean state between tests.

Source code in src/zap_ai/tracing/__init__.py
71
72
73
74
75
76
77
78
def reset_tracing_provider() -> None:
    """
    Reset the global tracing provider to None.

    Primarily used for testing to ensure clean state between tests.
    """
    global _tracing_provider
    _tracing_provider = None

Protocol

zap_ai.tracing.protocol.TracingProvider

Bases: Protocol

Protocol for tracing backends.

Implementations must be async-safe and handle context propagation across Temporal boundaries.

The provider is responsible for: - Creating traces (root spans) for tasks - Creating child observations for iterations, tool calls, etc. - Creating generation observations for LLM calls (with token usage) - Tracking errors and events - Flushing and cleanup

Source code in src/zap_ai/tracing/protocol.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@runtime_checkable
class TracingProvider(Protocol):
    """
    Protocol for tracing backends.

    Implementations must be async-safe and handle context propagation
    across Temporal boundaries.

    The provider is responsible for:
    - Creating traces (root spans) for tasks
    - Creating child observations for iterations, tool calls, etc.
    - Creating generation observations for LLM calls (with token usage)
    - Tracking errors and events
    - Flushing and cleanup
    """

    @asynccontextmanager
    async def start_trace(
        self,
        name: str,
        session_id: str | None = None,
        user_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        tags: list[str] | None = None,
    ) -> AsyncIterator[TraceContext]:
        """
        Start a new trace (root observation).

        Called at task start. Returns context for propagation.

        Args:
            name: Name of the trace (e.g., "task-AgentName-taskId").
            session_id: Optional session for grouping traces.
            user_id: Optional user identifier.
            metadata: Additional metadata to attach.
            tags: Optional tags for filtering.

        Yields:
            TraceContext for propagation to activities and child workflows.
        """
        ...

    @asynccontextmanager
    async def start_observation(
        self,
        name: str,
        observation_type: ObservationType,
        parent_context: TraceContext,
        metadata: dict[str, Any] | None = None,
        input_data: Any | None = None,
    ) -> AsyncIterator[TraceContext]:
        """
        Start a child observation within an existing trace.

        Used for iterations, tool calls, sub-agent delegation, etc.

        Args:
            name: Name of the observation (e.g., "iteration-0", "tool-search").
            observation_type: Type of observation for categorization.
            parent_context: Context from parent observation.
            metadata: Additional metadata to attach.
            input_data: Input data for the observation.

        Yields:
            TraceContext for nested observations.
        """
        ...

    async def start_generation(
        self,
        name: str,
        parent_context: TraceContext,
        model: str,
        input_messages: list[dict[str, Any]],
        metadata: dict[str, Any] | None = None,
    ) -> TraceContext:
        """
        Start an LLM generation observation.

        For tracking LLM calls with model info and usage.
        Must be explicitly ended with end_generation().

        Args:
            name: Name of the generation (e.g., "inference-AgentName").
            parent_context: Context from parent observation.
            model: LLM model identifier.
            input_messages: Input messages sent to the LLM.
            metadata: Additional metadata.

        Returns:
            TraceContext that must be passed to end_generation().
        """
        ...

    async def end_generation(
        self,
        context: TraceContext,
        output: dict[str, Any],
        usage: dict[str, int] | None = None,
    ) -> None:
        """
        End an LLM generation observation with output and usage.

        Args:
            context: Context from start_generation().
            output: LLM output (content, tool_calls, etc.).
            usage: Token usage dict (prompt_tokens, completion_tokens, total_tokens).
        """
        ...

    async def add_event(
        self,
        context: TraceContext,
        name: str,
        attributes: dict[str, Any] | None = None,
    ) -> None:
        """
        Add an event to the current observation.

        Used for logging significant occurrences (status changes, etc.).

        Args:
            context: Current trace context.
            name: Event name.
            attributes: Event attributes.
        """
        ...

    async def set_error(
        self,
        context: TraceContext,
        error: Exception,
    ) -> None:
        """
        Mark the observation as errored.

        Args:
            context: Current trace context.
            error: The exception that occurred.
        """
        ...

    async def flush(self) -> None:
        """Flush any pending trace data."""
        ...

    async def shutdown(self) -> None:
        """Cleanup tracing resources."""
        ...

start_trace(name, session_id=None, user_id=None, metadata=None, tags=None) async

Start a new trace (root observation).

Called at task start. Returns context for propagation.

Parameters:

Name Type Description Default
name str

Name of the trace (e.g., "task-AgentName-taskId").

required
session_id str | None

Optional session for grouping traces.

None
user_id str | None

Optional user identifier.

None
metadata dict[str, Any] | None

Additional metadata to attach.

None
tags list[str] | None

Optional tags for filtering.

None

Yields:

Type Description
AsyncIterator[TraceContext]

TraceContext for propagation to activities and child workflows.

Source code in src/zap_ai/tracing/protocol.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@asynccontextmanager
async def start_trace(
    self,
    name: str,
    session_id: str | None = None,
    user_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    tags: list[str] | None = None,
) -> AsyncIterator[TraceContext]:
    """
    Start a new trace (root observation).

    Called at task start. Returns context for propagation.

    Args:
        name: Name of the trace (e.g., "task-AgentName-taskId").
        session_id: Optional session for grouping traces.
        user_id: Optional user identifier.
        metadata: Additional metadata to attach.
        tags: Optional tags for filtering.

    Yields:
        TraceContext for propagation to activities and child workflows.
    """
    ...

start_observation(name, observation_type, parent_context, metadata=None, input_data=None) async

Start a child observation within an existing trace.

Used for iterations, tool calls, sub-agent delegation, etc.

Parameters:

Name Type Description Default
name str

Name of the observation (e.g., "iteration-0", "tool-search").

required
observation_type ObservationType

Type of observation for categorization.

required
parent_context TraceContext

Context from parent observation.

required
metadata dict[str, Any] | None

Additional metadata to attach.

None
input_data Any | None

Input data for the observation.

None

Yields:

Type Description
AsyncIterator[TraceContext]

TraceContext for nested observations.

Source code in src/zap_ai/tracing/protocol.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@asynccontextmanager
async def start_observation(
    self,
    name: str,
    observation_type: ObservationType,
    parent_context: TraceContext,
    metadata: dict[str, Any] | None = None,
    input_data: Any | None = None,
) -> AsyncIterator[TraceContext]:
    """
    Start a child observation within an existing trace.

    Used for iterations, tool calls, sub-agent delegation, etc.

    Args:
        name: Name of the observation (e.g., "iteration-0", "tool-search").
        observation_type: Type of observation for categorization.
        parent_context: Context from parent observation.
        metadata: Additional metadata to attach.
        input_data: Input data for the observation.

    Yields:
        TraceContext for nested observations.
    """
    ...

start_generation(name, parent_context, model, input_messages, metadata=None) async

Start an LLM generation observation.

For tracking LLM calls with model info and usage. Must be explicitly ended with end_generation().

Parameters:

Name Type Description Default
name str

Name of the generation (e.g., "inference-AgentName").

required
parent_context TraceContext

Context from parent observation.

required
model str

LLM model identifier.

required
input_messages list[dict[str, Any]]

Input messages sent to the LLM.

required
metadata dict[str, Any] | None

Additional metadata.

None

Returns:

Type Description
TraceContext

TraceContext that must be passed to end_generation().

Source code in src/zap_ai/tracing/protocol.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def start_generation(
    self,
    name: str,
    parent_context: TraceContext,
    model: str,
    input_messages: list[dict[str, Any]],
    metadata: dict[str, Any] | None = None,
) -> TraceContext:
    """
    Start an LLM generation observation.

    For tracking LLM calls with model info and usage.
    Must be explicitly ended with end_generation().

    Args:
        name: Name of the generation (e.g., "inference-AgentName").
        parent_context: Context from parent observation.
        model: LLM model identifier.
        input_messages: Input messages sent to the LLM.
        metadata: Additional metadata.

    Returns:
        TraceContext that must be passed to end_generation().
    """
    ...

end_generation(context, output, usage=None) async

End an LLM generation observation with output and usage.

Parameters:

Name Type Description Default
context TraceContext

Context from start_generation().

required
output dict[str, Any]

LLM output (content, tool_calls, etc.).

required
usage dict[str, int] | None

Token usage dict (prompt_tokens, completion_tokens, total_tokens).

None
Source code in src/zap_ai/tracing/protocol.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def end_generation(
    self,
    context: TraceContext,
    output: dict[str, Any],
    usage: dict[str, int] | None = None,
) -> None:
    """
    End an LLM generation observation with output and usage.

    Args:
        context: Context from start_generation().
        output: LLM output (content, tool_calls, etc.).
        usage: Token usage dict (prompt_tokens, completion_tokens, total_tokens).
    """
    ...

add_event(context, name, attributes=None) async

Add an event to the current observation.

Used for logging significant occurrences (status changes, etc.).

Parameters:

Name Type Description Default
context TraceContext

Current trace context.

required
name str

Event name.

required
attributes dict[str, Any] | None

Event attributes.

None
Source code in src/zap_ai/tracing/protocol.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def add_event(
    self,
    context: TraceContext,
    name: str,
    attributes: dict[str, Any] | None = None,
) -> None:
    """
    Add an event to the current observation.

    Used for logging significant occurrences (status changes, etc.).

    Args:
        context: Current trace context.
        name: Event name.
        attributes: Event attributes.
    """
    ...

set_error(context, error) async

Mark the observation as errored.

Parameters:

Name Type Description Default
context TraceContext

Current trace context.

required
error Exception

The exception that occurred.

required
Source code in src/zap_ai/tracing/protocol.py
194
195
196
197
198
199
200
201
202
203
204
205
206
async def set_error(
    self,
    context: TraceContext,
    error: Exception,
) -> None:
    """
    Mark the observation as errored.

    Args:
        context: Current trace context.
        error: The exception that occurred.
    """
    ...

flush() async

Flush any pending trace data.

Source code in src/zap_ai/tracing/protocol.py
208
209
210
async def flush(self) -> None:
    """Flush any pending trace data."""
    ...

shutdown() async

Cleanup tracing resources.

Source code in src/zap_ai/tracing/protocol.py
212
213
214
async def shutdown(self) -> None:
    """Cleanup tracing resources."""
    ...

Abstract Base Class

zap_ai.tracing.base.BaseTracingProvider

Bases: ABC

Abstract base class for tracing providers.

Subclass this to implement custom tracing backends. You must implement: - _start_trace_impl(): Core trace creation logic - _start_observation_impl(): Core observation creation logic - start_generation(): LLM generation tracking - end_generation(): Complete LLM generation

Optional overrides (default to no-op): - add_event(): Add events to observations - set_error(): Mark observations as errored - flush(): Flush pending data - shutdown(): Cleanup resources

Example

class MyTracingProvider(BaseTracingProvider): async def _start_trace_impl(self, name, **kwargs): ctx = self._create_context() return ctx, None # No cleanup needed

async def _start_observation_impl(
    self, name, observation_type, parent_context, **kwargs
):
    ctx = self._create_child_context(parent_context)
    return ctx, None

async def start_generation(
    self, name, parent_context, model, input_messages, **kwargs
):
    return self._create_child_context(parent_context)

async def end_generation(self, context, output, usage=None):
    pass  # No-op for simple implementation
Source code in src/zap_ai/tracing/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
class BaseTracingProvider(ABC):
    """
    Abstract base class for tracing providers.

    Subclass this to implement custom tracing backends. You must implement:
    - _start_trace_impl(): Core trace creation logic
    - _start_observation_impl(): Core observation creation logic
    - start_generation(): LLM generation tracking
    - end_generation(): Complete LLM generation

    Optional overrides (default to no-op):
    - add_event(): Add events to observations
    - set_error(): Mark observations as errored
    - flush(): Flush pending data
    - shutdown(): Cleanup resources

    Example:
        class MyTracingProvider(BaseTracingProvider):
            async def _start_trace_impl(self, name, **kwargs):
                ctx = self._create_context()
                return ctx, None  # No cleanup needed

            async def _start_observation_impl(
                self, name, observation_type, parent_context, **kwargs
            ):
                ctx = self._create_child_context(parent_context)
                return ctx, None

            async def start_generation(
                self, name, parent_context, model, input_messages, **kwargs
            ):
                return self._create_child_context(parent_context)

            async def end_generation(self, context, output, usage=None):
                pass  # No-op for simple implementation
    """

    # --- Utility Methods ---

    def _generate_trace_id(self) -> str:
        """Generate a unique trace ID."""
        return uuid4().hex

    def _generate_span_id(self, w3c_format: bool = False) -> str:
        """
        Generate a unique span ID.

        Args:
            w3c_format: If True, return 16 hex chars (W3C trace context format).
                       If False, return full 32 hex chars.
        """
        span_id = uuid4().hex
        return span_id[:16] if w3c_format else span_id

    def _create_context(
        self,
        trace_id: str | None = None,
        span_id: str | None = None,
        provider_data: dict[str, Any] | None = None,
    ) -> TraceContext:
        """
        Create a new TraceContext with generated IDs if not provided.

        Args:
            trace_id: Optional trace ID (generated if None).
            span_id: Optional span ID (generated if None).
            provider_data: Optional provider-specific data.

        Returns:
            New TraceContext instance.
        """
        return TraceContext(
            trace_id=trace_id or self._generate_trace_id(),
            span_id=span_id or self._generate_span_id(),
            provider_data=provider_data,
        )

    def _create_child_context(
        self,
        parent: TraceContext,
        span_id: str | None = None,
        provider_data: dict[str, Any] | None = None,
    ) -> TraceContext:
        """
        Create a child context preserving the parent's trace_id.

        Args:
            parent: Parent trace context.
            span_id: Optional span ID (generated if None).
            provider_data: Optional provider-specific data.

        Returns:
            New TraceContext with same trace_id as parent.
        """
        return TraceContext(
            trace_id=parent.trace_id,
            span_id=span_id or self._generate_span_id(),
            provider_data=provider_data,
        )

    # --- Abstract Methods (Template Pattern) ---

    @abstractmethod
    async def _start_trace_impl(
        self,
        name: str,
        session_id: str | None = None,
        user_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        tags: list[str] | None = None,
    ) -> tuple[TraceContext, Any | None]:
        """
        Implementation of trace creation.

        Args:
            name: Name of the trace.
            session_id: Optional session for grouping traces.
            user_id: Optional user identifier.
            metadata: Additional metadata to attach.
            tags: Optional tags for filtering.

        Returns:
            Tuple of (TraceContext, cleanup_data).
            cleanup_data is passed to _end_trace_cleanup if provided.
        """
        ...

    async def _end_trace_cleanup(self, context: TraceContext, cleanup_data: Any) -> None:
        """
        Optional cleanup when trace context manager exits.

        Override this to perform cleanup operations when a trace ends.
        Default implementation does nothing.

        Args:
            context: The trace context that is ending.
            cleanup_data: Data returned from _start_trace_impl.
        """
        pass

    @abstractmethod
    async def _start_observation_impl(
        self,
        name: str,
        observation_type: ObservationType,
        parent_context: TraceContext,
        metadata: dict[str, Any] | None = None,
        input_data: Any | None = None,
    ) -> tuple[TraceContext, Any | None]:
        """
        Implementation of observation creation.

        Args:
            name: Name of the observation.
            observation_type: Type of observation for categorization.
            parent_context: Context from parent observation.
            metadata: Additional metadata to attach.
            input_data: Input data for the observation.

        Returns:
            Tuple of (TraceContext, cleanup_data).
            cleanup_data is passed to _end_observation_cleanup if provided.
        """
        ...

    async def _end_observation_cleanup(self, context: TraceContext, cleanup_data: Any) -> None:
        """
        Optional cleanup when observation context manager exits.

        Override this to perform cleanup operations when an observation ends.
        Default implementation does nothing.

        Args:
            context: The observation context that is ending.
            cleanup_data: Data returned from _start_observation_impl.
        """
        pass

    # --- Concrete Context Manager Wrappers ---

    @asynccontextmanager
    async def start_trace(
        self,
        name: str,
        session_id: str | None = None,
        user_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        tags: list[str] | None = None,
    ) -> AsyncIterator[TraceContext]:
        """
        Start a new trace (root observation).

        Called at task start. Returns context for propagation.

        Args:
            name: Name of the trace.
            session_id: Optional session for grouping traces.
            user_id: Optional user identifier.
            metadata: Additional metadata to attach.
            tags: Optional tags for filtering.

        Yields:
            TraceContext for propagation to activities and child workflows.
        """
        context, cleanup_data = await self._start_trace_impl(
            name=name,
            session_id=session_id,
            user_id=user_id,
            metadata=metadata,
            tags=tags,
        )
        try:
            yield context
        finally:
            await self._end_trace_cleanup(context, cleanup_data)

    @asynccontextmanager
    async def start_observation(
        self,
        name: str,
        observation_type: ObservationType,
        parent_context: TraceContext,
        metadata: dict[str, Any] | None = None,
        input_data: Any | None = None,
    ) -> AsyncIterator[TraceContext]:
        """
        Start a child observation within an existing trace.

        Args:
            name: Name of the observation.
            observation_type: Type of observation for categorization.
            parent_context: Context from parent observation.
            metadata: Additional metadata to attach.
            input_data: Input data for the observation.

        Yields:
            TraceContext for nested observations.
        """
        context, cleanup_data = await self._start_observation_impl(
            name=name,
            observation_type=observation_type,
            parent_context=parent_context,
            metadata=metadata,
            input_data=input_data,
        )
        try:
            yield context
        finally:
            await self._end_observation_cleanup(context, cleanup_data)

    # --- Abstract Methods (Must Implement) ---

    @abstractmethod
    async def start_generation(
        self,
        name: str,
        parent_context: TraceContext,
        model: str,
        input_messages: list[dict[str, Any]],
        metadata: dict[str, Any] | None = None,
    ) -> TraceContext:
        """
        Start an LLM generation observation.

        Args:
            name: Name of the generation.
            parent_context: Context from parent observation.
            model: LLM model identifier.
            input_messages: Input messages sent to the LLM.
            metadata: Additional metadata.

        Returns:
            TraceContext that must be passed to end_generation().
        """
        ...

    @abstractmethod
    async def end_generation(
        self,
        context: TraceContext,
        output: dict[str, Any],
        usage: dict[str, int] | None = None,
    ) -> None:
        """
        End an LLM generation observation with output and usage.

        Args:
            context: Context from start_generation().
            output: LLM output (content, tool_calls, etc.).
            usage: Token usage dict (prompt_tokens, completion_tokens, total_tokens).
        """
        ...

    # --- Optional Methods (Default No-Op) ---

    async def add_event(
        self,
        context: TraceContext,
        name: str,
        attributes: dict[str, Any] | None = None,
    ) -> None:
        """
        Add an event to the current observation.

        Default implementation does nothing. Override to implement.

        Args:
            context: Current trace context.
            name: Event name.
            attributes: Event attributes.
        """
        pass

    async def set_error(
        self,
        context: TraceContext,
        error: Exception,
    ) -> None:
        """
        Mark the observation as errored.

        Default implementation does nothing. Override to implement.

        Args:
            context: Current trace context.
            error: The exception that occurred.
        """
        pass

    async def flush(self) -> None:
        """
        Flush any pending trace data.

        Default implementation does nothing. Override if buffering is used.
        """
        pass

    async def shutdown(self) -> None:
        """
        Cleanup tracing resources.

        Default implementation does nothing. Override if cleanup is needed.
        """
        pass

start_trace(name, session_id=None, user_id=None, metadata=None, tags=None) async

Start a new trace (root observation).

Called at task start. Returns context for propagation.

Parameters:

Name Type Description Default
name str

Name of the trace.

required
session_id str | None

Optional session for grouping traces.

None
user_id str | None

Optional user identifier.

None
metadata dict[str, Any] | None

Additional metadata to attach.

None
tags list[str] | None

Optional tags for filtering.

None

Yields:

Type Description
AsyncIterator[TraceContext]

TraceContext for propagation to activities and child workflows.

Source code in src/zap_ai/tracing/base.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@asynccontextmanager
async def start_trace(
    self,
    name: str,
    session_id: str | None = None,
    user_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    tags: list[str] | None = None,
) -> AsyncIterator[TraceContext]:
    """
    Start a new trace (root observation).

    Called at task start. Returns context for propagation.

    Args:
        name: Name of the trace.
        session_id: Optional session for grouping traces.
        user_id: Optional user identifier.
        metadata: Additional metadata to attach.
        tags: Optional tags for filtering.

    Yields:
        TraceContext for propagation to activities and child workflows.
    """
    context, cleanup_data = await self._start_trace_impl(
        name=name,
        session_id=session_id,
        user_id=user_id,
        metadata=metadata,
        tags=tags,
    )
    try:
        yield context
    finally:
        await self._end_trace_cleanup(context, cleanup_data)

start_observation(name, observation_type, parent_context, metadata=None, input_data=None) async

Start a child observation within an existing trace.

Parameters:

Name Type Description Default
name str

Name of the observation.

required
observation_type ObservationType

Type of observation for categorization.

required
parent_context TraceContext

Context from parent observation.

required
metadata dict[str, Any] | None

Additional metadata to attach.

None
input_data Any | None

Input data for the observation.

None

Yields:

Type Description
AsyncIterator[TraceContext]

TraceContext for nested observations.

Source code in src/zap_ai/tracing/base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@asynccontextmanager
async def start_observation(
    self,
    name: str,
    observation_type: ObservationType,
    parent_context: TraceContext,
    metadata: dict[str, Any] | None = None,
    input_data: Any | None = None,
) -> AsyncIterator[TraceContext]:
    """
    Start a child observation within an existing trace.

    Args:
        name: Name of the observation.
        observation_type: Type of observation for categorization.
        parent_context: Context from parent observation.
        metadata: Additional metadata to attach.
        input_data: Input data for the observation.

    Yields:
        TraceContext for nested observations.
    """
    context, cleanup_data = await self._start_observation_impl(
        name=name,
        observation_type=observation_type,
        parent_context=parent_context,
        metadata=metadata,
        input_data=input_data,
    )
    try:
        yield context
    finally:
        await self._end_observation_cleanup(context, cleanup_data)

start_generation(name, parent_context, model, input_messages, metadata=None) abstractmethod async

Start an LLM generation observation.

Parameters:

Name Type Description Default
name str

Name of the generation.

required
parent_context TraceContext

Context from parent observation.

required
model str

LLM model identifier.

required
input_messages list[dict[str, Any]]

Input messages sent to the LLM.

required
metadata dict[str, Any] | None

Additional metadata.

None

Returns:

Type Description
TraceContext

TraceContext that must be passed to end_generation().

Source code in src/zap_ai/tracing/base.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
@abstractmethod
async def start_generation(
    self,
    name: str,
    parent_context: TraceContext,
    model: str,
    input_messages: list[dict[str, Any]],
    metadata: dict[str, Any] | None = None,
) -> TraceContext:
    """
    Start an LLM generation observation.

    Args:
        name: Name of the generation.
        parent_context: Context from parent observation.
        model: LLM model identifier.
        input_messages: Input messages sent to the LLM.
        metadata: Additional metadata.

    Returns:
        TraceContext that must be passed to end_generation().
    """
    ...

end_generation(context, output, usage=None) abstractmethod async

End an LLM generation observation with output and usage.

Parameters:

Name Type Description Default
context TraceContext

Context from start_generation().

required
output dict[str, Any]

LLM output (content, tool_calls, etc.).

required
usage dict[str, int] | None

Token usage dict (prompt_tokens, completion_tokens, total_tokens).

None
Source code in src/zap_ai/tracing/base.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
@abstractmethod
async def end_generation(
    self,
    context: TraceContext,
    output: dict[str, Any],
    usage: dict[str, int] | None = None,
) -> None:
    """
    End an LLM generation observation with output and usage.

    Args:
        context: Context from start_generation().
        output: LLM output (content, tool_calls, etc.).
        usage: Token usage dict (prompt_tokens, completion_tokens, total_tokens).
    """
    ...

add_event(context, name, attributes=None) async

Add an event to the current observation.

Default implementation does nothing. Override to implement.

Parameters:

Name Type Description Default
context TraceContext

Current trace context.

required
name str

Event name.

required
attributes dict[str, Any] | None

Event attributes.

None
Source code in src/zap_ai/tracing/base.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
async def add_event(
    self,
    context: TraceContext,
    name: str,
    attributes: dict[str, Any] | None = None,
) -> None:
    """
    Add an event to the current observation.

    Default implementation does nothing. Override to implement.

    Args:
        context: Current trace context.
        name: Event name.
        attributes: Event attributes.
    """
    pass

set_error(context, error) async

Mark the observation as errored.

Default implementation does nothing. Override to implement.

Parameters:

Name Type Description Default
context TraceContext

Current trace context.

required
error Exception

The exception that occurred.

required
Source code in src/zap_ai/tracing/base.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
async def set_error(
    self,
    context: TraceContext,
    error: Exception,
) -> None:
    """
    Mark the observation as errored.

    Default implementation does nothing. Override to implement.

    Args:
        context: Current trace context.
        error: The exception that occurred.
    """
    pass

flush() async

Flush any pending trace data.

Default implementation does nothing. Override if buffering is used.

Source code in src/zap_ai/tracing/base.py
347
348
349
350
351
352
353
async def flush(self) -> None:
    """
    Flush any pending trace data.

    Default implementation does nothing. Override if buffering is used.
    """
    pass

shutdown() async

Cleanup tracing resources.

Default implementation does nothing. Override if cleanup is needed.

Source code in src/zap_ai/tracing/base.py
355
356
357
358
359
360
361
async def shutdown(self) -> None:
    """
    Cleanup tracing resources.

    Default implementation does nothing. Override if cleanup is needed.
    """
    pass

zap_ai.tracing.protocol.TraceContext dataclass

Serializable trace context for propagation across Temporal boundaries.

This context is passed through activity inputs to maintain trace continuity across process boundaries. Must be JSON-serializable for Temporal.

Attributes:

Name Type Description
trace_id str

Unique identifier for the trace.

span_id str

Unique identifier for the current span/observation.

provider_data dict[str, Any] | None

Provider-specific data (e.g., Langfuse observation ID).

Source code in src/zap_ai/tracing/protocol.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@dataclass
class TraceContext:
    """
    Serializable trace context for propagation across Temporal boundaries.

    This context is passed through activity inputs to maintain trace continuity
    across process boundaries. Must be JSON-serializable for Temporal.

    Attributes:
        trace_id: Unique identifier for the trace.
        span_id: Unique identifier for the current span/observation.
        provider_data: Provider-specific data (e.g., Langfuse observation ID).
    """

    trace_id: str
    span_id: str
    provider_data: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize for Temporal activity input."""
        return {
            "trace_id": self.trace_id,
            "span_id": self.span_id,
            "provider_data": self.provider_data,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> TraceContext:
        """Deserialize from Temporal activity input."""
        return cls(
            trace_id=data["trace_id"],
            span_id=data["span_id"],
            provider_data=data.get("provider_data"),
        )

to_dict()

Serialize for Temporal activity input.

Source code in src/zap_ai/tracing/protocol.py
48
49
50
51
52
53
54
def to_dict(self) -> dict[str, Any]:
    """Serialize for Temporal activity input."""
    return {
        "trace_id": self.trace_id,
        "span_id": self.span_id,
        "provider_data": self.provider_data,
    }

from_dict(data) classmethod

Deserialize from Temporal activity input.

Source code in src/zap_ai/tracing/protocol.py
56
57
58
59
60
61
62
63
@classmethod
def from_dict(cls, data: dict[str, Any]) -> TraceContext:
    """Deserialize from Temporal activity input."""
    return cls(
        trace_id=data["trace_id"],
        span_id=data["span_id"],
        provider_data=data.get("provider_data"),
    )

zap_ai.tracing.protocol.ObservationType

Bases: str, Enum

Observation types aligned with Langfuse's native types.

These types provide semantic context for different kinds of operations being traced in the agent workflow.

Source code in src/zap_ai/tracing/protocol.py
16
17
18
19
20
21
22
23
24
25
26
27
class ObservationType(str, Enum):
    """
    Observation types aligned with Langfuse's native types.

    These types provide semantic context for different kinds of operations
    being traced in the agent workflow.
    """

    SPAN = "span"  # Generic span for workflow steps, iterations
    GENERATION = "generation"  # LLM inference calls
    TOOL = "tool"  # Tool/function calls
    AGENT = "agent"  # Sub-agent delegations

Providers

LangfuseTracingProvider

zap_ai.tracing.langfuse_provider.LangfuseTracingProvider

Bases: BaseTracingProvider

Langfuse implementation of BaseTracingProvider.

Uses Langfuse's v3 SDK for async-compatible tracing with native observation types (generation, tool, agent, span).

Attributes:

Name Type Description
public_key

Langfuse public key (or set LANGFUSE_PUBLIC_KEY env var).

secret_key

Langfuse secret key (or set LANGFUSE_SECRET_KEY env var).

host

Langfuse host URL (defaults to cloud).

Source code in src/zap_ai/tracing/langfuse_provider.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class LangfuseTracingProvider(BaseTracingProvider):
    """
    Langfuse implementation of BaseTracingProvider.

    Uses Langfuse's v3 SDK for async-compatible tracing with
    native observation types (generation, tool, agent, span).

    Attributes:
        public_key: Langfuse public key (or set LANGFUSE_PUBLIC_KEY env var).
        secret_key: Langfuse secret key (or set LANGFUSE_SECRET_KEY env var).
        host: Langfuse host URL (defaults to cloud).
    """

    def __init__(
        self,
        public_key: str | None = None,
        secret_key: str | None = None,
        host: str | None = None,
    ):
        """
        Initialize Langfuse tracing provider.

        Args:
            public_key: Langfuse public key (or use LANGFUSE_PUBLIC_KEY env var).
            secret_key: Langfuse secret key (or use LANGFUSE_SECRET_KEY env var).
            host: Langfuse host URL (defaults to https://cloud.langfuse.com).
        """
        self._langfuse = Langfuse(
            public_key=public_key,
            secret_key=secret_key,
            host=host,
        )
        # Track active observations for ending them later
        self._active_observations: dict[str, Any] = {}

    def _make_langfuse_trace_context(
        self, trace_id: str, parent_span_id: str | None = None
    ) -> LangfuseTraceContext:
        """Create a Langfuse TraceContext dict."""
        ctx: LangfuseTraceContext = {"trace_id": trace_id}
        if parent_span_id:
            ctx["parent_span_id"] = parent_span_id
        return ctx

    def _observation_type_to_langfuse(self, obs_type: ObservationType) -> str:
        """Map our observation types to Langfuse's type strings."""
        mapping = {
            ObservationType.SPAN: "span",
            ObservationType.GENERATION: "generation",
            ObservationType.TOOL: "tool",
            ObservationType.AGENT: "agent",
        }
        return mapping.get(obs_type, "span")

    async def _start_trace_impl(
        self,
        name: str,
        session_id: str | None = None,
        user_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        tags: list[str] | None = None,
    ) -> tuple[TraceContext, Any]:
        """Start a new Langfuse trace.

        In v3, the trace is implicitly created by the first span. We create a
        root span to represent the trace and update it with trace-level metadata.
        """
        trace_id = self._langfuse.create_trace_id()
        span_id = self._generate_span_id(w3c_format=True)

        trace_context = self._make_langfuse_trace_context(trace_id)

        # Create a root span that represents the trace
        root_span = self._langfuse.start_span(
            name=name,
            trace_context=trace_context,
            metadata=metadata,
        )

        # Update the trace with additional metadata
        root_span.update_trace(
            session_id=session_id,
            user_id=user_id,
            tags=tags,
        )

        self._active_observations[trace_id] = root_span

        context = TraceContext(
            trace_id=trace_id,
            span_id=span_id,
            provider_data={
                "langfuse_trace_id": trace_id,
                "langfuse_root_span_id": root_span.id,
            },
        )

        return context, root_span

    async def _end_trace_cleanup(self, context: TraceContext, cleanup_data: Any) -> None:
        """End the root span when trace exits."""
        root_span = cleanup_data
        if root_span:
            root_span.end()
            self._active_observations.pop(context.trace_id, None)

    async def _start_observation_impl(
        self,
        name: str,
        observation_type: ObservationType,
        parent_context: TraceContext,
        metadata: dict[str, Any] | None = None,
        input_data: Any | None = None,
    ) -> tuple[TraceContext, Any]:
        """Start a child observation in Langfuse with the appropriate type."""
        span_id = self._generate_span_id(w3c_format=True)

        # Get parent span ID
        parent_span_id = None
        if parent_context.provider_data:
            parent_span_id = parent_context.provider_data.get(
                "langfuse_observation_id"
            ) or parent_context.provider_data.get("langfuse_root_span_id")

        trace_context = self._make_langfuse_trace_context(parent_context.trace_id, parent_span_id)

        # Get the Langfuse type string and add to metadata
        langfuse_type = self._observation_type_to_langfuse(observation_type)
        obs_metadata = {
            "observation_type": langfuse_type,
            **(metadata or {}),
        }

        # Create span with appropriate type
        span = self._langfuse.start_span(
            name=name,
            trace_context=trace_context,
            input=input_data,
            metadata=obs_metadata,
        )

        self._active_observations[span_id] = span

        context = TraceContext(
            trace_id=parent_context.trace_id,
            span_id=span_id,
            provider_data={
                "langfuse_trace_id": parent_context.trace_id,
                "langfuse_observation_id": span.id,
            },
        )

        return context, span

    async def _end_observation_cleanup(self, context: TraceContext, cleanup_data: Any) -> None:
        """End the span when observation exits."""
        span = cleanup_data
        if span:
            span.end()
            self._active_observations.pop(context.span_id, None)

    async def start_generation(
        self,
        name: str,
        parent_context: TraceContext,
        model: str,
        input_messages: list[dict[str, Any]],
        metadata: dict[str, Any] | None = None,
    ) -> TraceContext:
        """Start a Langfuse generation for LLM calls."""
        span_id = self._generate_span_id(w3c_format=True)

        # Get parent span ID
        parent_span_id = None
        if parent_context.provider_data:
            parent_span_id = parent_context.provider_data.get(
                "langfuse_observation_id"
            ) or parent_context.provider_data.get("langfuse_root_span_id")

        trace_context = self._make_langfuse_trace_context(parent_context.trace_id, parent_span_id)

        # Use start_observation with as_type='generation' (v3 preferred API)
        generation = self._langfuse.start_observation(
            name=name,
            trace_context=trace_context,
            as_type="generation",
            model=model,
            input=input_messages,
            metadata=metadata,
        )

        self._active_observations[span_id] = generation

        return TraceContext(
            trace_id=parent_context.trace_id,
            span_id=span_id,
            provider_data={
                "langfuse_trace_id": parent_context.trace_id,
                "langfuse_observation_id": generation.id,
                "langfuse_generation_id": generation.id,
            },
        )

    async def end_generation(
        self,
        context: TraceContext,
        output: dict[str, Any],
        usage: dict[str, int] | None = None,
    ) -> None:
        """End a Langfuse generation with output and usage."""
        generation = self._active_observations.get(context.span_id)
        if not generation:
            return

        # Convert usage to usage_details format expected by v3
        usage_details = None
        if usage:
            usage_details = {
                "input": usage.get("prompt_tokens", 0),
                "output": usage.get("completion_tokens", 0),
            }

        generation.update(output=output, usage_details=usage_details)
        generation.end()
        self._active_observations.pop(context.span_id, None)

    async def add_event(
        self,
        context: TraceContext,
        name: str,
        attributes: dict[str, Any] | None = None,
    ) -> None:
        """Add an event to the Langfuse observation."""
        # Get the parent observation or use the root span
        parent_span_id = None
        if context.provider_data:
            parent_span_id = context.provider_data.get(
                "langfuse_observation_id"
            ) or context.provider_data.get("langfuse_root_span_id")

        # Get the observation to add the event to
        obs = None
        for key, observation in self._active_observations.items():
            if hasattr(observation, "id") and observation.id == parent_span_id:
                obs = observation
                break

        if obs:
            obs.create_event(name=name, metadata=attributes)

    async def set_error(
        self,
        context: TraceContext,
        error: Exception,
    ) -> None:
        """Mark the Langfuse observation as errored."""
        observation = self._active_observations.get(context.span_id)
        if observation:
            observation.update(
                level="ERROR",
                status_message=str(error),
            )

    async def flush(self) -> None:
        """Flush pending Langfuse data."""
        self._langfuse.flush()

    async def shutdown(self) -> None:
        """Shutdown Langfuse client."""
        self._langfuse.shutdown()

__init__(public_key=None, secret_key=None, host=None)

Initialize Langfuse tracing provider.

Parameters:

Name Type Description Default
public_key str | None

Langfuse public key (or use LANGFUSE_PUBLIC_KEY env var).

None
secret_key str | None

Langfuse secret key (or use LANGFUSE_SECRET_KEY env var).

None
host str | None

Langfuse host URL (defaults to https://cloud.langfuse.com).

None
Source code in src/zap_ai/tracing/langfuse_provider.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    public_key: str | None = None,
    secret_key: str | None = None,
    host: str | None = None,
):
    """
    Initialize Langfuse tracing provider.

    Args:
        public_key: Langfuse public key (or use LANGFUSE_PUBLIC_KEY env var).
        secret_key: Langfuse secret key (or use LANGFUSE_SECRET_KEY env var).
        host: Langfuse host URL (defaults to https://cloud.langfuse.com).
    """
    self._langfuse = Langfuse(
        public_key=public_key,
        secret_key=secret_key,
        host=host,
    )
    # Track active observations for ending them later
    self._active_observations: dict[str, Any] = {}

start_generation(name, parent_context, model, input_messages, metadata=None) async

Start a Langfuse generation for LLM calls.

Source code in src/zap_ai/tracing/langfuse_provider.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
async def start_generation(
    self,
    name: str,
    parent_context: TraceContext,
    model: str,
    input_messages: list[dict[str, Any]],
    metadata: dict[str, Any] | None = None,
) -> TraceContext:
    """Start a Langfuse generation for LLM calls."""
    span_id = self._generate_span_id(w3c_format=True)

    # Get parent span ID
    parent_span_id = None
    if parent_context.provider_data:
        parent_span_id = parent_context.provider_data.get(
            "langfuse_observation_id"
        ) or parent_context.provider_data.get("langfuse_root_span_id")

    trace_context = self._make_langfuse_trace_context(parent_context.trace_id, parent_span_id)

    # Use start_observation with as_type='generation' (v3 preferred API)
    generation = self._langfuse.start_observation(
        name=name,
        trace_context=trace_context,
        as_type="generation",
        model=model,
        input=input_messages,
        metadata=metadata,
    )

    self._active_observations[span_id] = generation

    return TraceContext(
        trace_id=parent_context.trace_id,
        span_id=span_id,
        provider_data={
            "langfuse_trace_id": parent_context.trace_id,
            "langfuse_observation_id": generation.id,
            "langfuse_generation_id": generation.id,
        },
    )

end_generation(context, output, usage=None) async

End a Langfuse generation with output and usage.

Source code in src/zap_ai/tracing/langfuse_provider.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
async def end_generation(
    self,
    context: TraceContext,
    output: dict[str, Any],
    usage: dict[str, int] | None = None,
) -> None:
    """End a Langfuse generation with output and usage."""
    generation = self._active_observations.get(context.span_id)
    if not generation:
        return

    # Convert usage to usage_details format expected by v3
    usage_details = None
    if usage:
        usage_details = {
            "input": usage.get("prompt_tokens", 0),
            "output": usage.get("completion_tokens", 0),
        }

    generation.update(output=output, usage_details=usage_details)
    generation.end()
    self._active_observations.pop(context.span_id, None)

add_event(context, name, attributes=None) async

Add an event to the Langfuse observation.

Source code in src/zap_ai/tracing/langfuse_provider.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
async def add_event(
    self,
    context: TraceContext,
    name: str,
    attributes: dict[str, Any] | None = None,
) -> None:
    """Add an event to the Langfuse observation."""
    # Get the parent observation or use the root span
    parent_span_id = None
    if context.provider_data:
        parent_span_id = context.provider_data.get(
            "langfuse_observation_id"
        ) or context.provider_data.get("langfuse_root_span_id")

    # Get the observation to add the event to
    obs = None
    for key, observation in self._active_observations.items():
        if hasattr(observation, "id") and observation.id == parent_span_id:
            obs = observation
            break

    if obs:
        obs.create_event(name=name, metadata=attributes)

set_error(context, error) async

Mark the Langfuse observation as errored.

Source code in src/zap_ai/tracing/langfuse_provider.py
274
275
276
277
278
279
280
281
282
283
284
285
async def set_error(
    self,
    context: TraceContext,
    error: Exception,
) -> None:
    """Mark the Langfuse observation as errored."""
    observation = self._active_observations.get(context.span_id)
    if observation:
        observation.update(
            level="ERROR",
            status_message=str(error),
        )

flush() async

Flush pending Langfuse data.

Source code in src/zap_ai/tracing/langfuse_provider.py
287
288
289
async def flush(self) -> None:
    """Flush pending Langfuse data."""
    self._langfuse.flush()

shutdown() async

Shutdown Langfuse client.

Source code in src/zap_ai/tracing/langfuse_provider.py
291
292
293
async def shutdown(self) -> None:
    """Shutdown Langfuse client."""
    self._langfuse.shutdown()

NoOpTracingProvider

zap_ai.tracing.noop_provider.NoOpTracingProvider

Bases: BaseTracingProvider

No-operation tracing provider.

Used when tracing is not configured. All operations are no-ops but return valid contexts for code compatibility.

Source code in src/zap_ai/tracing/noop_provider.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class NoOpTracingProvider(BaseTracingProvider):
    """
    No-operation tracing provider.

    Used when tracing is not configured. All operations are no-ops
    but return valid contexts for code compatibility.
    """

    async def _start_trace_impl(
        self,
        name: str,
        session_id: str | None = None,
        user_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        tags: list[str] | None = None,
    ) -> tuple[TraceContext, None]:
        """Return a dummy context, no cleanup needed."""
        return self._create_context(), None

    async def _start_observation_impl(
        self,
        name: str,
        observation_type: ObservationType,
        parent_context: TraceContext,
        metadata: dict[str, Any] | None = None,
        input_data: Any | None = None,
    ) -> tuple[TraceContext, None]:
        """Return a dummy context with same trace_id, no cleanup needed."""
        return self._create_child_context(parent_context), None

    async def start_generation(
        self,
        name: str,
        parent_context: TraceContext,
        model: str,
        input_messages: list[dict[str, Any]],
        metadata: dict[str, Any] | None = None,
    ) -> TraceContext:
        """Return a dummy context."""
        return self._create_child_context(parent_context)

    async def end_generation(
        self,
        context: TraceContext,
        output: dict[str, Any],
        usage: dict[str, int] | None = None,
    ) -> None:
        """No-op."""
        pass

start_generation(name, parent_context, model, input_messages, metadata=None) async

Return a dummy context.

Source code in src/zap_ai/tracing/noop_provider.py
45
46
47
48
49
50
51
52
53
54
async def start_generation(
    self,
    name: str,
    parent_context: TraceContext,
    model: str,
    input_messages: list[dict[str, Any]],
    metadata: dict[str, Any] | None = None,
) -> TraceContext:
    """Return a dummy context."""
    return self._create_child_context(parent_context)

end_generation(context, output, usage=None) async

No-op.

Source code in src/zap_ai/tracing/noop_provider.py
56
57
58
59
60
61
62
63
async def end_generation(
    self,
    context: TraceContext,
    output: dict[str, Any],
    usage: dict[str, int] | None = None,
) -> None:
    """No-op."""
    pass

Usage Example

from zap_ai.tracing import set_tracing_provider, reset_tracing_provider
from zap_ai.tracing.langfuse_provider import LangfuseTracingProvider

# Enable Langfuse tracing
provider = LangfuseTracingProvider()
set_tracing_provider(provider)

# ... run your agents ...

# Flush before shutdown
await provider.flush()

# Optionally reset to no-op
reset_tracing_provider()