Skip to content

Commit

Permalink
Add producer to ap_association and update unit tests
Browse files Browse the repository at this point in the history
Add confluent_kafka to ap_association so that alerts can be sent to the alert stream when doProduceAlerts is set. Additionally, update the write alerts function to only write when doWriteAlerts is set. Add unit tests to test the new functionality.
  • Loading branch information
bsmartradio committed Feb 28, 2024
1 parent 2c0a737 commit 2393433
Show file tree
Hide file tree
Showing 3 changed files with 446 additions and 23 deletions.
20 changes: 14 additions & 6 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import numpy as np
import pandas as pd
import logging

from lsst.daf.base import DateTime
import lsst.dax.apdb as daxApdb
Expand All @@ -50,6 +51,9 @@
PackageAlertsTask)
from lsst.ap.association.ssoAssociation import SolarSystemAssociationTask

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


class DiaPipelineConnections(
pipeBase.PipelineTaskConnections,
Expand Down Expand Up @@ -541,12 +545,16 @@ def run(self,
["diaObjectId", "diaForcedSourceId"],
drop=False,
inplace=True)
self.alertPackager.run(associatedDiaSources,
diaCalResult.diaObjectCat,
loaderResult.diaSources,
diaForcedSources,
diffIm,
template)
try:
self.alertPackager.run(associatedDiaSources,
diaCalResult.diaObjectCat,
loaderResult.diaSources,
diaForcedSources,
diffIm,
template)
except ValueError as err:
# Continue processing even if alert sending fails
_log.error(err)

return pipeBase.Struct(apdbMarker=self.config.apdb.value,
associatedDiaSources=associatedDiaSources,
Expand Down
209 changes: 204 additions & 5 deletions python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@

import io
import os
import sys
import logging

from astropy import wcs
import astropy.units as u
from astropy.nddata import CCDData, VarianceUncertainty
import pandas as pd
import struct

import lsst.alert.packet as alertPack
import lsst.afw.geom as afwGeom
Expand All @@ -36,11 +39,14 @@
from lsst.pex.exceptions import InvalidParameterError
import lsst.pipe.base as pipeBase
from lsst.utils.timer import timeMethod

import fastavro

"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


class PackageAlertsConfig(pexConfig.Config):
"""Config class for AssociationTask.
Expand All @@ -63,6 +69,18 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
)

doProduceAlerts = pexConfig.Field(
dtype=bool,
doc="Turn on alert production to kafka if true. Set to false by default",
default=False,
)

doWriteAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true. Set to true by default",
default=True,
)


class PackageAlertsTask(pipeBase.Task):
"""Tasks for packaging Dia and Pipelines data into Avro alert packages.
Expand All @@ -77,6 +95,53 @@ def __init__(self, **kwargs):
self.alertSchema = alertPack.Schema.from_uri(self.config.schemaFile)
os.makedirs(self.config.alertWriteLocation, exist_ok=True)

if self.config.doProduceAlerts:

self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
self.server = os.getenv("AP_KAFKA_SERVER")
self.kafka_topic = os.getenv("AP_KAFKA_TOPIC")
# confluent_kafka configures all of its classes with dictionaries. This one
# sets up the bare minimum that is needed.
self.kafka_config = {
# This is the URL to use to connect to the Kafka cluster.
"bootstrap.servers": self.server,
# These next two properties tell the Kafka client about the specific
# authentication and authorization protocols that should be used when
# connecting.
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
# The sasl.username and sasl.password are passed through over
# SCRAM-SHA-512 auth to connect to the cluster. The username is not
# sensitive, but the password is (of course) a secret value which
# should never be committed to source code.
"sasl.username": self.username,
"sasl.password": self.password,
# Batch size limits the largest size of a kafka alert that can be sent.
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
}

try:
from confluent_kafka import KafkaException
self.kafka_exception = KafkaException
import confluent_kafka
except ImportError as error:
error.add_note("Could not import confluent_kafka. Alerts will not be sent "
"to the alert stream")
_log.error(error)

if not self.password:
raise ValueError("Kafka password environment variable was not set.")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
if not self.kafka_topic:
raise ValueError("Kafka topic environment variable was not set.")
self.producer = confluent_kafka.Producer(**self.kafka_config)

@timeMethod
def run(self,
diaSourceCat,
Expand Down Expand Up @@ -157,10 +222,21 @@ def run(self,
objDiaForcedSources,
diffImCutout,
templateCutout))
with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}.avro"),
"wb") as f:
self.alertSchema.store_alerts(f, alerts)

