Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with main #253

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ kowalski:
ml:
ZTF:
# instruments need to have a list of allowed features (as tuple), and a list of models
allowed_features: ('drb', 'diffmaglim', 'ra', 'dec', 'magpsf', 'sigmapsf', 'chipsf', 'fwhm', 'sky', 'chinr', 'sharpnr', 'sgscore1', 'distpsnr1', 'sgscore2', 'distpsnr2', 'sgscore3', 'distpsnr3', 'ndethist', 'ncovhist', 'scorr', 'nmtchps', 'clrcoeff', 'clrcounc', 'neargaia', 'neargaiabright', 'classtar', 'peakmag', 'age')
allowed_features: ('drb', 'diffmaglim', 'ra', 'dec', 'magpsf', 'sigmapsf', 'chipsf', 'fwhm', 'sky', 'chinr', 'sharpnr', 'sgscore1', 'distpsnr1', 'sgscore2', 'distpsnr2', 'sgscore3', 'distpsnr3', 'ndethist', 'ncovhist', 'scorr', 'nmtchps', 'clrcoeff', 'clrcounc', 'neargaia', 'neargaiabright', 'classtar', 'peakmag_so_far', 'maxmag_so_far', 'days_since_peak', 'days_to_peak', 'nnondet', 'age')
models:
# models need: a version (string, e.g. "v1"), a triplet (bool), and feature_names (bool, or list of feature names as tuple)
# if feature_names is True, all features from allowed_features are used
Expand Down Expand Up @@ -1152,13 +1152,13 @@ kowalski:
feature_names: ('drb', 'diffmaglim', 'ra', 'dec', 'magpsf', 'sigmapsf', 'chipsf', 'fwhm', 'sky', 'chinr', 'sharpnr', 'sgscore1', 'distpsnr1', 'sgscore2', 'distpsnr2', 'sgscore3', 'distpsnr3', 'ndethist', 'ncovhist', 'scorr', 'nmtchps', 'clrcoeff', 'clrcounc', 'neargaia', 'neargaiabright')
version: "d1_dnn_20201130"
url: "https://github.com/dmitryduev/acai/raw/master/models/acai_b.d1_dnn_20201130.h5"
# bts:
# triplet: True
# feature_names: ('sgscore1','distpsnr1','sgscore2','distpsnr2','fwhm','magpsf','sigmapsf','ra','dec','diffmaglim','ndethist','nmtchps','age','peakmag')
# version: "v03"
# format: "pb"
# order: ["triplet", "features"]
# url: "https://raw.githubusercontent.com/nabeelre/BNB-models/main/v03.tar.gz"
bts:
triplet: True
feature_names: ('sgscore1', 'distpsnr1', 'sgscore2', 'distpsnr2', 'fwhm', 'magpsf', 'sigmapsf', 'chipsf', 'ra', 'dec', 'diffmaglim', 'ndethist', 'nmtchps', 'age', 'days_since_peak', 'days_to_peak', 'peakmag_so_far', 'drb', 'ncovhist', 'nnondet', 'chinr', 'sharpnr', 'scorr', 'sky', 'maxmag_so_far')
version: "v1"
format: "pb"
order: ["triplet", "features"]
url: "https://raw.githubusercontent.com/nabeelre/BTSbot/main/production_models/v1.tar.gz"


skyportal:
Expand Down
180 changes: 162 additions & 18 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
retry,
time_stamp,
timer,
compare_dicts,
)
from warnings import simplefilter

simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

# Tensorflow is problematic for Mac's currently, so we can add an option to disable it
USE_TENSORFLOW = os.environ.get("USE_TENSORFLOW", True) in [
Expand Down Expand Up @@ -1336,7 +1340,10 @@ def alert_filter__user_defined(
).strftime("%Y-%m-%dT%H:%M:%S.%f"),
# one week validity window
},
# constraints
"source_group_ids": [_filter["group_id"]],
"not_if_classified": True,
"not_if_spectra_exist": True,
},
}

Expand Down Expand Up @@ -1677,6 +1684,37 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
log(e)
alert["prv_candidates"] = prv_candidates

# also get all the alerts for this object, to make sure to have all the detections
try:
all_alerts = list(
retry(self.mongo.db[self.collection_alerts].find)(
{
"objectId": alert["objectId"],
"candid": {"$ne": alert["candid"]},
},
{
"candidate": 1,
},
)
)
all_alerts = [
{**a["candidate"]} for a in all_alerts if "candidate" in a
]
# add to prv_candidates the detections that are not already in there
# use the jd and the fid to match
for a in all_alerts:
if not any(
[
(a["jd"] == p["jd"]) and (a["fid"] == p["fid"])
for p in alert["prv_candidates"]
]
):
alert["prv_candidates"].append(a)
del all_alerts
except Exception as e:
# this should never happen, but just in case
log(f"Failed to get all alerts for {alert['objectId']}: {e}")

self.alert_put_photometry(alert)

