Skip to content

Commit

Permalink
ZTF Alert FP - deduplication logic, flux to mag space (#261)
Browse files Browse the repository at this point in the history
* v2 forced photometry: only ingest for new objects, deduplicate by alert_mag, and add mag space data
  • Loading branch information
Theodlz authored Dec 4, 2023
1 parent 69cd8fd commit a100fb1
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 70 deletions.
49 changes: 34 additions & 15 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,15 @@ def alert_filter__user_defined(
},
}

if not isinstance(_filter.get("autosave", False), bool):
passed_filter["auto_followup"]["data"][
"ignore_source_group_ids"
] = [
_filter.get("autosave", {}).get(
"ignore_group_ids", []
)
]

passed_filters.append(passed_filter)

except Exception as e:
Expand Down Expand Up @@ -1927,9 +1936,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
response = self.api_skyportal(
"POST",
"/api/followup_request",
passed_filter["auto_followup"][
"data"
], # already contains the optional ignore_group_ids
passed_filter["auto_followup"]["data"],
)
if (
response.json()["status"] == "success"
Expand All @@ -1939,7 +1946,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
is False
):
log(
f"Posted followup request for {alert['objectId']} to SkyPortal"
f"Posted followup request successfully for {alert['objectId']} to SkyPortal"
)
# add it to the existing requests
existing_requests.append(
Expand Down Expand Up @@ -1971,7 +1978,14 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
"text": passed_filter["auto_followup"][
"comment"
],
"group_ids": [passed_filter["group_id"]],
"group_ids": list(
set(
[passed_filter["group_id"]]
+ passed_filter.get("auto_followup", {})
.get("data", {})
.get("target_group_ids", [])
)
),
}
with timer(
f"Posting followup comment {comment['text']} for {alert['objectId']} to SkyPortal",
Expand All @@ -1989,23 +2003,28 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
.get("data", {})
.get(
"message",
"unknow error posting comment",
"unknown error posting comment",
)
)
except Exception as e:
log(
f"Failed to post followup comment {comment['text']} for {alert['objectId']} to SkyPortal: {e}"
)
else:
error_message = response.json().get(
"message",
response.json()
.get("data", {})
.get(
try:
error_message = response.json().get(
"message",
"unknow error posting followup request",
),
)
response.json()
.get("data", {})
.get(
"message",
"unknown error posting followup request",
),
)
except Exception:
error_message = (
"unknown error posting followup request"
)
raise ValueError(error_message)
except Exception as e:
log(
Expand Down Expand Up @@ -2079,7 +2098,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
raise ValueError(
response.json().get(
"message",
"unknow error updating followup request",
"unknown error updating followup request",
)
)
except Exception as e:
Expand Down
221 changes: 176 additions & 45 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import threading
import time
import traceback
import numpy as np
from abc import ABC
from copy import deepcopy
from typing import Mapping, Sequence
Expand Down Expand Up @@ -43,7 +44,7 @@ def process_alert(alert: Mapping, topic: str):

# get worker running current task
worker = dask.distributed.get_worker()
alert_worker = worker.plugins["worker-init"].alert_worker
alert_worker: ZTFAlertWorker = worker.plugins["worker-init"].alert_worker

log(f"{topic} {object_id} {candid} {worker.address}")

Expand Down Expand Up @@ -129,9 +130,12 @@ def process_alert(alert: Mapping, topic: str):
"_id": object_id,
"cross_matches": xmatches,
"prv_candidates": prv_candidates,
"fp_hists": fp_hists,
}

# only add the fp_hists if its a brand new object, not just if there is no entry there
if alert["candidate"]["ndethist"] <= 1:
alert_aux["fp_hists"] = alert_worker.format_fp_hists(alert, fp_hists)

with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1):
retry(alert_worker.mongo.insert_one)(
collection=alert_worker.collection_alerts_aux, document=alert_aux
Expand All @@ -150,34 +154,24 @@ def process_alert(alert: Mapping, topic: str):
upsert=True,
)

# FOR NOW: we decided to only store the forced photometry for the very first alert we get for an object
# so, no need to update anything here

# update fp_hists
# existing_fp_hists = retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].find_one
# )({"_id": object_id}, {"fp_hists": 1})
# if existing_fp_hists is not None:
# existing_fp_hists = existing_fp_hists.get("fp_hists", [])
# if len(existing_fp_hists) > 0:
# new_fp_hists = alert_worker.deduplicate_fp_hists(
# existing_fp_hists, fp_hists
# )
# else:
# new_fp_hists = fp_hists
# else:
# new_fp_hists = fp_hists
# retry(
# alert_worker.mongo.db[alert_worker.collection_alerts_aux].update_one
# )(
# {"_id": object_id},
# {
# "$set": {
# "fp_hists": new_fp_hists,
# }
# },
# upsert=True,
# )
# if there is no fp_hists for this object, we don't update anything
# the idea is that we start accumulating FP only for new objects, to avoid
# having some objects with incomplete FP history, which would be confusing for the filters
# either there is full FP, or there isn't any
if (
retry(
alert_worker.mongo.db[
alert_worker.collection_alerts_aux
].count_documents
)(
{"_id": alert["objectId"], "fp_hists": {"$exists": True}},
limit=1,
)
== 1
):
alert_worker.update_fp_hists(
alert, alert_worker.format_fp_hists(alert, fp_hists)
)

if config["misc"]["broker"]:
# execute user-defined alert filters
Expand Down Expand Up @@ -495,25 +489,162 @@ def alert_put_photometry(self, alert):
)
continue

