Source code for intelligence_layer.evaluation.aggregation.elo_aggregation

import random
from collections import Counter, defaultdict
from collections.abc import Iterable, Mapping, Sequence

import numpy as np
from pydantic import BaseModel

from intelligence_layer.evaluation.aggregation.accumulator import MeanAccumulator
from intelligence_layer.evaluation.aggregation.aggregator import AggregationLogic
from intelligence_layer.evaluation.evaluation.evaluator.incremental_evaluator import (
    ComparisonEvaluation,
    Matches,
    MatchOutcome,
)


class PlayerScore(BaseModel):
    elo: float
    elo_standard_error: float
    win_rate: float
    num_matches: int


class AggregatedComparison(BaseModel):
    scores: Mapping[str, PlayerScore]


class EloAggregationAdapter:
    @staticmethod
    def aggregate(evaluations: Iterable[ComparisonEvaluation]) -> AggregatedComparison:
        evaluations = list(evaluations)
        player_counter = Counter(
            player
            for comparison_evaluation in evaluations
            for player in [
                comparison_evaluation.first_player,
                comparison_evaluation.second_player,
            ]
        )

        player_counts = dict(player_counter)
        players = player_counts.keys()

        accumulators = {p: MeanAccumulator() for p in players}
        for _ in range(100):
            elo_calc = EloCalculator(players)
            random.shuffle(evaluations)
            elo_calc.calculate(evaluations)
            for p in players:
                accumulators[p].add(elo_calc.ratings[p])

        win_rate_calc = WinRateCalculator(players)
        win_rate = win_rate_calc.calculate(evaluations)

        return AggregatedComparison(
            scores={
                p: PlayerScore(
                    elo=acc.extract(),
                    elo_standard_error=acc.standard_error(),
                    win_rate=win_rate[p],
                    num_matches=player_counts[p],
                )
                for p, acc in accumulators.items()
            },
        )


class EloCalculator:
    def __init__(
        self,
        players: Iterable[str],
        k_start: float = 20.0,
        k_floor: float = 10.0,
        decay_factor: float = 0.0005,
    ) -> None:
        self.ratings: dict[str, float] = {player: 1500.0 for player in players}
        self._match_counts: dict[str, int] = defaultdict(int)
        self._k_ceiling = k_start - k_floor
        self._k_floor = k_floor
        self._decay_factor = decay_factor

    def _calc_k_factor(self, player: str) -> float:
        n = self._match_counts.get(player) or 0
        return self._k_ceiling * np.exp(-self._decay_factor * n) + self._k_floor

    def _calc_expected_win_rates(
        self, player_a: str, player_b: str
    ) -> tuple[float, float]:
        rating_a, rating_b = self.ratings[player_a], self.ratings[player_b]
        exp_a = 1 / (1 + 10 ** ((rating_b - rating_a) / 400))
        return exp_a, 1 - exp_a

    def _calc_difs(
        self, match_outcome: MatchOutcome, player_a: str, player_b: str
    ) -> tuple[float, float]:
        expected_win_rate_a, expected_win_rate_b = self._calc_expected_win_rates(
            player_a, player_b
        )
        actual_a, actual_b = match_outcome.payoff
        k_a, k_b = self._calc_k_factor(player_a), self._calc_k_factor(player_b)
        return k_a * (actual_a - expected_win_rate_a), k_b * (
            actual_b - expected_win_rate_b
        )

    def calculate(self, matches: Sequence[ComparisonEvaluation]) -> None:
        for match in matches:
            dif_a, dif_b = self._calc_difs(
                match.outcome, match.first_player, match.second_player
            )
            self.ratings[match.first_player] += dif_a
            self.ratings[match.second_player] += dif_b
            self._match_counts[match.first_player] += 1
            self._match_counts[match.second_player] += 1


class WinRateCalculator:
    def __init__(self, players: Iterable[str]) -> None:
        self.match_count: dict[str, int] = {p: 0 for p in players}
        self.win_count: dict[str, float] = {p: 0 for p in players}

    def calculate(self, matches: Sequence[ComparisonEvaluation]) -> Mapping[str, float]:
        for match in matches:
            self.match_count[match.first_player] += 1
            self.match_count[match.second_player] += 1
            self.win_count[match.first_player] += match.outcome.payoff[0]
            self.win_count[match.second_player] += match.outcome.payoff[1]

        return {
            player: self.win_count[player] / match_count
            for player, match_count in self.match_count.items()
        }


[docs] class ComparisonEvaluationAggregationLogic( AggregationLogic[ComparisonEvaluation, AggregatedComparison] ):
[docs] def aggregate( self, evaluations: Iterable[ComparisonEvaluation] ) -> AggregatedComparison: return EloAggregationAdapter.aggregate(evaluations)
[docs] class MatchesAggregationLogic(AggregationLogic[Matches, AggregatedComparison]):
[docs] def aggregate(self, evaluations: Iterable[Matches]) -> AggregatedComparison: flattened_matches = [ comparison_evaluation for match in evaluations for comparison_evaluation in match.comparison_evaluations ] return EloAggregationAdapter.aggregate(flattened_matches)