# post thumbnails
Expand Down Expand Up @@ -1806,7 +1844,14 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
]

if len(passed_filters_followup) > 0:
# first fetch the followup requests on SkyPortal for this alert
# first sort all the filters by priority (highest first)
passed_filters_followup = sorted(
passed_filters_followup,
key=lambda f: f["auto_followup"]["data"]["payload"]["priority"],
reverse=True,
)

# then, fetch the existing followup requests on SkyPortal for this alert
with timer(
f"Getting followup requests for {alert['objectId']} from SkyPortal",
self.verbose > 1,
Expand All @@ -1823,23 +1868,35 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
for r in existing_requests
if r["status"] in ["completed", "submitted"]
]
# sort by priority (highest first)
existing_requests = sorted(
existing_requests,
key=lambda r: r["payload"]["priority"],
reverse=True,
)
else:
log(f"Failed to get followup requests for {alert['objectId']}")
existing_requests = []

for passed_filter in passed_filters_followup:
# post a followup request with the payload and allocation_id
# if there isn't already a pending request for this alert and this allocation_id
if (
len(
[
r
for r in existing_requests
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
]
# look for existing requests with the same allocation, group, and payload
existing_requests_filtered = [
(i, r)
for (i, r) in enumerate(existing_requests)
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
and set([passed_filter["group_id"]]).issubset(
[g["id"] for g in r["target_groups"]]
)
== 0
):
and compare_dicts(
passed_filter["auto_followup"]["data"]["payload"],
r["payload"],
ignore_keys=["priority", "start_date", "end_date"],
)
is True
]
if len(existing_requests_filtered) == 0:
# if no existing request, post a new one
with timer(
f"Posting auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
Expand All @@ -1862,6 +1919,24 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
log(
f"Posted followup request for {alert['objectId']} to SkyPortal"
)
# add it to the existing requests
existing_requests.append(
{
"allocation_id": passed_filter["auto_followup"][
"allocation_id"
],
"payload": passed_filter["auto_followup"][
"data"
]["payload"],
"target_groups": [
{
"id": passed_filter["group_id"],
}
],
"status": "submitted",
}
)

if (
passed_filter["auto_followup"].get("comment", None)
is not None
Expand All @@ -1885,19 +1960,88 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
)
if response.json()["status"] != "success":
raise ValueError(
response.json()["message"]
response.json().get(
"message",
"unknow error posting comment",
)
)
except Exception as e:
log(
f"Failed to post followup comment {comment['text']} for {alert['objectId']} to SkyPortal: {e}"
)
else:
raise ValueError(response.json()["message"])
raise ValueError(
response.json().get(
"message",
"unknow error posting followup request",
)
)
except Exception as e:
log(
f"Failed to post followup request for {alert['objectId']} to SkyPortal: {e}"
)
else:
log(
f"Pending Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal"
)
# if there is an existing request, but the priority is lower than the one we want to post,
# update the existing request with the new priority
request_to_update = existing_requests_filtered[0][1]
if (
passed_filter["auto_followup"]["data"]["payload"]["priority"]
> request_to_update["payload"]["priority"]
):
with timer(
f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
):
# to update, the api needs to get the request id, target group id, and payload
# so we'll basically get that from the existing request, and simply update the priority
try:
data = {
"payload": {
**request_to_update["payload"],
"priority": passed_filter["auto_followup"][
"data"
]["payload"]["priority"],
},
"obj_id": alert["objectId"],
"allocation_id": request_to_update["allocation_id"],
}
response = self.api_skyportal(
"PUT",
f"/api/followup_request/{request_to_update['id']}",
data,
)
if (
response.json()["status"] == "success"
and response.json()
.get("data", {})
.get("ignored", False)
is False
):
log(
f"Updated priority of followup request for {alert['objectId']} to SkyPortal"
)
# update the existing_requests list
existing_requests[existing_requests_filtered[0][0]][
"priority"
] = passed_filter["auto_followup"]["data"][
"payload"
][
"priority"
]

# TODO: post a comment to the source to mention the update
else:
raise ValueError(
response.json().get(
"message",
"unknow error updating followup request",
)
)
except Exception as e:
log(
f"Failed to update priority of followup request for {alert['objectId']} to SkyPortal: {e}"
)
else:
log(
f"Pending Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal, no need for update"
)
13 changes: 12 additions & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,18 @@ def process_alert(alert: Mapping, topic: str):
and len(existing_aux.get("prv_candidates", [])) > 0
):
all_prv_candidates += existing_aux["prv_candidates"]
del existing_aux

# get all alerts for this objectId:
existing_alerts = list(
alert_worker.mongo.db[alert_worker.collection_alerts].find(
{"objectId": object_id}, {"candidate": 1}
)
)
if len(existing_alerts) > 0:
all_prv_candidates += [
existing_alert["candidate"] for existing_alert in existing_alerts
]
del existing_aux, existing_alerts

# ML models:
with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1):
Expand Down
Loading
Loading