diff --git a/baseplate/sidecars/event_publisher.py b/baseplate/sidecars/event_publisher.py index 9c3dcb2eb..a98974bd1 100644 --- a/baseplate/sidecars/event_publisher.py +++ b/baseplate/sidecars/event_publisher.py @@ -5,7 +5,10 @@ import hashlib import hmac import logging +import signal +import sys +from types import FrameType from typing import Any from typing import List from typing import Optional @@ -164,6 +167,16 @@ def publish(self, payload: SerializedBatch) -> None: SERIALIZER_BY_VERSION = {"2": V2Batch, "2j": V2JBatch} +def serialize_and_publish_batch(publisher: BatchPublisher, batcher: TimeLimitedBatch) -> None: + """Serializes batch, publishes it using the publisher, and then resets the batch for more messages.""" + serialized_batch = batcher.serialize() + try: + publisher.publish(serialized_batch) + except Exception: + logger.exception("Events publishing failed.") + batcher.reset() + + def publish_events() -> None: arg_parser = argparse.ArgumentParser() arg_parser.add_argument( @@ -214,6 +227,33 @@ def publish_events() -> None: batcher = TimeLimitedBatch(serializer, MAX_BATCH_AGE) publisher = BatchPublisher(metrics_client, cfg) + def flush_queue_signal_handler(_signo: int, _frame: FrameType) -> None: + """Signal handler for flushing messages from the queue and publishing them.""" + message: Optional[bytes] + logger.info("Shutdown signal received. Flushing events...") + + while True: + try: + message = event_queue.get(timeout=0.2) + except TimedOutError: + if len(batcher.serialize()) > 0: + serialize_and_publish_batch(publisher, batcher) + break + + try: + batcher.add(message) + continue + except BatchFull: + pass + + serialize_and_publish_batch(publisher, batcher) + batcher.add(message) + sys.exit(0) + + for sig in (signal.SIGINT, signal.SIGTERM): + signal.signal(sig, flush_queue_signal_handler) + signal.siginterrupt(sig, False) + while True: message: Optional[bytes] @@ -228,12 +268,7 @@ def publish_events() -> None: except BatchFull: pass - serialized = batcher.serialize() - try: - publisher.publish(serialized) - except Exception: - logger.exception("Events publishing failed.") - batcher.reset() + serialize_and_publish_batch(publisher, batcher) batcher.add(message)