Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup request deduplication: ignore "advanced" in payload, target_group_ids in deduplication #255

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,14 @@ def alert_filter__user_defined(
"allocation_id": _filter["auto_followup"][
"allocation_id"
],
"target_group_ids": [_filter["group_id"]],
"target_group_ids": list(
set(
[_filter["group_id"]]
+ _filter["auto_followup"].get(
"target_group_ids", []
)
)
),
"payload": {
**_filter["auto_followup"].get("payload", {}),
"priority": priority,
Expand Down Expand Up @@ -1440,7 +1447,7 @@ def alert_post_source(
)
if len(not_saved_group_ids) > 0:
log(
f"Source {alert['objectId']} {alert['candid']} was not saved to groups {response.json()['data']['not_saved_group_ids']}"
f"Source {alert['objectId']} {alert['candid']} was not saved to groups {not_saved_group_ids}"
)
else:
raise ValueError(response.json()["message"])
Expand Down Expand Up @@ -1866,7 +1873,7 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
existing_requests = [
r
for r in existing_requests
if r["status"] in ["completed", "submitted"]
if r["status"] in ["completed", "submitted", "deleted"]
]
# sort by priority (highest first)
existing_requests = sorted(
Expand All @@ -1879,19 +1886,19 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
existing_requests = []

for passed_filter in passed_filters_followup:
# look for existing requests with the same allocation, group, and payload
# look for existing requests with the same allocation, target groups, and payload
existing_requests_filtered = [
(i, r)
for (i, r) in enumerate(existing_requests)
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
and set([passed_filter["group_id"]]).issubset(
[g["id"] for g in r["target_groups"]]
)
== passed_filter["auto_followup"]["data"]["allocation_id"]
and not set(
passed_filter["auto_followup"]["data"]["target_group_ids"]
).isdisjoint(set([g["id"] for g in r["target_groups"]]))
and compare_dicts(
passed_filter["auto_followup"]["data"]["payload"],
r["payload"],
ignore_keys=["priority", "start_date", "end_date"],
ignore_keys=["priority", "start_date", "end_date", "advanced"],
)
is True
]
Expand Down Expand Up @@ -1930,8 +1937,11 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
]["payload"],
"target_groups": [
{
"id": passed_filter["group_id"],
"id": target_group_id,
}
for target_group_id in passed_filter[
"auto_followup"
]["data"]["target_group_ids"]
],
"status": "submitted",
}
Expand Down Expand Up @@ -1984,15 +1994,24 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
# if there is an existing request, but the priority is lower than the one we want to post,
# update the existing request with the new priority
request_to_update = existing_requests_filtered[0][1]
# if the status is completed or deleted, do not update
if request_to_update["status"] in ["completed", "deleted"]:
log(
f"Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal, but is completed or deleted, no need for update"
)
# if the status is submitted, and the new priority is higher, update
if (
passed_filter["auto_followup"]["data"]["payload"]["priority"]
request_to_update["status"] == "submitted"
and passed_filter["auto_followup"]["data"]["payload"][
"priority"
]
> request_to_update["payload"]["priority"]
):
with timer(
f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
):
# to update, the api needs to get the request id, target group id, and payload
# to update, the api needs to get the request id, target group ids, and payload
# so we'll basically get that from the existing request, and simply update the priority
try:
data = {
Expand Down
10 changes: 2 additions & 8 deletions kowalski/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"pipeline": list,
"allocation_id": str,
"payload": dict,
"target_group_ids": list,
}


Expand Down Expand Up @@ -1933,6 +1934,7 @@ async def post(self, request: web.Request) -> web.Response:
}
"auto_followup": {
"allocation_id": 1,
"target_group_ids": [1, 2],
"comment": "SEDM triggered by BTSbot",
"payload": {
"observation_type": "IFU",
Expand Down Expand Up @@ -2432,14 +2434,6 @@ async def patch(self, request: web.Request) -> web.Response:
pass
elif isinstance(value, dict) and "pipeline" not in value:
pass
# elif (
# modifiable_field == "auto_followup"
# and isinstance(value, dict)
# and "pipeline" not in value
# ):
# return self.error(
# message=f"Cannot update filter id {filter_id}: {modifiable_field} must contain a pipeline"
# )
else:
pipeline = value.get("pipeline")
if not isinstance(pipeline, str):
Expand Down
149 changes: 140 additions & 9 deletions kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,11 @@ def test_alert_filter__user_defined_followup_with_broker(self):
ignore_if_saved_group_id = [g for g in groups if g["name"] == "Program B"][0][
"id"
]
target_group_id = [g for g in groups if g["name"] == "Sitewide Group"][0]["id"]

assert saved_group_id is not None
assert ignore_if_saved_group_id is not None
assert target_group_id is not None

response = self.worker.api_skyportal("GET", "/api/allocation", None)
assert response.status_code == 200
Expand Down Expand Up @@ -299,6 +302,7 @@ def test_alert_filter__user_defined_followup_with_broker(self):
"payload": { # example payload for SEDM
"observation_type": "IFU",
"priority": 2,
"advanced": False,
},
}
# make a copy of that filter, but with priority 3
Expand Down Expand Up @@ -389,6 +393,51 @@ def test_alert_filter__user_defined_followup_with_broker(self):
followup_requests_updated[0]["id"] == followup_requests[0]["id"]
) # the id should be the same

