Skip to content

Commit

Permalink
Add progress log to alert packaging loop.
Browse files Browse the repository at this point in the history
The log ensures that DiaPipelineTask does not appear to "lock up" while
processing alerts. This is motivated in part by Prompt Processing,
which assumes that any active pod emits at least one log every
30 seconds.

The logging overhead will naturally decrease as the task speeds up; run
times shorter than 10 seconds will log only the basic milestones.
  • Loading branch information
kfindeisen committed Jul 20, 2024
1 parent 884708c commit 7de299b
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
__all__ = ("PackageAlertsConfig", "PackageAlertsTask")

import io
import logging
import os
import sys
import time
Expand All @@ -48,6 +49,7 @@
import lsst.pex.config as pexConfig
from lsst.pex.exceptions import InvalidParameterError
import lsst.pipe.base as pipeBase
import lsst.utils.logging
from lsst.utils.timer import timeMethod


Expand Down Expand Up @@ -247,7 +249,14 @@ def run(self,
diffImPhotoCalib = diffIm.getPhotoCalib()
calexpPhotoCalib = calexp.getPhotoCalib()
templatePhotoCalib = template.getPhotoCalib()

n_sources = len(diaSourceCat)
self.log.info("Packaging alerts for %d DiaSources.", n_sources)
# Log every 10 seconds as proof of liveness.
loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0, level=logging.DEBUG)

for srcIndex, diaSource in diaSourceCat.iterrows():
loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources)
# Get all diaSources for the associated diaObject.
# TODO: DM-31992 skip DiaSources associated with Solar System
# Objects for now.
Expand Down Expand Up @@ -305,10 +314,13 @@ def run(self,
templateCutout))

if self.config.doProduceAlerts:
self.log.info("Producing alerts to %s.", self.kafkaTopic)
self.produceAlerts(alerts, visit, detector)

if self.config.doWriteAlerts:
with open(os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro"), "wb") as f:
avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro")
self.log.info("Writing alerts to %s.", avro_path)
with open(avro_path, "wb") as f:
self.alertSchema.store_alerts(f, alerts)

def _patchDiaSources(self, diaSources):
Expand Down

0 comments on commit 7de299b

Please sign in to comment.