diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index b0291eac..11d2c909 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -541,6 +541,7 @@ def alert_mongify(alert: Mapping): - add a placeholder for ML-based classifications - add coordinates for 2D spherical indexing and compute Galactic coordinates - cut off the prv_candidates section + - cut off the fp_hists section (if it exists) :param alert: :return: @@ -577,7 +578,13 @@ def alert_mongify(alert: Mapping): if prv_candidates is None: prv_candidates = [] - return doc, prv_candidates + # cut off the fp_hists section, if it exists + fp_hists = deepcopy(doc.get("fp_hists", None)) + doc.pop("fp_hists", None) + if fp_hists is None: + fp_hists = [] + + return doc, prv_candidates, fp_hists def make_thumbnail( self, alert: Mapping, skyportal_type: str, alert_packet_type: str @@ -1637,7 +1644,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): - decide which points to post to what groups based on permissions - post alert light curve in single PUT call to /api/photometry specifying stream_ids - :param alert: alert with a stripped-off prv_candidates section + :param alert: alert with a stripped-off prv_candidates section and fp_hists sections :param prv_candidates: could be plain prv_candidates section of an alert, or extended alert history :param passed_filters: list of filters that alert passed, with their output :return: diff --git a/kowalski/alert_brokers/alert_broker_pgir.py b/kowalski/alert_brokers/alert_broker_pgir.py index 1a71b97e..2bdc4ea5 100644 --- a/kowalski/alert_brokers/alert_broker_pgir.py +++ b/kowalski/alert_brokers/alert_broker_pgir.py @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str): # candid not in db, ingest decoded avro packet into db with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates = alert_worker.alert_mongify(alert) + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) # create alert history all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])] diff --git a/kowalski/alert_brokers/alert_broker_turbo.py b/kowalski/alert_brokers/alert_broker_turbo.py index 8d6627e5..9878529d 100644 --- a/kowalski/alert_brokers/alert_broker_turbo.py +++ b/kowalski/alert_brokers/alert_broker_turbo.py @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str): # candid not in db, ingest decoded avro packet into db with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates = alert_worker.alert_mongify(alert) + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) with timer(f"Ingesting {object_id} {candid}", alert_worker.verbose > 1): alert_worker.mongo.insert_one( diff --git a/kowalski/alert_brokers/alert_broker_winter.py b/kowalski/alert_brokers/alert_broker_winter.py index 16336e24..85961ef4 100644 --- a/kowalski/alert_brokers/alert_broker_winter.py +++ b/kowalski/alert_brokers/alert_broker_winter.py @@ -80,7 +80,7 @@ def process_alert(alert: Mapping, topic: str): # candid not in db, ingest decoded avro packet into db with timer(f"Mongification of {object_id} {candid}"): - alert, prv_candidates = alert_worker.alert_mongify(alert) + alert, prv_candidates, _ = alert_worker.alert_mongify(alert) # future: add ML model filtering here diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index d355d6ad..ee4783e2 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -58,7 +58,7 @@ def process_alert(alert: Mapping, topic: str): # candid not in db, ingest decoded avro packet into db with timer(f"Mongification of {object_id} {candid}", alert_worker.verbose > 1): - alert, prv_candidates = alert_worker.alert_mongify(alert) + alert, prv_candidates, fp_hists = alert_worker.alert_mongify(alert) # create alert history all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])] @@ -104,6 +104,12 @@ def process_alert(alert: Mapping, topic: str): for prv_candidate in prv_candidates ] + # fp_hists: pop nulls - save space + fp_hists = [ + {kk: vv for kk, vv in fp_hist.items() if vv not in [None, -99999, -99999.0]} + for fp_hist in fp_hists + ] + alert_aux, xmatches, passed_filters = None, None, None # cross-match with external catalogs if objectId not in collection_alerts_aux: if ( @@ -123,6 +129,7 @@ def process_alert(alert: Mapping, topic: str): "_id": object_id, "cross_matches": xmatches, "prv_candidates": prv_candidates, + "fp_hists": fp_hists, } with timer(f"Aux ingesting {object_id} {candid}", alert_worker.verbose > 1): @@ -139,6 +146,7 @@ def process_alert(alert: Mapping, topic: str): )( {"_id": object_id}, {"$addToSet": {"prv_candidates": {"$each": prv_candidates}}}, + {"$addToSet": {"fp_hists": {"$each": fp_hists}}}, upsert=True, ) @@ -160,6 +168,7 @@ def process_alert(alert: Mapping, topic: str): del ( alert, prv_candidates, + fp_hists, all_prv_candidates, scores, xmatches, diff --git a/kowalski/tests/test_alert_broker_pgir.py b/kowalski/tests/test_alert_broker_pgir.py index 82513f41..6cf5283a 100644 --- a/kowalski/tests/test_alert_broker_pgir.py +++ b/kowalski/tests/test_alert_broker_pgir.py @@ -34,7 +34,7 @@ class TestAlertBrokerPGIR: def test_alert_mongification(self): """Test massaging avro packet into a dict digestible by mongodb""" - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) assert alert["candid"] == self.candid assert len(prv_candidates) == 71 @@ -43,7 +43,7 @@ def test_make_photometry(self): assert len(df_photometry) == 71 def test_make_thumbnails(self): - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) for ttype, istrument_type in [ ("new", "Science"), ("ref", "Template"), @@ -54,13 +54,13 @@ def test_make_thumbnails(self): def test_alert_filter__ml(self): """Test executing ML models on the alert""" - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) scores = self.worker.alert_filter__ml(alert) log(scores) def test_alert_filter__xmatch(self): """Test cross matching with external catalog""" - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) xmatches = self.worker.alert_filter__xmatch(alert) catalogs_to_xmatch = config["database"].get("xmatch", {}).get("PGIR", {}).keys() assert isinstance(xmatches, dict) diff --git a/kowalski/tests/test_alert_broker_turbo.py b/kowalski/tests/test_alert_broker_turbo.py index f9c16bf6..2b70ee8e 100644 --- a/kowalski/tests/test_alert_broker_turbo.py +++ b/kowalski/tests/test_alert_broker_turbo.py @@ -34,7 +34,7 @@ class TestAlertBrokerTURBO: def test_alert_mongification(self): """Test massaging avro packet into a dict digestible by mongodb""" - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) assert alert["candid"] == self.candid assert len(prv_candidates) == 0 # 71 @@ -43,7 +43,7 @@ def test_make_photometry(self): assert len(df_photometry) == 0 # 71 def test_make_thumbnails(self): - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) for ttype, istrument_type in [ ("new", "Science"), ("ref", "Template"), @@ -54,7 +54,7 @@ def test_make_thumbnails(self): def test_alert_filter__xmatch(self): """Test cross matching with external catalog""" - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) xmatches = self.worker.alert_filter__xmatch(alert) catalogs_to_xmatch = ( config["database"].get("xmatch", {}).get("TURBO", {}).keys() diff --git a/kowalski/tests/test_alert_broker_wntr.py b/kowalski/tests/test_alert_broker_wntr.py index eb5e012e..0b90ed5f 100644 --- a/kowalski/tests/test_alert_broker_wntr.py +++ b/kowalski/tests/test_alert_broker_wntr.py @@ -35,7 +35,7 @@ class TestAlertBrokerWNTR: def test_alert_mongification(self): """Test massaging avro packet into a dict digestible by mongodb""" - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) assert alert["candid"] == self.candid assert len(alert["candidate"]) > 0 # ensure cand data is not empty assert alert["objectId"] == self.alert["objectId"] @@ -50,7 +50,7 @@ def test_make_photometry(self): assert df_photometry["filter"][0] == "2massj" def test_make_thumbnails(self): - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) for ttype, istrument_type in [ ("new", "Science"), ("ref", "Template"), @@ -61,7 +61,7 @@ def test_make_thumbnails(self): def test_alert_filter__xmatch(self): """Test cross matching with external catalog""" - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) xmatches = self.worker.alert_filter__xmatch(alert) catalogs_to_xmatch = config["database"].get("xmatch", {}).get("WNTR", {}).keys() assert isinstance(xmatches, dict) diff --git a/kowalski/tests/test_alert_broker_ztf.py b/kowalski/tests/test_alert_broker_ztf.py index 45bf7f60..ca204b10 100644 --- a/kowalski/tests/test_alert_broker_ztf.py +++ b/kowalski/tests/test_alert_broker_ztf.py @@ -68,7 +68,7 @@ def filter_template(upstream): def post_alert(worker, alert): - alert, _ = worker.alert_mongify(alert) + alert, _, _ = worker.alert_mongify(alert) # check if it already exists if worker.mongo.db[worker.collection_alerts].count_documents( {"candid": alert["candid"]} @@ -90,16 +90,40 @@ class TestAlertBrokerZTF: def test_alert_mongification(self): """Test massaging avro packet into a dict digestible by mongodb""" - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, fp_hists = self.worker.alert_mongify(self.alert) assert alert["candid"] == self.candid assert len(prv_candidates) == 31 + assert len(fp_hists) == 0 + + def test_alert_mongofication_with_fphists(self): + """Test massaging avro packet into a dict digestible by mongodb, with the new ZTF alert schema""" + candid = 2475433850015010009 + sample_avro = f"data/ztf_alerts/20231012/{candid}.avro" + with open(sample_avro, "rb") as f: + for record in fastavro.reader(f): + alert, prv_candidates, fp_hists = self.worker.alert_mongify(record) + assert alert["candid"] == candid + assert len(prv_candidates) == 20 + assert len(fp_hists) == 21 + + # test the removal of nulls and bad fields (None, -99999, -99999.0) + assert "forcediffimflux" in fp_hists[0] + fp_hists = [ + { + kk: vv + for kk, vv in fp_hist.items() + if vv not in [None, -99999, -99999.0] + } + for fp_hist in fp_hists + ] + assert "forcediffimflux" not in fp_hists[0] def test_make_photometry(self): df_photometry = self.worker.make_photometry(self.alert) assert len(df_photometry) == 32 def test_make_thumbnails(self): - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) for ttype, istrument_type in [ ("new", "Science"), ("ref", "Template"), @@ -110,7 +134,7 @@ def test_make_thumbnails(self): def test_alert_filter__ml(self): """Test executing ML models on the alert""" - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) all_prv_candidates = deepcopy(prv_candidates) + [deepcopy(alert["candidate"])] scores = self.worker.alert_filter__ml(alert, all_prv_candidates) assert len(scores) > 0 @@ -118,7 +142,7 @@ def test_alert_filter__ml(self): def test_alert_filter__xmatch(self): """Test cross matching with external catalog""" - alert, _ = self.worker.alert_mongify(self.alert) + alert, _, _ = self.worker.alert_mongify(self.alert) xmatches = self.worker.alert_filter__xmatch(alert) catalogs_to_xmatch = config["database"].get("xmatch", {}).get("ZTF", {}).keys() assert isinstance(xmatches, dict) @@ -327,7 +351,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): ) assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 3 - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the follow-up request from SP @@ -369,7 +393,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): ) assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 4 - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the follow-up request from SP @@ -418,7 +442,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): passed_filters[0]["auto_followup"]["data"]["target_group_ids"] ).issubset(set([target_group_id] + [saved_group_id])) - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the follow-up request from SP @@ -466,7 +490,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): == "IFU" ) - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the source from SP @@ -527,7 +551,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): == "IFU" ) - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the source from SP @@ -605,7 +629,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): == "IFU" ) - alert, prv_candidates = self.worker.alert_mongify(self.alert) + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) # now fetch the follow-up request from SP