diff --git a/.travis.yml b/.travis.yml index 9568f66..a1835bb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,4 @@ language: python -sudo: false python: - '2.7' - '3.5' @@ -13,14 +12,13 @@ install: - conda config --set always_yes yes --set changeps1 no - conda update -q conda - conda info -a -- | - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest +- conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION pip numexpr numpy pandas scipy pytest tqdm==4.24 futures - source activate test-environment -- pip install pytest-cov coveralls pep8 +- pip install pytest-cov coveralls pycodestyle - pip install . script: -- pep8 synthpop -- py.test --cov synthpop --cov-report term-missing +- pycodestyle synthpop +- travis_wait 20 py.test --cov synthpop --cov-report term-missing after_success: - coveralls notifications: diff --git a/scripts/sfbay_synth.py b/scripts/sfbay_synth.py new file mode 100644 index 0000000..b20da58 --- /dev/null +++ b/scripts/sfbay_synth.py @@ -0,0 +1,134 @@ +import os +import pandas as pd +from glob import glob +import warnings +from datetime import date +from multiprocessing import freeze_support + +from synthpop.census_helpers import Census +from synthpop.recipes.starter2 import Starter +from synthpop.synthesizer import synthesize_all_in_parallel, \ + synthesize_all_in_parallel_mp, \ + synthesize_all_in_parallel_full + +warnings.filterwarnings('ignore') + +today = str(date.today()) + +counties = [ + # "Alpine County", + # "Napa County", + "Santa Clara County", + # "Solano County", + # "San Mateo County", + # "Marin County", + # "San Francisco County", + # "Sonoma County", + # "Contra Costa County", + # "Alameda County" +] + +if __name__ == '__main__': + + freeze_support() + + for county in counties: + print('#' * 80) + print(' Processing {0} '.format(county).center(80, '#')) + c = Census(os.environ["CENSUS"]) + starter = Starter(os.environ["CENSUS"], "CA", county) + # county_dfs = synthesize_all(starter, num_geogs=1) + county_dfs = synthesize_all_in_parallel_full( + starter, + # max_workers=20, + # num_geogs=100 + ) + print('#' * 80) + + # hh_all = county_dfs[0] + # p_all = county_dfs[1] + # fits_all = county_dfs[2] + + # hh_all.index.name = 'household_id' + # p_all.index.name = 'person_id' + # p_all.rename(columns={'hh_id': 'household_id'}, inplace=True) + + # hh_all['age_of_head'] = p_all[p_all.RELP == 0].groupby( + # 'household_id').AGEP.max() + # hh_all['race_of_head'] = p_all[p_all.RELP == 0].groupby( + # 'household_id').RAC1P.max() + # hh_all['workers'] = p_all[p_all.ESR.isin([1, 2, 4, 5])].groupby( + # 'household_id').size() + # hh_all['children'] = p_all[p_all.AGEP < 18].groupby( + # 'household_id').size() + # hh_all['tenure'] = 2 + # hh_all.tenure[hh_all.TEN < 3] = 1 # tenure coded 1:own, 2:rent + # hh_all['recent_mover'] = 0 + # hh_all.recent_mover[hh_all.MV < 4] = 1 # 1 if recent mover + # hh_all = hh_all.rename(columns={ + # 'VEH': 'cars', 'HINCP': 'income', 'NP': 'persons', + # 'BLD': 'building_type'}) + + # for col in hh_all.columns: + # if col not in [ + # 'persons', 'income', 'age_of_head', 'race_of_head', + # 'hispanic_head', 'workers', 'children', 'cars', 'tenure', + # 'recent_mover', 'building_type', 'serialno', 'state', + # 'county', 'tract', 'block group']: + # del hh_all[col] + + # p_all.rename(columns={ + # 'AGEP': 'age', 'RAC1P': 'race_id', 'NP': 'persons', + # 'SPORDER': 'member_id', 'HISP': 'hispanic', 'RELP': 'relate', + # 'SEX': 'sex', 'WKHP': 'hours', 'SCHL': 'edu', 'PERNP': 'earning', + # 'JWTR': 'primary_commute_mode'}, + # inplace=True) + # p_all['student'] = 0 + # p_all.loc[p_all.SCH.isin([2, 3]), 'student'] = 1 + # p_all['work_at_home'] = 0 + # p_all.loc[p_all.primary_commute_mode == 11, 'work_at_home'] = 1 + # p_all['worker'] = 0 + # p_all.loc[p_all.ESR.isin([1, 2, 4, 5]), 'worker'] = 1 + # p_all['self_employed'] = 0 + # p_all.loc[p_all['COW'].isin([6, 7]), 'self_employed'] = 1 + + # for col in p_all.columns: + # if col not in ['household_id', 'member_id', + # 'relate', 'age', 'sex', 'race_id', 'hispanic', + # 'student', 'worker', 'hours', + # 'work_at_home', 'edu', 'earning', 'self_employed']: + # del p_all[col] + + # hh_all.to_csv('{0}_hh_synth_parallel_{1}.csv'.format( + # county.replace(' ', '_'), today)) + # p_all.to_csv('{0}_p_synth_parallel_{1}.csv'.format( + # county.replace(' ', '_'), today)) + + # # concat all the county dfs + # hh_fnames = glob('*hh*.csv') + + # p_df_list = [] + # hh_df_list = [] + # hh_index_start = 0 + # p_index_start = 0 + + # for hh_file in hh_fnames: + # county = hh_file.split('_hh')[0] + # hh_df = pd.read_csv(hh_file, index_col='household_id', header=0) + # p_df = pd.read_csv( + # glob(county + '_p*.csv')[0], index_col='person_id', header=0) + # print(county + ': {0}'.format(str(hh_df.iloc[0].county))) + # hh_df.index += hh_index_start + # p_df.household_id += hh_index_start + # p_df.index += p_index_start + # hh_df_list.append(hh_df) + # p_df_list.append(p_df) + # hh_index_start = hh_df.index.values[-1] + 1 + # p_index_start = p_df.index.values[-1] + 1 + + # hh_all = pd.concat(hh_df_list) + # p_all = pd.concat(p_df_list) + # print(len(hh_all.iloc[hh_all.index.duplicated(keep=False)])) + # print(len(p_all.iloc[p_all.index.duplicated(keep=False)])) + # p_all.to_csv('sfbay_persons_2018_09_27.csv') + # hh_all.to_csv('sfbay_households_2018_09_27.csv') diff --git a/setup.py b/setup.py index 09b785b..c870a56 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,8 @@ 'Development Status :: 4 - Beta', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6' + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7' ], packages=find_packages(exclude=['*.tests']), install_requires=[ @@ -24,6 +25,7 @@ 'numpy>=1.8.0', 'pandas>=0.15.0', 'scipy>=0.13.3', - 'us>=0.8' + 'us>=0.8', + 'tqdm>=4.23' ] ) diff --git a/synthpop/census_helpers.py b/synthpop/census_helpers.py index 7597540..665e3fb 100644 --- a/synthpop/census_helpers.py +++ b/synthpop/census_helpers.py @@ -83,7 +83,7 @@ def chunks(l, n): """ Yield successive n-sized chunks from l. """ for i in range(0, len(l), n): - yield l[i:i+n] + yield l[i: i + n] for census_column_batch in chunks(census_columns, 45): census_column_batch = list(census_column_batch) @@ -200,12 +200,12 @@ def try_fips_lookup(self, state, county=None): if county is None: try: return getattr(us.states, state).fips - except: + except (KeyError, NameError, ValueError, AttributeError, IndexError): pass return state try: return df.loc[(state, county)] - except: + except (KeyError, NameError, ValueError, AttributeError, IndexError): pass return state, county diff --git a/synthpop/ipf/test/test_ipf.py b/synthpop/ipf/test/test_ipf.py index f30142b..33496d2 100644 --- a/synthpop/ipf/test/test_ipf.py +++ b/synthpop/ipf/test/test_ipf.py @@ -2,7 +2,7 @@ import pytest from pandas.util import testing as pdt -from .. import ipf +from synthpop.ipf import ipf def test_trivial_ipf(): diff --git a/synthpop/ipu/ipu.py b/synthpop/ipu/ipu.py index 4fdf10f..285ae84 100644 --- a/synthpop/ipu/ipu.py +++ b/synthpop/ipu/ipu.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd +import warnings def _drop_zeros(df): @@ -99,7 +100,7 @@ def iter_columns(self): The returned column contains only the non-zero elements. """ - return list(self._everything.values()) + return self._everything.values() def get_column(self, key): """ @@ -193,7 +194,7 @@ def _update_weights(column, weights, constraint): def household_weights( household_freq, person_freq, household_constraints, person_constraints, - convergence=1e-4, max_iterations=20000): + convergence=1e-4, max_iterations=20000, ignore_max_iters=False): """ Calculate the household weights that best match household and person level attributes. @@ -259,9 +260,18 @@ def household_weights( iterations += 1 if iterations > max_iterations: - raise RuntimeError( - 'Maximum number of iterations reached during IPU: {}'.format( - max_iterations)) + + if ignore_max_iters: + warnings.warn( + 'Maximum number of iterations reached ' + 'during IPU: {}'.format(max_iterations), UserWarning) + return ( + pd.Series(best_weights, index=household_freq.index), + best_fit_qual, iterations) + else: + raise RuntimeError( + 'Maximum number of iterations reached ' + 'during IPU: {}'.format(max_iterations)) return ( pd.Series(best_weights, index=household_freq.index), diff --git a/synthpop/ipu/test/test_ipu.py b/synthpop/ipu/test/test_ipu.py index 721e9a1..df83fd3 100644 --- a/synthpop/ipu/test/test_ipu.py +++ b/synthpop/ipu/test/test_ipu.py @@ -2,9 +2,8 @@ import numpy.testing as npt import pandas as pd import pytest -from pandas.util import testing as pdt -from .. import ipu +from synthpop.ipu import ipu @pytest.fixture(scope='module') @@ -169,10 +168,18 @@ def test_household_weights( def test_household_weights_max_iter( household_freqs, person_freqs, household_constraints, person_constraints): + + with pytest.warns(UserWarning): + ipu.household_weights( + household_freqs, person_freqs, household_constraints, + person_constraints, convergence=1e-7, max_iterations=10, + ignore_max_iters=True) + with pytest.raises(RuntimeError): ipu.household_weights( household_freqs, person_freqs, household_constraints, - person_constraints, convergence=1e-7, max_iterations=10) + person_constraints, convergence=1e-7, max_iterations=10, + ignore_max_iters=False) def test_FrequencyAndConstraints(freq_wrap): diff --git a/synthpop/recipes/starter2.py b/synthpop/recipes/starter2.py index 8008d9e..4865a6a 100644 --- a/synthpop/recipes/starter2.py +++ b/synthpop/recipes/starter2.py @@ -30,9 +30,11 @@ class Starter: Returns ------- household_marginals : DataFrame - Marginals per block group for the household data (from ACS 5-year estimates) + Marginals per block group for the household + data (from ACS 5-year estimates) person_marginals : DataFrame - Marginals per block group for the person data (from ACS 5-year estimates) + Marginals per block group for the person + data (from ACS 5-year estimates) household_jointdist : DataFrame joint distributions for the households (from PUMS 2010-2000), one joint distribution for each PUMA (one row per PUMA) @@ -57,7 +59,8 @@ def __init__(self, key, state, county, tract=None, acsyear=2016): income_columns = ['B19001_0%02dE' % i for i in range(1, 18)] vehicle_columns = ['B08201_0%02dE' % i for i in range(1, 7)] workers_columns = ['B08202_0%02dE' % i for i in range(1, 6)] - presence_of_children_columns = ['B11005_001E', 'B11005_002E', 'B11005_011E'] + presence_of_children_columns = [ + 'B11005_001E', 'B11005_002E', 'B11005_011E'] presence_of_seniors_columns = ['B11007_002E', 'B11007_007E'] tenure_mover_columns = ['B25038_0%02dE' % i for i in range(1, 16)] block_group_columns = ( @@ -137,7 +140,8 @@ def __init__(self, key, state, county, tract=None, acsyear=2016): female_age_columns = ['B01001_0%02dE' % i for i in range(27, 50)] all_columns = population + sex + race + male_age_columns + \ female_age_columns + hh_population + hispanic - p_acs = c.block_group_query(all_columns, state, county, tract=tract, year=acsyear) + p_acs = c.block_group_query( + all_columns, state, county, tract=tract, year=acsyear) self.p_acs = p_acs self.p_acs_cat = cat.categorize(p_acs, { ("person_age", "19 and under"): @@ -162,11 +166,11 @@ def __init__(self, key, state, county, tract=None, acsyear=2016): "B01001_043E + B01001_044E + B01001_045E + " "B01001_046E + B01001_047E + B01001_048E + " "B01001_049E) * B11002_001E*1.0/B01001_001E", - ("race", "white"): "(B02001_002E) * B11002_001E*1.0/B01001_001E", - ("race", "black"): "(B02001_003E) * B11002_001E*1.0/B01001_001E", - ("race", "asian"): "(B02001_005E) * B11002_001E*1.0/B01001_001E", - ("race", "other"): "(B02001_004E + B02001_006E + B02001_007E + " - "B02001_008E) * B11002_001E*1.0/B01001_001E", + ("race", "white"): "(B02001_002E) * B11002_001E*1.0/B01001_001E", + ("race", "black"): "(B02001_003E) * B11002_001E*1.0/B01001_001E", + ("race", "asian"): "(B02001_005E) * B11002_001E*1.0/B01001_001E", + ("race", "other"): "(B02001_004E + B02001_006E + B02001_007E + " + "B02001_008E) * B11002_001E*1.0/B01001_001E", ("person_sex", "male"): "(B01001_002E) * B11002_001E*1.0/B01001_001E", ("person_sex", "female"): @@ -177,13 +181,14 @@ def __init__(self, key, state, county, tract=None, acsyear=2016): "(B03003_002E) * B11002_001E*1.0/B01001_001E", }, index_cols=['state', 'county', 'tract', 'block group']) - # Put the needed PUMS variables here. These are also the PUMS variables - # that will be in the outputted synthetic population + # Put the needed PUMS variables here. These are also the PUMS + # variables that will be in the outputted synthetic population self.h_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RT', 'NP', 'TYPE', 'R65', 'HINCP', 'VEH', 'MV', 'TEN', 'BLD', 'R18') - self.p_pums_cols = ('serialno', 'PUMA00', 'PUMA10', 'RELP', 'AGEP', - 'ESR', 'RAC1P', 'HISP', 'SEX', 'SPORDER', - 'PERNP', 'SCHL', 'WKHP', 'JWTR', 'SCH') + self.p_pums_cols = ( + 'serialno', 'SPORDER', 'PUMA00', 'PUMA10', 'RELP', 'AGEP', 'ESR', + 'SCHL', 'SCH', 'JWTR', 'PERNP', 'WKHP', 'RAC1P', 'HISP', 'SEX', + 'COW') def get_geography_name(self): # this synthesis is at the block group level for most variables diff --git a/synthpop/recipes/tests/test_starter.py b/synthpop/recipes/tests/test_starter.py index c31b1df..a3144f2 100644 --- a/synthpop/recipes/tests/test_starter.py +++ b/synthpop/recipes/tests/test_starter.py @@ -1,6 +1,8 @@ import pytest -from ...synthesizer import * -from ..starter import Starter + +from synthpop.synthesizer import * +from synthpop.recipes.starter import Starter +from synthpop.recipes.starter2 import Starter as Starter2 @pytest.fixture @@ -9,5 +11,11 @@ def key(): def test_starter(key): - st = Starter(key, "CA", "Napa County") + st = Starter(key, "CA", "Alpine County") + # just run it for now synthesize_all(st, num_geogs=1) + + +# no synthesizer bc it's too memory intensive for travis +def test_starter2(key): + Starter2(key, "CA", "Alpine County") diff --git a/synthpop/recipes/tests/test_starter2.py b/synthpop/recipes/tests/test_starter2.py new file mode 100644 index 0000000..6c6bf8d --- /dev/null +++ b/synthpop/recipes/tests/test_starter2.py @@ -0,0 +1,22 @@ +import pytest + +from synthpop.synthesizer import * +from synthpop.recipes.starter import Starter +from synthpop.recipes.starter2 import Starter as Starter2 + + +@pytest.fixture +def key(): + return "827402c2958dcf515e4480b7b2bb93d1025f9389" + + +def test_starter(key): + st = Starter(key, "IL", "Kendall County") + # just run it for now + synthesize_all(st, num_geogs=1) + + +# no synthesizer bc it's too memory intensive for travis +def test_starter2(key): + Starter2(key, "IL", "Kendall County") + synthesize_all_in_parallel(st, num_geogs=1) diff --git a/synthpop/synthesizer.py b/synthpop/synthesizer.py index f5532b1..95ca8c5 100644 --- a/synthpop/synthesizer.py +++ b/synthpop/synthesizer.py @@ -4,7 +4,13 @@ import numpy as np import pandas as pd -from scipy.stats import chisquare +from datetime import datetime as dt +from time import sleep, time +from tqdm import tqdm +from concurrent.futures import ProcessPoolExecutor, as_completed +import multiprocessing +from multiprocessing import Pool +from itertools import repeat from . import categorizer as cat from . import draw @@ -26,7 +32,7 @@ def enable_logging(): def synthesize(h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, - marginal_zero_sub=.01, jd_zero_sub=.001, hh_index_start=0): + marginal_zero_sub=.01, jd_zero_sub=.001, hh_index_start=0, ignore_max_iters=False): # this is the zero marginal problem h_marg = h_marg.replace(0, marginal_zero_sub) @@ -67,23 +73,22 @@ def synthesize(h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, logger.info("Running ipu") import time t1 = time.time() - best_weights, fit_quality, iterations = household_weights(household_freq, - person_freq, - h_constraint, - p_constraint) - logger.info("Time to run ipu: %.3fs" % (time.time()-t1)) + max_iterations = 20000 + best_weights, fit_quality, iterations = household_weights( + household_freq, person_freq, h_constraint, p_constraint, + max_iterations=max_iterations, ignore_max_iters=ignore_max_iters) + logger.info("Time to run ipu: %.3fs" % (time.time() - t1)) logger.debug("IPU weights:") logger.debug(best_weights.describe()) - logger.debug("Fit quality:") - logger.debug(fit_quality) - logger.debug("Number of iterations:") - logger.debug(iterations) - + logger.debug("Fit quality: {0}".format(fit_quality)) + if iterations == 20000: + logger.warning("Number of iterations: {0}".format(str(iterations))) + else: + logger.debug("Number of iterations: {0}".format(str(iterations))) num_households = int(h_marg.groupby(level=0).sum().mean()) - print("Drawing %d households" % num_households) - best_chisq = np.inf + # print("Drawing %d households" % num_households) return draw.draw_households( num_households, h_pums, p_pums, household_freq, h_constraint, @@ -91,7 +96,7 @@ def synthesize(h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, def synthesize_all(recipe, num_geogs=None, indexes=None, - marginal_zero_sub=.01, jd_zero_sub=.001): + marginal_zero_sub=.01, jd_zero_sub=.001, ignore_max_iters=False): """ Returns ------- @@ -102,9 +107,6 @@ def synthesize_all(recipe, num_geogs=None, indexes=None, and ``people_p``. """ - print("Synthesizing at geog level: '{}' (number of geographies is {})" - .format(recipe.get_geography_name(), recipe.get_num_geographies())) - if indexes is None: indexes = recipe.get_available_geography_ids() @@ -114,9 +116,8 @@ def synthesize_all(recipe, num_geogs=None, indexes=None, fit_quality = {} hh_index_start = 0 - # TODO will parallelization work here? - for geog_id in indexes: - print("Synthesizing geog id:\n", geog_id) + for geog_id in tqdm(indexes, total=num_geogs): + # print("Synthesizing geog id:\n", geog_id) h_marg = recipe.get_household_marginal_for_geography(geog_id) logger.debug("Household marginal") @@ -139,7 +140,7 @@ def synthesize_all(recipe, num_geogs=None, indexes=None, synthesize( h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, marginal_zero_sub=marginal_zero_sub, jd_zero_sub=jd_zero_sub, - hh_index_start=hh_index_start) + hh_index_start=hh_index_start, ignore_max_iters=ignore_max_iters) # Append location identifiers to the synthesized households for geog_cat in geog_id.keys(): @@ -164,3 +165,330 @@ def synthesize_all(recipe, num_geogs=None, indexes=None, all_persons = pd.concat(people_list, ignore_index=True) return (all_households, all_persons, fit_quality) + + +def geog_preprocessing(geog_id, recipe, marginal_zero_sub, jd_zero_sub, + hh_index_start, ignore_max_iters): + h_marg = recipe.get_household_marginal_for_geography(geog_id) + logger.debug("Household marginal") + logger.debug(h_marg) + + p_marg = recipe.get_person_marginal_for_geography(geog_id) + logger.debug("Person marginal") + logger.debug(p_marg) + + h_pums, h_jd = recipe.\ + get_household_joint_dist_for_geography(geog_id) + logger.debug("Household joint distribution") + logger.debug(h_jd) + + p_pums, p_jd = recipe.get_person_joint_dist_for_geography(geog_id) + logger.debug("Person joint distribution") + logger.debug(p_jd) + + return h_marg, p_marg, h_jd, p_jd, h_pums, p_pums, marginal_zero_sub,\ + jd_zero_sub, hh_index_start, ignore_max_iters + + +def synth_worker( + arg_tuple, hh_index_start=0, ignore_max_iters=False): + # geog_id, recipe, marginal_zero_sub, jd_zero_sub): + geog_id, recipe, marginal_zero_sub, jd_zero_sub = arg_tuple + + synth_args = geog_preprocessing( + geog_id, recipe, marginal_zero_sub, jd_zero_sub, hh_index_start, ignore_max_iters) + households, people, people_chisq, people_p = synthesize(*synth_args) + + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + + return households, people, key, people_chisq, people_p + + +def synthesize_all_in_parallel( + recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, + jd_zero_sub=.001, max_workers=None, hh_index_start=0, ignore_max_iters=False): + """ + Returns + ------- + households, people : pandas.DataFrame + fit_quality : dict of FitQuality + Keys are geographic IDs, values are namedtuples with attributes + ``.household_chisq``, ``household_p``, ``people_chisq``, + and ``people_p``. + ignore_max_iters: boolean which indicates to ignore the max iterations in the ipu. Default, False. + """ + # cluster = LocalCluster() + # client = Client(cluster) + with ProcessPoolExecutor(max_workers=5) as ex: + + if indexes is None: + indexes = recipe.get_available_geography_ids() + + hh_list = [] + people_list = [] + cnt = 0 + fit_quality = {} + hh_index_start = 0 + geog_synth_args = [] + finished_args = [] + geog_ids = [] + futures = [] + + print('Submitting function args for parallel processing:') + for i, geog_id in enumerate(indexes): + geog_synth_args.append(ex.submit( + geog_preprocessing, geog_id, recipe, marginal_zero_sub, + jd_zero_sub, hh_index_start, ignore_max_iters)) + geog_ids.append(geog_id) + cnt += 1 + if num_geogs is not None and cnt >= num_geogs: + break + + print('Processing function args in parallel:') + for finished_arg in tqdm( + as_completed(geog_synth_args), total=len(geog_synth_args)): + finished_args.append(finished_arg.result()) + + print('Submitting {0} geographies for parallel processing.'.format( + len(finished_args))) + futures = [ + ex.submit(synthesize, *geog_args) for geog_args in finished_args] + + print('Beginning population synthesis in parallel:') + for f in tqdm(as_completed(futures), total=len(futures)): + pass + + print('Processing results:') + for i, future in tqdm(enumerate(futures), total=len(futures)): + try: + households, people, people_chisq, people_p = future.result() + except Exception as e: + print('Generated an exception: {0}'.format(e)) + else: + geog_id = geog_ids[i] + + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 + + all_households = pd.concat(hh_list) + all_persons = pd.concat(people_list, ignore_index=True) + + return (all_households, all_persons, fit_quality) + + +def synthesize_all_in_parallel_mp( + recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, + jd_zero_sub=.001, max_workers=None): + """ + Returns + ------- + households, people : pandas.DataFrame + fit_quality : dict of FitQuality + Keys are geographic IDs, values are namedtuples with attributes + ``.household_chisq``, ``household_p``, ``people_chisq``, + and ``people_p``. + + """ + if indexes is None: + indexes = recipe.get_available_geography_ids() + + hh_list = [] + people_list = [] + cnt = 0 + fit_quality = {} + hh_index_start = 0 + geog_ids = [] + + for i, geog_id in enumerate(indexes): + geog_ids.append(geog_id) + cnt += 1 + if num_geogs is not None and cnt >= num_geogs: + break + + with Pool(processes=max_workers) as pool: + print('{0} - Generating function args for parallel processing.'.format( + str(dt.now()))) + geog_synth_args = pool.starmap( + geog_preprocessing, zip( + geog_ids, repeat(recipe), repeat(marginal_zero_sub), + repeat(jd_zero_sub))) + pool.close() + pool.join() + print('{0} - Finished.'.format(str(dt.now()))) + + with Pool(processes=max_workers) as pool: + print( + '{0} - Submitting funtion args to synthesizers ' + 'in parallel.'.format(str(dt.now()))) + results = pool.starmap(synthesize, geog_synth_args) + pool.close() + print('{0} - Waiting for parallel synthesizer to finish.'.format( + str(dt.now()))) + pool.join() + print('{0} - Finished synthesizing geographies in parallel.'.format( + str(dt.now()))) + + print('Processing results:') + for i, result in tqdm(enumerate(results), total=len(results)): + households, people, people_chisq, people_p = result + geog_id = geog_ids[i] + + # Append location identifiers to the synthesized households + for geog_cat in geog_id.keys(): + households[geog_cat] = geog_id[geog_cat] + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + key = BlockGroupID( + geog_id['state'], geog_id['county'], geog_id['tract'], + geog_id['block group']) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 + + all_households = pd.concat(hh_list) + all_persons = pd.concat(people_list, ignore_index=True) + + return (all_households, all_persons, fit_quality) + + +def synthesize_all_in_parallel_full( + recipe, num_geogs=None, indexes=None, marginal_zero_sub=.01, + jd_zero_sub=.001, max_workers=None): + """ + Returns + ------- + households, people : pandas.DataFrame + fit_quality : dict of FitQuality + Keys are geographic IDs, values are namedtuples with attributes + ``.household_chisq``, ``household_p``, ``people_chisq``, + and ``people_p``. + + """ + if indexes is None: + indexes = recipe.get_available_geography_ids() + + cnt = 0 + geog_ids = [] + + for i, geog_id in enumerate(indexes): + geog_ids.append(geog_id) + cnt += 1 + if num_geogs is not None and cnt >= num_geogs: + break + + count_geogs = len(geog_ids) + finished = 0 + timeouts = 0 + hh_index_start = 0 + hh_list = [] + people_list = [] + fit_quality = {} + print('Synthesizing geographies in parallel.') + pool = Pool() + + # APPLY_ASYNC VERSION + # procs = [pool.apply_async( + # synth_worker, (geog_id, recipe, marginal_zero_sub, jd_zero_sub) + # ) for geog_id in geog_ids] + + # print('Initialized all processes. Now recovering results:') + # for proc in procs: + # try: + # result = proc.get(120) + # results.append(result) + # print('{0} results completed'.format(str(len(results)))) + # except multiprocessing.TimeoutError: + # timeouts.append(proc) + # print('{0} timeouts'.format(str(len(timeouts)))) + + # IMAP VERSION W TIMEOUTS + sttm = time() + arg_tuples = zip( + geog_ids, repeat(recipe), repeat(marginal_zero_sub), + repeat(jd_zero_sub)) + imap_it = pool.imap_unordered(synth_worker, arg_tuples) + # pbar = tqdm(total=len(geog_ids)) + while 1: + try: + result = imap_it.next(timeout=120) + + households, people, key, people_chisq, people_p = result + + # update the household_ids since we can't do it in the call to + # synthesize when we execute in parallel + households.index += hh_index_start + people.hh_id += hh_index_start + + hh_list.append(households) + people_list.append(people) + fit_quality[key] = FitQuality(people_chisq, people_p) + + if len(households) > 0: + hh_index_start = households.index.values[-1] + 1 + finished += 1 + # pbar.update(1) + except StopIteration: + break + except multiprocessing.TimeoutError: + timeouts += 1 + elapsed_min = np.round((time() - sttm) / 60, 1) + min_per_geog = elapsed_min / finished + min_remaining = np.round(min_per_geog * (count_geogs - finished), 1) + print( + '{0} of {1} geographies completed // {2} minutes ' + 'elapsed // {3} minutes remaining'.format( + str(finished), str(count_geogs), + str(elapsed_min), str(min_remaining))) + # pbar.close() + + # IMAP_UNORDERED VERSION + # arg_tuples = zip( + # geog_ids, repeat(recipe), repeat(marginal_zero_sub), + # repeat(jd_zero_sub)) + # for result in tqdm( + # pool.imap_unordered( + # synth_worker, arg_tuples), + # total=len(geog_ids), ncols=80): + # results.append(result) + + print('Shutting down the worker pool.') + pool.close() + pool.join() + print('Pool closed.') + + # return results, timeouts + # print('Processing results:') + # for i, result in tqdm(enumerate(results), total=len(results)): + + all_households = pd.concat(hh_list) + all_persons = pd.concat(people_list, ignore_index=True) + + return (all_households, all_persons, fit_quality) diff --git a/synthpop/test/test_categorizer.py b/synthpop/test/test_categorizer.py index 88ddf84..8449286 100644 --- a/synthpop/test/test_categorizer.py +++ b/synthpop/test/test_categorizer.py @@ -2,7 +2,6 @@ import numpy as np from synthpop.census_helpers import Census from synthpop import categorizer as cat -import os @pytest.fixture diff --git a/synthpop/test/test_censushelpers.py b/synthpop/test/test_censushelpers.py index e2a1b27..0b90422 100644 --- a/synthpop/test/test_censushelpers.py +++ b/synthpop/test/test_censushelpers.py @@ -1,5 +1,5 @@ import pytest -from ..census_helpers import Census +from synthpop.census_helpers import Census import numpy as np from pandas.util.testing import assert_series_equal import os @@ -74,3 +74,11 @@ def test_download_pums(c): c.download_household_pums("06", puma) c.download_population_pums("10") c.download_household_pums("10") + + +def test_read_csv_cache(c): + puma10 = "07506" + state = "06" + c.download_population_pums(state, puma10) + loc = c.pums10_population_base_url % (state, puma10) + assert loc in c.pums_cache diff --git a/synthpop/test/test_draw.py b/synthpop/test/test_draw.py index 5730f0b..9296810 100644 --- a/synthpop/test/test_draw.py +++ b/synthpop/test/test_draw.py @@ -4,8 +4,8 @@ import pytest from pandas.util import testing as pdt -from .. import draw -from ..ipu.ipu import _FrequencyAndConstraints +from synthpop import draw +from synthpop.ipu.ipu import _FrequencyAndConstraints @pytest.fixture diff --git a/synthpop/test/test_parallel_synthesizer.py b/synthpop/test/test_parallel_synthesizer.py new file mode 100644 index 0000000..e90c0dc --- /dev/null +++ b/synthpop/test/test_parallel_synthesizer.py @@ -0,0 +1,26 @@ +import pytest + +from synthpop.synthesizer import synthesize_all_in_parallel_full +from synthpop.recipes.starter import Starter + + +@pytest.fixture +def key(): + return "827402c2958dcf515e4480b7b2bb93d1025f9389" + + +def test_parallel_synth(key): + num_geogs = 2 + st = Starter(key, "CA", "Napa County") + _, _, fit = synthesize_all_in_parallel_full(st, num_geogs=num_geogs) + + for bg_named_tuple in list(fit.keys()): + assert bg_named_tuple.state == '06' + assert bg_named_tuple.county == '055' + assert bg_named_tuple.tract == '200201' + assert bg_named_tuple.block_group in [ + str(x) for x in list(range(1, num_geogs + 1))] + + for fit_named_tuple in list(fit.values()): + assert fit_named_tuple.people_chisq > 10 + assert fit_named_tuple.people_p < 0.5