Skip to content

Commit

Permalink
update kafka-sink service to set Errored status on events
Browse files Browse the repository at this point in the history
currently all kafka errors result it a "Rejected" event, which for the
HTTP sink returns a 400 to the user.

however most errors should be "Errored", which implies an error happened
and not a bad event that can't be sent to the sink
  • Loading branch information
frankh committed Aug 9, 2024
1 parent 16d2300 commit 66d2ddf
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rdkafka::{
producer::{FutureProducer, FutureRecord},
types::RDKafkaErrorCode,
};
use vector_lib::config;

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};

Expand All @@ -34,11 +35,12 @@ pub struct KafkaRequestMetadata {
pub struct KafkaResponse {
event_byte_size: GroupedCountByteSize,
raw_byte_size: usize,
event_status: EventStatus,
}

impl DriverResponse for KafkaResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
self.event_status
}

fn events_sent(&self) -> &GroupedCountByteSize {
Expand Down Expand Up @@ -153,6 +155,7 @@ impl Service<KafkaRequest> for KafkaService {
.map(|_| KafkaResponse {
event_byte_size,
raw_byte_size,
event_status: EventStatus::Delivered,
})
.map_err(|(err, _)| err);
}
Expand All @@ -168,8 +171,30 @@ impl Service<KafkaRequest> for KafkaService {
record = original_record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// A different error occurred.
Err((err, _)) => return Err(err),
// A final/unretriable error occurred.

Check failure on line 174 in src/sinks/kafka/service.rs

View workflow job for this annotation

GitHub Actions / Check Spelling

`unretriable` is not a recognized word. (unrecognized-spelling)
Err((
err @ KafkaError::MessageProduction(
RDKafkaErrorCode::InvalidMessage |
RDKafkaErrorCode::InvalidMessageSize |
RDKafkaErrorCode::MessageSizeTooLarge |
RDKafkaErrorCode::UnknownTopicOrPartition |
RDKafkaErrorCode::InvalidRecord |
RDKafkaErrorCode::InvalidRequiredAcks |
RDKafkaErrorCode::TopicAuthorizationFailed |
RDKafkaErrorCode::UnsupportedForMessageFormat |
RDKafkaErrorCode::ClusterAuthorizationFailed
),
_,
)) => return Err(err),

// A different error occurred. Set event status to Errored not Rejected.
Err(_) => {
return Ok(KafkaResponse {
event_byte_size: config::telemetry().create_request_count_byte_size(),
raw_byte_size: 0,
event_status: EventStatus::Errored,
})
}
};
}
})
Expand Down

0 comments on commit 66d2ddf

Please sign in to comment.