Skip to content

Commit

Permalink
Feat: Add Kafka integration for Parseable server parseablehq#936 .
Browse files Browse the repository at this point in the history
  • Loading branch information
hippalus committed Dec 21, 2024
1 parent 6d3632a commit 4cc28cd
Show file tree
Hide file tree
Showing 22 changed files with 3,313 additions and 356 deletions.
517 changes: 489 additions & 28 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
mime = "0.3.17"

### connectors dependencies
rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["kafka"] }

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
argon2 = "0.5.0"
Expand Down Expand Up @@ -81,13 +86,15 @@ static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
tokio = { version = "1.42", default-features = false, features = [
"sync",
"macros",
"fs",
"rt-multi-thread"
] }
tokio-stream = { version = "0.1", features = ["fs"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio-util = "0.7"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
ulid = { version = "1.0", features = ["serde"] }
uptime_lib = "0.3.0"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
Expand Down
123 changes: 123 additions & 0 deletions scripts/kafka_log_stream_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json
import time
from datetime import datetime, timezone
from random import choice, randint
from uuid import uuid4
from confluent_kafka import Producer

# Configuration
config = {
"kafka_broker": "localhost:9092", # Replace with your Kafka broker address
"kafka_topic": "log-stream", # Replace with your Kafka topic name
"log_rate": 500, # Logs per second
"log_template": {
"timestamp": "", # Timestamp will be added dynamically
"correlation_id": "", # Unique identifier for tracing requests
"level": "INFO", # Log level (e.g., INFO, ERROR, DEBUG)
"message": "", # Main log message to be dynamically set
"pod": {
"name": "example-pod", # Kubernetes pod name
"namespace": "default", # Kubernetes namespace
"node": "node-01" # Kubernetes node name
},
"request": {
"method": "", # HTTP method
"path": "", # HTTP request path
"remote_address": "" # IP address of the client
},
"response": {
"status_code": 200, # HTTP response status code
"latency_ms": 0 # Latency in milliseconds
},
"metadata": {
"container_id": "", # Container ID
"image": "example/image:1.0", # Docker image
"environment": "prod" # Environment (e.g., dev, staging, prod)
}
}
}

producer = Producer({"bootstrap.servers": config["kafka_broker"]})


def delivery_report(err, msg):
if err is not None:
print(f"Delivery failed for message {msg.key()}: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def generate_log():
log = config["log_template"].copy()
log["timestamp"] = datetime.now(timezone.utc).isoformat()
log["correlation_id"] = str(uuid4())

levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
messages = [
"Received incoming HTTP request",
"Processed request successfully",
"Failed to process request",
"Request timeout encountered",
"Service unavailable"
]
log["level"] = choice(levels)
log["message"] = choice(messages)

# Populate request fields
methods = ["GET", "POST", "PUT", "DELETE"]
paths = ["/api/resource", "/api/login", "/api/logout", "/api/data"]
log["request"] = {
"method": choice(methods),
"path": choice(paths),
"remote_address": f"192.168.1.{randint(1, 255)}"
}

# Populate response fields
log["response"] = {
"status_code": choice([200, 201, 400, 401, 403, 404, 500]),
"latency_ms": randint(10, 1000)
}

# Populate pod and metadata fields
log["pod"] = {
"name": f"pod-{randint(1, 100)}",
"namespace": choice(["default", "kube-system", "production", "staging"]),
"node": f"node-{randint(1, 10)}"
}

log["metadata"] = {
"container_id": f"container-{randint(1000, 9999)}",
"image": f"example/image:{randint(1, 5)}.0",
"environment": choice(["dev", "staging", "prod"])
}

return log


def main():
try:
while True:
# Generate log message
log_message = generate_log()
log_json = json.dumps(log_message)

# Send to Kafka
producer.produce(
config["kafka_topic"],
value=log_json,
callback=delivery_report
)

# Flush the producer to ensure delivery
producer.flush()

# Wait based on the log rate
time.sleep(1 / config["log_rate"])
except KeyboardInterrupt:
print("Stopped log generation.")
finally:
producer.flush()


if __name__ == "__main__":
main()
Loading

0 comments on commit 4cc28cd

Please sign in to comment.