Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add Kafka integration for Parseable server #936 . #1047

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

hippalus
Copy link

@hippalus hippalus commented Dec 21, 2024

Fixes #936

Description

This pull request implements the Kafka connector for Parseable by introducing better stream management, modularizing the code, and integrating Prometheus metrics. It also adds groundwork for dynamic configurations.

How It Works

Partition Management

  • The connector dynamically manages streams for each partition using StreamState.
  • Each Kafka message is routed to the corresponding PartitionStreamReceiver for processing.

Rebalance Handling

  • On partition assignment:
    • New streams are created for untracked partitions.
  • On partition revocation:
    • Streams for revoked partitions are terminated and cleaned up.

Stream Processing

  • The StreamWorker processes messages in batches, ensuring backpressure handling with configurable buffer_size and buffer_timeout.

Metrics

  • Exposed metrics via Prometheus include: Partition lag, consumer throughput, etc provided by rd-kafka statistics.

TODO

  1. Integration Tests:
    • Add tests using test containers to simulate Kafka.
  2. Docker Improvements:
    • Add build dependencies to the Dockerfile for rd-kafka
    • Add docker-compose to test distributed env.
  3. Metrics Collector:
    • Complete KafkaConsumerMetricsCollector

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Copy link
Contributor

github-actions bot commented Dec 21, 2024

CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅

@hippalus
Copy link
Author

I have read the CLA Document and I hereby sign the CLA

nitisht added a commit to parseablehq/.github that referenced this pull request Dec 21, 2024
use tracing::{debug, info};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't want to serialize this back into a file, but I would love to extract it directly from the env vars using clap, can we do that? refer: S3Config

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for all other config types that have to be extracted from env vars

@de-sh de-sh requested a review from parmesant December 21, 2024 05:40
@nitisht
Copy link
Member

nitisht commented Dec 21, 2024

Unfortunately we just completed this feature in #1021 @hippalus

@nitisht
Copy link
Member

nitisht commented Dec 21, 2024

Would you want to contribute something else?

@nitisht
Copy link
Member

nitisht commented Dec 21, 2024

I see useful stuff like backpressure handling which we can augment to the changes added in #1021

@hippalus
Copy link
Author

Unfortunately we just completed this feature in #1021 @hippalus

@nitisht As someone with production-ready experience working with Kafka on both the client and broker sides, I was surprised to see the previous PR merged, as it overlaps with the work I was pursuing. While I may have missed the chance to open a draft PR earlier due to time constraints, I’d still love the opportunity to enhance and refine this feature further.

My goal is to contribute to a more robust and efficient solution. I believe there’s significant room for improvement, particularly in areas like backpressure, error handling, retrying, parallel processing, consumer rebalance, protection from data loss, etc. I’d be happy to collaborate on the existing implementation to make it more solid. Let me know how I can best support these efforts.

@nitisht
Copy link
Member

nitisht commented Dec 21, 2024

My goal is to contribute to a more robust and efficient solution. I believe there’s significant room for improvement, particularly in areas like backpressure, error handling, retrying, parallel processing, consumer rebalance, protection from data loss, etc. I’d be happy to collaborate on the existing implementation to make it more solid. Let me know how I can best support these efforts.

This is great to hear @hippalus . We're more than happy to support you in this. Would you change this PR to add these features instead?

@hippalus
Copy link
Author

My goal is to contribute to a more robust and efficient solution. I believe there’s significant room for improvement, particularly in areas like backpressure, error handling, retrying, parallel processing, consumer rebalance, protection from data loss, etc. I’d be happy to collaborate on the existing implementation to make it more solid. Let me know how I can best support these efforts.

This is great to hear @hippalus . We're more than happy to support you in this. Would you change this PR to add these features instead?

Of course! @nitisht As I mentioned in the PR description, I will implement the current TODOs and PR comments that @de-sh made. Then let's evaluate it.

@hippalus hippalus marked this pull request as draft December 21, 2024 10:54
…le for rdkafka dependencies. Implement retrying for consumer.rcv() fn to handle temporary Kafka unavailability.
Implement KafkaMetricsCollector to collect and expose Kafka client and broker metrics. Refactor ParseableServer.init(..) and connectors::init(..).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feat: Add native Kafka integration for Parseable server
3 participants