Source code for intelligence_layer.evaluation.run.file_run_repository

import warnings
from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Optional

from fsspec import AbstractFileSystem  # type: ignore
from fsspec.implementations.local import LocalFileSystem  # type: ignore

from intelligence_layer.core import FileTracer, InMemoryTracer, JsonSerializer, Output
from intelligence_layer.core.tracer.tracer import Tracer
from intelligence_layer.evaluation.infrastructure.file_system_based_repository import (
    FileSystemBasedRepository,
)
from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview
from intelligence_layer.evaluation.run.run_repository import RecoveryData, RunRepository


class FileSystemRunRepository(RunRepository, FileSystemBasedRepository):
    TMP_FILE_TYPE: str = "tmp"

    def __init__(self, file_system: AbstractFileSystem, root_directory: Path) -> None:
        FileSystemBasedRepository.__init__(self, file_system, root_directory)
        RunRepository.__init__(self)

    def store_run_overview(self, overview: RunOverview) -> None:
        self.write_utf8(
            self._run_overview_path(overview.id),
            overview.model_dump_json(indent=2),
            create_parents=True,
        )
        # create empty folder just in case no examples are ever saved
        self.mkdir(self._run_directory(overview.id))

    def _tmp_file_path(self, tmp_hash: str) -> Path:
        return self._run_directory(tmp_hash + "." + self.TMP_FILE_TYPE)

    def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None:
        self.write_utf8(
            self._tmp_file_path(tmp_hash),
            RecoveryData(run_id=run_id).model_dump_json(),
            create_parents=True,
        )

    def _delete_temporary_run_data(self, tmp_hash: str) -> None:
        self.remove_file(self._tmp_file_path(tmp_hash))

    def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None:
        data = RecoveryData.model_validate_json(
            self.read_utf8(self._tmp_file_path(tmp_hash))
        )
        data.finished_examples.append(example_id)
        self.write_utf8(
            self._tmp_file_path(tmp_hash),
            data.model_dump_json(),
            create_parents=True,
        )

    def finished_examples(self, tmp_hash: str) -> Optional[RecoveryData]:
        try:
            return RecoveryData.model_validate_json(
                self.read_utf8(self._tmp_file_path(tmp_hash))
            )
        except FileNotFoundError:
            return None

    def run_overview(self, run_id: str) -> Optional[RunOverview]:
        file_path = self._run_overview_path(run_id)
        if not self.exists(file_path):
            return None

        content = self.read_utf8(file_path)
        return RunOverview.model_validate_json(content)

    def run_overview_ids(self) -> Sequence[str]:
        return sorted(self.file_names(self._run_root_directory()))

    def store_example_output(self, example_output: ExampleOutput[Output]) -> None:
        serialized_result = JsonSerializer(root=example_output)
        self.write_utf8(
            self._example_output_path(example_output.run_id, example_output.example_id),
            serialized_result.model_dump_json(indent=2),
            create_parents=True,
        )

    def example_output(
        self, run_id: str, example_id: str, output_type: type[Output]
    ) -> Optional[ExampleOutput[Output]]:
        file_path = self._example_output_path(run_id, example_id)
        if not self.exists(file_path):
            warnings.warn(
                f'Repository does not contain a run with id: "{run_id}"', UserWarning
            )
            return None
        content = self.read_utf8(file_path)
        # mypy does not accept dynamic types
        return ExampleOutput[output_type].model_validate_json(  # type: ignore
            json_data=content
        )

    def example_outputs(
        self, run_id: str, output_type: type[Output]
    ) -> Iterable[ExampleOutput[Output]]:
        path = self._run_output_directory(run_id)
        if not self.exists(path):
            warnings.warn(
                f'Repository does not contain a run with id: "{run_id}"', UserWarning
            )
            return []

        example_outputs = []
        for file_name in self.file_names(path):
            output = self.example_output(run_id, file_name, output_type)
            if output is not None:
                example_outputs.append(output)

        return sorted(
            example_outputs,
            key=lambda _example_output: _example_output.example_id,
        )

    def example_output_ids(self, run_id: str) -> Sequence[str]:
        return sorted(self.file_names(self._run_output_directory(run_id)))

    def _run_root_directory(self) -> Path:
        path = self._root_directory / "runs"
        return path

    def _run_directory(self, run_id: str) -> Path:
        path = self._run_root_directory() / run_id
        return path

    def _run_output_directory(self, run_id: str) -> Path:
        path = self._run_directory(run_id) / "output"
        return path

    def _run_overview_path(self, run_id: str) -> Path:
        return self._run_directory(run_id).with_suffix(".json")

    def example_tracer(self, run_id: str, example_id: str) -> Optional[Tracer]:
        file_path = self._example_trace_path(run_id, example_id)
        if not self.exists(file_path):
            return None
        return self._parse_log(file_path)

    def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer:
        file_path = self._example_trace_path(run_id, example_id)
        return FileTracer(file_path)

    def _trace_directory(self, run_id: str) -> Path:
        path = self._run_directory(run_id) / "trace"
        return path

    def _example_trace_path(self, run_id: str, example_id: str) -> Path:
        return (self._trace_directory(run_id) / example_id).with_suffix(".jsonl")

    @staticmethod
    def _parse_log(log_path: Path) -> InMemoryTracer:
        return FileTracer(log_path).traces()

    def _example_output_path(self, run_id: str, example_id: str) -> Path:
        return (self._run_output_directory(run_id) / example_id).with_suffix(".json")


[docs] class FileRunRepository(FileSystemRunRepository): def __init__(self, root_directory: Path) -> None: super().__init__(LocalFileSystem(), root_directory)
[docs] @staticmethod def path_to_str(path: Path) -> str: return str(path)