From 7de299b79bb6fe953b1ad900ca073a71430dfb1f Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 19 Jul 2024 17:26:26 -0700 Subject: [PATCH] Add progress log to alert packaging loop. The log ensures that DiaPipelineTask does not appear to "lock up" while processing alerts. This is motivated in part by Prompt Processing, which assumes that any active pod emits at least one log every 30 seconds. The logging overhead will naturally decrease as the task speeds up; run times shorter than 10 seconds will log only the basic milestones. --- python/lsst/ap/association/packageAlerts.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/lsst/ap/association/packageAlerts.py b/python/lsst/ap/association/packageAlerts.py index b282c8f1..7109e75a 100644 --- a/python/lsst/ap/association/packageAlerts.py +++ b/python/lsst/ap/association/packageAlerts.py @@ -22,6 +22,7 @@ __all__ = ("PackageAlertsConfig", "PackageAlertsTask") import io +import logging import os import sys import time @@ -48,6 +49,7 @@ import lsst.pex.config as pexConfig from lsst.pex.exceptions import InvalidParameterError import lsst.pipe.base as pipeBase +import lsst.utils.logging from lsst.utils.timer import timeMethod @@ -247,7 +249,14 @@ def run(self, diffImPhotoCalib = diffIm.getPhotoCalib() calexpPhotoCalib = calexp.getPhotoCalib() templatePhotoCalib = template.getPhotoCalib() + + n_sources = len(diaSourceCat) + self.log.info("Packaging alerts for %d DiaSources.", n_sources) + # Log every 10 seconds as proof of liveness. + loop_logger = lsst.utils.logging.PeriodicLogger(self.log, interval=10.0, level=logging.DEBUG) + for srcIndex, diaSource in diaSourceCat.iterrows(): + loop_logger.log("%s/%s sources have been packaged.", len(alerts), n_sources) # Get all diaSources for the associated diaObject. # TODO: DM-31992 skip DiaSources associated with Solar System # Objects for now. @@ -305,10 +314,13 @@ def run(self, templateCutout)) if self.config.doProduceAlerts: + self.log.info("Producing alerts to %s.", self.kafkaTopic) self.produceAlerts(alerts, visit, detector) if self.config.doWriteAlerts: - with open(os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro"), "wb") as f: + avro_path = os.path.join(self.config.alertWriteLocation, f"{visit}_{detector}.avro") + self.log.info("Writing alerts to %s.", avro_path) + with open(avro_path, "wb") as f: self.alertSchema.store_alerts(f, alerts) def _patchDiaSources(self, diaSources):