Skip to content

Commit

Permalink
Update unit tests and import handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bsmartradio committed Feb 20, 2024
1 parent 6451895 commit 60552ea
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 175 deletions.
6 changes: 1 addition & 5 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import numpy as np
import pandas as pd
import logging

from lsst.daf.base import DateTime
import lsst.dax.apdb as daxApdb
Expand All @@ -51,9 +50,6 @@
PackageAlertsTask)
from lsst.ap.association.ssoAssociation import SolarSystemAssociationTask

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


class DiaPipelineConnections(
pipeBase.PipelineTaskConnections,
Expand Down Expand Up @@ -554,7 +550,7 @@ def run(self,
template)
except ValueError as err:
# Continue processing even if alert sending fails
_log.error(err)
self.log.error(err)

return pipeBase.Struct(apdbMarker=self.config.apdb.value,
associatedDiaSources=associatedDiaSources,
Expand Down
195 changes: 71 additions & 124 deletions python/lsst/ap/association/packageAlerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
import io
import os
import sys
import logging

from astropy import wcs
import astropy.units as u
from astropy.nddata import CCDData, VarianceUncertainty
import pandas as pd
import struct
import fastavro
try:
import confluent_kafka
from confluent_kafka import KafkaException
except ImportError:
confluent_kafka = None

import lsst.alert.packet as alertPack
import lsst.afw.geom as afwGeom
Expand All @@ -39,13 +44,6 @@
from lsst.pex.exceptions import InvalidParameterError
import lsst.pipe.base as pipeBase
from lsst.utils.timer import timeMethod
import fastavro

"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)


class PackageAlertsConfig(pexConfig.Config):
Expand All @@ -71,14 +69,14 @@ class PackageAlertsConfig(pexConfig.Config):

doProduceAlerts = pexConfig.Field(
dtype=bool,
doc="Turn on alert production to kafka if true. Set to false by default",
doc="Turn on alert production to kafka if true and if confluent_kafka is in the environment.",
default=False,
)

doWriteAlerts = pexConfig.Field(
doPackageAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true. Set to true by default",
default=True,
doc="Write alerts to disk if true.",
default=False,
)


Expand All @@ -97,50 +95,49 @@ def __init__(self, **kwargs):

if self.config.doProduceAlerts:

self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
self.server = os.getenv("AP_KAFKA_SERVER")
self.kafka_topic = os.getenv("AP_KAFKA_TOPIC")
# confluent_kafka configures all of its classes with dictionaries. This one
# sets up the bare minimum that is needed.
self.kafka_config = {
# This is the URL to use to connect to the Kafka cluster.
"bootstrap.servers": self.server,
# These next two properties tell the Kafka client about the specific
# authentication and authorization protocols that should be used when
# connecting.
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
# The sasl.username and sasl.password are passed through over
# SCRAM-SHA-512 auth to connect to the cluster. The username is not
# sensitive, but the password is (of course) a secret value which
# should never be committed to source code.
"sasl.username": self.username,
"sasl.password": self.password,
# Batch size limits the largest size of a kafka alert that can be sent.
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
}
if confluent_kafka is not None:

self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
self.server = os.getenv("AP_KAFKA_SERVER")
self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")

if not self.password:
raise ValueError("Kafka password environment variable was not set.")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
if not self.kafkaTopic:
raise ValueError("Kafka topic environment variable was not set.")

# confluent_kafka configures all of its classes with dictionaries. This one
# sets up the bare minimum that is needed.
self.kafkaConfig = {
# This is the URL to use to connect to the Kafka cluster.
"bootstrap.servers": self.server,
# These next two properties tell the Kafka client about the specific
# authentication and authorization protocols that should be used when
# connecting.
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "SCRAM-SHA-512",
# The sasl.username and sasl.password are passed through over
# SCRAM-SHA-512 auth to connect to the cluster. The username is not
# sensitive, but the password is (of course) a secret value which
# should never be committed to source code.
"sasl.username": self.username,
"sasl.password": self.password,
# Batch size limits the largest size of a kafka alert that can be sent.
# We set the batch size to 2 Mb.
"batch.size": 2097152,
"linger.ms": 5,
}

self.producer = confluent_kafka.Producer(**self.kafkaConfig)

try:
from confluent_kafka import KafkaException
self.kafka_exception = KafkaException
import confluent_kafka
except ImportError as error:
error.add_note("Could not import confluent_kafka. Alerts will not be sent "
"to the alert stream")
_log.error(error)

if not self.password:
raise ValueError("Kafka password environment variable was not set.")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
if not self.kafka_topic:
raise ValueError("Kafka topic environment variable was not set.")
self.producer = confluent_kafka.Producer(**self.kafka_config)
else:
self.log.error("Produce alerts is set but confluent_kafka is not present in "
"the environment. Alerts will not be sent to the alert stream.")

@timeMethod
def run(self,
Expand All @@ -153,6 +150,10 @@ def run(self,
):
"""Package DiaSources/Object and exposure data into Avro alerts.
Alerts can be sent to the alert stream if ``doProduceAlerts`` is set
and written to disk if ``doPackageAlerts`` is set. Both can be set at the
same time, and are independent of one another.
Writes Avro alerts to a location determined by the
``alertWriteLocation`` configurable.
Expand Down Expand Up @@ -223,21 +224,13 @@ def run(self,
diffImCutout,
templateCutout))

if self.config.doProduceAlerts and "confluent_kafka" in sys.modules:
if self.config.doProduceAlerts and confluent_kafka is not None:
self.produceAlerts(alerts, ccdVisitId)

elif self.config.doProduceAlerts and "confluent_kafka" not in sys.modules:
raise Exception("Produce alerts is set but confluent_kafka is not in the environment.")

if self.config.doWriteAlerts:
with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}.avro"),
"wb") as f:
if self.config.doPackageAlerts:
with open(os.path.join(self.config.alertWriteLocation, f"{ccdVisitId}.avro"), "wb") as f:
self.alertSchema.store_alerts(f, alerts)

if not self.config.doProduceAlerts and not self.config.doWriteAlerts:
raise Exception("Neither produce alerts nor write alerts is set.")

def _patchDiaSources(self, diaSources):
"""Add the ``programId`` column to the data.
Expand All @@ -249,7 +242,7 @@ def _patchDiaSources(self, diaSources):
diaSources["programId"] = 0

def createDiaSourceExtent(self, bboxSize):
"""Create a extent for a box for the cutouts given the size of the
"""Create an extent for a box for the cutouts given the size of the
square BBox that covers the source footprint.
Parameters
Expand All @@ -272,17 +265,17 @@ def createDiaSourceExtent(self, bboxSize):
def produceAlerts(self, alerts, ccdVisitId):

for alert in alerts:
alert_bytes = self._serialize_alert(alert, schema=self.alertSchema.definition, schema_id=1)
alertBytes = self._serializeAlert(alert, schema=self.alertSchema.definition, schema_id=1)
try:
self.producer.produce(self.kafka_topic, alert_bytes, callback=self._delivery_callback)
self.producer.produce(self.kafkaTopic, alertBytes, callback=self._delivery_callback)
self.producer.flush()

except self.kafka_exception as e:
_log.error('Kafka error: {}, message was {} bytes'.format(e, sys.getsizeof(alert_bytes)))
except KafkaException as e:
self.log.warning('Kafka error: {}, message was {} bytes'.format(e, sys.getsizeof(alertBytes)))

with open(os.path.join(self.config.alertWriteLocation,
f"{ccdVisitId}_{alert['alertId']}.avro"), "wb") as f:
f.write(alert_bytes)
f.write(alertBytes)

self.producer.flush()

Expand Down Expand Up @@ -466,7 +459,7 @@ def streamCcdDataToBytes(self, cutout):
cutoutBytes = streamer.getvalue()
return cutoutBytes

def _serialize_alert(self, alert, schema=None, schema_id=0):
def _serializeAlert(self, alert, schema=None, schema_id=0):
"""Serialize an alert to a byte sequence for sending to Kafka.
Parameters
Expand All @@ -491,38 +484,12 @@ def _serialize_alert(self, alert, schema=None, schema_id=0):

buf = io.BytesIO()
# TODO: Use a proper schema versioning system (DM-42606)
buf.write(self._serialize_confluent_wire_header(schema_id))
buf.write(self._serializeConfluentWireHeader(schema_id))
fastavro.schemaless_writer(buf, schema, alert)
return buf.getvalue()

def _deserialize_alert(self, alert_bytes, schema=None):
"""Deserialize an alert message from Kafka.
Paramaters
----------
alert_bytes : `bytes`
Binary-encoding serialized Avro alert, including Confluent Wire
Format prefix.
schema : `dict`, optional
An Avro schema definition describing how to encode `alert`. By default,
the latest schema is used.
Returns
-------
alert : `dict`
An alert payload.
"""
if schema is None:
schema = self.alertSchema.definition

header_bytes = alert_bytes[:5]
version = self._deserialize_confluent_wire_header(header_bytes)
assert version == 0
content_bytes = io.BytesIO(alert_bytes[5:])
return fastavro.schemaless_reader(content_bytes, schema)

@staticmethod
def _serialize_confluent_wire_header(schema_version):
def _serializeConfluentWireHeader(schema_version):
"""Returns the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
Expand All @@ -545,29 +512,9 @@ def _serialize_confluent_wire_header(schema_version):
ConfluentWireFormatHeader = struct.Struct(">bi")
return ConfluentWireFormatHeader.pack(0, schema_version)

@staticmethod
def _deserialize_confluent_wire_header(raw):
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
raw : `bytes`
The 5-byte encoded message prefix.
Returns
-------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
"""
ConfluentWireFormatHeader = struct.Struct(">bi")
_, version = ConfluentWireFormatHeader.unpack(raw)
return version

def _delivery_callback(self, err, msg):
if err:
_log.debug('%% Message failed delivery: %s\n' % err)
self.log.debug('%% Message failed delivery: %s\n' % err)
else:
_log.debug('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
self.log.debug('%% Message delivered to %s [%d] @ %d\n'
% (msg.topic(), msg.partition(), msg.offset()))
Loading

0 comments on commit 60552ea

Please sign in to comment.