Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the kafka package dependency optional to be able to use confluent_kafka #351

Closed
jankatins opened this issue May 10, 2023 · 5 comments · Fixed by #377
Closed

Make the kafka package dependency optional to be able to use confluent_kafka #351

jankatins opened this issue May 10, 2023 · 5 comments · Fixed by #377

Comments

@jankatins
Copy link
Contributor

jankatins commented May 10, 2023

Describe the bug

We use confluent_kafka in the project and it's strange that we need another kafka sdk (kafka-python) just to check that the container is running.

To Reproduce

Install testcontainers-kafka -> observe that a kafka-python gets installed

@tillahoffmann
Copy link
Collaborator

tillahoffmann commented May 10, 2023

Yes, let's explore. Cf. #336 for the same idea for the postgres container. We'd have to figure out what the default behavior should be, e.g.,

  • Which client library is used out-of-the-box?
  • Do we always require users to also install their own client library?
  • How are convenience method to get a client connection implemented (although admittedly a corresponding method does not exist for the KafkaContainer)?

@jankatins
Copy link
Contributor Author

For kafka, the only dependency is on the connect checks. So an idea would be to program the "checks" outside the class and use them conditionally. It would then need two extras, one depending on confluent-kafka, one on kafka-python:

KAFKA_PACKAGE: str
try: 
    import confluent_kafka
    KAFKA_PACKAGE="confluent_kafka"
except ImportError:
    try:
        from kafka import KafkaConsumer
        from kafka.errors import KafkaError, UnrecognizedBrokerVersion, NoBrokersAvailable
        KAFKA_PACKAGE="kafka_python"
    except ImportError
        KAFKA_PACKAGE="MISSING"

@wait_container_is_ready(confluent_kafka.KafkaError, confluent_kafka.KafkaException)
def _connect_confluent_kafka(self) -> None:
   """Condition: try to connect to the broker."""
   consumer = confluent_kafka.Consumer(
        {
           "bootstrap.servers": self.get_bootstrap_server(),
           "group.id": "test",
       }
   )
   if not consumer.list_topics(timeout=0.1):
        raise confluent_kafka.KafkaException("Unable to connect with kafka container!")


@wait_container_is_ready(UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError)
def _connect_kafka_python(self):
    bootstrap_server = self.get_bootstrap_server()
    consumer = KafkaConsumer(group_id='test', bootstrap_servers=[bootstrap_server])
    if not consumer.bootstrap_connected():
        raise KafkaError("Unable to connect with kafka container!")
        

class KafkaContainer:
    ...
    def _connect(self):
        if KAFKA_PACKAGE=="confluent_kafka":
            _connect_confluent_kafka()
       elif KAFKA_PACKAGE=="kafka_python":
            _connect_kafka_python()
      else:
          raise NotImplmentedError("No kafka sdk found, install either confluent-kafka or kafka-python")

(Written into the github window, not tested...)

@jankatins jankatins changed the title Make the kafka package dependecy optional to be able to use confluent_kafka Make the kafka package dependency optional to be able to use confluent_kafka May 11, 2023
@tillahoffmann
Copy link
Collaborator

Do you think it would be feasible to use wait_for_logs as discussed in #340 for postgres? Then we wouldn't have to install any additional packages.

@jankatins
Copy link
Contributor Author

jankatins commented May 11, 2023

The java testcontainer seem to think so: https://github.com/testcontainers/testcontainers-java/blob/main/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java#L136-L143

    protected void configure() {
        if (this.kraftEnabled) {
            waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
            configureKraft();
        } else {
            waitingFor(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
            configureZookeeper();
        }
    }

@tillahoffmann
Copy link
Collaborator

Great, let's do that. Would be nice to bring the implementations closer, too.

alexanderankin pushed a commit that referenced this issue Mar 24, 2024
#377)

Use `wait_for_logs` to wait for startup instead of waiting for
successful connection via kafka-python. Also removes the dependency on
kafka-python.

Closes #351

---------

Co-authored-by: Gudjon Ragnar Brynjarsson <[email protected]>
bearrito pushed a commit to bearrito/testcontainers-python that referenced this issue Mar 30, 2024
testcontainers#377)

Use `wait_for_logs` to wait for startup instead of waiting for
successful connection via kafka-python. Also removes the dependency on
kafka-python.

Closes testcontainers#351

---------

Co-authored-by: Gudjon Ragnar Brynjarsson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants