Source code for intelligence_layer.core.tracer.open_telemetry_tracer

from collections.abc import Sequence
from datetime import datetime
from typing import Optional

from opentelemetry.context import attach, detach
from opentelemetry.trace import Span as OpenTSpan
from opentelemetry.trace import StatusCode, set_span_in_context
from opentelemetry.trace import Tracer as OpenTTracer
from pydantic import BaseModel, SerializeAsAny

from intelligence_layer.core.tracer.tracer import (
    Context,
    ExportedSpan,
    JsonSerializer,
    PydanticSerializable,
    Span,
    SpanStatus,
    SpanType,
    TaskSpan,
    Tracer,
)


[docs] class OpenTelemetryTracer(Tracer): """A `Tracer` that uses open telemetry.""" def __init__(self, tracer: OpenTTracer) -> None: self._tracer = tracer
[docs] def span( self, name: str, timestamp: Optional[datetime] = None, ) -> "OpenTelemetrySpan": tracer_span = self._tracer.start_span( name, attributes={"type": SpanType.SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) return OpenTelemetrySpan(tracer_span, self._tracer, token, self.context)
[docs] def task_span( self, task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, ) -> "OpenTelemetryTaskSpan": tracer_span = self._tracer.start_span( task_name, attributes={"input": _serialize(input), "type": SpanType.TASK_SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, self.context)
[docs] def export_for_viewing(self) -> Sequence[ExportedSpan]: raise NotImplementedError( "The OpenTelemetryTracer does not support export for viewing, as it can not access its own traces." )
class OpenTelemetrySpan(Span, OpenTelemetryTracer): """A `Span` created by `OpenTelemetryTracer.span`.""" end_timestamp: Optional[datetime] = None def __init__( self, span: OpenTSpan, tracer: OpenTTracer, token: object, context: Optional[Context] = None, ) -> None: OpenTelemetryTracer.__init__(self, tracer) Span.__init__(self, context=context) self.open_ts_span = span self._token = token def log( self, message: str, value: PydanticSerializable, timestamp: Optional[datetime] = None, ) -> None: self.open_ts_span.add_event( message, {"value": _serialize(value)}, None if not timestamp else _open_telemetry_timestamp(timestamp), ) def end(self, timestamp: Optional[datetime] = None) -> None: super().end(timestamp) self.open_ts_span.set_status( StatusCode.OK if self.status_code == SpanStatus.OK else StatusCode.ERROR ) detach(self._token) self.open_ts_span.end( _open_telemetry_timestamp(timestamp) if timestamp is not None else None ) class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan): """A `TaskSpan` created by `OpenTelemetryTracer.task_span`.""" output: Optional[PydanticSerializable] = None def record_output(self, output: PydanticSerializable) -> None: self.open_ts_span.set_attribute("output", _serialize(output)) def _open_telemetry_timestamp(t: datetime) -> int: # Open telemetry expects *nanoseconds* since epoch t_float = t.timestamp() * 1e9 return int(t_float) def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) return value.model_dump_json()