if self.config.doProduceAlerts and "confluent_kafka" in sys.modules:
self.produceAlerts(alerts, ccdVisitId)

elif self.config.doProduceAlerts and "confluent_kafka" not in sys.modules:
raise Exception("Produce alerts is set but confluent_kafka is not in the environment.")

if self.config.doWriteAlerts:
with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}.avro"),
"wb") as f:
self.alertSchema.store_alerts(f, alerts)

if not self.config.doProduceAlerts and not self.config.doWriteAlerts:
raise Exception("Neither produce alerts nor write alerts is set.")

def _patchDiaSources(self, diaSources):
"""Add the ``programId`` column to the data.
Expand Down Expand Up @@ -193,6 +269,23 @@ def createDiaSourceExtent(self, bboxSize):
extent = geom.Extent2I(bboxSize, bboxSize)
return extent

def produceAlerts(self, alerts, ccdVisitId):

for alert in alerts:
alert_bytes = self._serialize_alert(alert, schema=self.alertSchema.definition, schema_id=1)
try:
self.producer.produce(self.kafka_topic, alert_bytes, callback=self._delivery_callback)
self.producer.flush()

except self.kafka_exception as e:
_log.error('Kafka error: {}, message was {} bytes'.format(e, sys.getsizeof(alert_bytes)))

with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}_{alert['alertId']}.avro"), "wb") as f:
f.write(alert_bytes)

self.producer.flush()

def createCcdDataCutout(self, image, skyCenter, extent, photoCalib, srcId):
"""Grab an image as a cutout and return a calibrated CCDData image.
Expand Down Expand Up @@ -372,3 +465,109 @@ def streamCcdDataToBytes(self, cutout):
cutout.write(streamer, format="fits")
cutoutBytes = streamer.getvalue()
return cutoutBytes

def _serialize_alert(self, alert, schema=None, schema_id=0):
"""Serialize an alert to a byte sequence for sending to Kafka.
Parameters
----------`
alert : `dict`
An alert payload to be serialized.
schema : `dict`, optional
An Avro schema definition describing how to encode `alert`. By default,
the schema is None, which sets it to the latest schema available.
schema_id : `int`, optional
The Confluent Schema Registry ID of the schema. By default, 0 (an
invalid ID) is used, indicating that the schema is not registered.
`
Returns
-------
serialized : `bytes`
The byte sequence describing the alert, including the Confluent Wire
Format prefix.
"""
if schema is None:
schema = self.alertSchema.definition

buf = io.BytesIO()
# TODO: Use a proper schema versioning system (DM-42606)
buf.write(self._serialize_confluent_wire_header(schema_id))
fastavro.schemaless_writer(buf, schema, alert)
return buf.getvalue()

def _deserialize_alert(self, alert_bytes, schema=None):
"""Deserialize an alert message from Kafka.
Paramaters
----------
alert_bytes : `bytes`
Binary-encoding serialized Avro alert, including Confluent Wire
Format prefix.
schema : `dict`, optional
An Avro schema definition describing how to encode `alert`. By default,
the latest schema is used.
Returns
-------
alert : `dict`
An alert payload.
"""
if schema is None:
schema = self.alertSchema.definition

header_bytes = alert_bytes[:5]
version = self._deserialize_confluent_wire_header(header_bytes)
assert version == 0
content_bytes = io.BytesIO(alert_bytes[5:])
return fastavro.schemaless_reader(content_bytes, schema)

@staticmethod
def _serialize_confluent_wire_header(schema_version):
"""Returns the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
Returns
-------
header : `bytes`
The 5-byte encoded message prefix.
Notes
-----
The Confluent Wire Format is described more fully here:
https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format
"""
ConfluentWireFormatHeader = struct.Struct(">bi")
return ConfluentWireFormatHeader.pack(0, schema_version)

@staticmethod
def _deserialize_confluent_wire_header(raw):
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
raw : `bytes`
The 5-byte encoded message prefix.
Returns
-------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
"""
ConfluentWireFormatHeader = struct.Struct(">bi")
_, version = ConfluentWireFormatHeader.unpack(raw)
return version

def _delivery_callback(self, err, msg):
if err:
_log.debug('%% Message failed delivery: %s\n' % err)
else:
_log.debug('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
Loading

0 comments on commit 2393433

Please sign in to comment.