Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
keep overflow detection in sink pending removal
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Apr 29, 2024
1 parent 63f3389 commit 23ef91e
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 83 deletions.
9 changes: 8 additions & 1 deletion capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ impl IntoResponse for CaptureError {
}
}

#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum DataType {
AnalyticsMain,
AnalyticsHistorical,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
#[serde(skip_serializing)]
pub data_type: DataType,
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
Expand Down
67 changes: 25 additions & 42 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config::KafkaConfig;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
use crate::sinks::{DataType, Event};
use crate::sinks::Event;

struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -139,28 +139,23 @@ impl KafkaSink {
self.producer.flush(Duration::new(30, 0))
}

async fn kafka_send(
&self,
event: ProcessedEvent,
data_type: &DataType,
) -> Result<DeliveryFuture, CaptureError> {
async fn kafka_send(&self, event: ProcessedEvent) -> Result<DeliveryFuture, CaptureError> {
let payload = serde_json::to_string(&event).map_err(|e| {
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

let event_key = event.key();
let (topic, partition_key): (&str, Option<&str>) = match data_type {
let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: move overflow up in the handler
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
(&self.main_topic, None)
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
}
}
DataType::AnalyticsOverflow => (&self.main_topic, None), // Overflow is going on the main topic for analytics
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
};

match self.producer.send_result(FutureRecord {
Expand Down Expand Up @@ -217,25 +212,21 @@ impl KafkaSink {
#[async_trait]
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError> {
let ack = self.kafka_send(event, &data_type).await?;
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let ack = self.kafka_send(event).await?;
histogram!("capture_event_batch_size").record(1.0);
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
.await
}

#[instrument(skip_all)]
async fn send_batch(
&self,
data_type: DataType,
events: Vec<ProcessedEvent>,
) -> Result<(), CaptureError> {
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let mut set = JoinSet::new();
let batch_size = events.len();
for event in events {
// We await kafka_send to get events in the producer queue sequentially
let ack = self.kafka_send(event, &data_type).await?;
let ack = self.kafka_send(event).await?;

// Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
Expand Down Expand Up @@ -269,11 +260,11 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::{DataType, Event};
use crate::sinks::Event;
use crate::utils::uuid_v7;
use health::HealthRegistry;
use rand::distributions::Alphanumeric;
Expand Down Expand Up @@ -316,6 +307,7 @@ mod tests {

let (cluster, sink) = start_on_mocked_sink().await;
let event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand All @@ -327,20 +319,16 @@ mod tests {

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
for _ in 0..20 {
if sink
.send(DataType::AnalyticsMain, event.clone())
.await
.is_ok()
{
if sink.send(event.clone()).await.is_ok() {
break;
}
}

// Send events to confirm happy path
sink.send(DataType::AnalyticsMain, event.clone())
sink.send(event.clone())
.await
.expect("failed to send one initial event");
sink.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()])
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send initial event batch");

Expand All @@ -351,6 +339,7 @@ mod tests {
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand All @@ -359,7 +348,7 @@ mod tests {
sent_at: None,
token: "token1".to_string(),
};
match sink.send(DataType::AnalyticsMain, big_event).await {
match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
Expand All @@ -369,18 +358,15 @@ mod tests {
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(DataType::AnalyticsMain, event.clone()).await {
match sink.send(event.clone()).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink
.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()])
.await
{
match sink.send_batch(vec![event.clone(), event.clone()]).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
Expand All @@ -390,29 +376,26 @@ mod tests {
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send(DataType::AnalyticsMain, event.clone())
sink.send(event.clone())
.await
.expect("failed to send one event after recovery");
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
sink.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()])
sink.send_batch(vec![event.clone(), event.clone()])
.await
.expect("failed to send event batch after recovery");

// Timeout on a sustained transient error
cluster.clear_request_errors(RDKafkaApiKey::Produce);
let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 50];
cluster.request_errors(RDKafkaApiKey::Produce, &err);
match sink.send(DataType::AnalyticsMain, event.clone()).await {
match sink.send(event.clone()).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
};
match sink
.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()])
.await
{
match sink.send_batch(vec![event.clone(), event.clone()]).await {
Err(CaptureError::RetryableSinkError) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Ok(()) => panic!("should have errored"),
Expand Down
15 changes: 2 additions & 13 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,8 @@ use crate::api::{CaptureError, ProcessedEvent};
pub mod kafka;
pub mod print;

#[derive(Debug, Copy, Clone)]
pub enum DataType {
AnalyticsMain,
AnalyticsOverflow,
AnalyticsHistorical,
}

#[async_trait]
pub trait Event {
async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(
&self,
data_type: DataType,
events: Vec<ProcessedEvent>,
) -> Result<(), CaptureError>;
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>;
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError>;
}
14 changes: 5 additions & 9 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,26 @@ use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::{CaptureError, ProcessedEvent};
use crate::sinks::{DataType, Event};
use crate::sinks::Event;

pub struct PrintSink {}

#[async_trait]
impl Event for PrintSink {
async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single {:?} event: {:?}", data_type, event);
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
info!("single {:?} event: {:?}", event.data_type, event);
counter!("capture_events_ingested_total").increment(1);

Ok(())
}
async fn send_batch(
&self,
data_type: DataType,
events: Vec<ProcessedEvent>,
) -> Result<(), CaptureError> {
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
let span = tracing::span!(tracing::Level::INFO, "batch of events");
let _enter = span.enter();

histogram!("capture_event_batch_size").record(events.len() as f64);
counter!("capture_events_ingested_total").increment(events.len() as u64);
for event in events {
info!("{:?} event: {:?}", data_type, event);
info!("{:?} event: {:?}", event.data_type, event);
}

Ok(())
Expand Down
19 changes: 10 additions & 9 deletions capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ use tracing::instrument;

use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::sinks::DataType;
use crate::v0_request::{Compression, ProcessingContext, RawRequest};
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode, ProcessedEvent},
api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent},
router, sinks,
utils::uuid_v7,
v0_request::{EventFormData, EventQuery, RawEvent},
Expand Down Expand Up @@ -121,16 +120,12 @@ pub async fn event(
counter!("capture_events_received_total").increment(events.len() as u64);

let context = ProcessingContext {
data_type: if is_historical {
DataType::AnalyticsHistorical
} else {
DataType::AnalyticsMain
},
lib_version: meta.lib_version.clone(),
sent_at,
token,
now: state.timesource.current_time(),
client_ip: ip.to_string(),
is_historical,
};

let billing_limited = state
Expand Down Expand Up @@ -180,12 +175,18 @@ pub fn process_single_event(
return Err(CaptureError::MissingEventName);
}

let data_type = match context.is_historical {
true => DataType::AnalyticsHistorical,
false => DataType::AnalyticsMain,
};

let data = serde_json::to_string(&event).map_err(|e| {
tracing::error!("failed to encode data field: {}", e);
CaptureError::NonRetryableSinkError
})?;

Ok(ProcessedEvent {
data_type,
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id: event.extract_distinct_id()?,
ip: context.client_ip.clone(),
Expand All @@ -210,8 +211,8 @@ pub async fn process_events<'a>(
tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
sink.send(context.data_type, events[0].clone()).await
sink.send(events[0].clone()).await
} else {
sink.send_batch(context.data_type, events).await
sink.send_batch(events).await
}
}
3 changes: 1 addition & 2 deletions capture/src/v0_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tracing::instrument;
use uuid::Uuid;

use crate::api::CaptureError;
use crate::sinks::DataType;
use crate::token::validate_token;

#[derive(Deserialize, Default)]
Expand Down Expand Up @@ -228,12 +227,12 @@ impl RawEvent {

#[derive(Debug)]
pub struct ProcessingContext {
pub data_type: DataType,
pub lib_version: Option<String>,
pub sent_at: Option<OffsetDateTime>,
pub token: String,
pub now: String,
pub client_ip: String,
pub is_historical: bool,
}

#[cfg(test)]
Expand Down
10 changes: 3 additions & 7 deletions capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, Processed
use capture::limiters::billing::BillingLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::{DataType, Event};
use capture::sinks::Event;
use capture::time::TimeSource;
use health::HealthRegistry;
use serde::Deserialize;
Expand Down Expand Up @@ -61,16 +61,12 @@ impl MemorySink {

#[async_trait]
impl Event for MemorySink {
async fn send(&self, _: DataType, event: ProcessedEvent) -> Result<(), CaptureError> {
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
self.events.lock().unwrap().push(event);
Ok(())
}

async fn send_batch(
&self,
_: DataType,
events: Vec<ProcessedEvent>,
) -> Result<(), CaptureError> {
async fn send_batch(&self, events: Vec<ProcessedEvent>) -> Result<(), CaptureError> {
self.events.lock().unwrap().extend_from_slice(&events);
Ok(())
}
Expand Down

0 comments on commit 23ef91e

Please sign in to comment.