Skip to content

Commit

Permalink
Merge pull request #191 from lsst/tickets/DM-40414
Browse files Browse the repository at this point in the history
DM-40414: Write PipelineTask to transmit alerts to Kafka
  • Loading branch information
bsmartradio authored Mar 1, 2024
2 parents 2c0a737 + 3861bca commit 61a7c33
Show file tree
Hide file tree
Showing 2 changed files with 392 additions and 23 deletions.
171 changes: 162 additions & 9 deletions python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@

import io
import os
import sys

from astropy import wcs
import astropy.units as u
from astropy.nddata import CCDData, VarianceUncertainty
import pandas as pd
import struct
import fastavro
# confluent_kafka is not in the standard Rubin environment as it is a third
# party package and is only needed when producing alerts.
try:
import confluent_kafka
from confluent_kafka import KafkaException
except ImportError:
confluent_kafka = None

import lsst.alert.packet as alertPack
import lsst.afw.geom as afwGeom
Expand All @@ -38,10 +48,6 @@
from lsst.utils.timer import timeMethod


"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""


class PackageAlertsConfig(pexConfig.Config):
"""Config class for AssociationTask.
"""
Expand All @@ -63,6 +69,18 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
)

doProduceAlerts = pexConfig.Field(
dtype=bool,
doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.",
default=False,
)

doWriteAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true.",
default=False,
)


class PackageAlertsTask(pipeBase.Task):
"""Tasks for packaging Dia and Pipelines data into Avro alert packages.
Expand All @@ -77,6 +95,48 @@ def __init__(self, **kwargs):
self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile)
os.makedirs(self.config.alertWriteLocation, exist_ok=True)

if self.config.doProduceAlerts:
if confluent_kafka is not None:
self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
if not self.password:
raise ValueError("Kafka password environment variable was not set.")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
self.server = os.getenv("AP_KAFKA_SERVER")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")
if not self.kafkaTopic:
raise ValueError("Kafka topic environment variable was not set.")

# confluent_kafka configures all of its classes with dictionaries. This one
# sets up the bare minimum that is needed.
self.kafkaConfig = {
# This is the URL to use to connect to the Kafka cluster.
"bootstrap.servers": self.server,
# These next two properties tell the Kafka client about the specific
# authentication and authorization protocols that should be used when
# connecting.
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
# The sasl.username and sasl.password are passed through over
# SCRAM-SHA-512 auth to connect to the cluster. The username is not
# sensitive, but the password is (of course) a secret value which
# should never be committed to source code.
"sasl.username": self.username,
"sasl.password": self.password,
# Batch size limits the largest size of a kafka alert that can be sent.
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
}
self.producer = confluent_kafka.Producer(**self.kafkaConfig)

else:
raise RuntimeError("Produce alerts is set but confluent_kafka is not present in "
"the environment. Alerts will not be sent to the alert stream.")

@timeMethod
def run(self,
diaSourceCat,
Expand All @@ -88,6 +148,10 @@ def run(self,
):
"""Package DiaSources/Object and exposure data into Avro alerts.
Alerts can be sent to the alert stream if ``doProduceAlerts`` is set
and written to disk if ``doWriteAlerts`` is set. Both can be set at the
same time, and are independent of one another.
Writes Avro alerts to a location determined by the
``alertWriteLocation`` configurable.
Expand Down Expand Up @@ -157,10 +221,13 @@ def run(self,
objDiaForcedSources,
diffImCutout,
templateCutout))
with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}.avro"),
"wb") as f:
self.alertSchema.store_alerts(f, alerts)

if self.config.doProduceAlerts:
self.produceAlerts(alerts, ccdVisitId)

if self.config.doWriteAlerts:
with open(os.path.join(self.config.alertWriteLocation, f"{ccdVisitId}.avro"), "wb") as f:
self.alertSchema.store_alerts(f, alerts)

def _patchDiaSources(self, diaSources):
"""Add the ``programId`` column to the data.
Expand All @@ -173,7 +240,7 @@ def _patchDiaSources(self, diaSources):
diaSources["programId"] = 0

def createDiaSourceExtent(self, bboxSize):
"""Create a extent for a box for the cutouts given the size of the
"""Create an extent for a box for the cutouts given the size of the
square BBox that covers the source footprint.
Parameters
Expand All @@ -193,6 +260,33 @@ def createDiaSourceExtent(self, bboxSize):
extent = geom.Extent2I(bboxSize, bboxSize)
return extent

def produceAlerts(self, alerts, ccdVisitId):
"""Serialize alerts and send them to the alert stream using
confluent_kafka's producer.
Parameters
----------
alerts : `dict`
Dictionary of alerts to be sent to the alert stream.
ccdVisitId : `int`
ccdVisitId of the alerts sent to the alert stream. Used to write
out alerts which fail to be sent to the alert stream.
"""
for alert in alerts:
alertBytes = self._serializeAlert(alert, schema=self.alertSchema.definition, schema_id=1)
try:
self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback)
self.producer.flush()

except KafkaException as e:
self.log.warning('Kafka error: {}, message was {} bytes'.format(e, sys.getsizeof(alertBytes)))

with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}_{alert['alertId']}.avro"), "wb") as f:
f.write(alertBytes)

self.producer.flush()

def createCcdDataCutout(self, image, skyCenter, extent, photoCalib, srcId):
"""Grab an image as a cutout and return a calibrated CCDData image.
Expand Down Expand Up @@ -372,3 +466,62 @@ def streamCcdDataToBytes(self, cutout):
cutout.write(streamer, format="fits")
cutoutBytes = streamer.getvalue()
return cutoutBytes

def _serializeAlert(self, alert, schema=None, schema_id=0):
"""Serialize an alert to a byte sequence for sending to Kafka.
Parameters
----------
alert : `dict`
An alert payload to be serialized.
schema : `dict`, optional
An Avro schema definition describing how to encode `alert`. By default,
the schema is None, which sets it to the latest schema available.
schema_id : `int`, optional
The Confluent Schema Registry ID of the schema. By default, 0 (an
invalid ID) is used, indicating that the schema is not registered.
Returns
-------
serialized : `bytes`
The byte sequence describing the alert, including the Confluent Wire
Format prefix.
"""
if schema is None:
schema = self.alertSchema.definition

buf = io.BytesIO()
# TODO: Use a proper schema versioning system (DM-42606)
buf.write(self._serializeConfluentWireHeader(schema_id))
fastavro.schemaless_writer(buf, schema, alert)
return buf.getvalue()

@staticmethod
def _serializeConfluentWireHeader(schema_version):
"""Returns the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
Returns
-------
header : `bytes`
The 5-byte encoded message prefix.
Notes
-----
The Confluent Wire Format is described more fully here:
https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
"""
ConfluentWireFormatHeader = struct.Struct(">bi")
return ConfluentWireFormatHeader.pack(0, schema_version)

def _delivery_callback(self, err, msg):
if err:
self.log.warning('Message failed delivery: %s\n' % err)
else:
self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset())
Loading

0 comments on commit 61a7c33

Please sign in to comment.