Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Kafka integration for Parseable server #936 . #1047

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
470 changes: 440 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
mime = "0.3.17"

### connectors dependencies
rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["kafka"] }

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
argon2 = "0.5.0"
Expand Down Expand Up @@ -66,7 +71,6 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand All @@ -82,13 +86,15 @@ static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
tokio = { version = "1.42", default-features = false, features = [
"sync",
"macros",
"fs",
"rt-multi-thread"
] }
tokio-stream = { version = "0.1", features = ["fs"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio-util = "0.7"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
ulid = { version = "1.0", features = ["serde"] }
uptime_lib = "0.3.0"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
Expand Down
123 changes: 123 additions & 0 deletions scripts/kafka_log_stream_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json
import time
from datetime import datetime, timezone
from random import choice, randint
from uuid import uuid4
from confluent_kafka import Producer

# Configuration
config = {
"kafka_broker": "localhost:9092", # Replace with your Kafka broker address
"kafka_topic": "log-stream", # Replace with your Kafka topic name
"log_rate": 500, # Logs per second
"log_template": {
"timestamp": "", # Timestamp will be added dynamically
"correlation_id": "", # Unique identifier for tracing requests
"level": "INFO", # Log level (e.g., INFO, ERROR, DEBUG)
"message": "", # Main log message to be dynamically set
"pod": {
"name": "example-pod", # Kubernetes pod name
"namespace": "default", # Kubernetes namespace
"node": "node-01" # Kubernetes node name
},
"request": {
"method": "", # HTTP method
"path": "", # HTTP request path
"remote_address": "" # IP address of the client
},
"response": {
"status_code": 200, # HTTP response status code
"latency_ms": 0 # Latency in milliseconds
},
"metadata": {
"container_id": "", # Container ID
"image": "example/image:1.0", # Docker image
"environment": "prod" # Environment (e.g., dev, staging, prod)
}
}
}

producer = Producer({"bootstrap.servers": config["kafka_broker"]})


def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed for message {msg.key()}: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def generate_log():
log = config["log_template"].copy()
log["timestamp"] = datetime.now(timezone.utc).isoformat()
log["correlation_id"] = str(uuid4())

levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
messages = [
"Received incoming HTTP request",
"Processed request successfully",
"Failed to process request",
"Request timeout encountered",
"Service unavailable"
]
log["level"] = choice(levels)
log["message"] = choice(messages)

# Populate request fields
methods = ["GET", "POST", "PUT", "DELETE"]
paths = ["/api/resource", "/api/login", "/api/logout", "/api/data"]
log["request"] = {
"method": choice(methods),
"path": choice(paths),
"remote_address": f"192.168.1.{randint(1, 255)}"
}

# Populate response fields
log["response"] = {
"status_code": choice([200, 201, 400, 401, 403, 404, 500]),
"latency_ms": randint(10, 1000)
}

# Populate pod and metadata fields
log["pod"] = {
"name": f"pod-{randint(1, 100)}",
"namespace": choice(["default", "kube-system", "production", "staging"]),
"node": f"node-{randint(1, 10)}"
}

log["metadata"] = {
"container_id": f"container-{randint(1000, 9999)}",
"image": f"example/image:{randint(1, 5)}.0",
"environment": choice(["dev", "staging", "prod"])
}

return log


def main():
try:
while True:
# Generate log message
log_message = generate_log()
log_json = json.dumps(log_message)

# Send to Kafka
producer.produce(
config["kafka_topic"],
value=log_json,
callback=delivery_report
)

# Flush the producer to ensure delivery
producer.flush()

# Wait based on the log rate
time.sleep(1 / config["log_rate"])
except KeyboardInterrupt:
print("Stopped log generation.")
finally:
producer.flush()


if __name__ == "__main__":
main()
Loading
Loading