Skip to content

Commit

Permalink
Merge pull request #206 from lsst/tickets/DM-43501
Browse files Browse the repository at this point in the history
DM-43501: Add delivery timeout config
  • Loading branch information
bsmartradio committed Apr 9, 2024
2 parents c972d45 + f651493 commit 0796242
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
9 changes: 8 additions & 1 deletion python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class PackageAlertsConfig(pexConfig.Config):
default=15.0,
)

deliveryTimeout = pexConfig.Field(
dtype=float,
doc="Sets the time to wait for the producer to wait to deliver an "
"alert in milliseconds.",
default=1200.0,
)


class PackageAlertsTask(pipeBase.Task):
"""Tasks for packaging Dia and Pipelines data into Avro alert packages.
Expand Down Expand Up @@ -145,6 +152,7 @@ def __init__(self, **kwargs):
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
"delivery.timeout.ms": self.config.deliveryTimeout,
}
self.kafkaAdminConfig = {
# This is the URL to use to connect to the Kafka cluster.
Expand Down Expand Up @@ -325,7 +333,6 @@ def produceAlerts(self, alerts, ccdVisitId):
ccdVisitId of the alerts sent to the alert stream. Used to write
out alerts which fail to be sent to the alert stream.
"""
self._server_check()
for alert in alerts:
alertBytes = self._serializeAlert(alert, schema=self.alertSchema.definition, schema_id=1)
try:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def test_produceAlerts_success(self, mock_server_check, mock_producer):
producer_instance.flush = Mock()
packageAlerts.produceAlerts(alerts, ccdVisitId)

self.assertEqual(mock_server_check.call_count, 2)
self.assertEqual(mock_server_check.call_count, 1)
self.assertEqual(producer_instance.produce.call_count, len(alerts))
self.assertEqual(producer_instance.flush.call_count, len(alerts)+1)

Expand Down Expand Up @@ -447,7 +447,7 @@ def mock_produce(*args, **kwargs):

packageAlerts.produceAlerts(alerts, ccdVisitId)

self.assertEqual(mock_server_check.call_count, 2)
self.assertEqual(mock_server_check.call_count, 1)
self.assertEqual(producer_instance.produce.call_count, len(alerts))
self.assertEqual(patch_open.call_count, 1)
self.assertIn("123_2.avro", patch_open.call_args.args[0])
Expand Down

0 comments on commit 0796242

Please sign in to comment.