From f651493083d597a12e2d4678d00bbaba7becb296 Mon Sep 17 00:00:00 2001 From: Brianna Smart Date: Tue, 9 Apr 2024 15:18:56 -0700 Subject: [PATCH] Add delivery timeout config --- python/lsst/ap/association/packageAlerts.py | 9 ++++++++- tests/test_packageAlerts.py | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/lsst/ap/association/packageAlerts.py b/python/lsst/ap/association/packageAlerts.py index 40dc98f6..be72f105 100644 --- a/python/lsst/ap/association/packageAlerts.py +++ b/python/lsst/ap/association/packageAlerts.py @@ -96,6 +96,13 @@ class PackageAlertsConfig(pexConfig.Config): default=15.0, ) + deliveryTimeout = pexConfig.Field( + dtype=float, + doc="Sets the time to wait for the producer to wait to deliver an " + "alert in milliseconds.", + default=1200.0, + ) + class PackageAlertsTask(pipeBase.Task): """Tasks for packaging Dia and Pipelines data into Avro alert packages. @@ -145,6 +152,7 @@ def __init__(self, **kwargs): # We set the batch size to 2 Mb. "batch.size": 2097152, "linger.ms": 5, + "delivery.timeout.ms": self.config.deliveryTimeout, } self.kafkaAdminConfig = { # This is the URL to use to connect to the Kafka cluster. @@ -325,7 +333,6 @@ def produceAlerts(self, alerts, ccdVisitId): ccdVisitId of the alerts sent to the alert stream. Used to write out alerts which fail to be sent to the alert stream. """ - self._server_check() for alert in alerts: alertBytes = self._serializeAlert(alert, schema=self.alertSchema.definition, schema_id=1) try: diff --git a/tests/test_packageAlerts.py b/tests/test_packageAlerts.py index 8f35d70f..e9a362b0 100644 --- a/tests/test_packageAlerts.py +++ b/tests/test_packageAlerts.py @@ -412,7 +412,7 @@ def test_produceAlerts_success(self, mock_server_check, mock_producer): producer_instance.flush = Mock() packageAlerts.produceAlerts(alerts, ccdVisitId) - self.assertEqual(mock_server_check.call_count, 2) + self.assertEqual(mock_server_check.call_count, 1) self.assertEqual(producer_instance.produce.call_count, len(alerts)) self.assertEqual(producer_instance.flush.call_count, len(alerts)+1) @@ -447,7 +447,7 @@ def mock_produce(*args, **kwargs): packageAlerts.produceAlerts(alerts, ccdVisitId) - self.assertEqual(mock_server_check.call_count, 2) + self.assertEqual(mock_server_check.call_count, 1) self.assertEqual(producer_instance.produce.call_count, len(alerts)) self.assertEqual(patch_open.call_count, 1) self.assertIn("123_2.avro", patch_open.call_args.args[0])