Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance metrics comparison (RW vs Flink) #11

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.DS_Store
**/.DS_Store
10 changes: 10 additions & 0 deletions 04-solution-demos/compare-metrics/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM flink:1.16.3-scala_2.12

# Download and copy both JAR files into the /opt/flink/lib directory
RUN wget -P /opt/flink/lib \
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.3/flink-sql-connector-kafka-1.16.3.jar && \
wget -P /opt/flink/lib \
https://repo.maven.apache.org/maven2/org/apache/flink/flink-metrics-prometheus/1.16.3/flink-metrics-prometheus-1.16.3.jar && \
chown -R flink:flink /opt/flink/lib \
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory" >> "$FLINK_HOME/conf/flink-conf.yaml"
125 changes: 125 additions & 0 deletions 04-solution-demos/compare-metrics/flink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
version: '3.8'
services:

# zookeeper
zookeeper:
#platform: linux/amd64
image: bitnami/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net

# kafka
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9093:9093"
- "29093:29093"
expose:
- "9093"
- "29093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:29093, LISTENER_DOCKER_EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT, LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTO_OFFSET_RESET: earliest
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka-net

# kafka producer
kafka-producer:
build:
context: .
dockerfile: flink_producer/Dockerfile
depends_on:
- kafka
- zookeeper
environment:
KAFKA_BOOTSTRAP_SERVERS: LISTENER_DOCKER_INTERNAL://kafka:29093
restart: always
networks:
- kafka-net

jobmanager:
build: .
depends_on:
- kafka-producer
ports:
- "8082:8081"
- "9249:9249"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
checkpointing.mode: exactly_once
execution.checkpointing.interval: 60s
rest.flamegraph.enabled: true
networks:
- kafka-net

taskmanager:
build: .
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 5
networks:
- kafka-net

sql-client:
build: .
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
networks:
- kafka-net

prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
networks:
- kafka-net

grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
volumes:
- "./grafana_configs/datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml"
- "./grafana_configs/dashboard.yaml:/etc/grafana/provisioning/dashboards/main.yaml"
- "./grafana_configs:/var/lib/grafana/dashboards"
networks:
- kafka-net

networks:
kafka-net:
driver: bridge
name: kafka-net
7 changes: 7 additions & 0 deletions 04-solution-demos/compare-metrics/flink/flink-conf.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
execution.checkpointing.interval: 60s
state.backend: filesystem
state.checkpoints.dir: file:///flink_checkpoint
state.savepoints.dir: file:///flink_checkpoint

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249 # Port for Prometheus metrics
14 changes: 14 additions & 0 deletions 04-solution-demos/compare-metrics/flink/flink_producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.8

RUN pip install kafka-python

# Create a directory for the Python files
WORKDIR /app

# Copy Python files into the container
COPY flink_producer/run_producers.py .
COPY flink_producer/constant_data.py .
COPY flink_producer/varying_data.py .

# Run the Python script
CMD ["python", "run_producers.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import random
import json
import datetime
import time
import string
from kafka import KafkaProducer

rate_per_second = 5

# Check if broker is available
def is_broker_available():
global producer
try:
return True
except Exception as e:
print(f"Broker not available: {e}")
return False

# Generate a random order ID
def generate_order_id():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))

# Generate a random customer ID
def generate_customer_id():
return ''.join(random.choices(string.digits, k=3))

# Generate a random product ID
def generate_product_id():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=2))

# Generate random purchase event
def generate_purchase_event():
order_id = generate_order_id()
customer_id = generate_customer_id()
product = generate_product_id()
quantity = random.randint(1,5)
timestamp = datetime.datetime.now().isoformat()
total_amount = round(random.uniform(10, 100) * quantity, 2) # Random total amount
return {
"order_id": order_id,
"customer_id": customer_id,
"prod": product,
"quant_in": quantity,
"ts": timestamp,
"tot_amnt_in": total_amount
}

# Kafka topic to produce messages to
topic = 'purchase_constant'

kafka_config = {
'bootstrap_servers': ['kafka:29093']
}
time.sleep(3)
# Kafka producer
producer = KafkaProducer(**kafka_config)

if __name__ == "__main__":

try:
# Produce messages to the Kafka topic
while is_broker_available():

message = generate_purchase_event()
message_str = json.dumps(message).encode('utf-8')

producer.send(topic, message_str)

time.sleep(1/rate_per_second)

finally:

print('Producer closed')

# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()
producer.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import subprocess
import os
import time

# Get the current directory
current_directory = os.path.dirname(os.path.realpath(__file__))

# File paths for the Python files
file1_path = os.path.join(current_directory, 'constant_data.py')
file2_path = os.path.join(current_directory, 'varying_data.py')

# Start file1.py in a separate process
process1 = subprocess.Popen(['python3', file1_path])

# Start file2.py in a separate process
process2 = subprocess.Popen(['python3', file2_path])

# Wait for both processes to finish (which they never will in this case)
process1.wait()
process2.wait()
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import random
import json
import datetime
import time
import string
from kafka import KafkaProducer

rate_per_second = 5

# Check if broker is available
def is_broker_available():
global producer
try:
return True
except Exception as e:
print(f"Broker not available: {e}")
return False

# pause producer
def wait_until():
current_time = datetime.datetime.now().second

if current_time <= 30:
wait = 30 - current_time
elif current_time < 60:
wait = 60 - current_time

print(current_time, wait)
# Otherwise, wait until the target time is reached
time.sleep(wait)

# Generate a random order ID
def generate_order_id():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))

# Generate a random customer ID
def generate_customer_id():
return ''.join(random.choices(string.digits, k=3))

# Generate a random product ID
def generate_product_id():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=2))

# Generate random purchase event
def generate_purchase_event():
order_id = generate_order_id()
customer_id = generate_customer_id()
product = generate_product_id()
quantity = random.randint(1,5)
timestamp = datetime.datetime.now().isoformat()
total_amount = round(random.uniform(10, 100) * quantity, 2) # Random total amount
return {
"order_id": order_id,
"customer_id": customer_id,
"prod": product,
"quant_out": quantity,
"ts": timestamp,
"tot_amnt_out": total_amount
}

# Kafka topic to produce messages to
topic = 'purchase_varying'

kafka_config = {
'bootstrap_servers': ['kafka:29093']
}

time.sleep(3)
# Kafka producer
producer = KafkaProducer(**kafka_config)

if __name__ == "__main__":

try:
# Produce messages to the Kafka topic
while is_broker_available():

if 0 <= datetime.datetime.now().second < 15 or 30 <= datetime.datetime.now().second < 45:

message = generate_purchase_event()
message_str = json.dumps(message).encode('utf-8')
# Produce the message to the topic asynchronously
producer.send(topic, message_str)
time.sleep(1/rate_per_second)

else:
wait_until()

finally:
print('Producer closed')

# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()
producer.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: 1

providers:
- name: 'Flink Metrics'
orgId: 1
folder: 'General'
type: file
disableDeletion: false
updateIntervalSeconds: 3
allowUiUpdates: false
options:
path: /var/lib/grafana/dashboards
foldersFromFilesStructure: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: 1
deleteDatasources:
- name: flink-prometheus
datasources:
- name: flink-prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
withCredentials: false
isDefault: true
tlsAuth: false
tlsAuthWithCACert: false
version: 1
editable: true
Loading