Skip to content

Commit

Permalink
Followup request deduplication: ignore "advanced" in payload, target_…
Browse files Browse the repository at this point in the history
…group_ids in deduplication (#255)

* ignore "advanced" in payload

* add target group ids for requests

* don't trigger again if there is a completed or deleted request
  • Loading branch information
Theodlz authored Oct 5, 2023
1 parent bd886c7 commit c30336f
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 29 deletions.
43 changes: 31 additions & 12 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,14 @@ def alert_filter__user_defined(
"allocation_id": _filter["auto_followup"][
"allocation_id"
],
"target_group_ids": [_filter["group_id"]],
"target_group_ids": list(
set(
[_filter["group_id"]]
+ _filter["auto_followup"].get(
"target_group_ids", []
)
)
),
"payload": {
**_filter["auto_followup"].get("payload", {}),
"priority": priority,
Expand Down Expand Up @@ -1440,7 +1447,7 @@ def alert_post_source(
)
if len(not_saved_group_ids) > 0:
log(
f"Source {alert['objectId']} {alert['candid']} was not saved to groups {response.json()['data']['not_saved_group_ids']}"
f"Source {alert['objectId']} {alert['candid']} was not saved to groups {not_saved_group_ids}"
)
else:
raise ValueError(response.json()["message"])
Expand Down Expand Up @@ -1866,7 +1873,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
existing_requests = [
r
for r in existing_requests
if r["status"] in ["completed", "submitted"]
if r["status"] in ["completed", "submitted", "deleted"]
]
# sort by priority (highest first)
existing_requests = sorted(
Expand All @@ -1879,19 +1886,19 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
existing_requests = []

for passed_filter in passed_filters_followup:
# look for existing requests with the same allocation, group, and payload
# look for existing requests with the same allocation, target groups, and payload
existing_requests_filtered = [
(i, r)
for (i, r) in enumerate(existing_requests)
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
and set([passed_filter["group_id"]]).issubset(
[g["id"] for g in r["target_groups"]]
)
== passed_filter["auto_followup"]["data"]["allocation_id"]
and not set(
passed_filter["auto_followup"]["data"]["target_group_ids"]
).isdisjoint(set([g["id"] for g in r["target_groups"]]))
and compare_dicts(
passed_filter["auto_followup"]["data"]["payload"],
r["payload"],
ignore_keys=["priority", "start_date", "end_date"],
ignore_keys=["priority", "start_date", "end_date", "advanced"],
)
is True
]
Expand Down Expand Up @@ -1930,8 +1937,11 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
]["payload"],
"target_groups": [
{
"id": passed_filter["group_id"],
"id": target_group_id,
}
for target_group_id in passed_filter[
"auto_followup"
]["data"]["target_group_ids"]
],
"status": "submitted",
}
Expand Down Expand Up @@ -1984,15 +1994,24 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
# if there is an existing request, but the priority is lower than the one we want to post,
# update the existing request with the new priority
request_to_update = existing_requests_filtered[0][1]
# if the status is completed or deleted, do not update
if request_to_update["status"] in ["completed", "deleted"]:
log(
f"Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal, but is completed or deleted, no need for update"
)
# if the status is submitted, and the new priority is higher, update
if (
passed_filter["auto_followup"]["data"]["payload"]["priority"]
request_to_update["status"] == "submitted"
and passed_filter["auto_followup"]["data"]["payload"][
"priority"
]
> request_to_update["payload"]["priority"]
):
with timer(
f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
):
# to update, the api needs to get the request id, target group id, and payload
# to update, the api needs to get the request id, target group ids, and payload
# so we'll basically get that from the existing request, and simply update the priority
try:
data = {
Expand Down
10 changes: 2 additions & 8 deletions kowalski/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"pipeline": list,
"allocation_id": str,
"payload": dict,
"target_group_ids": list,
}


Expand Down Expand Up @@ -1933,6 +1934,7 @@ async def post(self, request: web.Request) -> web.Response:
}
"auto_followup": {
"allocation_id": 1,
"target_group_ids": [1, 2],
"comment": "SEDM triggered by BTSbot",
"payload": {
"observation_type": "IFU",
Expand Down Expand Up @@ -2432,14 +2434,6 @@ async def patch(self, request: web.Request) -> web.Response:
pass
elif isinstance(value, dict) and "pipeline" not in value:
pass
# elif (
# modifiable_field == "auto_followup"
# and isinstance(value, dict)
# and "pipeline" not in value
# ):
# return self.error(
# message=f"Cannot update filter id {filter_id}: {modifiable_field} must contain a pipeline"
# )
else:
pipeline = value.get("pipeline")
if not isinstance(pipeline, str):
Expand Down
149 changes: 140 additions & 9 deletions kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,11 @@ def test_alert_filter__user_defined_followup_with_broker(self):
ignore_if_saved_group_id = [g for g in groups if g["name"] == "Program B"][0][
"id"
]
target_group_id = [g for g in groups if g["name"] == "Sitewide Group"][0]["id"]

assert saved_group_id is not None
assert ignore_if_saved_group_id is not None
assert target_group_id is not None

response = self.worker.api_skyportal("GET", "/api/allocation", None)
assert response.status_code == 200
Expand Down Expand Up @@ -299,6 +302,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
"payload": { # example payload for SEDM
"observation_type": "IFU",
"priority": 2,
"advanced": False,
},
}
# make a copy of that filter, but with priority 3
Expand Down Expand Up @@ -389,6 +393,51 @@ def test_alert_filter__user_defined_followup_with_broker(self):
followup_requests_updated[0]["id"] == followup_requests[0]["id"]
) # the id should be the same

# now, we'll test the target_group_ids functionality for deduplication
# that is, if a filter has a target_group_ids, then it should only trigger a follow-up request
# if none of the existing requests' target groups have an overlap with the target_group_ids
# of the filter

filter_multiple_groups = deepcopy(filter)
filter_multiple_groups["auto_followup"]["target_group_ids"] = [saved_group_id]
filter_multiple_groups["group_id"] = target_group_id

passed_filters = self.worker.alert_filter__user_defined(
[filter_multiple_groups], self.alert
)

assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 2
assert set(
passed_filters[0]["auto_followup"]["data"]["target_group_ids"]
).issubset(set([target_group_id] + [saved_group_id]))

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# we should still have just one follow-up request, the exact same as before
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
assert response.status_code == 200
followup_requests_updated = response.json()["data"].get("followup_requests", [])
followup_requests_updated = [
f
for f in followup_requests_updated
if (f["allocation_id"] == allocation_id and f["status"] == "submitted")
]
assert len(followup_requests_updated) == 1
assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU"
assert followup_requests_updated[0]["payload"]["priority"] == 4
assert followup_requests_updated[0]["id"] == followup_requests[0]["id"]

# delete the follow-up request
response = self.worker.api_skyportal(
"DELETE", f"/api/followup_request/{followup_requests[0]['id']}", None
Expand Down Expand Up @@ -427,9 +476,10 @@ def test_alert_filter__user_defined_followup_with_broker(self):
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
assert len(source["groups"]) == 1
# should only be saved to the group of the first filter
assert source["groups"][0]["id"] == saved_group_id
# should be saved to Program A and Sitewide Group, but not Program B
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert any([g["id"] == target_group_id for g in source["groups"]])
assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]])

