Skip to content

Commit

Permalink
use wait_for_logs for kafka container and remove kafka-python dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Gudjon Ragnar Brynjarsson committed Mar 3, 2024
1 parent 7358b49 commit a18c1b1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 31 deletions.
15 changes: 3 additions & 12 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from io import BytesIO
from textwrap import dedent

from kafka import KafkaConsumer
from kafka.errors import KafkaError, NoBrokersAvailable, UnrecognizedBrokerVersion
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.core.waiting_utils import wait_for_logs


class KafkaContainer(DockerContainer):
Expand Down Expand Up @@ -47,13 +45,6 @@ def get_bootstrap_server(self) -> str:
port = self.get_exposed_port(self.port)
return f"{host}:{port}"

@wait_container_is_ready(UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError)
def _connect(self) -> None:
bootstrap_server = self.get_bootstrap_server()
consumer = KafkaConsumer(group_id="test", bootstrap_servers=[bootstrap_server])
if not consumer.bootstrap_connected():
raise KafkaError("Unable to connect with kafka container!")

def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
Expand All @@ -78,13 +69,13 @@ def tc_start(self) -> None:
)
self.create_file(data, KafkaContainer.TC_START_SCRIPT)

def start(self) -> "KafkaContainer":
def start(self, timeout=30) -> "KafkaContainer":
script = KafkaContainer.TC_START_SCRIPT
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
self._connect()
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
Expand Down
23 changes: 6 additions & 17 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ clickhouse-driver = { version = "*", optional = true }
google-cloud-pubsub = { version = ">=2", optional = true }
kubernetes = { version = "*", optional = true }
pyyaml = { version = "*", optional = true }
kafka-python = { version = "*", optional = true }
python-keycloak = { version = "*", optional = true }
boto3 = { version = "*", optional = true }
minio = { version = "*", optional = true }
Expand All @@ -91,7 +90,7 @@ clickhouse = ["clickhouse-driver"]
elasticsearch = []
google = ["google-cloud-pubsub"]
k3s = ["kubernetes", "pyyaml"]
kafka = ["kafka-python"]
kafka = []
keycloak = ["python-keycloak"]
localstack = ["boto3"]
minio = ["minio"]
Expand Down

0 comments on commit a18c1b1

Please sign in to comment.