Source code for pref_voting.probabilistic_methods

'''
    File: voting_methods.py
    Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu)
    Date: Nove 21, 2024
    
    Implementations of probabilistic voting methods.
'''

from pref_voting.prob_voting_method import  *
from pref_voting.weighted_majority_graphs import  MajorityGraph, MarginGraph
from scipy.optimize import linprog

import random
import nashpy as nash

[docs] @pvm(name="Random Dictator") def random_dictator(profile, curr_cands = None): '''Returns lottery over the candidates that is proportional to the Plurality scores. Args: profile (Profile): A Profile object. curr_cands (list): A list of candidates to restrict the ranking to. If ``None``, then the ranking is over the entire domain of the profile. Returns: dict: A dictionary mapping candidates to probabilities. ''' plurality_scores = profile.plurality_scores(curr_cands = curr_cands) total_plurality_scores = sum(list(plurality_scores.values())) return {c: plurality_scores[c] / total_plurality_scores for c in plurality_scores.keys()}
[docs] @pvm(name="Random Dictator on the Beta-Uncovered Set") def RaDiUS(profile, curr_cands = None, beta = 0.5): """ Runs the Random Dictator method on the profile restricted to the beta-uncovered set, as proposed by Charikar et al. (https://arxiv.org/abs/2306.17838). Args: profile (Profile): An anonymous profile of linear orders curr_cands (List[int], optional): Candidates to consider. Defaults to all candidates if not provided. Returns: dict: Maps each candidate to their probability of winning under the RaDiUS method. """ from pref_voting.margin_based_methods import beta_uncovered_set curr_cands = profile.candidates if curr_cands is None else curr_cands rd_dist = random_dictator(profile, curr_cands = beta_uncovered_set(profile, curr_cands = curr_cands, beta = beta)) rd_dist.update({c:0 for c in curr_cands if c not in rd_dist.keys()}) return rd_dist
[docs] @pvm(name="Proportional Borda") def pr_borda(profile, curr_cands=None): '''Returns lottery over the candidates that is proportional to the Borda scores. Args: profile (Profile): A Profile object. curr_cands (list): A list of candidates to restrict the ranking to. If ``None``, then the ranking is over the entire domain of the profile. Returns: dict: A dictionary mapping candidates to probabilities. ''' borda_scores = profile.borda_scores(curr_cands=curr_cands) total_borda_scores = sum(list(borda_scores.values())) return {c: borda_scores[c] / total_borda_scores for c in borda_scores.keys()}
def clean_and_normalize(probs, threshold=1e-10): # Set negative or small positive values to zero probs = np.where(probs < threshold, 0, probs) # Renormalize to ensure the probabilities sum to 1 total = np.sum(probs) if total > 0: probs /= total return probs def _maximal_lottery_enumeration(edata, curr_cands=None, margin_transformation=lambda x: x): ''' Implementation of maximal lotteries. See http://dss.in.tum.de/files/brandt-research/fishburn_slides.pdf Returns a randomly chosen maximal lottery. ''' candidates = edata.candidates if curr_cands is None else curr_cands m_matrix, cand_to_cidx = edata.strength_matrix(curr_cands=candidates) A = np.array([[margin_transformation(m) for m in row] for row in m_matrix]) # Create the game game = nash.Game(A) equilibria = [] try: equilibria = list(game.vertex_enumeration()) #print("Vertex Enumeration found equilibria.") except Exception as e: print(f"Vertex Enumeration failed: {e}") # Backup method 1: Support Enumeration if not equilibria: try: equilibria = list(game.support_enumeration()) #print("Support Enumeration found equilibria.") except Exception as e: print(f"Support Enumeration failed: {e}") if len(equilibria) == 0: return {c: 1 / len(candidates) for c in candidates} else: # average the equilibria component-wise to get a single equilibrium eq_probs = [np.mean([eq[0][idx] for eq in equilibria]) for idx in range(len(candidates))] eq_probs = clean_and_normalize(np.array(eq_probs)) # Return the result as a dictionary return {c: eq_probs[cand_to_cidx(c)] for c in candidates} def _maximal_lottery_lp(edata, curr_cands=None, margin_transformation=lambda x: x): ''' Implementation of maximal lotteries using linear programming. ''' candidates = edata.candidates if curr_cands is None else curr_cands m_matrix, cand_to_cidx = edata.strength_matrix(curr_cands=candidates) A = np.array([[margin_transformation(m) for m in row] for row in m_matrix]) num_cands = len(candidates) c = np.zeros(num_cands + 1) c[-1] = -1 # Coefficient for v in the objective function (maximize v) # Inequalities: A^T p - v * 1 >= 0 => A^T p - v * 1 - s = 0 (s >= 0) # We need to convert this to the form: A_ub x <= b_ub A_ub = np.hstack([-A.T, np.ones((num_cands, 1))]) b_ub = np.zeros(num_cands) # Equalities: sum p_i = 1 A_eq = np.zeros((1, num_cands + 1)) A_eq[0, :num_cands] = 1 b_eq = np.array([1]) bounds = [(0, None)] * num_cands + [(None, None)] # p_i >= 0, v free res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs') if res.success: eq_probs = res.x[:num_cands] # Normalize to account for numerical errors eq_probs = np.maximum(eq_probs, 0) eq_probs /= np.sum(eq_probs) return {c: eq_probs[cand_to_cidx(c)] for c in candidates} else: # If LP fails, default to uniform distribution return {c: 1 / len(candidates) for c in candidates}
[docs] @pvm(name="C1 Maximal Lottery") def c1_maximal_lottery(edata, curr_cands=None, algorithm='enumeration'): '''Returns the C1 maximal lottery over the candidates. See http://dss.in.tum.de/files/brandt-research/fishburn_slides.pdf. Args: edata (Profile, MarginGraph): A Profile object. curr_cands (list): A list of candidates to restrict the ranking to. If ``None``, then the ranking is over the entire domain of the profile. algorithm (str): The algorithm to use. Either 'enumeration' or 'lp'. Defaults to 'enumeration'. Returns: dict: A dictionary mapping candidates to probabilities. .. note:: The 'enumeration' algorithm averages over the extremal maximal lotteries. The 'lp' is faster, but only returns a single maximal lottery (not necessarily the average) ''' if type(edata) == MajorityGraph: # if edata is a MajorityGraph, we need to add margins for the following code to work. The margins do not matter when finding the c1 maximal lottery. candidates = edata.candidates if curr_cands is None else curr_cands edata = MarginGraph(candidates, [(c1, c2, 1) for c1, c2 in edata.edges if (c1 in candidates and c2 in candidates)]) if algorithm == 'enumeration': return _maximal_lottery_enumeration(edata, curr_cands=curr_cands, margin_transformation = np.sign) elif algorithm == 'lp': return _maximal_lottery_lp(edata, curr_cands=curr_cands, margin_transformation = np.sign) else: raise ValueError(f"Unknown algorithm: {algorithm}")
[docs] @pvm(name="Maximal Lottery") def maximal_lottery(edata, curr_cands=None, algorithm='lp'): '''Returns the maximal lottery over the candidates. See http://dss.in.tum.de/files/brandt-research/fishburn_slides.pdf. Args: edata (Profile, MarginGraph): A Profile object. curr_cands (list): A list of candidates to restrict the ranking to. If ``None``, then the ranking is over the entire domain of the profile. algorithm (str): The algorithm to use. Either 'enumeration' or 'lp'. Defaults to 'enumeration'. Returns: dict: A dictionary mapping candidates to probabilities. .. note:: The 'enumeration' algorithm averages over the extremal maximal lotteries. The 'lp' is faster, but only returns a single maximal lottery (not necessarily the average) ''' if algorithm == 'enumeration': return _maximal_lottery_enumeration(edata, curr_cands=curr_cands, margin_transformation = lambda x: x) elif algorithm == 'lp': return _maximal_lottery_lp(edata, curr_cands=curr_cands, margin_transformation = lambda x: x) else: raise ValueError(f"Unknown algorithm: {algorithm}")
[docs] @pvm(name="Random Consensus Builder") def random_consensus_builder(profile, curr_cands=None, beta=0.5): """Random Consensus Builder (RCB) voting method due to Charikar et al. (https://arxiv.org/abs/2306.17838). For each ranking type in the profile, runs the deterministic Consensus Builder voting method using that ranking as the consensus building ranking. The probability of a candidate winning is proportional to the number of voters with rankings that would make that candidate win when used as the consensus building ranking. Args: profile (Profile): An anonymous profile of linear orders curr_cands (List[int], optional): Candidates to consider. Defaults to all candidates if not provided. beta (float): Threshold for elimination (default 0.5). When processing candidate i, eliminates a candidate j above i in the consensus building ranking if the proportion of voters preferring i to j is >= beta Returns: dict: Maps each candidate to their probability of winning under the RCB method .. seealso:: :meth:`pref_voting.iterative_methods.consensus_builder` :meth:`pref_voting.stochastic_methods.random_consensus_builder_st` """ from pref_voting.iterative_methods import consensus_builder if curr_cands is None: curr_cands = profile.candidates # Count how many times each ranking type produces each winner winner_counts = {c: 0 for c in curr_cands} # Process each unique ranking type for ranking_type in profile.ranking_types: # Count number of voters with this ranking type num_rankings_with_type = len([r for r in profile.rankings if r == ranking_type]) winner = consensus_builder(profile, curr_cands=curr_cands,consensus_building_ranking=ranking_type, beta=beta)[0] winner_counts[winner] += num_rankings_with_type total_count += num_rankings_with_type # Convert counts to probabilities return {c: count/profile.num_voters for c, count in winner_counts.items()}
def create_probabilistic_method(vm): """ Create a probabilistic voting method from a voting method. """ from pref_voting.voting_method import VotingMethod if type(vm) != VotingMethod: raise TypeError("vm must be a VotingMethod object") def _pvm(profile, curr_cands=None, **kwargs): return vm.prob(profile, curr_cands=curr_cands, **kwargs) return ProbVotingMethod(_pvm, name=f'{vm.name} with Even Chance Tiebreaking') def mixture(pvm1, pvm2, alpha): """ Mixture of the two probabilistic voting methods pvm1 and pvm2 with mixing parameter alpha. With probability alpha, the output is the output of pvm1, and with probability 1-alpha, the output is the output of pvm2. """ def _mixture(profile, curr_cands=None, **kwargs): return {c: alpha * pvm1(profile, curr_cands=curr_cands, **kwargs)[c] + (1-alpha) * pvm2(profile, curr_cands=curr_cands, **kwargs)[c] for c in profile.candidates} return ProbVotingMethod(_mixture, name=f'Mixture of {pvm1.name} and {pvm2.name} with alpha={alpha}')