From c30336febb9be9b695f3f0140b0430cb7633bd72 Mon Sep 17 00:00:00 2001 From: Theophile du Laz Date: Thu, 5 Oct 2023 14:00:23 -0700 Subject: [PATCH] Followup request deduplication: ignore "advanced" in payload, target_group_ids in deduplication (#255) * ignore "advanced" in payload * add target group ids for requests * don't trigger again if there is a completed or deleted request --- kowalski/alert_brokers/alert_broker.py | 43 +++++-- kowalski/api/api.py | 10 +- kowalski/tests/test_alert_broker_ztf.py | 149 ++++++++++++++++++++++-- 3 files changed, 173 insertions(+), 29 deletions(-) diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 3b75138c..f3039d1a 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -1327,7 +1327,14 @@ def alert_filter__user_defined( "allocation_id": _filter["auto_followup"][ "allocation_id" ], - "target_group_ids": [_filter["group_id"]], + "target_group_ids": list( + set( + [_filter["group_id"]] + + _filter["auto_followup"].get( + "target_group_ids", [] + ) + ) + ), "payload": { **_filter["auto_followup"].get("payload", {}), "priority": priority, @@ -1440,7 +1447,7 @@ def alert_post_source( ) if len(not_saved_group_ids) > 0: log( - f"Source {alert['objectId']} {alert['candid']} was not saved to groups {response.json()['data']['not_saved_group_ids']}" + f"Source {alert['objectId']} {alert['candid']} was not saved to groups {not_saved_group_ids}" ) else: raise ValueError(response.json()["message"]) @@ -1866,7 +1873,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): existing_requests = [ r for r in existing_requests - if r["status"] in ["completed", "submitted"] + if r["status"] in ["completed", "submitted", "deleted"] ] # sort by priority (highest first) existing_requests = sorted( @@ -1879,19 +1886,19 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): existing_requests = [] for passed_filter in passed_filters_followup: - # look for existing requests with the same allocation, group, and payload + # look for existing requests with the same allocation, target groups, and payload existing_requests_filtered = [ (i, r) for (i, r) in enumerate(existing_requests) if r["allocation_id"] - == passed_filter["auto_followup"]["allocation_id"] - and set([passed_filter["group_id"]]).issubset( - [g["id"] for g in r["target_groups"]] - ) + == passed_filter["auto_followup"]["data"]["allocation_id"] + and not set( + passed_filter["auto_followup"]["data"]["target_group_ids"] + ).isdisjoint(set([g["id"] for g in r["target_groups"]])) and compare_dicts( passed_filter["auto_followup"]["data"]["payload"], r["payload"], - ignore_keys=["priority", "start_date", "end_date"], + ignore_keys=["priority", "start_date", "end_date", "advanced"], ) is True ] @@ -1930,8 +1937,11 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): ]["payload"], "target_groups": [ { - "id": passed_filter["group_id"], + "id": target_group_id, } + for target_group_id in passed_filter[ + "auto_followup" + ]["data"]["target_group_ids"] ], "status": "submitted", } @@ -1984,15 +1994,24 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters): # if there is an existing request, but the priority is lower than the one we want to post, # update the existing request with the new priority request_to_update = existing_requests_filtered[0][1] + # if the status is completed or deleted, do not update + if request_to_update["status"] in ["completed", "deleted"]: + log( + f"Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal, but is completed or deleted, no need for update" + ) + # if the status is submitted, and the new priority is higher, update if ( - passed_filter["auto_followup"]["data"]["payload"]["priority"] + request_to_update["status"] == "submitted" + and passed_filter["auto_followup"]["data"]["payload"][ + "priority" + ] > request_to_update["payload"]["priority"] ): with timer( f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal", self.verbose > 1, ): - # to update, the api needs to get the request id, target group id, and payload + # to update, the api needs to get the request id, target group ids, and payload # so we'll basically get that from the existing request, and simply update the priority try: data = { diff --git a/kowalski/api/api.py b/kowalski/api/api.py index 28760a33..e438d2b9 100644 --- a/kowalski/api/api.py +++ b/kowalski/api/api.py @@ -82,6 +82,7 @@ "pipeline": list, "allocation_id": str, "payload": dict, + "target_group_ids": list, } @@ -1933,6 +1934,7 @@ async def post(self, request: web.Request) -> web.Response: } "auto_followup": { "allocation_id": 1, + "target_group_ids": [1, 2], "comment": "SEDM triggered by BTSbot", "payload": { "observation_type": "IFU", @@ -2432,14 +2434,6 @@ async def patch(self, request: web.Request) -> web.Response: pass elif isinstance(value, dict) and "pipeline" not in value: pass - # elif ( - # modifiable_field == "auto_followup" - # and isinstance(value, dict) - # and "pipeline" not in value - # ): - # return self.error( - # message=f"Cannot update filter id {filter_id}: {modifiable_field} must contain a pipeline" - # ) else: pipeline = value.get("pipeline") if not isinstance(pipeline, str): diff --git a/kowalski/tests/test_alert_broker_ztf.py b/kowalski/tests/test_alert_broker_ztf.py index 2e2bcd50..45bf7f60 100644 --- a/kowalski/tests/test_alert_broker_ztf.py +++ b/kowalski/tests/test_alert_broker_ztf.py @@ -244,8 +244,11 @@ def test_alert_filter__user_defined_followup_with_broker(self): ignore_if_saved_group_id = [g for g in groups if g["name"] == "Program B"][0][ "id" ] + target_group_id = [g for g in groups if g["name"] == "Sitewide Group"][0]["id"] + assert saved_group_id is not None assert ignore_if_saved_group_id is not None + assert target_group_id is not None response = self.worker.api_skyportal("GET", "/api/allocation", None) assert response.status_code == 200 @@ -299,6 +302,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): "payload": { # example payload for SEDM "observation_type": "IFU", "priority": 2, + "advanced": False, }, } # make a copy of that filter, but with priority 3 @@ -389,6 +393,51 @@ def test_alert_filter__user_defined_followup_with_broker(self): followup_requests_updated[0]["id"] == followup_requests[0]["id"] ) # the id should be the same + # now, we'll test the target_group_ids functionality for deduplication + # that is, if a filter has a target_group_ids, then it should only trigger a follow-up request + # if none of the existing requests' target groups have an overlap with the target_group_ids + # of the filter + + filter_multiple_groups = deepcopy(filter) + filter_multiple_groups["auto_followup"]["target_group_ids"] = [saved_group_id] + filter_multiple_groups["group_id"] = target_group_id + + passed_filters = self.worker.alert_filter__user_defined( + [filter_multiple_groups], self.alert + ) + + assert passed_filters is not None + assert len(passed_filters) == 1 + assert "auto_followup" in passed_filters[0] + assert ( + passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"] + == "IFU" + ) + assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 2 + assert set( + passed_filters[0]["auto_followup"]["data"]["target_group_ids"] + ).issubset(set([target_group_id] + [saved_group_id])) + + alert, prv_candidates = self.worker.alert_mongify(self.alert) + self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) + + # now fetch the follow-up request from SP + # we should still have just one follow-up request, the exact same as before + response = self.worker.api_skyportal( + "GET", f"/api/followup_request?sourceID={alert['objectId']}", None + ) + assert response.status_code == 200 + followup_requests_updated = response.json()["data"].get("followup_requests", []) + followup_requests_updated = [ + f + for f in followup_requests_updated + if (f["allocation_id"] == allocation_id and f["status"] == "submitted") + ] + assert len(followup_requests_updated) == 1 + assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU" + assert followup_requests_updated[0]["payload"]["priority"] == 4 + assert followup_requests_updated[0]["id"] == followup_requests[0]["id"] + # delete the follow-up request response = self.worker.api_skyportal( "DELETE", f"/api/followup_request/{followup_requests[0]['id']}", None @@ -427,9 +476,10 @@ def test_alert_filter__user_defined_followup_with_broker(self): assert response.status_code == 200 source = response.json()["data"] assert source["id"] == "ZTF20aajcbhr" - assert len(source["groups"]) == 1 - # should only be saved to the group of the first filter - assert source["groups"][0]["id"] == saved_group_id + # should be saved to Program A and Sitewide Group, but not Program B + assert any([g["id"] == saved_group_id for g in source["groups"]]) + assert any([g["id"] == target_group_id for g in source["groups"]]) + assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]]) # verify that there isn't a follow-up request response = self.worker.api_skyportal( @@ -444,8 +494,8 @@ def test_alert_filter__user_defined_followup_with_broker(self): ] assert len(followup_requests) == 0 - # rerun the first filter, but with the ignore_if_saved_group_id - # this time we are testing that it does not trigger a follow-up request + # rerun the first filter, but not with the ignore_if_saved_group_id + # this time, we are testing that it does not trigger a follow-up request # if the source is already classified # first post a classification @@ -487,11 +537,12 @@ def test_alert_filter__user_defined_followup_with_broker(self): assert response.status_code == 200 source = response.json()["data"] assert source["id"] == "ZTF20aajcbhr" - assert len(source["groups"]) == 1 - # should only be saved to the group of the first filter - assert source["groups"][0]["id"] == saved_group_id - # verify that there is a follow-up request + # should only be saved to Program A and Sitewide Group + assert any([g["id"] == saved_group_id for g in source["groups"]]) + assert any([g["id"] == target_group_id for g in source["groups"]]) + + # verify that there is no follow-up request response = self.worker.api_skyportal( "GET", f"/api/followup_request?sourceID={alert['objectId']}", None ) @@ -509,6 +560,86 @@ def test_alert_filter__user_defined_followup_with_broker(self): "DELETE", f"/api/classification/{classification_id}", None ) + # unsave the source from the group + response = self.worker.api_skyportal( + "POST", + "/api/source_groups", + { + "objId": alert["objectId"], + "unsaveGroupIds": [saved_group_id, target_group_id], + }, + ) + assert response.status_code == 200 + + # last but not least, we verify that we don't trigger the same request again, if a request has been submitted but then deleted, or is completed already + filter = filter_template(self.worker.collection_alerts) + filter["group_id"] = saved_group_id + filter["autosave"] = { + "active": True, + "comment": "Saved to BTS by BTSbot.", + } + filter["auto_followup"] = { + "active": True, + "pipeline": [ + { + "$match": { + "candidate.drb": {"$gt": 0.5}, + } + } + ], + "allocation_id": allocation_id, + "payload": { # example payload for SEDM + "observation_type": "IFU", + "priority": 2, + "advanced": False, + }, + "comment": "SEDM triggered by BTSbot", + } + + passed_filters = self.worker.alert_filter__user_defined([filter], self.alert) + assert passed_filters is not None + assert len(passed_filters) == 1 + assert "auto_followup" in passed_filters[0] + assert ( + passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"] + == "IFU" + ) + + alert, prv_candidates = self.worker.alert_mongify(self.alert) + self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters) + + # now fetch the follow-up request from SP + # we should not have any submitted follow-up requests + response = self.worker.api_skyportal( + "GET", f"/api/followup_request?sourceID={alert['objectId']}", None + ) + assert response.status_code == 200 + followup_requests = response.json()["data"].get("followup_requests", []) + submitted = [ + f + for f in followup_requests + if (f["allocation_id"] == allocation_id and f["status"] == "submitted") + ] + deleted = [ + f + for f in followup_requests + if (f["allocation_id"] == allocation_id and f["status"] == "deleted") + ] + assert len(submitted) == 0 + assert len(deleted) > 0 + + # verify that it was save still + response = self.worker.api_skyportal( + "GET", f"/api/sources/{alert['objectId']}", None + ) + assert response.status_code == 200 + source = response.json()["data"] + assert source["id"] == "ZTF20aajcbhr" + # should be saved to Program A and Sitewide Group, but not Program B + assert any([g["id"] == saved_group_id for g in source["groups"]]) + assert not any([g["id"] == target_group_id for g in source["groups"]]) + assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]]) + # unsave the source from the group response = self.worker.api_skyportal( "POST",