Source code for pref_voting.analysis

"""
    File: analysis.py
    Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu)
    Date: August 9, 2022
    Updated: May 9, 2023

    Functions to analyze voting methods
"""

from pref_voting.generate_profiles import generate_profile
from functools import partial
from multiprocess import Pool, cpu_count
import pandas as pd
import numpy as np

[docs] def find_profiles_with_different_winners( vms, numbers_of_candidates=[3, 4, 5], numbers_of_voters=[5, 25, 50, 100], all_unique_winners=False, show_profiles=True, show_margin_graphs=True, show_winning_sets=True, show_rankings_counts=False, return_multiple_profiles=True, probmod="IC", num_trials=10000, ): """ Given a list of voting methods, search for profiles with different winning sets. Args: vms (list(functions)): A list of voting methods, numbers_of_candidates (list(int), default = [3, 4, 5]): The numbers of candidates to check. numbers_of_voters (list(int), default = [5, 25, 50, 100]): The numbers of voters to check. all_unique_winners (bool, default = False): If True, only return profiles in which each voting method has a unique winner. show_profiles (bool, default=True): If True, show profiles with different winning sets for the voting methods when discovered. show_margin_graphs (bool, default=True): If True, show margin graphs of the profiles with different winning sets for the voting methods when discovered. show_winning_sets (bool, default=True): If True, show the different winning sets for the voting methods when discovered. show_rankings_counts (bool, default=True): If True, show the rankings and counts of the profiles with different winning sets for the voting methods. return_multiple_profiles (bool, default=True): If True, return all profiles that are found. probmod (str, default="IC"): The probability model to be passed to the ``generate_profile`` method num_trials (int, default=10000): The number of profiles to check for different winning sets. """ profiles = list() for num_cands in numbers_of_candidates: for num_voters in numbers_of_voters: print(f"{num_cands} candidates, {num_voters} voters") for t in range(num_trials): prof = generate_profile(num_cands, num_voters, probmod=probmod) winning_sets = {vm.name: vm(prof) for vm in vms} wss = [tuple(ws) for ws in list(winning_sets.values())] if ( not all_unique_winners or all([len(ws) == 1 for ws in wss]) ) and list(set(wss)) == list(wss): if show_profiles: prof.display() if show_margin_graphs: prof.display_margin_graph() if show_winning_sets: for vm in vms: vm.display(prof) if show_rankings_counts: print(prof.rankings_counts) if not return_multiple_profiles: return prof else: profiles.append(prof) print(f"Found {len(profiles)} profiles with different winning sets") return profiles
def record_condorcet_efficiency_data(vms, num_cands, num_voters, probmod, probmod_param, t): prof = generate_profile(num_cands, num_voters, probmod=probmod, probmod_param=probmod_param) cw = prof.condorcet_winner() return { "has_cw": cw is not None, "cw_winner": {vm.name: cw is not None and [cw] == vm(prof) for vm in vms}, }
[docs] def condorcet_efficiency_data( vms, numbers_of_candidates=[3, 4, 5], numbers_of_voters=[4, 10, 20, 50, 100, 500, 1000], probmods=["IC"], probmod_params=None, num_trials=10000, use_parallel=True, num_cpus=12, ): """ Returns a Pandas DataFrame with the Condorcet efficiency of a list of voting methods. Args: vms (list(functions)): A list of voting methods, numbers_of_candidates (list(int), default = [3, 4, 5]): The numbers of candidates to check. numbers_of_voters (list(int), default = [5, 25, 50, 100]): The numbers of voters to check. probmod (str, default="IC"): The probability model to be passed to the ``generate_profile`` method num_trials (int, default=10000): The number of profiles to check for different winning sets. use_parallel (bool, default=True): If True, then use parallel processing. num_cpus (int, default=12): The number of (virtual) cpus to use if using parallel processing. """ probmod_params_list = [None]*len(probmods) if probmod_params is None else probmod_params assert len(probmod_params_list) == len(probmods), "probmod_params must be a list of the same length as probmods" if use_parallel: pool = Pool(num_cpus) data_for_df = { "vm": list(), "num_cands": list(), "num_voters": list(), "probmod": list(), "probmod_param":list(), "num_trials": list(), "percent_condorcet_winners": list(), "condorcet_efficiency": list(), } for probmod,probmod_param in zip(probmods, probmod_params_list): for num_cands in numbers_of_candidates: for num_voters in numbers_of_voters: print(f"{num_cands} candidates, {num_voters} voters") get_data = partial( record_condorcet_efficiency_data, vms, num_cands, num_voters, probmod, probmod_param ) if use_parallel: data = pool.map(get_data, range(num_trials)) else: data = list(map(get_data, range(num_trials))) num_cw = 0 num_choose_cw = {vm.name: 0 for vm in vms} for d in data: if d["has_cw"]: num_cw += 1 for vm in vms: num_choose_cw[vm.name] += int(d["cw_winner"][vm.name]) for vm in vms: data_for_df["vm"].append(vm.name) data_for_df["num_cands"].append(num_cands) data_for_df["num_voters"].append(num_voters) data_for_df["probmod"].append(probmod) data_for_df["probmod_param"].append(probmod_param) data_for_df["num_trials"].append(num_trials) data_for_df["percent_condorcet_winners"].append( num_cw / num_trials ) data_for_df["condorcet_efficiency"].append( num_choose_cw[vm.name] / num_cw ) return pd.DataFrame(data_for_df)
# helper function for axiom_violations_data def record_axiom_violation_data( axioms, vms, num_cands, num_voters, probmod, verbose, t ): prof = generate_profile(num_cands, num_voters, probmod=probmod) return {ax.name: {vm.name: ax.has_violation(prof, vm, verbose=verbose) for vm in vms} for ax in axioms}
[docs] def axiom_violations_data( axioms, vms, numbers_of_candidates=[3, 4, 5], numbers_of_voters=[4, 5, 10, 11, 20, 21, 50, 51, 100, 101, 500, 501, 1000, 1001], probmods=["IC"], num_trials=10000, verbose=False, use_parallel=True, num_cpus=12, ): """ Returns a Pandas DataFrame with the Condorcet efficiency of a list of voting methods. Args: vms (list(functions)): A list of voting methods, numbers_of_candidates (list(int), default = [3, 4, 5]): The numbers of candidates to check. numbers_of_voters (list(int), default = [5, 25, 50, 100]): The numbers of voters to check. probmod (str, default="IC"): The probability model to be passed to the ``generate_profile`` method num_trials (int, default=10000): The number of profiles to check for different winning sets. use_parallel (bool, default=True): If True, then use parallel processing. num_cpus (int, default=12): The number of (virtual) cpus to use if using parallel processing. """ if use_parallel: pool = Pool(num_cpus) data_for_df = { "axiom": list(), "vm": list(), "num_cands": list(), "num_voters": list(), "probmod": list(), "num_trials": list(), "num_violations": list(), } for probmod in probmods: print(f"{probmod} probability model") for num_cands in numbers_of_candidates: for num_voters in numbers_of_voters: #print(f"{num_cands} candidates, {num_voters} voters") _verbose = verbose if not use_parallel else False get_data = partial( record_axiom_violation_data, axioms, vms, num_cands, num_voters, probmod, _verbose ) if use_parallel: data = pool.map(get_data, range(num_trials)) else: data = list(map(get_data, range(num_trials))) for ax in axioms: for vm in vms: data_for_df["axiom"].append(ax.name) data_for_df["vm"].append(vm.name) data_for_df["num_cands"].append(num_cands) data_for_df["num_voters"].append(num_voters) data_for_df["probmod"].append(probmod) data_for_df["num_trials"].append(num_trials) data_for_df["num_violations"].append(sum([d[ax.name][vm.name] for d in data])) print("Done.") return pd.DataFrame(data_for_df)
def estimated_variance_of_sampling_dist( values_for_each_experiment, mean_for_each_experiment=None): # values_for_each_vm is a 2d numpy array mean_for_each_experiment = np.nanmean(values_for_each_experiment, axis=1) if mean_for_each_experiment is not None else mean_for_each_experiment num_val_for_each_exp = np.sum(~np.isnan(values_for_each_experiment), axis=1) row_means_reshaped = mean_for_each_experiment[:, np.newaxis] return np.where( num_val_for_each_exp * (num_val_for_each_exp - 1) != 0.0, (1 / (num_val_for_each_exp * (num_val_for_each_exp - 1))) * np.nansum( (values_for_each_experiment - row_means_reshaped) ** 2, axis=1), np.nan ) def estimated_std_error(values_for_each_experiment, mean_for_each_experiment=None): # values_for_each_vm is a 2d numpy array return np.sqrt(estimated_variance_of_sampling_dist(values_for_each_experiment, mean_for_each_experiment=mean_for_each_experiment))
[docs] def means_with_estimated_standard_error( generate_samples, max_std_error, initial_trials=1000, step_trials=1000, min_num_trials=10_000, max_num_trials=None, verbose=False ): """ For each list of numbers produced by generate_samples, returns the means, the estimated standard error (https://en.wikipedia.org/wiki/Standard_error) of the means, the variance of the samples, and the total number of trials. Uses the estimated_variance_of_sampling_dist (as described in https://berkeley-stat243.github.io/stat243-fall-2023/units/unit9-sim.html) and estimated_std_error functions. Args: generate_samples (function): A function that a 2d numpy array of samples. It should take two arguments: num_samples and step (only used if samples are drawn from a pre-computed source in order to ensure that we get new samples during the while loop below). max_std_error (float): The desired estimated standard error for the mean of each sample. initial_trials (int, default=1000): The number of samples to initially generate. step_trials (int, default=1000): The number of samples to generate in each step. min_num_trials (int, default=10000): The minimum number of trials to run. max_num_trials (int, default=None): If not None, then the maximum number of trials to run. verbose (bool, default=False): If True, then print progress information. Returns: A tuple (means, est_std_errors, variances, num_trials) where means is an array of the means of the samples, est_std_errors is an array of estimated standard errors of the samples, variances is an array of the variances of the samples, and num_trials is the total number of trials. """ # samples is a 2d numpy array step = 0 samples = generate_samples(num_samples = initial_trials, step = step) means = np.nanmean(samples, axis=1) variances = np.nanvar(samples, axis=1) est_std_errors = estimated_std_error( samples, mean_for_each_experiment=means) if verbose: print("Initial number of trials:", initial_trials) print(f"Remaining estimated standard errors greater than {max_std_error}:", np.sum(est_std_errors > max_std_error)) print(f"Estimated standard errors that are still greater than {max_std_error}:\n",est_std_errors[est_std_errors > max_std_error]) num_trials = initial_trials while (np.isnan(est_std_errors).any() or np.any(est_std_errors > max_std_error) or (num_trials < min_num_trials)) and (max_num_trials is None or num_trials < max_num_trials): if verbose: print("Number of trials:", num_trials) print(f"Remaining estimated standard errors greater than {max_std_error}:", np.sum(est_std_errors > max_std_error)) print(f"Estimated standard errors that are still greater than {max_std_error}:\n",est_std_errors[est_std_errors > max_std_error]) step += 1 new_samples = generate_samples(num_samples=step_trials, step=step) samples = np.concatenate((samples, new_samples), axis=1) num_trials += step_trials means = np.nanmean(samples, axis=1) variances = np.nanvar(samples, axis=1) est_std_errors = estimated_std_error( samples, mean_for_each_experiment=means) return means, est_std_errors, variances, num_trials
#### Bootstrap confidence interval analysis (to be removed) #### def bootstrap_cia( generate_samples, process_samples, precision, averaging_axis = 0, confidence=0.95, initial_trials=10000, step_trials=1000, bootstrap_samples=1000, verbose = False, max_num_trials = None ): """ Applies the percentile bootstrap confidence interval analysis using the functions generate_samples and process_samples. Args: generate_samples (function): A function that generates samples. process_samples (function): A function that processes the samples generated by generate_samples. precision (float): The desired precision of the confidence interval. averaging_axis (int, default=0): The axis along which to average the samples. confidence (float, default=0.95): The desired confidence level of the confidence interval. initial_trials (int, default=10000): The number of samples to initially generate. step_trials (int, default=1000): The number of samples to generate in each step. bootstrap_samples (int, default=1000): The number of bootstrap samples to select. verbose (bool, default=False): If True, then print progress information. max_num_trials (int, default=None): If not None, then the maximum number of trials to run. Returns: A tuple (means, half_widths, variance_bootstrap_means, variance_of_values, num_trials) where means is an array of the means of the bootstrap means, half_widths is an array of the half widths of the confidence intervals, variance_bootstrap_means is an array of the variances of the bootstrap means, and variance_of_values is an array of the variances of the values returned by process_samples, and num_trials is the total number of trials. """ num_gen_samples = 0 # Generate initial samples samples = generate_samples(num_samples = initial_trials, num_gen_samples = num_gen_samples) # Process the results values = process_samples(samples) # Generate bootstrap samples and calculate the confidence interval for values bootstrap_means = np.array([ [np.mean(np.random.choice(s, size=len(s))) for s in values] for _ in range(bootstrap_samples) ]) means = np.mean(bootstrap_means, axis=0) lowers, uppers = np.percentile(bootstrap_means, [(1-confidence)/2*100, (1+confidence)/2*100], axis=0) half_widths = (uppers - lowers) / 2 variance_bootstrap_means = np.var(bootstrap_means, axis=0) # Compute variance of each array in values variance_of_values = np.array([np.var(arr) for arr in values]) # Continue running simulations until the confidence interval is narrow enough num_trials = initial_trials num_gen_samples += 1 while np.any(half_widths > precision): if verbose: print("Number of trials:", num_trials) print(f"Remaining half widths greater than {precision}:", np.sum(half_widths > precision)) print(f"Half widths that are still greater than {precision}:\n",half_widths[half_widths > precision]) new_samples = generate_samples(num_samples = step_trials, num_gen_samples = num_gen_samples) num_gen_samples += 1 samples = np.concatenate((samples, new_samples), axis=averaging_axis) num_trials += step_trials values = process_samples(samples) bootstrap_means = np.array([ [np.mean(np.random.choice(s, size=len(s))) for s in values] for _ in range(bootstrap_samples) ]) means = np.mean(bootstrap_means, axis=0) lowers, uppers = np.percentile(bootstrap_means, [(1-confidence)/2*100, (1+confidence)/2*100], axis=0) half_widths = (uppers - lowers) / 2 variance_bootstrap_means = np.var(bootstrap_means, axis=0) variance_of_values = np.array([np.var(arr) for arr in values]) if max_num_trials is not None and num_trials > max_num_trials: break return means, half_widths, variance_bootstrap_means, variance_of_values, num_trials