Skip to content

Singleton Pulsar Producer Using Python Client Not Working #214

Open
@usmandap

Description

@usmandap

I'm struggling with creating a singleton Apache Pulsar producer in a Django app. I have a Django application that needs to produce hundreds of messages every second. Instead of creating a new producer every time, I want to reuse the same producer that was created the first time.

The problem I'm facing is that the producer gets successfully created, I can see the logs. but if I try to send a message using the producer, it gets stuck without any error. Below is my code for creating the producer.

import logging
import os
import threading

import pulsar

from sastaticketpk.settings.config import PULSAR_ENABLED

logger = logging.getLogger(__name__)

PULSAR_ENV = os.environ.get("PULSAR_CONTAINER")

class PulsarClient:
    __producer = None
    __lock = threading.Lock()  # A lock to ensure thread-safe initialization of the singleton producer

    def __init__(self):
        """ Virtually private constructor. """
        raise RuntimeError("Call get_producer() instead")

    @classmethod
    def initialize_producer(cls):
        if PULSAR_ENABLED and PULSAR_ENV:
            logger.info("Creating pulsar producer")
            client = pulsar.Client(
                "pulsar://k8s-tooling-pulsarpr-7.elb.ap-southeast-1.amazonaws.com:6650"
            )
            # Create the producer
            try:
                cls.__producer = client.create_producer(
                    "persistent://public/default/gaf",
                    send_timeout_millis=1000
                )
                logger.info("Producer created successfully")
            except pulsar._pulsar.TopicNotFound as e:
                logger.error(f"Error creating producer: {e}")
            except Exception as e:
                logger.error(f"Error creating producer: {e}")

    @classmethod
    def get_producer(cls):
        logger.info(f"Producer value {cls.__producer}")
        if cls.__producer is None:
            with cls.__lock:
                if cls.__producer is None:
                    cls.initialize_producer()
        logger.info(f"Is producer connected {cls.__producer}")
        return cls.__producer

I'm importing it in one of the utility files that runs at the startup of the application. Below is the code for calling:

PULSAR_PRODUCER = PulsarClient.get_producer()

The above code works fine on my local setup, but when the same code is deployed on production (i.e., AWS ECS), it doesn't work. If I SSH into the Docker container and then run a Python shell and try to import PULSAR_PRODUCER, it shows as None. Please suggest what I might be missing, as I'm really stuck in understanding.

I have created the same issue on stackoverflow as well,
https://stackoverflow.com/questions/78526293/singleton-pulsar-producer-using-python-client-not-working

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions