Skip to content

Commit

Permalink
handle the new ZTF alert schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz committed Oct 16, 2023
1 parent 39a918c commit b25e61d
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 27 deletions.
11 changes: 9 additions & 2 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ def alert_mongify(alert: Mapping):
- add a placeholder for ML-based classifications
- add coordinates for 2D spherical indexing and compute Galactic coordinates
- cut off the prv_candidates section
- cut off the fp_hists section (if it exists)
:param alert:
:return:
Expand Down Expand Up @@ -577,7 +578,13 @@ def alert_mongify(alert: Mapping):
if prv_candidates is None:
prv_candidates = []

return doc, prv_candidates
# cut off the fp_hists section, if it exists
fp_hists = deepcopy(doc.get("fp_hists", None))
doc.pop("fp_hists", None)
if fp_hists is None:
fp_hists = []

return doc, prv_candidates, fp_hists

def make_thumbnail(
self, alert: Mapping, skyportal_type: str, alert_packet_type: str
Expand Down Expand Up @@ -1637,7 +1644,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
- decide which points to post to what groups based on permissions
- post alert light curve in single PUT call to /api/photometry specifying stream_ids
:param alert: alert with a stripped-off prv_candidates section
:param alert: alert with a stripped-off prv_candidates section and fp_hists sections
:param prv_candidates: could be plain prv_candidates section of an alert, or extended alert history
:param passed_filters: list of filters that alert passed, with their output
:return:
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_pgir.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

# create alert history
all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])]
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_turbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1):
alert_worker.mongo.insert_one(
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_winter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}"):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, _ = alert_worker.alert_mongify(alert)

# future: add ML model filtering here

Expand Down
11 changes: 10 additions & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str):

# candid not in db, ingest decoded avro packet into db
with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1):
alert, prv_candidates = alert_worker.alert_mongify(alert)
alert, prv_candidates, fp_hists = alert_worker.alert_mongify(alert)

# create alert history
all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])]
Expand Down Expand Up @@ -104,6 +104,12 @@ def process_alert(alert: Mapping, topic: str):
for prv_candidate in prv_candidates
]

# fp_hists: pop nulls - save space
fp_hists = [
{kk: vv for kk, vv in fp_hist.items() if vv not in [None, -99999, -99999.0]}
for fp_hist in fp_hists
]

alert_aux, xmatches, passed_filters = None, None, None
# cross-match with external catalogs if objectId not in collection_alerts_aux:
if (
Expand All @@ -123,6 +129,7 @@ def process_alert(alert: Mapping, topic: str):
"_id": object_id,
"cross_matches": xmatches,
"prv_candidates": prv_candidates,
"fp_hists": fp_hists,
}

with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1):
Expand All @@ -139,6 +146,7 @@ def process_alert(alert: Mapping, topic: str):
)(
{"_id": object_id},
{"$addToSet": {"prv_candidates": {"$each": prv_candidates}}},
{"$addToSet": {"fp_hists": {"$each": fp_hists}}},
upsert=True,
)

