Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40414: Write PipelineTask to transmit alerts to Kafka #191

Merged
merged 3 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 162 additions & 9 deletions python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,21 @@

import io
import os
import sys

from astropy import wcs
import astropy.units as u
from astropy.nddata import CCDData, VarianceUncertainty
import pandas as pd
import struct
import fastavro
# confluent_kafka is not in the standard Rubin environment as it is a third
# party package and is only needed when producing alerts.
try:
import confluent_kafka
from confluent_kafka import KafkaException
parejkoj marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
confluent_kafka = None

import lsst.alert.packet as alertPack
import lsst.afw.geom as afwGeom
Expand All @@ -38,10 +48,6 @@
from lsst.utils.timer import timeMethod


"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""


class PackageAlertsConfig(pexConfig.Config):
"""Config class for AssociationTask.
"""
Expand All @@ -63,6 +69,18 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh dear, I just noticed this: we definitely don't want it to default to writing to the current working directory, especially since we're using it as the dumping ground for alerts that fail to be sent via kafka! I think this path should either have no default, or be a tempdir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I view this behavior as only meant for debugging, for which the cwd is a reasonable default. There's no sensible choice in production as the PP pod will get blown away. (And for that reason we should figure out a more useful scheme on a future ticket.)

)

doProduceAlerts = pexConfig.Field(
dtype=bool,
doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.",
default=False,
)

doWriteAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true.",
default=False,
)


class PackageAlertsTask(pipeBase.Task):
"""Tasks for packaging Dia and Pipelines data into Avro alert packages.
Expand All @@ -77,6 +95,48 @@ def __init__(self, **kwargs):
self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile)
os.makedirs(self.config.alertWriteLocation, exist_ok=True)

if self.config.doProduceAlerts:
if confluent_kafka is not None:
self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
if not self.password:
raise ValueError("Kafka password environment variable was not set.")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
self.server = os.getenv("AP_KAFKA_SERVER")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")
if not self.kafkaTopic:
raise ValueError("Kafka topic environment variable was not set.")

# confluent_kafka configures all of its classes with dictionaries. This one
# sets up the bare minimum that is needed.
self.kafkaConfig = {
# This is the URL to use to connect to the Kafka cluster.
"bootstrap.servers": self.server,
# These next two properties tell the Kafka client about the specific
# authentication and authorization protocols that should be used when
# connecting.
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
# The sasl.username and sasl.password are passed through over
# SCRAM-SHA-512 auth to connect to the cluster. The username is not
# sensitive, but the password is (of course) a secret value which
# should never be committed to source code.
"sasl.username": self.username,
"sasl.password": self.password,
# Batch size limits the largest size of a kafka alert that can be sent.
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
}
self.producer = confluent_kafka.Producer(**self.kafkaConfig)

bsmartradio marked this conversation as resolved.
Show resolved Hide resolved
bsmartradio marked this conversation as resolved.
Show resolved Hide resolved
else:
raise RuntimeError("Produce alerts is set but confluent_kafka is not present in "
"the environment. Alerts will not be sent to the alert stream.")

@timeMethod
def run(self,
diaSourceCat,
Expand All @@ -88,6 +148,10 @@ def run(self,
):
"""Package DiaSources/Object and exposure data into Avro alerts.

Alerts can be sent to the alert stream if ``doProduceAlerts`` is set
and written to disk if ``doWriteAlerts`` is set. Both can be set at the
same time, and are independent of one another.

Writes Avro alerts to a location determined by the
``alertWriteLocation`` configurable.

Expand Down Expand Up @@ -157,10 +221,13 @@ def run(self,
objDiaForcedSources,
diffImCutout,
templateCutout))
with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}.avro"),
"wb") as f:
self.alertSchema.store_alerts(f, alerts)

if self.config.doProduceAlerts:
self.produceAlerts(alerts, ccdVisitId)

if self.config.doWriteAlerts:
with open(os.path.join(self.config.alertWriteLocation, f"{ccdVisitId}.avro"), "wb") as f:
self.alertSchema.store_alerts(f, alerts)

def _patchDiaSources(self, diaSources):
"""Add the ``programId`` column to the data.
Expand All @@ -173,7 +240,7 @@ def _patchDiaSources(self, diaSources):
diaSources["programId"] = 0

def createDiaSourceExtent(self, bboxSize):
"""Create a extent for a box for the cutouts given the size of the
"""Create an extent for a box for the cutouts given the size of the
square BBox that covers the source footprint.

Parameters
Expand All @@ -193,6 +260,33 @@ def createDiaSourceExtent(self, bboxSize):
extent = geom.Extent2I(bboxSize, bboxSize)
return extent

def produceAlerts(self, alerts, ccdVisitId):
"""Serialize alerts and send them to the alert stream using
confluent_kafka's producer.

parejkoj marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
alerts : `dict`
Dictionary of alerts to be sent to the alert stream.
ccdVisitId : `int`
ccdVisitId of the alerts sent to the alert stream. Used to write
out alerts which fail to be sent to the alert stream.
"""
for alert in alerts:
alertBytes = self._serializeAlert(alert, schema=self.alertSchema.definition, schema_id=1)
try:
self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback)
self.producer.flush()

except KafkaException as e:
self.log.warning('Kafka error: {}, message was {} bytes'.format(e, sys.getsizeof(alertBytes)))

with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}_{alert['alertId']}.avro"), "wb") as f:
f.write(alertBytes)

self.producer.flush()

def createCcdDataCutout(self, image, skyCenter, extent, photoCalib, srcId):
"""Grab an image as a cutout and return a calibrated CCDData image.

Expand Down Expand Up @@ -372,3 +466,62 @@ def streamCcdDataToBytes(self, cutout):
cutout.write(streamer, format="fits")
cutoutBytes = streamer.getvalue()
return cutoutBytes

def _serializeAlert(self, alert, schema=None, schema_id=0):
"""Serialize an alert to a byte sequence for sending to Kafka.

Parameters
----------
alert : `dict`
An alert payload to be serialized.
schema : `dict`, optional
An Avro schema definition describing how to encode `alert`. By default,
the schema is None, which sets it to the latest schema available.
schema_id : `int`, optional
The Confluent Schema Registry ID of the schema. By default, 0 (an
invalid ID) is used, indicating that the schema is not registered.

Returns
-------
serialized : `bytes`
The byte sequence describing the alert, including the Confluent Wire
Format prefix.
"""
if schema is None:
schema = self.alertSchema.definition

buf = io.BytesIO()
# TODO: Use a proper schema versioning system (DM-42606)
buf.write(self._serializeConfluentWireHeader(schema_id))
fastavro.schemaless_writer(buf, schema, alert)
return buf.getvalue()

@staticmethod
def _serializeConfluentWireHeader(schema_version):
"""Returns the byte prefix for Confluent Wire Format-style Kafka messages.

Parameters
----------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.

Returns
-------
header : `bytes`
The 5-byte encoded message prefix.

Notes
-----
The Confluent Wire Format is described more fully here:
https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
"""
ConfluentWireFormatHeader = struct.Struct(">bi")
return ConfluentWireFormatHeader.pack(0, schema_version)

def _delivery_callback(self, err, msg):
if err:
self.log.warning('Message failed delivery: %s\n' % err)
else:
self.log.debug('Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset())
Loading
Loading