Source code for intelligence_layer.evaluation.infrastructure.repository_navigator

import itertools
from collections.abc import Iterable, Sequence
from typing import Generic, Optional

import pandas as pd
import rich
from rich.tree import Tree

from intelligence_layer.core import Tracer
from intelligence_layer.core.task import Input, Output
from intelligence_layer.evaluation.aggregation.domain import (
    AggregatedEvaluation,
    AggregationOverview,
)
from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository
from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput
from intelligence_layer.evaluation.evaluation.domain import (
    Evaluation,
    ExampleEvaluation,
)
from intelligence_layer.evaluation.evaluation.evaluation_repository import (
    EvaluationRepository,
)
from intelligence_layer.evaluation.run.domain import ExampleOutput
from intelligence_layer.evaluation.run.run_repository import RunRepository


class RunLineage(Generic[Input, ExpectedOutput, Output]):
    example: Example[Input, ExpectedOutput]
    output: ExampleOutput[Output]
    tracer: Optional[Tracer]

    def __init__(
        self,
        example: Example[Input, ExpectedOutput],
        output: ExampleOutput[Output],
        tracer: Optional[Tracer] = None,
    ) -> None:
        self.example = example
        self.output = output
        self.tracer = tracer

    def _rich_render(self) -> Tree:
        tree = Tree("Run Lineage")
        tree.add(self.example._rich_render())
        tree.add(self.output._rich_render(skip_example_id=True))
        return tree

    def _ipython_display_(self) -> None:
        rich.print(self._rich_render())


