Source code for intelligence_layer.evaluation.aggregation.elo_aggregation

import random
from collections import Counter, defaultdict
from collections.abc import Iterable, Mapping, Sequence

import numpy as np
from pydantic import BaseModel

from intelligence_layer.evaluation.aggregation.accumulator import MeanAccumulator
from intelligence_layer.evaluation.aggregation.aggregator import AggregationLogic
from intelligence_layer.evaluation.evaluation.evaluator.incremental_evaluator import (
    ComparisonEvaluation,
    Matches,
    MatchOutcome,
)


class PlayerScore(BaseModel):
    elo: float
    elo_standard_error: float
    win_rate: float
    num_matches: int


[docs] class AggregatedComparison(BaseModel): scores: Mapping[str, PlayerScore]
class EloAggregationAdapter: @staticmethod def aggregate(evaluations: Iterable[ComparisonEvaluation]) -> AggregatedComparison: evaluations = list(evaluations) player_counter = Counter( player for comparison_evaluation in evaluations for player in [ comparison_evaluation.first_player, comparison_evaluation.second_player, ] ) player_counts = dict(player_counter) players = player_counts.keys() accumulators = {p: MeanAccumulator() for p in players} for _ in range(100): elo_calc = EloCalculator(players) random.shuffle(evaluations) elo_calc.calculate(evaluations) for p in players: accumulators[p].add(elo_calc.ratings[p]) win_rate_calc = WinRateCalculator(players) win_rate = win_rate_calc.calculate(evaluations) return AggregatedComparison( scores={ p: PlayerScore( elo=acc.extract(), elo_standard_error=acc.standard_error(), win_rate=win_rate[p], num_matches=player_counts[p], ) for p, acc in accumulators.items() }, ) class EloCalculator: def __init__( self, players: Iterable[str], k_start: float = 20.0, k_floor: float = 10.0, decay_factor: float = 0.0005, ) -> None: self.ratings: dict[str, float] = {player: 1500.0 for player in players} self._match_counts: dict[str, int] = defaultdict(int) self._k_ceiling = k_start - k_floor self._k_floor = k_floor self._decay_factor = decay_factor def _calc_k_factor(self, player: str) -> float: n = self._match_counts.get(player) or 0 return self._k_ceiling * np.exp(-self._decay_factor * n) + self._k_floor def _calc_expected_win_rates( self, player_a: str, player_b: str ) -> tuple[float, float]: rating_a, rating_b = self.ratings[player_a], self.ratings[player_b] exp_a = 1 / (1 + 10 ** ((rating_b - rating_a) / 400)) return exp_a, 1 - exp_a def _calc_difs( self, match_outcome: MatchOutcome, player_a: str, player_b: str ) -> tuple[float, float]: expected_win_rate_a, expected_win_rate_b = self._calc_expected_win_rates( player_a, player_b ) actual_a, actual_b = match_outcome.payoff k_a, k_b = self._calc_k_factor(player_a), self._calc_k_factor(player_b) return k_a * (actual_a - expected_win_rate_a), k_b * ( actual_b - expected_win_rate_b ) def calculate(self, matches: Sequence[ComparisonEvaluation]) -> None: for match in matches: dif_a, dif_b = self._calc_difs( match.outcome, match.first_player, match.second_player ) self.ratings[match.first_player] += dif_a self.ratings[match.second_player] += dif_b self._match_counts[match.first_player] += 1 self._match_counts[match.second_player] += 1 class WinRateCalculator: def __init__(self, players: Iterable[str]) -> None: self.match_count: dict[str, int] = {p: 0 for p in players} self.win_count: dict[str, float] = {p: 0 for p in players} def calculate(self, matches: Sequence[ComparisonEvaluation]) -> Mapping[str, float]: for match in matches: self.match_count[match.first_player] += 1 self.match_count[match.second_player] += 1 self.win_count[match.first_player] += match.outcome.payoff[0] self.win_count[match.second_player] += match.outcome.payoff[1] return { player: self.win_count[player] / match_count for player, match_count in self.match_count.items() }
[docs] class ComparisonEvaluationAggregationLogic( AggregationLogic[ComparisonEvaluation, AggregatedComparison] ):
[docs] def aggregate( self, evaluations: Iterable[ComparisonEvaluation] ) -> AggregatedComparison: return EloAggregationAdapter.aggregate(evaluations)
[docs] class MatchesAggregationLogic(AggregationLogic[Matches, AggregatedComparison]):
[docs] def aggregate(self, evaluations: Iterable[Matches]) -> AggregatedComparison: flattened_matches = [ comparison_evaluation for match in evaluations for comparison_evaluation in match.comparison_evaluations ] return EloAggregationAdapter.aggregate(flattened_matches)