# now, we'll test the target_group_ids functionality for deduplication
# that is, if a filter has a target_group_ids, then it should only trigger a follow-up request
# if none of the existing requests' target groups have an overlap with the target_group_ids
# of the filter

filter_multiple_groups = deepcopy(filter)
filter_multiple_groups["auto_followup"]["target_group_ids"] = [saved_group_id]
filter_multiple_groups["group_id"] = target_group_id

passed_filters = self.worker.alert_filter__user_defined(
[filter_multiple_groups], self.alert
)

assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 2
assert set(
passed_filters[0]["auto_followup"]["data"]["target_group_ids"]
).issubset(set([target_group_id] + [saved_group_id]))

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# we should still have just one follow-up request, the exact same as before
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
assert response.status_code == 200
followup_requests_updated = response.json()["data"].get("followup_requests", [])
followup_requests_updated = [
f
for f in followup_requests_updated
if (f["allocation_id"] == allocation_id and f["status"] == "submitted")
]
assert len(followup_requests_updated) == 1
assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU"
assert followup_requests_updated[0]["payload"]["priority"] == 4
assert followup_requests_updated[0]["id"] == followup_requests[0]["id"]

# delete the follow-up request
response = self.worker.api_skyportal(
"DELETE", f"/api/followup_request/{followup_requests[0]['id']}", None
Expand Down Expand Up @@ -427,9 +476,10 @@ def test_alert_filter__user_defined_followup_with_broker(self):
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
assert len(source["groups"]) == 1
# should only be saved to the group of the first filter
assert source["groups"][0]["id"] == saved_group_id
# should be saved to Program A and Sitewide Group, but not Program B
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert any([g["id"] == target_group_id for g in source["groups"]])
assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]])

# verify that there isn't a follow-up request
response = self.worker.api_skyportal(
Expand All @@ -444,8 +494,8 @@ def test_alert_filter__user_defined_followup_with_broker(self):
]
assert len(followup_requests) == 0

# rerun the first filter, but with the ignore_if_saved_group_id
# this time we are testing that it does not trigger a follow-up request
# rerun the first filter, but not with the ignore_if_saved_group_id
# this time, we are testing that it does not trigger a follow-up request
# if the source is already classified

# first post a classification
Expand Down Expand Up @@ -487,11 +537,12 @@ def test_alert_filter__user_defined_followup_with_broker(self):
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
assert len(source["groups"]) == 1
# should only be saved to the group of the first filter
assert source["groups"][0]["id"] == saved_group_id

# verify that there is a follow-up request
# should only be saved to Program A and Sitewide Group
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert any([g["id"] == target_group_id for g in source["groups"]])

# verify that there is no follow-up request
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
Expand All @@ -509,6 +560,86 @@ def test_alert_filter__user_defined_followup_with_broker(self):
"DELETE", f"/api/classification/{classification_id}", None
)

# unsave the source from the group
response = self.worker.api_skyportal(
"POST",
"/api/source_groups",
{
"objId": alert["objectId"],
"unsaveGroupIds": [saved_group_id, target_group_id],
},
)
assert response.status_code == 200

# last but not least, we verify that we don't trigger the same request again, if a request has been submitted but then deleted, or is completed already
filter = filter_template(self.worker.collection_alerts)
filter["group_id"] = saved_group_id
filter["autosave"] = {
"active": True,
"comment": "Saved to BTS by BTSbot.",
}
filter["auto_followup"] = {
"active": True,
"pipeline": [
{
"$match": {
"candidate.drb": {"$gt": 0.5},
}
}
],
"allocation_id": allocation_id,
"payload": { # example payload for SEDM
"observation_type": "IFU",
"priority": 2,
"advanced": False,
},
"comment": "SEDM triggered by BTSbot",
}

passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# we should not have any submitted follow-up requests
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
assert response.status_code == 200
followup_requests = response.json()["data"].get("followup_requests", [])
submitted = [
f
for f in followup_requests
if (f["allocation_id"] == allocation_id and f["status"] == "submitted")
]
deleted = [
f
for f in followup_requests
if (f["allocation_id"] == allocation_id and f["status"] == "deleted")
]
assert len(submitted) == 0
assert len(deleted) > 0

# verify that it was save still
response = self.worker.api_skyportal(
"GET", f"/api/sources/{alert['objectId']}", None
)
assert response.status_code == 200
source = response.json()["data"]
assert source["id"] == "ZTF20aajcbhr"
# should be saved to Program A and Sitewide Group, but not Program B
assert any([g["id"] == saved_group_id for g in source["groups"]])
assert not any([g["id"] == target_group_id for g in source["groups"]])
assert not any([g["id"] == ignore_if_saved_group_id for g in source["groups"]])

# unsave the source from the group
response = self.worker.api_skyportal(
"POST",
Expand Down
Loading