Skip to content

Commit

Permalink
init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
JanikBuecher committed Mar 23, 2021
0 parents commit eec8d5e
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 0 deletions.
Empty file added .gitignore
Empty file.
5 changes: 5 additions & 0 deletions Consumer/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.log
.git
.gitignore
.env
Dockerfile
Empty file added Consumer/.gitignore
Empty file.
8 changes: 8 additions & 0 deletions Consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.7.5-slim


RUN python -m pip install \
kafka-python

ADD stream.py /
ENTRYPOINT [ "python", "stream.py" ]
35 changes: 35 additions & 0 deletions Consumer/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from kafka import KafkaConsumer
from json import loads
import logging
import time
from sys import argv
#logging.basicConfig(level=logging.DEBUG)

def consume():
print('\n<Consuming>')
consumer = KafkaConsumer(
'Test',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=None,
#value_deserializer=lambda m: loads(m.decode('utf-8')),
bootstrap_servers=[bootstrap_server])


for m in consumer:
print(m.value)

try:
bs=argv[1]
print('\n🥾 bootstrap server: {}'.format(bs))
bootstrap_server=bs
except:
# no bs X-D
bootstrap_server='localhost:19092'
print('⚠️ No bootstrap server defined, defaulting to {}\n'.format(bootstrap_server))

try:
consume()

except Exception as e:
print("❌ (uncaught exception in consume): ", e)
5 changes: 5 additions & 0 deletions Producer/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.log
.git
.gitignore
.env
Dockerfile
8 changes: 8 additions & 0 deletions Producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.7.5-slim


RUN python -m pip install \
kafka-python

ADD stream.py /
ENTRYPOINT [ "python", "stream.py" ]
29 changes: 29 additions & 0 deletions Producer/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from kafka import KafkaProducer
from json import dumps
import logging
import time
from sys import argv
#logging.basicConfig(level=logging.DEBUG)
def produce():
# write to the topic
print('\n<Producing>')
producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
for i in range(20):
producer.send('Test', ('Message: ' + str(i)).encode() )
producer.flush()
producer.close(timeout=2)

try:
bs=argv[1]
print('\n🥾 bootstrap server: {}'.format(bs))
bootstrap_server=bs
except:
# no bs X-D
bootstrap_server='localhost:19092'
print('⚠️ No bootstrap server defined, defaulting to {}\n'.format(bootstrap_server))

try:
produce()

except Exception as e:
print("❌ (uncaught exception in produce): ", e)
Empty file added README.md
Empty file.
Binary file added Seminararbeit/DWH.docx
Binary file not shown.
Binary file added Seminararbeit/DWH.pdf
Binary file not shown.
60 changes: 60 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: '2'
networks:
rmoff_kafka:
name: rmoff_kafka
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
container_name: zookeeper
networks:
- rmoff_kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181

broker:
image: confluentinc/cp-kafka:5.5.0
container_name: broker
networks:
- rmoff_kafka
ports:
- '19092:19092'
depends_on:
- zookeeper
environment:
#KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,CONNECTIONS_FROM_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: 'Test:1:1'
producer:
image: janikbuecheribm/producer # Own docker image of producer can be replaced here
container_name: producer
depends_on:
- broker
networks:
- rmoff_kafka
entrypoint:
- bash
- -c
- |
echo 'Giving Kafka a bit of time to start up…'
sleep 30
# Run the client code
python /stream.py broker:9092
consumer:
image: janikbuecheribm/consumer # Own docker image of consumer can be replaced here
container_name: consumer
depends_on:
- broker
networks:
- rmoff_kafka
entrypoint:
- bash
- -c
- |
echo 'Giving Kafka a bit of time to start up…'
sleep 40
echo 'Starting consume…'
# Run the client code
python -u /stream.py broker:9092

0 comments on commit eec8d5e

Please sign in to comment.