From a2e14ce7f264d486a039fbc19bb926dda081a991 Mon Sep 17 00:00:00 2001 From: Noah Yoshida Date: Fri, 1 Nov 2024 16:50:25 -0700 Subject: [PATCH 1/4] optimize healthcheck --- router/src/health.rs | 107 ++++++++++++++++++++++++++++--------------- router/src/server.rs | 103 ++++------------------------------------- 2 files changed, 81 insertions(+), 129 deletions(-) diff --git a/router/src/health.rs b/router/src/health.rs index 1c6aaed2d..8d576b99e 100644 --- a/router/src/health.rs +++ b/router/src/health.rs @@ -43,51 +43,86 @@ impl Health { self.client.health().await.is_ok() } else { // Generation is unhealthy or have not sent any generation request yet + // Default to generation + let mut liveness_request = Request { + id: LIVENESS_ID, + inputs: "liveness".to_string(), + tokenized_inputs: None, + truncate: 10, + prefill_logprobs: false, + parameters: Some(NextTokenChooserParameters { + temperature: 1.0, + top_k: 0, + top_p: 1.0, + typical_p: 1.0, + do_sample: false, + seed: 0, + repetition_penalty: 1.0, + watermark: false, + adapter_id: "".to_string(), + schema: None, + return_k_alternatives: 0, + }), + stopping_parameters: Some(StoppingCriteriaParameters { + max_new_tokens: 1, + stop_sequences: vec![], + ignore_eos_token: false, + }), + adapter_index: 0, + // Block 0 is reserved for health checks + blocks: vec![0], + slots: (0..16).collect(), + cache_len: 0, + chunk_len: None, + }; + // Create different requestas based on the type of model this is + if self.shard_info().supports_embeddings { + liveness_request = EmbedRequest { + inputs: "San Francisco".to_string(), + parameters: EmbedParameters { + adapter_id: None, + adapter_source: None, + adapter_parameters: None, + api_token: None, + }, + } + }; + + if self.shard_info().supports_classification { + liveness_request = ClassifyRequest { + inputs: "San Francisco".to_string(), + }; + } // Dummy batch of 1 token and 1 generated token - let liveness_request = Request { - id: LIVENESS_ID, - inputs: "liveness".to_string(), - tokenized_inputs: None, - truncate: 10, - prefill_logprobs: false, - parameters: Some(NextTokenChooserParameters { - temperature: 1.0, - top_k: 0, - top_p: 1.0, - typical_p: 1.0, - do_sample: false, - seed: 0, - repetition_penalty: 1.0, - watermark: false, - adapter_id: "".to_string(), - schema: None, - return_k_alternatives: 0, - }), - stopping_parameters: Some(StoppingCriteriaParameters { - max_new_tokens: 1, - stop_sequences: vec![], - ignore_eos_token: false, - }), - adapter_index: 0, - // Block 0 is reserved for health checks - blocks: vec![0], - slots: (0..16).collect(), - cache_len: 0, - chunk_len: None, - }; let batch = Batch { id: BATCH_ID, requests: vec![liveness_request], size: 1, max_tokens: 2, - max_blocks: 1, }; + // Skips the queue - let value = self.client.prefill(batch, None).await.is_ok(); - // Update generation health - self.generation_health.store(value, Ordering::SeqCst); - value + if self.shard_info().supports_generation { + let value = self.client.prefill(batch, None).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value + } + + if self.shard_info().supports_embeddings { + let value = self.client.embed(batch).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value + } + + if self.shard_info().supports_classification { + let value = self.client.classify(batch).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value + } } } } diff --git a/router/src/server.rs b/router/src/server.rs index 3370114e9..7e5c9e32a 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -553,99 +553,16 @@ async fn health( infer: Extension, health: Extension, ) -> Result<(), (StatusCode, Json)> { - if health.shard_info().supports_classification { - let classify_request = ClassifyRequest { - inputs: "San Francisco".to_string(), - }; - match infer.classify(classify_request).await { - Ok(_) => {} - Err(error) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse { - error: error.to_string(), - error_type: error.error_type().to_string(), - }), - )); - } - } - } - if health.shard_info().supports_embeddings { - let embed_request = EmbedRequest { - inputs: "San Francisco".to_string(), - parameters: EmbedParameters { - adapter_id: None, - adapter_source: None, - adapter_parameters: None, - api_token: None, - }, - }; - match infer.embed(embed_request).await { - Ok(_) => {} - Err(error) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse { - error: error.to_string(), - error_type: error.error_type().to_string(), - }), - )); - } - } - } - if health.shard_info().supports_generation { - let generate_request = GenerateRequest { - inputs: "Who?".to_string(), - parameters: GenerateParameters { - adapter_id: None, - adapter_source: None, - adapter_parameters: None, - api_token: None, - best_of: None, - temperature: None, - top_k: None, - top_p: None, - typical_p: None, - do_sample: false, - seed: None, - repetition_penalty: None, - watermark: false, - return_full_text: None, - stop: vec![], - truncate: None, - details: false, - decoder_input_details: false, - return_k_alternatives: None, - apply_chat_template: false, - response_format: None, - max_new_tokens: Some(1), - ignore_eos_token: false, - }, - }; - match infer.generate(generate_request).await { - Ok(response) => { - if response.generated_text.text.len() == 0 { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse { - error: "Empty generation".to_string(), - error_type: "failed healthcheck".to_string(), - }), - )); - } - } - Err(error) => { - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse { - error: error.to_string(), - error_type: error.error_type().to_string(), - }), - )); - } - } - } - Ok(()) + match health.check().await { + true => Ok(()), + false => Err(( + StatusCode::SERVICE_UNAVAILABLE, + Json(ErrorResponse { + error: "unhealthy".to_string(), + error_type: "healthcheck".to_string(), + }), + )), + } } /// Generate tokens From a4183eb6d2ecba69229888c552f6771cbaa0a121 Mon Sep 17 00:00:00 2001 From: Noah Yoshida Date: Fri, 8 Nov 2024 10:47:36 -0800 Subject: [PATCH 2/4] fix --- router/src/health.rs | 9 +++++++-- router/src/server.rs | 3 +-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/router/src/health.rs b/router/src/health.rs index 8d576b99e..842d92d61 100644 --- a/router/src/health.rs +++ b/router/src/health.rs @@ -1,7 +1,10 @@ use lorax_client::{ Batch, NextTokenChooserParameters, Request, ShardInfo, ShardedClient, - StoppingCriteriaParameters, + StoppingCriteriaParameters, Cl }; +use crate::{ + ClassifyRequest, EmbedRequest +} use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -123,6 +126,8 @@ impl Health { self.generation_health.store(value, Ordering::SeqCst); return value } + // Return false - need to implement that shard type. + return false + } } - } } diff --git a/router/src/server.rs b/router/src/server.rs index 7e5c9e32a..111c46b8f 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -550,8 +550,7 @@ example = json ! ({"error": "unhealthy", "error_type": "healthcheck"})), )] /// Health check method async fn health( - infer: Extension, - health: Extension, + mut health: Extension, ) -> Result<(), (StatusCode, Json)> { match health.check().await { true => Ok(()), From bc27a0394b2c9c75fa409616c5bbb3e34b96e002 Mon Sep 17 00:00:00 2001 From: Noah Yoshida Date: Fri, 8 Nov 2024 10:59:41 -0800 Subject: [PATCH 3/4] fix build --- router/src/health.rs | 161 ++++++++++++++++++++++--------------------- router/src/server.rs | 2 +- 2 files changed, 82 insertions(+), 81 deletions(-) diff --git a/router/src/health.rs b/router/src/health.rs index 842d92d61..4aeada493 100644 --- a/router/src/health.rs +++ b/router/src/health.rs @@ -1,10 +1,8 @@ +use crate::{ClassifyRequest, EmbedParameters, EmbedRequest}; use lorax_client::{ Batch, NextTokenChooserParameters, Request, ShardInfo, ShardedClient, - StoppingCriteriaParameters, Cl + StoppingCriteriaParameters, }; -use crate::{ - ClassifyRequest, EmbedRequest -} use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -45,89 +43,92 @@ impl Health { // Generation is healthy, we only check that the shards are answering gRPC calls self.client.health().await.is_ok() } else { - // Generation is unhealthy or have not sent any generation request yet - // Default to generation - let mut liveness_request = Request { - id: LIVENESS_ID, - inputs: "liveness".to_string(), - tokenized_inputs: None, - truncate: 10, - prefill_logprobs: false, - parameters: Some(NextTokenChooserParameters { - temperature: 1.0, - top_k: 0, - top_p: 1.0, - typical_p: 1.0, - do_sample: false, - seed: 0, - repetition_penalty: 1.0, - watermark: false, - adapter_id: "".to_string(), - schema: None, - return_k_alternatives: 0, - }), - stopping_parameters: Some(StoppingCriteriaParameters { - max_new_tokens: 1, - stop_sequences: vec![], - ignore_eos_token: false, - }), - adapter_index: 0, - // Block 0 is reserved for health checks - blocks: vec![0], - slots: (0..16).collect(), - cache_len: 0, - chunk_len: None, - }; - // Create different requestas based on the type of model this is - if self.shard_info().supports_embeddings { - liveness_request = EmbedRequest { - inputs: "San Francisco".to_string(), - parameters: EmbedParameters { - adapter_id: None, - adapter_source: None, - adapter_parameters: None, - api_token: None, - }, - } - }; - - if self.shard_info().supports_classification { - liveness_request = ClassifyRequest { - inputs: "San Francisco".to_string(), - }; - } - - // Dummy batch of 1 token and 1 generated token - let batch = Batch { - id: BATCH_ID, - requests: vec![liveness_request], - size: 1, - max_tokens: 2, - }; - - // Skips the queue if self.shard_info().supports_generation { - let value = self.client.prefill(batch, None).await.is_ok(); - // Update generation health - self.generation_health.store(value, Ordering::SeqCst); - return value + let mut liveness_request = Request { + id: LIVENESS_ID, + inputs: "liveness".to_string(), + tokenized_inputs: None, + truncate: 10, + prefill_logprobs: false, + parameters: Some(NextTokenChooserParameters { + temperature: 1.0, + top_k: 0, + top_p: 1.0, + typical_p: 1.0, + do_sample: false, + seed: 0, + repetition_penalty: 1.0, + watermark: false, + adapter_id: "".to_string(), + schema: None, + return_k_alternatives: 0, + }), + stopping_parameters: Some(StoppingCriteriaParameters { + max_new_tokens: 1, + stop_sequences: vec![], + ignore_eos_token: false, + }), + adapter_index: 0, + // Block 0 is reserved for health checks + blocks: vec![0], + slots: (0..16).collect(), + cache_len: 0, + chunk_len: None, + }; + // Dummy batch of 1 token and 1 generated token + let batch = Batch { + id: BATCH_ID, + requests: vec![liveness_request], + size: 1, + max_tokens: 2, + }; + let value = self.client.prefill(batch, None).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value; } + // Create different requestas based on the type of model this is if self.shard_info().supports_embeddings { - let value = self.client.embed(batch).await.is_ok(); - // Update generation health - self.generation_health.store(value, Ordering::SeqCst); - return value - } + let liveness_request = EmbedRequest { + inputs: "San Francisco".to_string(), + parameters: EmbedParameters { + adapter_id: None, + adapter_source: None, + adapter_parameters: None, + api_token: None, + }, + }; + let batch = Batch { + id: BATCH_ID, + requests: vec![liveness_request], + size: 1, + max_tokens: 2, + }; + let value = self.client.embed(batch).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value; + }; if self.shard_info().supports_classification { - let value = self.client.classify(batch).await.is_ok(); - // Update generation health - self.generation_health.store(value, Ordering::SeqCst); - return value + let liveness_request = ClassifyRequest { + inputs: "San Francisco".to_string(), + }; + let batch = Batch { + id: BATCH_ID, + requests: vec![liveness_request], + size: 1, + max_tokens: 2, + }; + let value = self.client.classify(batch).await.is_ok(); + // Update generation health + self.generation_health.store(value, Ordering::SeqCst); + return value; } + // Return false - need to implement that shard type. - return false - } + return false; } + } } diff --git a/router/src/server.rs b/router/src/server.rs index 111c46b8f..8ef5b9b75 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -11,7 +11,7 @@ use crate::{ ChatCompletionResponseChoice, ChatCompletionStreamResponse, ChatCompletionStreamResponseChoice, ChatMessage, ClassifyRequest, CompatGenerateRequest, CompletionFinishReason, CompletionRequest, CompletionResponse, CompletionResponseChoice, CompletionResponseStreamChoice, - CompletionStreamResponse, Details, EmbedParameters, EmbedRequest, EmbedResponse, Entity, + CompletionStreamResponse, Details, EmbedRequest, EmbedResponse, Entity, ErrorResponse, FinishReason, FunctionDefinition, GenerateParameters, GenerateRequest, GenerateResponse, HubModelInfo, Infer, Info, JsonSchema, LogProbs, Message, OpenAiResponseFormat, PrefillToken, ResponseFormat, ResponseFormatType, SimpleToken, From 61597d7de5d882f9e9fb7b3e7f5e9c29e3b3384a Mon Sep 17 00:00:00 2001 From: Noah Yoshida Date: Fri, 8 Nov 2024 11:32:42 -0800 Subject: [PATCH 4/4] fix build --- router/src/health.rs | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/router/src/health.rs b/router/src/health.rs index 4aeada493..2ce6844b8 100644 --- a/router/src/health.rs +++ b/router/src/health.rs @@ -1,4 +1,3 @@ -use crate::{ClassifyRequest, EmbedParameters, EmbedRequest}; use lorax_client::{ Batch, NextTokenChooserParameters, Request, ShardInfo, ShardedClient, StoppingCriteriaParameters, @@ -39,6 +38,9 @@ impl Health { #[allow(dead_code)] pub(crate) async fn check(&mut self) -> bool { + // The server will put data into self.generation_health whenever we get something back from the model. + // We fail the health check if the if there were failures coming back from the model server. + // The "else" statement is only done before the router has recieved any traffic. if self.generation_health.load(Ordering::SeqCst) { // Generation is healthy, we only check that the shards are answering gRPC calls self.client.health().await.is_ok() @@ -90,14 +92,20 @@ impl Health { // Create different requestas based on the type of model this is if self.shard_info().supports_embeddings { - let liveness_request = EmbedRequest { + let liveness_request = Request { + id: LIVENESS_ID, + prefill_logprobs: false, inputs: "San Francisco".to_string(), - parameters: EmbedParameters { - adapter_id: None, - adapter_source: None, - adapter_parameters: None, - api_token: None, - }, + tokenized_inputs: None, // Tokenization happens on the model server instead + truncate: 0, + parameters: None, + stopping_parameters: None, + adapter_index: 0, + // Block 0 is reserved for health checks + blocks: vec![0], + slots: (0..16).collect(), + cache_len: 0, + chunk_len: None, }; let batch = Batch { id: BATCH_ID, @@ -112,8 +120,20 @@ impl Health { }; if self.shard_info().supports_classification { - let liveness_request = ClassifyRequest { + let liveness_request = Request { + id: LIVENESS_ID, + prefill_logprobs: false, inputs: "San Francisco".to_string(), + tokenized_inputs: None, // Tokenization happens on the model server instead + truncate: 0, + parameters: None, + stopping_parameters: None, + adapter_index: 0, + // Block 0 is reserved for health checks + blocks: vec![0], + slots: (0..16).collect(), + cache_len: 0, + chunk_len: None, }; let batch = Batch { id: BATCH_ID,