diff --git a/hendrics/efsearch.py b/hendrics/efsearch.py index cbdcaedc..c55ef7ae 100644 --- a/hendrics/efsearch.py +++ b/hendrics/efsearch.py @@ -9,7 +9,7 @@ import numpy as np from scipy.ndimage import gaussian_filter from astropy import log -from astropy.table import Table +from astropy.table import Table, vstack from astropy.logger import AstropyUserWarning from .io import get_file_type from stingray.pulse.search import ( @@ -1238,7 +1238,7 @@ def get_boundaries_from_level(x, y, level, x0): return min_x, max_x -def analyze_qffa_results(fname): +def _analyze_qffa_results(input_ef_periodogram, fname=None): """Search best candidates in a quasi-fast-folding search. This function searches the (typically) 2-d search plane from @@ -1247,39 +1247,50 @@ def analyze_qffa_results(fname): Parameters ---------- - fname : str - File containing the folding search results + input_ef_periodogram : :class:`EFPeriodogram` + Epoch folding periodogram object """ - ef = load_folding(fname) - if not hasattr(ef, "M") or ef.M is None: - ef.M = 1 + if not hasattr(input_ef_periodogram, "M") or input_ef_periodogram.M is None: + input_ef_periodogram.M = 1 - ntrial = ef.stat.size - if hasattr(ef, "oversample") and ef.oversample is not None: - ntrial /= ef.oversample + ntrial = input_ef_periodogram.stat.size + if ( + hasattr(input_ef_periodogram, "oversample") + and input_ef_periodogram.oversample is not None + ): + ntrial /= input_ef_periodogram.oversample ntrial = int(ntrial) - if ef.kind == "Z2n": - ndof = ef.N - 1 + if input_ef_periodogram.kind == "Z2n": + ndof = input_ef_periodogram.N - 1 detlev = z2_n_detection_level( epsilon=0.001, - n=int(ef.N), + n=int(input_ef_periodogram.N), ntrial=ntrial, - n_summed_spectra=int(ef.M), + n_summed_spectra=int(input_ef_periodogram.M), + ) + nbin = max( + 16, + input_ef_periodogram.N * 8, + input_ef_periodogram.nbin if input_ef_periodogram.nbin is not None else 1, ) - nbin = max(16, ef.N * 8, ef.nbin if ef.nbin is not None else 1) - label = "$" + "Z^2_{" + f"{ef.N}" + "}$" + label = "$" + "Z^2_{" + f"{input_ef_periodogram.N}" + "}$" else: - ndof = ef.nbin - detlev = fold_detection_level(nbin=int(ef.nbin), epsilon=0.001, ntrial=ntrial) - nbin = max(16, ef.nbin) + ndof = input_ef_periodogram.nbin + detlev = fold_detection_level( + nbin=int(input_ef_periodogram.nbin), epsilon=0.001, ntrial=ntrial + ) + nbin = max(16, input_ef_periodogram.nbin) label = rf"$\chi^2_{ndof}$ Stat" n_cands = 5 - best_cands = find_peaks_in_image(ef.stat, n=n_cands) + best_cands = find_peaks_in_image(input_ef_periodogram.stat, n=n_cands) fddot = 0 - if hasattr(ef, "fddots") and ef.fddots is not None: - fddot = ef.fddots + if ( + hasattr(input_ef_periodogram, "fddots") + and input_ef_periodogram.fddots is not None + ): + fddot = input_ef_periodogram.fddots best_cand_table = Table( names=[ @@ -1337,49 +1348,68 @@ def analyze_qffa_results(fname): for i, idx in enumerate(best_cands): f_idx = fdot_idx = fddot_idx = 0 - if len(ef.stat.shape) > 1 and ef.stat.shape[0] > 1: + if ( + len(input_ef_periodogram.stat.shape) > 1 + and input_ef_periodogram.stat.shape[0] > 1 + ): f_idx, fdot_idx = idx - allfreqs = ef.freq[f_idx, :] - allfdots = ef.freq[:, fdot_idx] - allstats_f = ef.stat[f_idx, :] - allstats_fdot = ef.stat[:, fdot_idx] - f, fdot = ef.freq[f_idx, fdot_idx], ef.fdots[f_idx, fdot_idx] - max_stat = ef.stat[f_idx, fdot_idx] - sig_e1_m, sig_e1 = power_confidence_limits(max_stat, c=0.68, n=ef.N) + allfreqs = input_ef_periodogram.freq[f_idx, :] + allfdots = input_ef_periodogram.freq[:, fdot_idx] + allstats_f = input_ef_periodogram.stat[f_idx, :] + allstats_fdot = input_ef_periodogram.stat[:, fdot_idx] + f, fdot = ( + input_ef_periodogram.freq[f_idx, fdot_idx], + input_ef_periodogram.fdots[f_idx, fdot_idx], + ) + max_stat = input_ef_periodogram.stat[f_idx, fdot_idx] + sig_e1_m, sig_e1 = power_confidence_limits( + max_stat, c=0.68, n=input_ef_periodogram.N + ) fmin, fmax, fdotmin, fdotmax = get_xy_boundaries_from_level( - ef.freq, ef.fdots, ef.stat, sig_e1_m, f, fdot + input_ef_periodogram.freq, + input_ef_periodogram.fdots, + input_ef_periodogram.stat, + sig_e1_m, + f, + fdot, ) - elif len(ef.stat.shape) == 1: + elif len(input_ef_periodogram.stat.shape) == 1: f_idx = idx - allfreqs = ef.freq - allstats_f = ef.stat - f = ef.freq[f_idx] - max_stat = ef.stat[f_idx] - sig_e1_m, sig_e1 = power_confidence_limits(max_stat, c=0.68, n=ef.N) - fmin, fmax = get_boundaries_from_level(ef.freq, ef.stat, sig_e1_m, f) + allfreqs = input_ef_periodogram.freq + allstats_f = input_ef_periodogram.stat + f = input_ef_periodogram.freq[f_idx] + max_stat = input_ef_periodogram.stat[f_idx] + sig_e1_m, sig_e1 = power_confidence_limits( + max_stat, c=0.68, n=input_ef_periodogram.N + ) + fmin, fmax = get_boundaries_from_level( + input_ef_periodogram.freq, input_ef_periodogram.stat, sig_e1_m, f + ) fdot = fdotmin = fdotmax = 0 allfdots = None allstats_fdot = None else: raise ValueError("Did not understand stats shape.") - if ef.ncounts is None: + if input_ef_periodogram.ncounts is None: continue - sig_0, sig_1 = power_confidence_limits(max_stat, c=0.90, n=ef.N) + sig_0, sig_1 = power_confidence_limits( + max_stat, c=0.90, n=input_ef_periodogram.N + ) amp = amp_err = amp_ul = amp_1 = amp_0 = np.nan if max_stat < detlev: - amp_ul = a_from_ssig(sig_1, ef.ncounts) * 100 + amp_ul = a_from_ssig(sig_1, input_ef_periodogram.ncounts) * 100 else: - amp = a_from_ssig(max_stat, ef.ncounts) * 100 - amp_err = a_from_ssig(sig_e1, ef.ncounts) * 100 - amp - amp_0 = a_from_ssig(sig_0, ef.ncounts) * 100 - amp_1 = a_from_ssig(sig_1, ef.ncounts) * 100 + amp = a_from_ssig(max_stat, input_ef_periodogram.ncounts) * 100 + amp_err = a_from_ssig(sig_e1, input_ef_periodogram.ncounts) * 100 - amp + amp_0 = a_from_ssig(sig_0, input_ef_periodogram.ncounts) * 100 + amp_1 = a_from_ssig(sig_1, input_ef_periodogram.ncounts) * 100 best_cand_table.add_row( [ - ef.filename, - ef.pepoch, + input_ef_periodogram.filename, + input_ef_periodogram.pepoch, max_stat, f, fmin - f, @@ -1403,32 +1433,52 @@ def analyze_qffa_results(fname): # Only add one candidate continue - Table({"freq": allfreqs, "stat": allstats_f}).write( - f'{fname.replace(HEN_FILE_EXTENSION, "")}' - f"_cand_{n_cands - i - 1}_fdot{fdot}.csv", - overwrite=True, - format="ascii", - ) + if fname is not None: + Table({"freq": allfreqs, "stat": allstats_f}).write( + f'{fname.replace(HEN_FILE_EXTENSION, "")}' + f"_cand_{n_cands - i - 1}_fdot{fdot}.csv", + overwrite=True, + format="ascii", + ) if allfdots is None: continue - Table({"fdot": allfdots, "stat": allstats_fdot}).write( - f'{fname.replace(HEN_FILE_EXTENSION, "")}' - f"_cand_{n_cands - i - 1}_f{f}.dat", - overwrite=True, - format="ascii", - ) + if fname is not None: + Table({"fdot": allfdots, "stat": allstats_fdot}).write( + f'{fname.replace(HEN_FILE_EXTENSION, "")}' + f"_cand_{n_cands - i - 1}_f{f}.dat", + overwrite=True, + format="ascii", + ) print_qffa_results(best_cand_table) best_cand_table.meta.update( dict(nbin=nbin, ndof=ndof, label=label, filename=None, detlev=detlev) ) if ( - hasattr(ef, "filename") - and ef.filename is not None - and os.path.exists(ef.filename) + hasattr(input_ef_periodogram, "filename") + and input_ef_periodogram.filename is not None + and os.path.exists(input_ef_periodogram.filename) ): - best_cand_table.meta["filename"] = ef.filename + best_cand_table.meta["filename"] = input_ef_periodogram.filename + return best_cand_table + + +def analyze_qffa_results(fname): + """Search best candidates in a quasi-fast-folding search. + + This function searches the (typically) 2-d search plane from + a QFFA search and finds the best five candidates. + For the best candidate, it calculates + + Parameters + ---------- + fname : str or :class:`EFPeriodogram` + File containing the folding search results, or epoch folding periodogram object + """ + ef = load_folding(fname) + + best_cand_table = _analyze_qffa_results(ef, fname=fname) best_cand_table.write(fname + "_best_cands.csv", overwrite=True) return ef, best_cand_table