Skip to content

Commit

Permalink
feat(decoding): Return concrete error types in chunked gelf
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgehermo9 committed Jul 28, 2024
1 parent 8c66ea2 commit 09fb7bc
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 101 deletions.
247 changes: 157 additions & 90 deletions lib/codecs/src/decoding/framing/chunked_gelf.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::BytesDecoder;
use crate::{BytesDecoder, StreamDecodingError};

use super::BoxedFramingError;
use super::{BoxedFramingError, FramingError};
use bytes::{Buf, Bytes, BytesMut};
use derivative::Derivative;
use snafu::{ensure, Snafu};
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -50,7 +52,6 @@ impl ChunkedGelfDecoderConfig {
pub struct ChunkedGelfDecoderOptions {
/// The timeout, in milliseconds, for a message to be fully received. If the timeout is reached, the
/// decoder drops all the received chunks of the incomplete message and starts over.
/// The default value is 5 seconds.
#[serde(
default = "default_timeout_millis",
skip_serializing_if = "vector_core::serde::is_default"
Expand All @@ -59,7 +60,6 @@ pub struct ChunkedGelfDecoderOptions {

/// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded.
/// The default value is 1000.
#[serde(
default = "default_pending_messages_limit",
skip_serializing_if = "vector_core::serde::is_default"
Expand Down Expand Up @@ -128,6 +128,49 @@ impl MessageState {
}
}

#[derive(Debug, Snafu)]
pub enum ChunkedGelfDecoderError {
#[snafu(display("Invalid chunk header with less than 10 bytes: {header:?}"))]
InvalidChunkHeader { header: Bytes },
#[snafu(display("Invalid total chunks value {total_chunks} for message with id {message_id} and sequence number {sequence_number}"))]
InvalidTotalChunks {
message_id: u64,
sequence_number: u8,
total_chunks: u8,
},
#[snafu(display("Sequence number {sequence_number} is greater than the total chunks value {total_chunks} for message with id {message_id}"))]
InvalidSequenceNumber {
message_id: u64,
sequence_number: u8,
total_chunks: u8,
},
#[snafu(display("Pending messages limit of {pending_messages_limit} reached while processing message with id {message_id} and sequence number {sequence_number}"))]
PendingMessagesLimitReached {
message_id: u64,
sequence_number: u8,
pending_messages_limit: usize,
},
#[snafu(display("Received message with id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value {original_total_chunks}, received total chunks value {received_total_chunks}"))]
TotalChunksMismatch {
message_id: u64,
sequence_number: u8,
original_total_chunks: u8,
received_total_chunks: u8,
},
}

impl StreamDecodingError for ChunkedGelfDecoderError {
fn can_continue(&self) -> bool {
true
}
}

impl FramingError for ChunkedGelfDecoderError {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}

/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP)
/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go).
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -155,7 +198,10 @@ impl ChunkedGelfDecoder {
}

