Skip to content

Commit

Permalink
log improvements and some code refactor (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
adilhafeez authored Jan 31, 2025
1 parent e79d16e commit 39266b5
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 133 deletions.
3 changes: 0 additions & 3 deletions crates/common/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::rc::Rc;

use crate::{configuration, llm_providers::LlmProviders};
use configuration::LlmProvider;
use log::debug;
use rand::{seq::IteratorRandom, thread_rng};

#[derive(Debug)]
Expand Down Expand Up @@ -35,11 +34,9 @@ pub fn get_llm_provider(
}

if llm_providers.default().is_some() {
debug!("no llm provider found for hint, using default llm provider");
return llm_providers.default().unwrap();
}

debug!("no default llm found, using random llm provider");
let mut rng = thread_rng();
llm_providers
.iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/tokenizer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use log::debug;
use log::trace;

#[derive(thiserror::Error, Debug, PartialEq, Eq)]
#[allow(dead_code)]
Expand All @@ -9,7 +9,7 @@ pub enum Error {

#[allow(dead_code)]
pub fn token_count(model_name: &str, text: &str) -> Result<usize, Error> {
debug!("getting token count model={}", model_name);
trace!("getting token count model={}", model_name);
// Consideration: is it more expensive to instantiate the BPE object every time, or to contend the singleton?
let bpe = tiktoken_rs::get_bpe_from_model(model_name).map_err(|_| Error::UnknownModel {
model_name: model_name.to_string(),
Expand Down
12 changes: 5 additions & 7 deletions crates/llm_gateway/src/filter_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common::llm_providers::LlmProviders;
use common::ratelimit;
use common::stats::Gauge;
use common::tracing::TraceData;
use log::debug;
use log::trace;
use log::warn;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
Expand Down Expand Up @@ -79,7 +79,7 @@ impl RootContext for FilterContext {
}

fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
debug!(
trace!(
"||| create_http_context called with context_id: {:?} |||",
context_id
);
Expand Down Expand Up @@ -108,10 +108,8 @@ impl RootContext for FilterContext {
fn on_tick(&mut self) {
let _ = self.traces_queue.try_lock().map(|mut traces_queue| {
while let Some(trace) = traces_queue.pop_front() {
debug!("trace received: {:?}", trace);

let trace_str = serde_json::to_string(&trace).unwrap();
debug!("trace: {}", trace_str);
trace!("trace details: {}", trace_str);
let call_args = CallArgs::new(
OTEL_COLLECTOR_HTTP,
OTEL_POST_PATH,
Expand Down Expand Up @@ -144,7 +142,7 @@ impl Context for FilterContext {
_body_size: usize,
_num_trailers: usize,
) {
debug!(
trace!(
"||| on_http_call_response called with token_id: {:?} |||",
token_id
);
Expand All @@ -156,7 +154,7 @@ impl Context for FilterContext {
.expect("invalid token_id");

if let Some(status) = self.get_http_call_response_header(":status") {
debug!("trace response status: {:?}", status);
trace!("trace response status: {:?}", status);
};
}
}
63 changes: 36 additions & 27 deletions crates/llm_gateway/src/stream_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use common::consts::{
};
use common::errors::ServerError;
use common::llm_providers::LlmProviders;
use common::pii::obfuscate_auth_header;
use common::ratelimit::Header;
use common::stats::{IncrementingMetric, RecordingMetric};
use common::tracing::{Event, Span, TraceData, Traceparent};
Expand Down Expand Up @@ -82,12 +81,16 @@ impl StreamContext {
.get_http_request_header(ARCH_PROVIDER_HINT_HEADER)
.map(|llm_name| llm_name.into());

debug!("llm provider hint: {:?}", provider_hint);
self.llm_provider = Some(routing::get_llm_provider(
&self.llm_providers,
provider_hint,
));
debug!("selected llm: {}", self.llm_provider.as_ref().unwrap().name);

debug!(
"request received: llm provider hint: {:?}, selected llm: {}",
self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER),
self.llm_provider.as_ref().unwrap().name
);
}

fn modify_auth_headers(&mut self) -> Result<(), ServerError> {
Expand Down Expand Up @@ -150,7 +153,7 @@ impl StreamContext {
self.metrics
.input_sequence_length
.record(token_count as u64);
log::debug!("Recorded input token count: {}", token_count);
trace!("Recorded input token count: {}", token_count);

// Check if rate limiting needs to be applied.
if let Some(selector) = self.ratelimit_selector.take() {
Expand All @@ -161,7 +164,7 @@ impl StreamContext {
NonZero::new(token_count as u32).unwrap(),
)?;
} else {
log::debug!("No rate limit applied for model: {}", model);
trace!("No rate limit applied for model: {}", model);
}

Ok(())
Expand Down Expand Up @@ -197,12 +200,6 @@ impl HttpContext for StreamContext {
self.is_chat_completions_request =
self.get_http_request_header(":path").unwrap_or_default() == CHAT_COMPLETIONS_PATH;

debug!(
"on_http_request_headers S[{}] req_headers={:?}",
self.context_id,
obfuscate_auth_header(&mut self.get_http_request_headers())
);

self.request_id = self.get_http_request_header(REQUEST_ID_HEADER);
self.traceparent = self.get_http_request_header(TRACE_PARENT_HEADER);

Expand Down Expand Up @@ -310,9 +307,10 @@ impl HttpContext for StreamContext {
}

fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
debug!(
trace!(
"on_http_response_headers [S={}] end_stream={}",
self.context_id, _end_of_stream
self.context_id,
_end_of_stream
);

self.set_property(
Expand All @@ -324,9 +322,11 @@ impl HttpContext for StreamContext {
}

fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
debug!(
trace!(
"on_http_response_body [S={}] bytes={} end_stream={}",
self.context_id, body_size, end_of_stream
self.context_id,
body_size,
end_of_stream
);

if !self.is_chat_completions_request {
Expand All @@ -342,19 +342,22 @@ impl HttpContext for StreamContext {
Ok(duration) => {
// Convert the duration to milliseconds
let duration_ms = duration.as_millis();
debug!("Total latency: {} milliseconds", duration_ms);
debug!("request latency: {}ms", duration_ms);
// Record the latency to the latency histogram
self.metrics.request_latency.record(duration_ms as u64);

if self.response_tokens > 0 {
// Compute the time per output token
let tpot = duration_ms as u64 / self.response_tokens as u64;

debug!("Time per output token: {} milliseconds", tpot);
// Record the time per output token
self.metrics.time_per_output_token.record(tpot);

debug!("Tokens per second: {}", 1000 / tpot);
trace!(
"time per token: {}ms, tokens per second: {}",
tpot,
1000 / tpot
);
// Record the tokens per second
self.metrics.tokens_per_second.record(1000 / tpot);
}
Expand Down Expand Up @@ -414,9 +417,10 @@ impl HttpContext for StreamContext {
let body = if self.streaming_response {
let chunk_start = 0;
let chunk_size = body_size;
debug!(
trace!(
"streaming response reading, {}..{}",
chunk_start, chunk_size
chunk_start,
chunk_size
);
let streaming_chunk = match self.get_http_response_body(0, chunk_size) {
Some(chunk) => chunk,
Expand All @@ -438,7 +442,7 @@ impl HttpContext for StreamContext {
}
streaming_chunk
} else {
debug!("non streaming response bytes read: 0:{}", body_size);
trace!("non streaming response bytes read: 0:{}", body_size);
match self.get_http_response_body(0, body_size) {
Some(body) => body,
None => {
Expand Down Expand Up @@ -510,7 +514,7 @@ impl HttpContext for StreamContext {
match current_time.duration_since(self.start_time) {
Ok(duration) => {
let duration_ms = duration.as_millis();
debug!("Time to First Token (TTFT): {} milliseconds", duration_ms);
debug!("time to first token: {}ms", duration_ms);
self.ttft_duration = Some(duration);
self.metrics.time_to_first_token.record(duration_ms as u64);
}
Expand All @@ -520,12 +524,15 @@ impl HttpContext for StreamContext {
}
}
} else {
debug!("non streaming response");
trace!("non streaming response");
let chat_completions_response: ChatCompletionsResponse =
match serde_json::from_str(body_utf8.as_str()) {
Ok(de) => de,
Err(_e) => {
debug!("invalid response: {}", body_utf8);
Err(err) => {
debug!(
"non chat-completion compliant response received err: {}, body: {}",
err, body_utf8
);
return Action::Continue;
}
};
Expand All @@ -539,9 +546,11 @@ impl HttpContext for StreamContext {
}
}

debug!(
trace!(
"recv [S={}] total_tokens={} end_stream={}",
self.context_id, self.response_tokens, end_of_stream
self.context_id,
self.response_tokens,
end_of_stream
);

Action::Continue
Expand Down
36 changes: 19 additions & 17 deletions crates/llm_gateway/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) {
Some(MapType::HttpRequestHeaders),
Some("x-arch-llm-provider-hint"),
)
.returning(Some("default"))
.expect_log(
Some(LogLevel::Debug),
Some("llm provider hint: Some(Default)"),
)
.expect_log(Some(LogLevel::Debug), Some("selected llm: open-ai-gpt-4"))
.returning(None)
.expect_log(Some(LogLevel::Debug), Some("request received: llm provider hint: Some(\"default\"), selected llm: open-ai-gpt-4"))
.expect_add_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-llm-provider"),
Expand All @@ -38,7 +34,11 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) {
Some("Authorization"),
Some("Bearer secret_key"),
)
.expect_remove_header_map_value(Some(MapType::HttpRequestHeaders), Some("content-length"))
.expect_get_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-llm-provider-hint"),
)
.returning(Some("default"))
.expect_get_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-ratelimit-selector"),
Expand All @@ -50,7 +50,6 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) {
.returning(None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":path"))
.returning(Some("/v1/chat/completions"))
.expect_log(Some(LogLevel::Debug), None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("x-request-id"))
.returning(None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("traceparent"))
Expand All @@ -62,7 +61,7 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) {
fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.execute_and_expect(ReturnType::None)
.unwrap();

Expand Down Expand Up @@ -187,7 +186,10 @@ fn llm_gateway_successful_request_to_open_ai_chat_completions() {

module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(
Some(LogLevel::Trace),
Some("||| create_http_context called with context_id: 2 |||"),
)
.execute_and_expect(ReturnType::None)
.unwrap();

Expand Down Expand Up @@ -218,9 +220,9 @@ fn llm_gateway_successful_request_to_open_ai_chat_completions() {
.expect_get_buffer_bytes(Some(BufferType::HttpRequestBody))
.returning(Some(chat_completions_request_body))
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_metric_record("input_sequence_length", 21)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_set_buffer_bytes(Some(BufferType::HttpRequestBody), None)
Expand Down Expand Up @@ -251,7 +253,7 @@ fn llm_gateway_bad_request_to_open_ai_chat_completions() {

module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.execute_and_expect(ReturnType::None)
.unwrap();

Expand Down Expand Up @@ -339,9 +341,9 @@ fn llm_gateway_request_ratelimited() {
.returning(Some(chat_completions_request_body))
// The actual call is not important in this test, we just need to grab the token_id
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_metric_record("input_sequence_length", 107)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
Expand Down Expand Up @@ -405,9 +407,9 @@ fn llm_gateway_request_not_ratelimited() {
.returning(Some(chat_completions_request_body))
// The actual call is not important in this test, we just need to grab the token_id
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_metric_record("input_sequence_length", 29)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Trace), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_set_buffer_bytes(Some(BufferType::HttpRequestBody), None)
Expand Down
Loading

0 comments on commit 39266b5

Please sign in to comment.