Source code for pref_voting.io.readers

"""
    File: readers.py
    Author: Wes Holliday (wesholliday@berkeley.edu) and Eric Pacuit (epacuit@umd.edu)
    Date: March 17, 2024

    Functions to write election data to a file.
"""

from pref_voting.profiles import Profile
from pref_voting.profiles_with_ties import ProfileWithTies
from pref_voting.spatial_profiles import SpatialProfile
from preflibtools.instances import OrdinalInstance
import os
import csv
import pandas as pd
import json

[docs] def abif_to_profile(filename): """ Open filename in the abif format and return a Profile object. Args: filename: The name of the file to read the profile from. Returns: A Profile object. """ with open(filename, mode='r') as file: lines = list(file.readlines()) cmap = {} cand_to_indices = {} cindx = 0 # create a candidate map for line in lines: if line.startswith("="): _, cname = line[1:].strip().split(":") cmap[cindx] = cname.strip().strip("[]") cand_to_indices[cname.strip().strip("[]")] = cindx cindx += 1 rankings = [] rcounts = [] for line in lines: if line.startswith("#"): # comment continue elif line.startswith("="): # candidate line continue elif line.startswith("{"): # metadata continue else: # ranking line count, ranking = line.strip().split(":") count = int(count) ranking = ranking.split(">") assert not any(["=" in cs or "," in cs for cs in ranking]), "The election must contain linear orders on the candidates to create a Profile." if len(cmap) == 0: # no candidate map provided, so need to create one from the rankings cmap = {cidx: str(sorted(ranking)[cidx].strip()) for cidx in range(len(sorted(ranking)))} cand_to_indices = {c: i for i, c in cmap.items()} r = list() assert len(cmap) > 0 and len(ranking) == len(cmap), "The election must contain linear orders on the candidates to create a Profile." for c in ranking: assert len(cmap) > 0 and c in cand_to_indices.keys(), "Candidate found that is not in the candidate map." r.append(cand_to_indices[c.strip()]) rankings.append(r) rcounts.append(count) return Profile( rankings, rcounts=rcounts, cmap=cmap)
[docs] def abif_to_profile_with_ties(filename, cand_type=None): """ Open filename in the abif format and return a ProfileWithTies object. Args: filename: The name of the file to read the profile from. Returns: A ProfileWithTies object. """ import re with open(filename, mode='r') as file: lines = list(file.readlines()) rankings = [] rcounts = [] cmap = {} for line in lines: if line.startswith("#"): # comment continue elif line.startswith("="): # candidate line cidx, cname = line[1:].strip().split(":") cmap[cand_type(cidx.strip()) if cand_type is not None else cidx.strip()] = cname.strip().strip("[]") elif line.startswith("{"): # metadata continue else: # ranking line count, ranking = line.strip().split(":") count = int(count) ranking = ranking.split(">") r = dict() for ridx, cs in enumerate(ranking): cands = re.split(r'[=,]', cs) for c in cands: if cand_type is not None: r[cand_type(c.strip())] = ridx + 1 else: r[c.strip()] = ridx + 1 rankings.append(r) rcounts.append(count) if len(cmap) == 0: return ProfileWithTies( rankings, rcounts=rcounts) else: return ProfileWithTies( rankings, rcounts=rcounts, candidates = sorted(list(cmap.keys())), cmap = cmap)
[docs] def preflib_to_profile( instance_or_preflib_file, include_cmap=False, use_cand_names=False, as_linear_profile=False): """ Read a profile from an OrdinalInstance or a .soc, .soi, .toc, or .toi file used by PrefLib (https://www.preflib.org/format#types). This function uses the ``OrdinalInstance`` class from the ``preflibtools`` package to read the profile from the file (see https://preflib.github.io/preflibtools/usage.html#ordinal-preferences). Args: preflib_file (str): the path to the file include_cmap (bool): if True, then include the candidate map. Defaults to False. use_cand_names (bool): if True, then use the candidate map as the candidate names. Defaults to False. as_linear_profile (bool): if True, then return a Profile object. Defaults to False. If False, then return a ProfileWithTies object. Returns: Profile or ProfileWithTies: the profile read from the file """ assert type(instance_or_preflib_file) == OrdinalInstance or type(instance_or_preflib_file) == str, "The argument must be an instance of OrdinalInstance or a string." if type(instance_or_preflib_file) == str: preflib_file = instance_or_preflib_file assert preflib_file.endswith(".soc") or preflib_file.endswith(".soi") or preflib_file.endswith(".toc") or preflib_file.endswith(".toi"), f"The file must be one of the file types from preflib: https://www.preflib.org/format#types, not {preflib_file}." assert os.path.exists(preflib_file), f"The file {preflib_file} does not exist." instance = OrdinalInstance() instance.parse_file(preflib_file) else: instance = instance_or_preflib_file rankings = [] rcounts = [] cmap = {c:str(c) for c in range(instance.num_alternatives)} if not as_linear_profile: for order in instance.orders: rank = dict() for r,cs in enumerate(order): for c in cs: if not use_cand_names: rank[c] = r + 1 else: rank[instance.alternatives_name[c]] = r + 1 if include_cmap: if use_cand_names: cmap[instance.alternatives_name[c]] = instance.alternatives_name[c] else: cmap[c] = instance.alternatives_name[c] rankings.append(rank) rcounts.append(instance.multiplicity[order]) return ProfileWithTies(rankings, rcounts=rcounts, cmap=cmap) elif as_linear_profile: cand_to_cidx = {c:cidx for cidx,c in enumerate(sorted(list(instance.alternatives_name.keys())))} for order in instance.orders: rank = list() cmap = {c:str(c) for c in range(instance.num_alternatives)} for _,cs in enumerate(order): for c in cs: rank.append(cand_to_cidx[c]) if include_cmap: cmap[cand_to_cidx[c]] = instance.alternatives_name[c] rankings.append(rank) rcounts.append(instance.multiplicity[order]) return Profile(rankings, rcounts=rcounts, cmap=cmap)
[docs] def csv_to_profile( filename, csv_format="candidate_columns", as_linear_profile=False, items_to_skip=None, cand_type=None): """ Read a profile from a csv file. Args: filename (str): the path to the file csv_format (str): the format of the csv file. Defaults to "candidate_columns". The other option is "rank_columns". as_linear_profile (bool): if True, then return a Profile object. Defaults to False. If False, then return a ProfileWithTies object. items_to_skip (list[str]): a list of items to skip. Defaults to None. Items in this list are not included in the profile. Only relevant for "rank_columns" csv format. Returns: Profile or ProfileWithTies: the profile read from the file Note: There are two formats for the csv file: "rank_columns" and "candidate_columns". The "rank_columns" format is used when the csv file contains a column for each rank and the rows are the candidates at that rank (or "skipped" if the ranked is skipped). The "candidate_columns" format is used when the csv file contains a column for each candidate and the rows are the rank of the candidates (or the empty string if the candidate is not ranked). """ if csv_format == "rank_columns": df = pd.read_csv(filename) items_to_skip = items_to_skip if items_to_skip is not None else ["skipped"] ranks = [] rank_columns = [col for col in df.columns if col.startswith('rank') or col.startswith('Rank')] # Get unique values from these columns, excluding 'skipped' cand_names = pd.unique(df[rank_columns].values.ravel('K')) cand_names = [str(value) for value in cand_names if value not in items_to_skip] if 'writein' in cand_names: cands = list(set([c for c in sorted(cand_names) if c != 'writein'])) + ['writein'] else: cands = sorted(list(set(cand_names))) if len(cands) == 0: print("No candidates found in file", filename) cmap = {cidx: c for cidx,c in enumerate(cands)} cand_to_cidx = {c:cidx for cidx,c in enumerate(cands)} rank_str_to_rank = lambda rank_str: int(rank_str[4:].strip()) for _, row in df.iterrows(): ballot_dict = {} for rank in rank_columns: candidate = str(row[rank]) if candidate not in items_to_skip: ballot_dict[cand_to_cidx[candidate]] = rank_str_to_rank(rank) ballot_dict = {cand_type(c) if cand_type is not None else c:r for c,r in ballot_dict.items()} ranks.append(ballot_dict) cmap = {cand_to_cidx[c]:str(c) for c in cands} prof = ProfileWithTies(ranks, cmap=cmap) if as_linear_profile: prof = prof.to_linear_profile() assert prof is not None, "The profile could not be converted to a Profile." return prof elif csv_format == "candidate_columns": with open(filename, mode='r') as file: reader = csv.reader(file) header = next(reader) candidates = header[:-1] rankings = list() rcounts = list() for row in reader: ranks = [int(r) if r != "" else None for r in row[:-1]] count = int(row[-1]) ranking = {cand_type(c) if cand_type is not None else c:r for c,r in zip(candidates, ranks) if r is not None} rankings.append(ranking) rcounts.append(count) prof = ProfileWithTies(rankings, rcounts=rcounts, cmap={cand_type(c) if cand_type is not None else str(c):str(c) for c in candidates}) if as_linear_profile: prof = prof.to_linear_profile() assert prof is not None, "The profile could not be converted to a Profile." return prof
# helper function for json_to_profile def _convert_key_type(key, lst): for c in lst: try: # Attempt to convert the key to the same type as the candidate if type(c)(key) == c: return type(c)(key) except ValueError: continue # Return the original key if no conversion is successful return key
[docs] def json_to_profile(filename, cand_type=None, as_linear_profile=False): """ Read a profile from a json file. Args: filename (str): the path to the file cand_type (type): the type of the candidates. Defaults to None. If not None, then the candidates are converted to this type. as_linear_profile (bool): if True, then return a Profile object. Defaults to False. If False, then return a ProfileWithTies object. Returns: Profile or ProfileWithTies: the profile read from the file """ with open(filename, mode='r') as file: data = json.load(file) candidates = data["candidates"] cmap = {_convert_key_type(c, candidates): c_str for c, c_str in data["cmap"].items()} if cand_type is not None: cmap = {cand_type(c):str(c_str) for c,c_str in cmap.items()} candidates = [cand_type(c) for c in candidates] rankings = [] rcounts = [] for r_data in data["rankings"]: rank = {cand_type(c) if cand_type is not None else _convert_key_type(c, candidates):int(r) for c,r in r_data["ranking"].items()} rankings.append(rank) rcounts.append(int(r_data["count"])) if as_linear_profile: prof = ProfileWithTies(rankings, rcounts=rcounts, candidates=candidates, cmap=cmap) prof = prof.to_linear_profile() assert prof is not None, "The profile could not be converted to a Profile." else: prof = ProfileWithTies(rankings, rcounts=rcounts, candidates=candidates, cmap=cmap) return prof
[docs] def read(filename, file_format, as_linear_profile=False, cand_type=None, csv_format="candidate_columns", items_to_skip=None): """ Read election data from ``filename`` in the format ``file_format``. Args: filename (str): the path to the file file_format (str): the format of the file. The options are "preflib", "json", "csv", and "abif". as_linear_profile (bool): if True, then return a Profile object. Defaults to False. If False, then return a ProfileWithTies object. cand_type (type): the type of the candidates. Defaults to None. If not None, then the candidates are converted to this type. csv_format (str): the format of the csv file. Defaults to "candidate_columns". The other option is "rank_columns". items_to_skip (list[str]): a list of items to skip. Defaults to None. Items in this list are not included in the profile. Only relevant for "rank_columns" csv format. Returns: Profile or ProfileWithTies: the profile read from the file """ if file_format == "abif": if as_linear_profile: return abif_to_profile( filename) else: return abif_to_profile_with_ties( filename, cand_type=cand_type) elif file_format == "json": return json_to_profile( filename, cand_type=cand_type, as_linear_profile=as_linear_profile) elif file_format == "csv": return csv_to_profile( filename, as_linear_profile=as_linear_profile, cand_type=cand_type, csv_format=csv_format, items_to_skip=items_to_skip) elif file_format == "preflib": return preflib_to_profile(filename, as_linear_profile=as_linear_profile) else: raise ValueError(f"File format {file_format} not recognized.")
[docs] def json_to_spatial_profile(filename): """ Load a spatial profile from a JSON file. Args: filename (str): the path to the file Returns: SpatialProfile: the spatial profile read from the file """ with open(filename, "r") as f: spatial_profile_dict = json.load(f) candidates = spatial_profile_dict["cand_names"] voters = spatial_profile_dict["voter_names"] return SpatialProfile( {_convert_key_type(c, candidates):c_pos for c,c_pos in spatial_profile_dict["candidates"].items()}, {_convert_key_type(v, voters):v_pos for v,v_pos in spatial_profile_dict["voters"].items()} )