Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python sampling downselect #320

Draft
wants to merge 2 commits into
base: python_sampling
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions buildstockbatch/sampler/residential_sampler/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import multiprocessing
import click
import pathlib
import json
from .sampling_utils import get_param2tsv, get_samples, TSVTuple, get_all_tsv_issues, get_all_tsv_max_errors


Expand All @@ -27,10 +28,60 @@ def get_topological_generations(param2dep: dict[str, list[str]]) -> list[tuple[i
param2dep_graph = get_param_graph(param2dep)
return list(enumerate(nx.topological_generations(param2dep_graph)))

def apply_downselect(param2tsv, param2dep, downselect):
# assume 1 parameter to start
param = list(downselect.keys())[0]
tsv_data, dep_cols, opt_cols, samp_probs = param2tsv[param]

# Get dict of dependency structure
G = get_param_graph(param2dep)
dep_structure = nx.dfs_predecessors(G.reverse(), source=param)
root_node = list(dep_structure.items())[-1][0]
second_node = dep_structure[root_node]

# Downselect node dependencies
down_s_idx = [opt_cols.index(option) for option in list(downselect.values())[0]]
first_dep_options = [dep for dep, vals in tsv_data.items() if vals[down_s_idx[0]]>0]
previous_deps = first_dep_options

# Iterate dependency structure tsvs
for tsv, pre_dep in dep_structure.items(): # starts with tsv=County and PUMA
tsv_data, dep_cols, opt_cols, samp_probs = param2tsv[tsv]
opt_idxs = [opt_cols.index(option[0]) for option in previous_deps] # relevent options that were dependencies in previous node
dep_options = [dep for dep, vals in tsv_data.items() if vals[opt_idxs[0]]>0]

# Write new data for node preceeding root node using global probabilities
if tsv == second_node:
global_probs = {dep:[i*samp_probs[dep] for i in tsv_data[dep]] for dep in dep_options}
global_probs_slice = {dep: [vals[i] for i in opt_idxs if vals[i]>0] for dep, vals in global_probs.items()}
new_data = {dep: [val/sum(vals) for val in vals] for dep, vals in global_probs_slice.items()}
collapse_global_prob = {dep: sum(val) for dep, val in global_probs_slice.items()}

for dep, vals in new_data.items():
param2tsv[tsv][0][dep] = [0.0]*len(param2tsv[tsv][0][dep])
for i,val in zip(opt_idxs, vals): param2tsv[tsv][0][dep][i] = val

# Write data for root node using succeeding global probabilities
if tsv == root_node:
param2tsv[tsv][0][()] = [0.0]*len(param2tsv[tsv][0][()])
for i,val in zip(opt_idxs, collapse_global_prob.values()): param2tsv[tsv][0][()][i] = val/sum(collapse_global_prob.values())
previous_deps = dep_options

# NEW METHODS NEEDED:
# normalize_by_row(dependencies, tsv2param)
# normalize_by_column()

# QUESTIONS/FIXMEs:
# What would happen if the first tsv is not a mapping tsv? Probably need to consider global probability starting at the downselect tsv
# Allow for downselect of tsvs with > 1 dependency
# Does not work for downselect of tsv with 0 dependencies

return(param2tsv)


def sample_param(param_tuple: TSVTuple, sample_df: pd.DataFrame, param: str, num_samples: int) -> list[str]:
start_time = time.time()
group2values, dep_cols, opt_cols = param_tuple
group2values, dep_cols, opt_cols, prob_cols = param_tuple
if not dep_cols:
probs = group2values[()]
samples = get_samples(probs, opt_cols, num_samples)
Expand All @@ -55,9 +106,13 @@ def sample_param(param_tuple: TSVTuple, sample_df: pd.DataFrame, param: str, num
return samples


def sample_all(project_path, num_samples) -> pd.DataFrame:
def sample_all(project_path, num_samples, downselect=None) -> pd.DataFrame:
param2tsv = get_param2tsv(project_path)
param2dep = {param: tsv_tuple[1] for (param, tsv_tuple) in param2tsv.items()}
if downselect:
downselect = json.loads(downselect)
print(downselect)
param2tsv = apply_downselect(param2tsv, param2dep, downselect)
sample_df = pd.DataFrame()
sample_df.loc[:, "Building"] = list(range(1, num_samples+1))
s_time = time.time()
Expand All @@ -66,7 +121,7 @@ def sample_all(project_path, num_samples) -> pd.DataFrame:
print(f"Sampling {len(params)} params in a batch at level {level}")
results = []
for param in params:
_, dep_cols, _ = param2tsv[param]
_, dep_cols, _, _ = param2tsv[param]
res = pool.apply_async(sample_param, (param2tsv[param], sample_df[dep_cols], param, num_samples))
results.append(res)
st = time.time()
Expand Down Expand Up @@ -95,12 +150,14 @@ def cli():
help="The number of datapoints to sample.")
@click.option("-o", "--output", type=str, required=True,
help="The output filename for samples.")
def sample(project: str, num_datapoints: int, output: str) -> None:
@click.option("-d", "--downselect", type=str, required=False,
help='Downselect parameter and options. \'{\"parameter\": [\"option 1\", \"option 2\", ...]}\'')
def sample(project: str, num_datapoints: int, output: str, downselect: str = None) -> None:
"""Performs sampling for project and writes output csv file.
"""
start_time = time.time()
print(project, num_datapoints, output)
sample_df = sample_all(pathlib.Path(project), num_datapoints)
sample_df = sample_all(pathlib.Path(project), num_datapoints, downselect)
click.echo("Writing CSV")
sample_df.to_csv(output, index=False)
click.echo(f"Completed sampling in {time.time() - start_time:.2f} seconds")
Expand Down
10 changes: 6 additions & 4 deletions buildstockbatch/sampler/residential_sampler/sampling_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
random.seed(42)


TSVTuple = tuple[dict[tuple[str, ...], list[float]], list[str], list[str]]
TSVTuple = tuple[dict[tuple[str, ...], list[float]], list[str], list[str], dict[str, float]]


def read_char_tsv(file_path: pathlib.Path) -> TSVTuple:
dep_cols = []
opt_cols = []
sampling_col = None
prob_cols = {}
group2probs = {}
with open(file_path) as file:
for line_num, line in enumerate(file):
Expand All @@ -38,8 +39,9 @@ def read_char_tsv(file_path: pathlib.Path) -> TSVTuple:
opt_val = [float(v) for v in line_array[len(dep_cols): len(dep_cols) + len(opt_cols)]]
sampling_prob = float(line_array[sampling_col]) if sampling_col else 1
group2probs[dep_val] = opt_val + [sampling_prob] # append sampling probability at the end
prob_cols[dep_val] = float(line_array[-1])

return group2probs, dep_cols, opt_cols
return group2probs, dep_cols, opt_cols, prob_cols


@cache
Expand Down Expand Up @@ -204,7 +206,7 @@ def get_all_tsv_issues(sample_df: pd.DataFrame, project_dir: pathlib.Path) -> di
results = []
with multiprocessing.Pool(processes=max(multiprocessing.cpu_count() - 2, 1)) as pool:
for param in all_params:
_, dep_cols, _ = param2tsv[param]
_, dep_cols, _, _ = param2tsv[param]
res = pool.apply_async(get_tsv_issues, (param, param2tsv[param], sample_df[dep_cols + [param]]))
results.append(res)
all_issues = {param: res_val.get() for param, res_val in zip(all_params, results)}
Expand Down Expand Up @@ -274,7 +276,7 @@ def get_all_tsv_max_errors(sample_df: pd.DataFrame, project_dir: pathlib.Path) -
results = []
with multiprocessing.Pool(processes=max(multiprocessing.cpu_count() - 2, 1)) as pool:
for param in all_params:
_, dep_cols, _ = param2tsv[param]
_, dep_cols, _, _ = param2tsv[param]
res = pool.apply_async(get_tsv_max_sampling_errors,
(param, param2tsv[param], sample_df[dep_cols + [param]]))
# res = test_sample(param, param2tsv[param], sample_df[dep_cols + [param]])
Expand Down