Source code for intelligence_layer.evaluation.run.run_repository

from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
from multiprocessing import Lock as lock
from multiprocessing.synchronize import Lock
from typing import Optional, final

from pydantic import BaseModel

from intelligence_layer.core import Output, Tracer
from intelligence_layer.evaluation.run.domain import (
    ExampleOutput,
    FailedExampleRun,
    RunOverview,
)


class RecoveryData(BaseModel):
    run_id: str
    finished_examples: list[str] = []


[docs] class RunRepository(ABC): """Base run repository interface. Provides methods to store and load run results: :class:`RunOverview` and :class:`ExampleOutput`. A :class:`RunOverview` is created from and is linked (by its ID) to multiple :class:`ExampleOutput`s representing results of a dataset. """ def __init__(self) -> None: self.locks: dict[str, Lock] = {}
[docs] @abstractmethod def store_run_overview(self, overview: RunOverview) -> None: """Stores a :class:`RunOverview`. Args: overview: The overview to be persisted. """ pass
@abstractmethod def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: pass @abstractmethod def _delete_temporary_run_data(self, tmp_hash: str) -> None: pass @abstractmethod def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: pass @abstractmethod def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]: pass @final def create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: self.locks[tmp_hash] = lock() self._create_temporary_run_data(tmp_hash, run_id) @final def delete_temporary_run_data(self, tmp_hash: str) -> None: del self.locks[tmp_hash] self._delete_temporary_run_data(tmp_hash) @final def temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: with self.locks[tmp_hash]: self._temp_store_finished_example(tmp_hash, example_id)
[docs] @abstractmethod def run_overview(self, run_id: str) -> Optional[RunOverview]: """Returns a :class:`RunOverview` for the given ID. Args: run_id: ID of the run overview to retrieve. Returns: :class:`RunOverview` if it was found, `None` otherwise. """ ...
[docs] def run_overviews(self) -> Iterable[RunOverview]: """Returns all :class:`RunOverview`s sorted by their ID. Yields: :class:`Iterable` of :class:`RunOverview`s. """ for run_id in self.run_overview_ids(): run_overview = self.run_overview(run_id) if run_overview is not None: yield run_overview
[docs] @abstractmethod def run_overview_ids(self) -> Sequence[str]: """Returns sorted IDs of all stored :class:`RunOverview`s. Returns: A :class:`Sequence` of the :class:`RunOverview` IDs. """ ...
[docs] @abstractmethod def store_example_output(self, example_output: ExampleOutput[Output]) -> None: """Stores an :class:`ExampleOutput`. Args: example_output: The example output to be persisted. """ ...
@final def store_example_output_parallel( self, tmp_hash: str, example_output: ExampleOutput[Output] ) -> None: with self.locks[tmp_hash]: self.store_example_output(example_output)
[docs] @abstractmethod def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> Optional[ExampleOutput[Output]]: """Returns :class:`ExampleOutput` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example to retrieve. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: class:`ExampleOutput` if it was found, `None` otherwise. """ ...
[docs] @abstractmethod def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: """Returns all :class:`ExampleOutput` for a given run ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ ...
[docs] @abstractmethod def example_output_ids(self, run_id: str) -> Sequence[str]: """Returns the sorted IDs of all :class:`ExampleOutput`s for a given run ID. Args: run_id: The ID of the run overview. Returns: A :class:`Sequence` of all :class:`ExampleOutput` IDs. """ ...
[docs] def successful_example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: """Returns all :class:`ExampleOutput` for successful example runs with a given run-overview ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ results = self.example_outputs(run_id, output_type) return (r for r in results if not isinstance(r.output, FailedExampleRun))
[docs] def failed_example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: """Returns all :class:`ExampleOutput` for failed example runs with a given run-overview ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ results = self.example_outputs(run_id, output_type) return (r for r in results if isinstance(r.output, FailedExampleRun))
[docs] @abstractmethod def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]: """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example whose :class:`Tracer` should be retrieved. Returns: A :class:`Tracer` if it was found, `None` otherwise. """ ...
[docs] @abstractmethod def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: """Creates and returns a :class:`Tracer` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example whose :class:`Tracer` should be retrieved. Returns: A :.class:`Tracer`. """ ...