From 2deff3c031a6644386ff1f7cb9d319564d74e2d9 Mon Sep 17 00:00:00 2001 From: Amin Torabi <140189926+AminTorabi-NOAA@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:19:41 -0400 Subject: [PATCH] Speeding up writing output (#841) --- src/troute-network/troute/nhd_io.py | 62 +++++++++++++++++++---------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index a30f3d1cf..381810c84 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -2289,31 +2289,51 @@ def create_mask(ids): merge_flowveldepth_reset = merge_flowveldepth_reset.dropna(subset=['nex']) merge_flowveldepth_reset['nex'] = merge_flowveldepth_reset['nex'].astype(int) - # Define custom aggregation functions - def sum_q_columns(group): - q_columns = [col for col in group.columns if col[1] == 'q'] - return group[q_columns].sum() - - def custom_v(group): - v_columns = [col for col in group.columns if col[1] == 'v'] - v_values = group[v_columns] - if len(v_values) > 1: - return pd.Series({col: np.nan for col in v_values.columns}) - else: - return v_values.iloc[0] + + def optimized_v_data(merge_flowveldepth_reset, v_columns): + # Ensure v_columns is a list + if isinstance(v_columns, tuple): + v_columns = [v_columns] + + # Calculate the size of each group by 'nex', to find out the duplicate + group_sizes = merge_flowveldepth_reset.groupby('nex').size() + + unique_nex = group_sizes[group_sizes == 1].index + non_unique_nex = group_sizes[group_sizes > 1].index + filtered_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(unique_nex)] + filtered_non_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(non_unique_nex)] + filtered_non_unique_df.loc[:, v_columns] = np.nan + filtered_non_unique_df = filtered_non_unique_df.drop_duplicates(subset=['nex']) + + combined_df = pd.concat([filtered_unique_df, filtered_non_unique_df]) + v_data = combined_df.set_index('nex')[v_columns] + + return v_data + + + def optimized_agg(merge_flowveldepth_reset): + # Identify columns for each operation + q_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'q'] + v_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'v'] + d_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'd'] + + # Sum of flowrate columns + sum_q = merge_flowveldepth_reset.groupby('nex')[q_columns].sum() + + # Handle velocity in a way that if there are 2 segments coming into one put NaN value + v_data = optimized_v_data(merge_flowveldepth_reset, v_columns) + + # Average depth columns + avg_d = merge_flowveldepth_reset.groupby('nex')[d_columns].mean() - def avg_d_columns(group): - d_columns = [col for col in group.columns if col[1] == 'd'] - return group[d_columns].mean() + # Combine all results into a single DataFrame + all_nex_data = pd.concat([sum_q, v_data, avg_d], axis=1) - # Apply the groupby with the custom aggregation functions - def custom_agg(group): - result = pd.concat([sum_q_columns(group), custom_v(group), avg_d_columns(group)]) - return result + return all_nex_data - all_nex_data = merge_flowveldepth_reset.groupby('nex').apply(custom_agg) + all_nex_data = optimized_agg(merge_flowveldepth_reset) all_nex_data.index = all_nex_data.index.rename('featureID') - all_nex_data['Type'] = 'nex' + all_nex_data['Type'] = 'nex' # Set the new 'Type' column as an index all_nex_data = all_nex_data.set_index('Type', append=True)