diff --git a/MDANSE/Src/MDANSE/Mathematics/Signal.py b/MDANSE/Src/MDANSE/Mathematics/Signal.py index 667ff0431..bab2a1e4d 100644 --- a/MDANSE/Src/MDANSE/Mathematics/Signal.py +++ b/MDANSE/Src/MDANSE/Mathematics/Signal.py @@ -20,6 +20,7 @@ from functools import partial import scipy.signal +from MDANSE.Mathematics.Arithmetic import weight from scipy import signal, fftpack from MDANSE.Core.Error import Error @@ -816,6 +817,84 @@ def __init__(self, **kwargs): filter_map = {filter_class.__name__: filter_class for filter_class in FILTERS} +def power_spectrum( + trajectory, + frames, + projection, + atom_selection, + weights, + instrument_resolution, + n_steps +): + """ + + """ + trajectory = trajectory["instance"] + sorted_atoms = trajectory.chemical_system.atom_list + + output = dict() + output["romega"] = instrument_resolution["romega"] + + + + for element in atom_selection["unique_names"]: + output["pacf_%s" % element] = np.zeros(np.array(range(frames["n_frames"])).shape) + output["pps_%s" % element] = np.zeros(output["romega"].shape) + + output["pacf_total"] = np.zeros(np.array(range(frames["n_frames"])).shape) + output["pps_total"] = np.zeros(output["romega"].shape) + + for index in range(n_steps): + indexes = atom_selection["indexes"][index] + atoms = [sorted_atoms[idx] for idx in indexes] + series = trajectory.read_com_trajectory( + atoms, + first=frames["first"], + last=frames["last"] + 1, + step=frames["step"], + ) + + series = series - np.average(series, axis=0) + series = projection["projector"](series) + + n_configs = frames["n_configs"] + atomicPACF = signal.correlate(series, series[:n_configs], mode="valid") / ( + 3 * n_configs + ) + + output["pacf_%s" % atom_selection["names"][index]] += np.array([x[0] for x in atomicPACF]) + + nAtomsPerElement = atom_selection.get_natoms() + for element, number in nAtomsPerElement.items(): + output["pacf_%s" % element][:] /= number + output["pps_%s" % element][:] = get_spectrum( + output["pacf_%s" % element], + instrument_resolution["time_window"], + instrument_resolution["time_step"], + fft="rfft" + ) + + weights = weights.get_weights() + + output["pacf_total"][:] = weight( + weights, + output, + nAtomsPerElement, + 1, + "pacf_%s", + update_partials=True, + ) + output["pps_total"][:] = weight( + weights, + output, + nAtomsPerElement, + 1, + "pps_%s", + update_partials=True, + ) + + # Adjust to atom selection + return (output["romega"], output["pps_total"]) if __name__=="__main__": filter_class = filter_map["ChebyshevTypeI"]