Skip to content

Commit

Permalink
Speeding up writing output (NOAA-OWP#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
AminTorabi-NOAA authored and taddyb committed Sep 9, 2024
1 parent 2ecd279 commit 2deff3c
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,31 +2289,51 @@ def create_mask(ids):

merge_flowveldepth_reset = merge_flowveldepth_reset.dropna(subset=['nex'])
merge_flowveldepth_reset['nex'] = merge_flowveldepth_reset['nex'].astype(int)
# Define custom aggregation functions
def sum_q_columns(group):
q_columns = [col for col in group.columns if col[1] == 'q']
return group[q_columns].sum()

def custom_v(group):
v_columns = [col for col in group.columns if col[1] == 'v']
v_values = group[v_columns]
if len(v_values) > 1:
return pd.Series({col: np.nan for col in v_values.columns})
else:
return v_values.iloc[0]

def optimized_v_data(merge_flowveldepth_reset, v_columns):
# Ensure v_columns is a list
if isinstance(v_columns, tuple):
v_columns = [v_columns]

# Calculate the size of each group by 'nex', to find out the duplicate
group_sizes = merge_flowveldepth_reset.groupby('nex').size()

unique_nex = group_sizes[group_sizes == 1].index
non_unique_nex = group_sizes[group_sizes > 1].index
filtered_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(unique_nex)]
filtered_non_unique_df = merge_flowveldepth_reset[merge_flowveldepth_reset['nex'].isin(non_unique_nex)]
filtered_non_unique_df.loc[:, v_columns] = np.nan
filtered_non_unique_df = filtered_non_unique_df.drop_duplicates(subset=['nex'])

combined_df = pd.concat([filtered_unique_df, filtered_non_unique_df])
v_data = combined_df.set_index('nex')[v_columns]

return v_data


def optimized_agg(merge_flowveldepth_reset):
# Identify columns for each operation
q_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'q']
v_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'v']
d_columns = [col for col in merge_flowveldepth_reset.columns if col[1] == 'd']

# Sum of flowrate columns
sum_q = merge_flowveldepth_reset.groupby('nex')[q_columns].sum()

# Handle velocity in a way that if there are 2 segments coming into one put NaN value
v_data = optimized_v_data(merge_flowveldepth_reset, v_columns)

# Average depth columns
avg_d = merge_flowveldepth_reset.groupby('nex')[d_columns].mean()

def avg_d_columns(group):
d_columns = [col for col in group.columns if col[1] == 'd']
return group[d_columns].mean()
# Combine all results into a single DataFrame
all_nex_data = pd.concat([sum_q, v_data, avg_d], axis=1)

# Apply the groupby with the custom aggregation functions
def custom_agg(group):
result = pd.concat([sum_q_columns(group), custom_v(group), avg_d_columns(group)])
return result
return all_nex_data

all_nex_data = merge_flowveldepth_reset.groupby('nex').apply(custom_agg)
all_nex_data = optimized_agg(merge_flowveldepth_reset)
all_nex_data.index = all_nex_data.index.rename('featureID')
all_nex_data['Type'] = 'nex'
all_nex_data['Type'] = 'nex'
# Set the new 'Type' column as an index
all_nex_data = all_nex_data.set_index('Type', append=True)

Expand Down

0 comments on commit 2deff3c

Please sign in to comment.