Skip to content

Commit

Permalink
Refactored both interpolation (resampling) stages
Browse files Browse the repository at this point in the history
  • Loading branch information
jl-marin committed Jan 27, 2025
1 parent f240a7e commit b87cf97
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 113 deletions.
21 changes: 9 additions & 12 deletions src/dgcv/curves/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,15 @@ def apply_signal_processing(
# csv_reference_curves = csv_calculated_curves

t_com = config.get_float("GridCode", "t_com", 0.002)
cutoff = config.get_float("GridCode", "cutoff", 15.0)
sanity_checks.check_sampling_interval(t_com, cutoff)
f_cutoff = config.get_float("GridCode", "cutoff", 15.0)
sanity_checks.check_sampling_interval(t_com, f_cutoff)

resampling_fs = 1 / t_com
# Reference signals should be converted to RMS PS (if they are EMT)
reference_curves = sigpro.ensure_rms_signals(csv_reference_curves, fs)

# First resampling: ensure a constant time-step signal
# TODO: preserve the finest time-resolution instead of using a fixed t_com here
calculated_curves = sigpro.resampling_signal(csv_calculated_curves, resampling_fs)

reference_curves = sigpro.ensure_rms_signals(csv_reference_curves, fs)
# TODO: preserve the finest time-resolution instead of using a fixed t_com here
reference_curves = sigpro.resampling_signal(reference_curves, resampling_fs)
calculated_curves = sigpro.resample_to_fixed_step(csv_calculated_curves)
reference_curves = sigpro.resample_to_fixed_step(reference_curves)

calc_time_values = list(calculated_curves["time"])
self._windows["calculated"] = signal_windows.calculate(
Expand All @@ -312,13 +309,13 @@ def apply_signal_processing(
)

# After their respective resampling and after windowing, we can apply the filters
calculated_curves = sigpro.filter_signal(calculated_curves, cutoff, resampling_fs)
reference_curves = sigpro.filter_signal(reference_curves, cutoff, resampling_fs)
calculated_curves = sigpro.filter_curves(calculated_curves, f_cutoff)
reference_curves = sigpro.filter_curves(reference_curves, f_cutoff)

# Second resampling: ensure both signals are on the same time grid
# Note it doesn't matter if this is a downsampling for any of the two sets, because the
# low-pass filter has already been applied (therefore, no aliasing is produced).
calculated_curves, reference_curves = sigpro.interpolate_same_time_grid(
calculated_curves, reference_curves = sigpro.resample_to_common_tgrid(
calculated_curves, reference_curves
)
self._curves["calculated"] = calculated_curves
Expand Down
224 changes: 123 additions & 101 deletions src/dgcv/sigpro/sigpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,109 @@ def abc_to_psrms(abc, fs):
return (1 / np.sqrt(2)) * np.abs(ps)


def resample(x, t, fs=1000):
start_time = t[0]
end_time = t[-1]
N_r = int((end_time - start_time) * fs)
t_r = np.linspace(start_time, end_time, N_r)
return t_r, np.interp(t_r, t, x)
def ensure_rms_signals(curves, fs):
processed_curve_dict = {}
abc_items, rms_items = find_abc_signal(curves)
for abc_item in abc_items:
a = curves[abc_item + "_a"]
b = curves[abc_item + "_b"]
c = curves[abc_item + "_c"]
ps_rms = abc_to_psrms([a, b, c], fs)
processed_curve_dict[abc_item] = ps_rms

for rms_item in rms_items:
processed_curve_dict[rms_item] = curves[rms_item]

return pd.DataFrame.from_dict(processed_curve_dict, orient="columns")


def find_abc_signal(curves):
abc_items = []
rms_items = []
for col in curves.columns:
if col.endswith("_c"):
continue
if col.endswith("_b"):
continue
if col.endswith("_a"):
abc_items.append(col[:-2])
else:
rms_items.append(col)

return abc_items, rms_items


def resample_to_fixed_step(curves: pd.DataFrame, fs_max=1000):
"""
Resamples a set of curves to ensure they have a fixed time step.
Simulated signals typically have a variable time step, and this functions converts them to
a fixed time step via mathematical interpolation (of the monotone kind, to avoid overshooting
artifacts). The actual step is not prescribed, but calculated from the time grid of the
original signal, to preserve as much as possible the signal's bandwidth.
"""
# Simulations may have repeated time points. Get rid of them using unique().
orig_tgrid = curves["time"].to_numpy()
uniq_idx = np.unique(orig_tgrid, return_index=True)[1]
uniq_tgrid = orig_tgrid[uniq_idx]

# Calculate the new fixed time step.
curve_tsteps = np.diff(uniq_tgrid)
min_tstep = np.min(curve_tsteps)
new_tstep = max(min_tstep, 1 / fs_max)

# Construct the new time grid
t_start, t_end = uniq_tgrid[0], uniq_tgrid[-1]
new_tgrid = np.arange(t_start, t_end, step=new_tstep)

# Resample using a monotone interpolator (PCHIP)
resampled_curve_dict = {}
resampled_curve_dict["time"] = new_tgrid
for col in curves.columns:
if "time" == col:
continue
uniq_curve = curves[col][uniq_idx]
resampled_curve_dict[col] = PchipInterpolator(uniq_tgrid, uniq_curve)(new_tgrid)

return pd.DataFrame.from_dict(resampled_curve_dict, orient="columns")


def resample_to_common_tgrid(sim_curves, ref_curves):
"""
Resamples TWO sets of curves to a common fixed time step, t_com.
To be able to compare point-wise the waveforms of the Simulated curves (either obtained from
Dynawo simulation or user-provided) curves versus the Reference ones, both sets must be
resampled so that they share the same time grid. The new sampling rate (fs = 1 / t_com) is a
configurable value, but obviously it must be *higher* than twice the cutoff frequency used in
the low-pass filtering stage (which must have happened before we reach this function).
"""
t_com = config.get_float("GridCode", "t_com", 0.002)

sim_times = sim_curves["time"].to_numpy()
sim_uniq_idx = np.unique(sim_times, return_index=True)[1]
sim_times = sim_times[sim_uniq_idx]
ref_times = ref_curves["time"].to_numpy()
ref_uniq_idx = np.unique(ref_times, return_index=True)[1]
ref_times = ref_times[ref_uniq_idx]
t_start = max(sim_times[0], ref_times[0])
t_end = min(sim_times[-1], ref_times[-1])
new_tgrid = np.arange(t_start, t_end, step=t_com)

# warnings.filterwarnings("ignore", module="scipy")
rs_sim_curves = dict()
rs_ref_curves = dict()
rs_sim_curves["time"] = new_tgrid
rs_ref_curves["time"] = new_tgrid
for col in ref_curves:
if "time" == col or col not in sim_curves:
continue
sim_values = sim_curves[col][sim_uniq_idx]
ref_values = ref_curves[col][ref_uniq_idx]
rs_sim_curves[col] = PchipInterpolator(sim_times, sim_values)(new_tgrid)
rs_ref_curves[col] = PchipInterpolator(ref_times, ref_values)(new_tgrid)

return pd.DataFrame(rs_sim_curves), pd.DataFrame(rs_ref_curves)


def lowpass_filter(signal, fc=15, fs=1000, filter="critdamped", padding_method="gust"):
Expand All @@ -54,13 +151,8 @@ def lowpass_filter(signal, fc=15, fs=1000, filter="critdamped", padding_method="
filter: str
Name of the filter to use. One of: {critdamped, bessel, butter, cheby1}
padding_method: str
Method used to treat the signal boundaries in filtfilt. One of: {
"gust",
"odd_padding",
"even_padding",
"constant_padding",
"no_padding",
}
Method used to treat the signal boundaries in filtfilt. One of: {"gust", "odd_padding",
"even_padding", "constant_padding", "no_padding"}.
Returns:
output_signal: npt.ArrayLike
Expand Down Expand Up @@ -92,109 +184,39 @@ def lowpass_filter(signal, fc=15, fs=1000, filter="critdamped", padding_method="
return lp_filters.apply_filtfilt(b, a, signal, padding_method)


def ensure_rms_signals(curves, fs):
processed_curve_dict = {}
abc_items, rms_items = find_abc_signal(curves)
for abc_item in abc_items:
a = curves[abc_item + "_a"]
b = curves[abc_item + "_b"]
c = curves[abc_item + "_c"]
ps_rms = abc_to_psrms([a, b, c], fs)
processed_curve_dict[abc_item] = ps_rms

for rms_item in rms_items:
processed_curve_dict[rms_item] = curves[rms_item]

return pd.DataFrame.from_dict(processed_curve_dict, orient="columns")


def find_abc_signal(curves):
abc_items = []
rms_items = []
for col in curves.columns:
if col.endswith("_c"):
continue
if col.endswith("_b"):
continue
if col.endswith("_a"):
abc_items.append(col[:-2])
else:
rms_items.append(col)

return abc_items, rms_items


def resampling_signal(curves, fs=1000):
resampled_curve_dict = {}
for col in curves.columns:
if "time" in col:
continue

t_r, r = resample(curves[col], curves["time"].values.tolist(), fs)
resampled_curve_dict[col] = r

resampled_curve_dict["time"] = t_r
return pd.DataFrame.from_dict(resampled_curve_dict, orient="columns")

def filter_curves(curves, f_cutoff=15, filter_name="critdamped"):
"""
This function applies a low-pass filter to a set of curves, with these options:
* filters each window separately (default) or the whole signal
* filtering can be disabled altogether via user config
* cutoff frequency f_c (default: IEC's 15 Hz)
* choice of filter (default: IEC's 2nd-order critically damped)
"""
# Obtain the actual sampling rate of these curves, which is needed to invoke the filter
time_step = np.mean(np.diff(curves["time"].to_numpy()))
fs = 1 / time_step

def filter_signal(curves, cutoff=15, fs=1000, filter_name="critdamped"):
# This function applies a low-pass filter to the signal, with these options:
# * filters each window separately (default) or the whole signal
# * filtering can be disabled altogether via user config
# * cutoff frequency f_c (default: IEC's 15 Hz)
# * choice of filter (default: IEC's 2nd-order critically damped)
# Filter
lowpass_curve_dict = {}
for col in curves.columns:
if "time" in col:
continue

# Disable filtering altogether when so configured, but also when the signal is a constant
# because LP filters produce artifacts, potentially affecting our sanity check for flat curves
# that takes place later on (at report building time).
# because LP filters produce artifacts, potentially affecting our sanity check for flat
# curves that takes place later on (at report building time).
c = curves[col]
if config.get_boolean("Debug", "disable_LP_filtering", False) or max(list(c)) == min(
list(c)
):
c_filt = c
else:
c_filt = lowpass_filter(c, cutoff, fs, filter_name)
# For avoiding overflows in PChipInterpolator (used in resampling later on)
# TODO: this won't be necessary anymore once we filter the signals *after* the two
# resamplings in CurvesManager.apply_signal_processing()
c_filt[abs(c_filt) < ZERO_THRESHOLD] = 0.0
c_filt = lowpass_filter(c, f_cutoff, fs, filter_name)
# For avoiding overflows in PChipInterpolator (used in the 2nd resampling later on)
# TODO: double-check if this is still necessary
# c_filt[abs(c_filt) < ZERO_THRESHOLD] = 0.0

lowpass_curve_dict[col] = c_filt

lowpass_curve_dict["time"] = curves["time"]
return pd.DataFrame.from_dict(lowpass_curve_dict, orient="columns")


def interpolate_same_time_grid(curves_final, curves_ref, fs=1000):
final_times = curves_final["time"].to_numpy()
final_unique_indices = np.unique(final_times, return_index=True)[1]
final_times = final_times[final_unique_indices]
ref_times = curves_ref["time"].to_numpy()
ref_unique_indices = np.unique(ref_times, return_index=True)[1]
ref_times = ref_times[ref_unique_indices]
t_start = max(final_times[0], ref_times[0])
t_end = min(final_times[-1], ref_times[-1])
itime = np.arange(t_start, t_end, step=1 / fs)

# warnings.filterwarnings("ignore", module="scipy")
icurves_final = dict()
icurves_ref = dict()
for key in curves_ref:
if "time" in key:
continue

if key not in curves_final:
continue

final_values = curves_final[key][final_unique_indices]
ref_values = curves_ref[key][ref_unique_indices]
icurves_final[key] = PchipInterpolator(final_times, final_values)(itime)
icurves_ref[key] = PchipInterpolator(ref_times, ref_values)(itime)

icurves_final["time"] = itime
icurves_ref["time"] = itime
return pd.DataFrame(icurves_final), pd.DataFrame(icurves_ref)

0 comments on commit b87cf97

Please sign in to comment.