# verify that there isn't a follow-up request
response = self.worker.api_skyportal(
Expand All @@ -444,8 +494,8 @@ def test_alert_filter__user_defined_followup_with_broker(self):
]
assert len(followup_requests) == 0

# rerun the first filter, but with the ignore_if_saved_group_id
# this time we are testing that it does not trigger a follow-up request
# rerun the first filter, but not with the ignore_if_saved_group_id
# this time, we are testing that it does not trigger a follow-up request
# if the source is already classified

# first post a classification
Expand Down Expand Up @@ -487,11 +537,12 @@ def test_alert_filter__user_defined_followup_with_broker(self):
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
assert len(source["groups"]) == 1
# should only be saved to the group of the first filter
assert source["groups"][0]["id"] == saved_group_id

# verify that there is a follow-up request
# should only be saved to Program A and Sitewide Group
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert any([g["id"] == target_group_id for g in source["groups"]])

# verify that there is no follow-up request
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
Expand All @@ -509,6 +560,86 @@ def test_alert_filter__user_defined_followup_with_broker(self):
"DELETE", f"/api/classification/{classification_id}", None
)

# unsave the source from the group
response = self.worker.api_skyportal(
"POST",
"/api/source_groups",
{
"objId": alert["objectId"],
"unsaveGroupIds": [saved_group_id, target_group_id],
},
)
assert response.status_code == 200

# last but not least, we verify that we don't trigger the same request again, if a request has been submitted but then deleted, or is completed already
filter = filter_template(self.worker.collection_alerts)
filter["group_id"] = saved_group_id
filter["autosave"] = {
"active": True,
"comment": "Saved to BTS by BTSbot.",
}
filter["auto_followup"] = {
"active": True,
"pipeline": [
{
"$match": {
"candidate.drb": {"$gt": 0.5},
}
}
],
"allocation_id": allocation_id,
"payload": { # example payload for SEDM
"observation_type": "IFU",
"priority": 2,
"advanced": False,
},
"comment": "SEDM triggered by BTSbot",
}

passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# we should not have any submitted follow-up requests
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
assert response.status_code == 200
followup_requests = response.json()["data"].get("followup_requests", [])
submitted = [
f
for f in followup_requests
if (f["allocation_id"] == allocation_id and f["status"] == "submitted")
]
deleted = [
f
for f in followup_requests
if (f["allocation_id"] == allocation_id and f["status"] == "deleted")
]
assert len(submitted) == 0
assert len(deleted) > 0

# verify that it was save still
response = self.worker.api_skyportal(
"GET", f"/api/sources/{alert['objectId']}", None
)
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
# should be saved to Program A and Sitewide Group, but not Program B
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert not any([g["id"] == target_group_id for g in source["groups"]])
assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]])

# unsave the source from the group
response = self.worker.api_skyportal(
"POST",
Expand Down

0 comments on commit c30336f

Please sign in to comment.