/// Decode a GELF chunk
pub fn decode_chunk(&mut self, mut chunk: Bytes) -> Result<Option<Bytes>, BoxedFramingError> {
pub fn decode_chunk(
&mut self,
mut chunk: Bytes,
) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
// Encoding scheme:
//
// +------------+-----------------+--------------+----------------------+
Expand All @@ -170,53 +216,53 @@ impl ChunkedGelfDecoder {
// and return the complete payload.

// We need 10 bytes to read the message id, sequence number and total chunks
if chunk.remaining() < 10 {
let src_display = format!("{chunk:?}");
warn!(message = "Received malformed chunk headers (message ID, sequence number and total chunks) with less than 10 bytes. Ignoring it.",
src = src_display,
remaining = chunk.remaining(),
internal_log_rate_limit = true
);
return Ok(None);
}
// if chunk.remaining() < 10 {
// let src_display = format!("{chunk:?}");
// warn!(message = "Received malformed chunk headers (message ID, sequence number and total chunks) with less than 10 bytes. Ignoring it.",
// src = src_display,
// remaining = chunk.remaining(),
// internal_log_rate_limit = true
// );
// return Ok(None);
// }

ensure!(
chunk.remaining() >= 10,
InvalidChunkHeaderSnafu { header: chunk }
);

let message_id = chunk.get_u64();
let sequence_number = chunk.get_u8();
let total_chunks = chunk.get_u8();

if total_chunks == 0 || total_chunks > GELF_MAX_TOTAL_CHUNKS {
warn!(
message = "Received a chunk with an invalid total chunks value. Ignoring it.",
message_id = message_id,
sequence_number = sequence_number,
total_chunks = total_chunks,
internal_log_rate_limit = true
);
return Ok(None);
}
ensure!(
total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
InvalidTotalChunksSnafu {
message_id,
sequence_number,
total_chunks
}
);

if sequence_number >= total_chunks {
warn!(
message = "Received a chunk with a sequence number greater than total chunks. Ignoring it.",
message_id = message_id,
sequence_number = sequence_number,
total_chunks = total_chunks,
internal_log_rate_limit = true
);
return Ok(None);
}
ensure!(
sequence_number < total_chunks,
InvalidSequenceNumberSnafu {
message_id,
sequence_number,
total_chunks
}
);

let mut state_lock = self.state.lock().expect("poisoned lock");

if state_lock.len() >= self.pending_messages_limit {
warn!(
message = "Received a chunk but reached the pending messages limit. Ignoring it.",
message_id = message_id,
sequence_number = sequence_number,
pending_messages_limit = self.pending_messages_limit,
internal_log_rate_limit = true
);
return Ok(None);
}
ensure!(
state_lock.len() < self.pending_messages_limit,
PendingMessagesLimitReachedSnafu {
message_id,
sequence_number,
pending_messages_limit: self.pending_messages_limit
}
);

let message_state = state_lock.entry(message_id).or_insert_with(|| {
// We need to spawn a task that will clear the message state after a certain time
Expand All @@ -240,15 +286,15 @@ impl ChunkedGelfDecoder {
MessageState::new(total_chunks, timeout_handle)
});

if message_state.total_chunks != total_chunks {
warn!(message_id = "Received a chunk with a different total chunks than the original. Ignoring it.",
message_id = message_id,
original_total_chunks = message_state.total_chunks,
received_total_chunks = total_chunks,
internal_log_rate_limit = true
);
return Ok(None);
}
ensure!(
message_state.total_chunks == total_chunks,
TotalChunksMismatchSnafu {
message_id,
sequence_number,
original_total_chunks: message_state.total_chunks,
received_total_chunks: total_chunks
}
);

if message_state.is_chunk_present(sequence_number) {
info!(
Expand All @@ -273,7 +319,10 @@ impl ChunkedGelfDecoder {
/// Decode a GELF message that may be chunked or not. The source bytes are expected to be
/// datagram-based (or message-based), so it must not contain multiple GELF messages
/// delimited by '\0', such as it would be in a stream-based protocol.
pub fn decode_message(&mut self, mut src: Bytes) -> Result<Option<Bytes>, BoxedFramingError> {
pub fn decode_message(
&mut self,
mut src: Bytes,
) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
let magic = src.get(0..2);
if magic.is_some_and(|magic| magic == GELF_MAGIC) {
src.advance(2);
Expand All @@ -300,29 +349,28 @@ impl Decoder for ChunkedGelfDecoder {
return Ok(None);
}

// TODO: add a PR comment here stating that this will never call the decode_message since
// the bytes decoder will always return a Ok(None) in this method, but leaving this
// here for consistency. Would be better to add a unreachable/panic here if the inner decoder returns
// the Some variant?
self.bytes_decoder
Ok(self
.bytes_decoder
.decode(src)?
.and_then(|frame| self.decode_message(frame).transpose())
.transpose()
.transpose()?)
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if buf.is_empty() {
return Ok(None);
}

self.bytes_decoder
Ok(self
.bytes_decoder
.decode_eof(buf)?
.and_then(|frame| self.decode_message(frame).transpose())
.transpose()
.transpose()?)
}
}

#[cfg(test)]
mod tests {

use super::*;
use bytes::{BufMut, BytesMut};
use rstest::{fixture, rstest};
Expand Down Expand Up @@ -416,6 +464,13 @@ mod tests {
)
}

fn downcast_framing_error(error: &Box<dyn FramingError>) -> &ChunkedGelfDecoderError {
error
.as_any()
.downcast_ref::<ChunkedGelfDecoderError>()
.expect("Expected ChunkedGelfDecoderError to be downcasted")
}

#[rstest]
#[tokio::test]
async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
Expand Down Expand Up @@ -513,7 +568,7 @@ mod tests {
tokio::time::sleep(Duration::from_millis(DEFAULT_TIMEOUT_MILLIS + 1)).await;
assert!(decoder.state.lock().unwrap().is_empty());
assert!(logs_contain(
"Message was not fully received within the timeout window. Discarding it."
"Message was not fully received within the timeout window of 5000ms. Discarding it."
));

let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
Expand All @@ -537,17 +592,21 @@ mod tests {

#[tokio::test]
#[traced_test]
async fn decode_chunk_with_malformed_header() {
async fn decode_chunk_with_invalid_header() {
let mut src = BytesMut::new();
src.extend_from_slice(&GELF_MAGIC);
// Malformed chunk header with less than 10 bytes
let malformed_chunk = [0x12, 0x34];
src.extend_from_slice(&malformed_chunk);
// Invalid chunk header with less than 10 bytes
let invalid_chunk = [0x12, 0x34];
src.extend_from_slice(&invalid_chunk);
let mut decoder = ChunkedGelfDecoder::default();
let frame = decoder.decode_eof(&mut src);

let frame = decoder.decode_eof(&mut src).unwrap();
assert!(frame.is_none());
assert!(logs_contain("Received malformed chunk headers (message ID, sequence number and total chunks) with less than 10 bytes. Ignoring it."));
let error = frame.expect_err("Expected an error");
let downcasted_error = downcast_framing_error(&error);
matches!(
downcasted_error,
ChunkedGelfDecoderError::InvalidChunkHeader { .. }
);
}

#[tokio::test]
Expand All @@ -560,11 +619,13 @@ mod tests {
let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, payload);
let mut decoder = ChunkedGelfDecoder::default();

let frame = decoder.decode_eof(&mut chunk).unwrap();
assert!(frame.is_none());
assert!(logs_contain(
"Received a chunk with an invalid total chunks value. Ignoring it."
));
let frame = decoder.decode_eof(&mut chunk);
let error = frame.expect_err("Expected an error");
let downcasted_error = downcast_framing_error(&error);
matches!(
downcasted_error,
ChunkedGelfDecoderError::InvalidTotalChunks { .. }
);
}

#[tokio::test]
Expand All @@ -577,11 +638,13 @@ mod tests {
let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, payload);
let mut decoder = ChunkedGelfDecoder::default();

let frame = decoder.decode_eof(&mut chunk).unwrap();
assert!(frame.is_none());
assert!(logs_contain(
"Received a chunk with a sequence number greater than total chunks. Ignoring it."
));
let frame = decoder.decode_eof(&mut chunk);
let error = frame.expect_err("Expected an error");
let downcasted_error = downcast_framing_error(&error);
matches!(
downcasted_error,
ChunkedGelfDecoderError::InvalidSequenceNumber { .. }
);
}

#[rstest]
Expand All @@ -600,12 +663,14 @@ mod tests {
assert!(frame.is_none());
assert!(decoder.state.lock().unwrap().len() == 1);

let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
assert!(frame.is_none());
let frame = decoder.decode_eof(&mut three_chunks[0]);
let error = frame.expect_err("Expected an error");
let downcasted_error = downcast_framing_error(&error);
matches!(
downcasted_error,
ChunkedGelfDecoderError::PendingMessagesLimitReached { .. }
);
assert!(decoder.state.lock().unwrap().len() == 1);
assert!(logs_contain(
"Received a chunk but reached the pending messages limit. Ignoring it."
));
}

#[rstest]
Expand All @@ -624,11 +689,13 @@ mod tests {
let frame = decoder.decode_eof(&mut first_chunk).unwrap();
assert!(frame.is_none());

let frame = decoder.decode_eof(&mut second_chunk).unwrap();
assert!(frame.is_none());
assert!(logs_contain(
"Received a chunk with a different total chunks than the original. Ignoring it."
));
let frame = decoder.decode_eof(&mut second_chunk);
let error = frame.expect_err("Expected an error");
let downcasted_error = downcast_framing_error(&error);
matches!(
downcasted_error,
ChunkedGelfDecoderError::TotalChunksMismatch { .. }
);
}

#[rstest]
Expand Down
Loading

0 comments on commit 09fb7bc

Please sign in to comment.