Expand All @@ -160,6 +168,7 @@ def process_alert(alert: Mapping, topic: str):
del (
alert,
prv_candidates,
fp_hists,
all_prv_candidates,
scores,
xmatches,
Expand Down
8 changes: 4 additions & 4 deletions kowalski/tests/test_alert_broker_pgir.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TestAlertBrokerPGIR:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(prv_candidates) == 71

Expand All @@ -43,7 +43,7 @@ def test_make_photometry(self):
assert len(df_photometry) == 71

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -54,13 +54,13 @@ def test_make_thumbnails(self):

def test_alert_filter__ml(self):
"""Test executing ML models on the alert"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
scores = self.worker.alert_filter__ml(alert)
log(scores)

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = config["database"].get("xmatch", {}).get("PGIR", {}).keys()
assert isinstance(xmatches, dict)
Expand Down
6 changes: 3 additions & 3 deletions kowalski/tests/test_alert_broker_turbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TestAlertBrokerTURBO:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(prv_candidates) == 0 # 71

Expand All @@ -43,7 +43,7 @@ def test_make_photometry(self):
assert len(df_photometry) == 0 # 71

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -54,7 +54,7 @@ def test_make_thumbnails(self):

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = (
config["database"].get("xmatch", {}).get("TURBO", {}).keys()
Expand Down
6 changes: 3 additions & 3 deletions kowalski/tests/test_alert_broker_wntr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TestAlertBrokerWNTR:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(alert["candidate"]) > 0 # ensure cand data is not empty
assert alert["objectId"] == self.alert["objectId"]
Expand All @@ -50,7 +50,7 @@ def test_make_photometry(self):
assert df_photometry["filter"][0] == "2massj"

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -61,7 +61,7 @@ def test_make_thumbnails(self):

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = config["database"].get("xmatch", {}).get("WNTR", {}).keys()
assert isinstance(xmatches, dict)
Expand Down
46 changes: 35 additions & 11 deletions kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def filter_template(upstream):


def post_alert(worker, alert):
alert, _ = worker.alert_mongify(alert)
alert, _, _ = worker.alert_mongify(alert)
# check if it already exists
if worker.mongo.db[worker.collection_alerts].count_documents(
{"candid": alert["candid"]}
Expand All @@ -90,16 +90,40 @@ class TestAlertBrokerZTF:

def test_alert_mongification(self):
"""Test massaging avro packet into a dict digestible by mongodb"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, fp_hists = self.worker.alert_mongify(self.alert)
assert alert["candid"] == self.candid
assert len(prv_candidates) == 31
assert len(fp_hists) == 0

def test_alert_mongofication_with_fphists(self):
"""Test massaging avro packet into a dict digestible by mongodb, with the new ZTF alert schema"""
candid = 2475433850015010009
sample_avro = f"data/ztf_alerts/20231012/{candid}.avro"
with open(sample_avro, "rb") as f:
for record in fastavro.reader(f):
alert, prv_candidates, fp_hists = self.worker.alert_mongify(record)
assert alert["candid"] == candid
assert len(prv_candidates) == 20
assert len(fp_hists) == 21

# test the removal of nulls and bad fields (None, -99999, -99999.0)
assert "forcediffimflux" in fp_hists[0]
fp_hists = [
{
kk: vv
for kk, vv in fp_hist.items()
if vv not in [None, -99999, -99999.0]
}
for fp_hist in fp_hists
]
assert "forcediffimflux" not in fp_hists[0]

def test_make_photometry(self):
df_photometry = self.worker.make_photometry(self.alert)
assert len(df_photometry) == 32

def test_make_thumbnails(self):
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
for ttype, istrument_type in [
("new", "Science"),
("ref", "Template"),
Expand All @@ -110,15 +134,15 @@ def test_make_thumbnails(self):

def test_alert_filter__ml(self):
"""Test executing ML models on the alert"""
alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])]
scores = self.worker.alert_filter__ml(alert, all_prv_candidates)
assert len(scores) > 0
# print(scores)

def test_alert_filter__xmatch(self):
"""Test cross matching with external catalog"""
alert, _ = self.worker.alert_mongify(self.alert)
alert, _, _ = self.worker.alert_mongify(self.alert)
xmatches = self.worker.alert_filter__xmatch(alert)
catalogs_to_xmatch = config["database"].get("xmatch", {}).get("ZTF", {}).keys()
assert isinstance(xmatches, dict)
Expand Down Expand Up @@ -327,7 +351,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
)
assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 3

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
Expand Down Expand Up @@ -369,7 +393,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
)
assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 4

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
Expand Down Expand Up @@ -418,7 +442,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
passed_filters[0]["auto_followup"]["data"]["target_group_ids"]
).issubset(set([target_group_id] + [saved_group_id]))

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
Expand Down Expand Up @@ -466,7 +490,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
== "IFU"
)

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the source from SP
Expand Down Expand Up @@ -527,7 +551,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
== "IFU"
)

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the source from SP
Expand Down Expand Up @@ -605,7 +629,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
== "IFU"
)

alert, prv_candidates = self.worker.alert_mongify(self.alert)
alert, prv_candidates, _ = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
Expand Down

0 comments on commit b25e61d

Please sign in to comment.