[docs] def run_lineages_to_pandas( run_lineages: Sequence[RunLineage[Input, ExpectedOutput, Output]], ) -> pd.DataFrame: """Converts a sequence of `RunLineage` objects to a pandas `DataFrame`. The `RunLineage` objects are stored in the column `"lineage"`. The `DataFrame` is indexed by `(example_id, run_id)`. Args: run_lineages: The lineages to convert. Returns: A pandas `DataFrame` with the data contained in the `run_lineages`. """ df = pd.DataFrame( [ vars(lineage.example) | vars(lineage.output) | {"lineage": lineage} for lineage in run_lineages ] ) df = df.drop(columns="id") df = df.set_index(["example_id", "run_id"]) return df
class EvaluationLineage(Generic[Input, ExpectedOutput, Output, Evaluation]): example: Example[Input, ExpectedOutput] outputs: Sequence[ExampleOutput[Output]] evaluation: ExampleEvaluation[Evaluation] tracers: Sequence[Optional[Tracer]] def __init__( self, example: Example[Input, ExpectedOutput], outputs: Sequence[ExampleOutput[Output]], evaluation: ExampleEvaluation[Evaluation], tracers: Sequence[Optional[Tracer]], ) -> None: self.example = example self.outputs = outputs self.evaluation = evaluation self.tracers = tracers def _rich_render(self) -> Tree: tree = Tree("Run Lineage") tree.add(self.example._rich_render()) output_tree = Tree("Outputs") for output in self.outputs: output_tree.add(output._rich_render(skip_example_id=True)) tree.add(output_tree) tree.add(self.evaluation._rich_render(skip_example_id=True)) return tree def _ipython_display_(self) -> None: rich.print(self._rich_render())
[docs] def evaluation_lineages_to_pandas( evaluation_lineages: Sequence[ EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] ], ) -> pd.DataFrame: """Converts a sequence of `EvaluationLineage` objects to a pandas `DataFrame`. The `EvaluationLineage` objects are stored in the column `"lineage"`. The `DataFrame` is indexed by `(example_id, evaluation_id, run_id)`. Each `output` of every lineage will contribute one row in the `DataFrame`. Args: evaluation_lineages: The lineages to convert. Returns: A pandas `DataFrame` with the data contained in the `evaluation_lineages`. """ df = pd.DataFrame( [ vars(lineage.example) | vars(output) | vars(lineage.evaluation) | {"tracer": lineage.tracers[index]} | {"lineage": lineage} for lineage in evaluation_lineages for index, output in enumerate(lineage.outputs) ] ) df = df.drop(columns="id") df = df.set_index(["example_id", "evaluation_id", "run_id"]) return df
[docs] def aggregation_overviews_to_pandas( aggregation_overviews: Sequence[AggregationOverview[AggregatedEvaluation]], unwrap_statistics: bool = True, strict: bool = True, unwrap_metadata: bool = True, ) -> pd.DataFrame: """Converts aggregation overviews to a pandas table for easier comparison. Args: aggregation_overviews: Overviews to convert. unwrap_statistics: Unwrap the `statistics` field in the overviews into separate columns. Defaults to True. strict: Allow only overviews with exactly equal `statistics` types. Defaults to True. unwrap_metadata: Unwrap the `metadata` field in the overviews into separate columns. Defaults to True. Returns: A pandas :class:`DataFrame` containing an overview per row with fields as columns. """ overviews = list(aggregation_overviews) if strict and len(overviews) > 1: first_type = overviews[0].statistics.__class__ if any( overview.statistics.__class__ != first_type for overview in overviews[1:] ): raise ValueError( "Aggregation overviews contain different types, which is not allowed with strict=True" ) df = pd.DataFrame( [model.model_dump(mode="json") for model in aggregation_overviews] ) if unwrap_statistics and "statistics" in df.columns: df = df.join(pd.DataFrame(df["statistics"].to_list())).drop( columns=["statistics"] ) if unwrap_metadata and "metadata" in df.columns: df = pd.concat([df, pd.json_normalize(df["metadata"])], axis=1).drop( # type: ignore columns=["metadata"] ) return df
[docs] class RepositoryNavigator: """The `RepositoryNavigator` is used to retrieve coupled data from multiple repositories.""" def __init__( self, dataset_repository: DatasetRepository, run_repository: RunRepository, evaluation_repository: EvaluationRepository | None = None, ) -> None: self._dataset_repository = dataset_repository self._run_repository = run_repository self._eval_repository = evaluation_repository
[docs] def run_lineages( self, run_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], output_type: type[Output], ) -> Iterable[RunLineage[Input, ExpectedOutput, Output]]: """Retrieves all :class:`RunLineage`s for the run with id `run_id`. Args: run_id: The id of the run input_type: The type of the input as defined by the :class:`Example` expected_output_type: The type of the expected output as defined by the :class:`Example` output_type: The type of the run output as defined by the :class:`Output` Yields: An iterator over all :class:`RunLineage`s for the given run id. """ run_overview = self._run_repository.run_overview(run_id) if run_overview is None: raise ValueError(f"Run repository does not contain a run with id {run_id}.") examples = list( self._dataset_repository.examples( run_overview.dataset_id, input_type, expected_output_type, ) ) example_outputs = list( self._run_repository.example_outputs(run_id, output_type) ) # join for example, example_output in itertools.product(examples, example_outputs): if example.id == example_output.example_id: yield RunLineage( example=example, output=example_output, tracer=self._run_repository.example_tracer( run_id=run_id, example_id=example.id ), )
[docs] def evaluation_lineages( self, evaluation_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], output_type: type[Output], evaluation_type: type[Evaluation], ) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]: """Retrieves all :class:`EvaluationLineage`s for the evaluation with id `evaluation_id`. Args: evaluation_id: The id of the evaluation input_type: The type of the input as defined by the :class:`Example` expected_output_type: The type of the expected output as defined by the :class:`Example` output_type: The type of the run output as defined by the :class:`Output` evaluation_type: The type of the evaluation as defined by the :class:`Evaluation` Yields: All :class:`EvaluationLineage`s for the given evaluation id. """ if self._eval_repository is None: raise ValueError("Evaluation Repository is not set, but required.") eval_overview = self._eval_repository.evaluation_overview(evaluation_id) if eval_overview is None: raise ValueError( f"Evaluation repository does not contain an evaluation with id {evaluation_id}." ) evaluations = list( self._eval_repository.example_evaluations(evaluation_id, evaluation_type) ) run_lineages = list( itertools.chain.from_iterable( self.run_lineages( overview.id, input_type, expected_output_type, output_type ) for overview in eval_overview.run_overviews ) ) # join for evaluation in evaluations: example = None outputs = [] tracers = [] for run_lineage in run_lineages: if run_lineage.example.id == evaluation.example_id: if example is None: # the evaluation has only one example # and all relevant run lineages contain the same example example = run_lineage.example outputs.append(run_lineage.output) tracers.append( self._run_repository.example_tracer( run_lineage.output.run_id, run_lineage.output.example_id ) ) if example is not None: yield EvaluationLineage( example=example, outputs=outputs, evaluation=evaluation, tracers=tracers, )
[docs] def run_lineage( self, run_id: str, example_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], output_type: type[Output], ) -> RunLineage[Input, ExpectedOutput, Output] | None: """Retrieves the :class:`RunLineage` for the run with id `run_id` and example with id `example_id`. Args: run_id: The id of the run example_id: The id of the example input_type: The type of the input as defined by the :class:`Example` expected_output_type: The type of the expected output as defined by the :class:`Example` output_type: The type of the run output as defined by the :class:`Output` Returns: The :class:`RunLineage` for the given run id and example id, `None` if the example or an output for the example does not exist. """ run_overview = self._run_repository.run_overview(run_id) if run_overview is None: raise ValueError(f"Run repository does not contain a run with id {run_id}.") example = self._dataset_repository.example( run_overview.dataset_id, example_id, input_type, expected_output_type ) if example is None: return None example_output = self._run_repository.example_output( run_id, example_id, output_type ) if example_output is None: return None return RunLineage( example=example, output=example_output, tracer=self._run_repository.example_tracer(run_id, example_id), )
[docs] def evaluation_lineage( self, evaluation_id: str, example_id: str, input_type: type[Input], expected_output_type: type[ExpectedOutput], output_type: type[Output], evaluation_type: type[Evaluation], ) -> EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] | None: """Retrieves the :class:`EvaluationLineage` for the evaluation with id `evaluation_id` and example with id `example_id`. Args: evaluation_id: The id of the evaluation example_id: The id of the example of interest input_type: The type of the input as defined by the :class:`Example` expected_output_type: The type of the expected output as defined by the :class:`Example` output_type: The type of the run output as defined by the :class:`Output` evaluation_type: The type of the evaluation as defined by the :class:`Evaluation` Returns: The :class:`EvaluationLineage` for the given evaluation id and example id. Returns `None` if the lineage is not complete because either an example, a run, or an evaluation does not exist. """ if self._eval_repository is None: raise ValueError("Evaluation Repository is not set, but required.") eval_overview = self._eval_repository.evaluation_overview(evaluation_id) if eval_overview is None: raise ValueError( f"Evaluation repository does not contain an evaluation with id {evaluation_id}." ) run_lineages = [ self.run_lineage( overview.id, example_id, input_type, expected_output_type, output_type ) for overview in eval_overview.run_overviews ] existing_run_lineages = [ lineage for lineage in run_lineages if lineage is not None ] if len(existing_run_lineages) == 0: return None example_evaluation = self._eval_repository.example_evaluation( evaluation_id, example_id, evaluation_type ) if example_evaluation is None: return None return EvaluationLineage( example=existing_run_lineages[0].example, outputs=[lineage.output for lineage in existing_run_lineages], evaluation=example_evaluation, tracers=[lineage.tracer for lineage in existing_run_lineages], )