def deduplicate_fp_hists(self, existing_fp=[], latest_fp=[]):
# for the forced photometry (fp_hists) unfortunately it's not as simple as deduplicating with a set
# the fp_hists of each candidate of an object is recomputed everytime, so datapoints
# at the same jd can be different, so we grab the existing fp_hists aggregate, and build a new one.
def flux_to_mag(self, flux, fluxerr, zp):
"""Convert flux to magnitude and calculate SNR
:param flux:
:param fluxerr:
:param zp:
:param snr_threshold:
:return:
"""
# make sure its all numpy floats or nans
values = np.array([flux, fluxerr, zp], dtype=np.float64)
snr = values[0] / values[1]
mag = -2.5 * np.log10(values[0]) + values[2]
magerr = 1.0857 * (values[1] / values[0])
limmag3sig = -2.5 * np.log10(3 * values[1]) + values[2]
limmag5sig = -2.5 * np.log10(5 * values[1]) + values[2]
if np.isnan(snr):
return {}
if snr < 0:
return {
"snr": snr,
}
mag_data = {
"mag": mag,
"magerr": magerr,
"snr": snr,
"limmag3sig": limmag3sig,
"limmag5sig": limmag5sig,
}
# remove all NaNs fields
mag_data = {k: v for k, v in mag_data.items() if not np.isnan(v)}
return mag_data

def format_fp_hists(self, alert, fp_hists):
if len(fp_hists) == 0:
return []
# sort by jd
fp_hists = sorted(fp_hists, key=lambda x: x["jd"])

# add the "alert_mag" field to the new fp_hist
# as well as alert_ra, alert_dec
for i, fp in enumerate(fp_hists):
fp_hists[i] = {
**fp,
**self.flux_to_mag(
flux=fp.get("forcediffimflux", np.nan),
fluxerr=fp.get("forcediffimfluxunc", np.nan),
zp=fp["magzpsci"],
),
"alert_mag": alert["candidate"]["magpsf"],
"alert_ra": alert["candidate"]["ra"],
"alert_dec": alert["candidate"]["dec"],
}

return fp_hists

# first find the oldest jd in the latest fp_hists
oldest_jd_in_latest = min([fp["jd"] for fp in latest_fp])
# get all the datapoints in the existing fp_hists that are older than the oldest jd in the latest fp_hists
older_datapoints = [fp for fp in existing_fp if fp["jd"] < oldest_jd_in_latest]
def update_fp_hists(self, alert, formatted_fp_hists):
# update the existing fp_hist with the new one
# instead of treating it as a set,
# if some entries have the same jd, keep the one with the highest alert_mag

# TODO: implement a better logic here. Could be based on:
# - SNR (better SNR datapoints might be better)
# - position (centroid, closer to the avg position might be better)
# - mag (if 1 sigma brighter or dimmer than the current datapoints, use the newer ones)
# make sure this is an aggregate pipeline in mongodb
if len(formatted_fp_hists) == 0:
return

# for now, just append the latest fp_hists to the older ones,
# to prioritize newer datapoints which might come from an updated pipeline
with timer(
f"Updating fp_hists of {alert['objectId']} {alert['candid']}",
self.verbose > 1,
):
update_pipeline = [
# 1. concat the new fp_hists with the existing ones
{
"$project": {
"all_fp_hists": {
"$concatArrays": [
{"$ifNull": ["$fp_hists", []]},
formatted_fp_hists,
]
}
}
},
# 2. unwind the resulting array to get one document per fp_hist
{"$unwind": "$all_fp_hists"},
# 3. group by jd and keep the one with the highest alert_mag for each jd
{
"$set": {
"all_fp_hists.alert_mag": {
"$cond": {
"if": {
"$eq": [
{"$type": "$all_fp_hists.alert_mag"},
"missing",
]
},
"then": -99999.0,
"else": "$all_fp_hists.alert_mag",
}
}
}
},
# 4. sort by jd and alert_mag
{
"$sort": {
"all_fp_hists.jd": 1,
"all_fp_hists.alert_mag": 1,
}
},
# 5. group all the deduplicated fp_hists back into an array, keeping the first one (the brightest at each jd)
{
"$group": {
"_id": "$all_fp_hists.jd",
"fp_hist": {"$first": "$$ROOT.all_fp_hists"},
}
},
# 6. sort by jd again
{"$sort": {"fp_hist.jd": 1}},
# 7. group all the fp_hists documents back into a single array
{"$group": {"_id": None, "fp_hists": {"$push": "$fp_hist"}}},
# 8. project only the new fp_hists array
{"$project": {"fp_hists": 1, "_id": 0}},
]
n_retries = 0
while True:
# run the pipeline and then update the document
new_fp_hists = (
self.mongo.db[self.collection_alerts_aux]
.aggregate(
update_pipeline,
)
.next()
.get("fp_hists", [])
)

return older_datapoints + latest_fp
# update the document, only if there is still less points in the DB than in the new fp_hists.
# Otherwise, rerun the pipeline. This is to help a little bit with concurrency issues
result = self.mongo.db[self.collection_alerts_aux].find_one_and_update(
{
"_id": alert["objectId"],
f"fp_hists.{len(new_fp_hists)}": {"$exists": False},
},
{"$set": {"fp_hists": new_fp_hists}},
)
if result is None:
n_retries += 1
if n_retries > 10:
log(
f"Failed to update fp_hists of {alert['objectId']} {alert['candid']}"
)
break
else:
log(
f"Retrying to update fp_hists of {alert['objectId']} {alert['candid']}"
)
time.sleep(1)
else:
break


class WorkerInitializer(dask.distributed.WorkerPlugin):
Expand Down
Loading

0 comments on commit a100fb1

Please sign in to comment.