From 57b92cf22cbab8777a56a0499363e6cfeabd795e Mon Sep 17 00:00:00 2001 From: Yorick van Pelt Date: Mon, 15 Apr 2024 19:16:48 +0200 Subject: [PATCH] update triton_templates --- triton_templates/ensemble/config.pbtxt | 6 +- triton_templates/postprocessing/1/model.py | 110 ++--- triton_templates/postprocessing/config.pbtxt | 11 +- triton_templates/preprocessing/1/model.py | 229 ++++----- triton_templates/preprocessing/config.pbtxt | 9 +- triton_templates/tensorrt_llm/config.pbtxt | 60 +++ .../tensorrt_llm_bls/1/lib/decode.py | 332 ++++++++++++++ .../tensorrt_llm_bls/1/lib/triton_decoder.py | 433 ++++++++++++++++++ triton_templates/tensorrt_llm_bls/1/model.py | 332 ++------------ .../tensorrt_llm_bls/config.pbtxt | 26 ++ 10 files changed, 1049 insertions(+), 499 deletions(-) create mode 100644 triton_templates/tensorrt_llm_bls/1/lib/decode.py create mode 100644 triton_templates/tensorrt_llm_bls/1/lib/triton_decoder.py diff --git a/triton_templates/ensemble/config.pbtxt b/triton_templates/ensemble/config.pbtxt index 0e2627b..bb521d3 100644 --- a/triton_templates/ensemble/config.pbtxt +++ b/triton_templates/ensemble/config.pbtxt @@ -173,8 +173,8 @@ input [ ] output [ { - name: "output_ids" - data_type: TYPE_INT32 + name: "text_output" + data_type: TYPE_STRING dims: [ -1 ] }, { @@ -421,7 +421,7 @@ ensemble_scheduling { } output_map { key: "OUTPUT" - value: "output_ids" + value: "text_output" } output_map { key: "OUT_OUTPUT_LOG_PROBS" diff --git a/triton_templates/postprocessing/1/model.py b/triton_templates/postprocessing/1/model.py index 12c0639..02aafad 100644 --- a/triton_templates/postprocessing/1/model.py +++ b/triton_templates/postprocessing/1/model.py @@ -28,9 +28,7 @@ import numpy as np import triton_python_backend_utils as pb_utils -from transformers import AutoTokenizer, LlamaTokenizerFast, T5Tokenizer - -import time +from transformers import AutoTokenizer class TritonPythonModel: @@ -53,32 +51,20 @@ def initialize(self, args): * model_version: Model version * model_name: Model name """ - - # Parse model configs model_config = json.loads(args['model_config']) tokenizer_dir = model_config['parameters']['tokenizer_dir'][ 'string_value'] - tokenizer_type = model_config['parameters']['tokenizer_type'][ - 'string_value'] self.skip_special_tokens = model_config['parameters'].get( 'skip_special_tokens', {'string_value': "true"})['string_value'].lower() in [ 'true', '1', 't', 'y', 'yes' ] - if tokenizer_type == 't5': - self.tokenizer = T5Tokenizer(vocab_file=tokenizer_dir, - padding_side='left') - elif tokenizer_type == 'auto': - self.tokenizer = AutoTokenizer.from_pretrained( - tokenizer_dir, padding_side='left', trust_remote_code=True) - elif tokenizer_type == 'llama': - self.tokenizer = LlamaTokenizerFast.from_pretrained( - tokenizer_dir, legacy=False, padding_side='left') - else: - raise AttributeError( - f'Unexpected tokenizer type: {tokenizer_type}') + self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir, + legacy=False, + padding_side='left', + trust_remote_code=True) self.tokenizer.pad_token = self.tokenizer.eos_token # Parse model output configs @@ -88,7 +74,6 @@ def initialize(self, args): # Convert Triton types to numpy types self.output_dtype = pb_utils.triton_string_to_numpy( output_config['data_type']) - def execute(self, requests): """`execute` must be implemented in every Python model. `execute` @@ -109,6 +94,7 @@ def execute(self, requests): A list of pb_utils.InferenceResponse. The length of this list must be the same as `requests` """ + responses = [] # Every Python backend must iterate over everyone of the requests @@ -124,19 +110,19 @@ def execute(self, requests): # Get cum log probs cum_log_probs = pb_utils.get_input_tensor_by_name( - request, 'CUM_LOG_PROBS').as_numpy() + request, 'CUM_LOG_PROBS') # Get sequence length output_log_probs = pb_utils.get_input_tensor_by_name( - request, 'OUTPUT_LOG_PROBS').as_numpy() + request, 'OUTPUT_LOG_PROBS') # Get context logits context_logits = pb_utils.get_input_tensor_by_name( - request, 'CONTEXT_LOGITS').as_numpy() + request, 'CONTEXT_LOGITS') # Get generation logits generation_logits = pb_utils.get_input_tensor_by_name( - request, 'GENERATION_LOGITS').as_numpy() + request, 'GENERATION_LOGITS') # Reshape Input # tokens_batch = tokens_batch.reshape([-1, tokens_batch.shape[0]]) @@ -147,25 +133,51 @@ def execute(self, requests): # Create output tensors. You need pb_utils.Tensor # objects to create pb_utils.InferenceResponse. - # output_tensor = pb_utils.Tensor( - # 'OUTPUT', - # np.array(outputs).astype(self.output_dtype)) - output_tensor = pb_utils.Tensor( 'OUTPUT', - tokens_batch) - - out_cum_log_probs = pb_utils.Tensor('OUT_CUM_LOG_PROBS', - cum_log_probs) - - out_output_log_probs = pb_utils.Tensor('OUT_OUTPUT_LOG_PROBS', - output_log_probs) - - out_context_logits = pb_utils.Tensor('OUT_CONTEXT_LOGITS', - context_logits) - - out_generation_logits = pb_utils.Tensor('OUT_GENERATION_LOGITS', - generation_logits) + np.array(outputs).astype(self.output_dtype)) + + outputs = [] + outputs.append(output_tensor) + + if cum_log_probs: + out_cum_log_probs = pb_utils.Tensor('OUT_CUM_LOG_PROBS', + cum_log_probs.as_numpy()) + outputs.append(out_cum_log_probs) + else: + out_cum_log_probs = pb_utils.Tensor( + 'OUT_CUM_LOG_PROBS', np.array([[0.0]], dtype=np.float32)) + outputs.append(out_cum_log_probs) + + if output_log_probs: + out_output_log_probs = pb_utils.Tensor( + 'OUT_OUTPUT_LOG_PROBS', output_log_probs.as_numpy()) + outputs.append(out_output_log_probs) + else: + out_output_log_probs = pb_utils.Tensor( + 'OUT_OUTPUT_LOG_PROBS', + np.array([[[0.0]]], dtype=np.float32)) + outputs.append(out_output_log_probs) + + if context_logits: + out_context_logits = pb_utils.Tensor('OUT_CONTEXT_LOGITS', + context_logits.as_numpy()) + outputs.append(out_context_logits) + else: + out_context_logits = pb_utils.Tensor( + 'OUT_CONTEXT_LOGITS', np.array([[[0.0]]], + dtype=np.float32)) + outputs.append(out_context_logits) + + if generation_logits: + out_generation_logits = pb_utils.Tensor( + 'OUT_GENERATION_LOGITS', generation_logits.as_numpy()) + outputs.append(out_generation_logits) + else: + out_generation_logits = pb_utils.Tensor( + 'OUT_GENERATION_LOGITS', + np.array([[[[0.0]]]], dtype=np.float32)) + outputs.append(out_generation_logits) # Create InferenceResponse. You can set an error here in case # there was a problem with handling this inference request. @@ -174,15 +186,12 @@ def execute(self, requests): # # pb_utils.InferenceResponse( # output_tensors=..., TritonError("An error occurred")) - inference_response = pb_utils.InferenceResponse(output_tensors=[ - output_tensor, out_cum_log_probs, out_output_log_probs, - out_context_logits, out_generation_logits - ]) + inference_response = pb_utils.InferenceResponse( + output_tensors=outputs) responses.append(inference_response) # You should return a list of pb_utils.InferenceResponse. Length # of this list must match the length of `requests` list. - return responses def finalize(self): @@ -193,17 +202,12 @@ def finalize(self): print('Cleaning up...') def _postprocessing(self, tokens_batch, sequence_lengths): - start = time.time() outputs = [] for batch_idx, beam_tokens in enumerate(tokens_batch): for beam_idx, tokens in enumerate(beam_tokens): - inner_loop_time = time.time() seq_len = sequence_lengths[batch_idx][beam_idx] - tokens_to_decode = tokens[:seq_len] - tokenizer_start_time = time.time() output = self.tokenizer.decode( - tokens_to_decode, + tokens[:seq_len], skip_special_tokens=self.skip_special_tokens) - tokenizer_output_time = time.time() outputs.append(output.encode('utf8')) - end_inner_loop = time.time() + return outputs diff --git a/triton_templates/postprocessing/config.pbtxt b/triton_templates/postprocessing/config.pbtxt index 5b5676e..60d0290 100644 --- a/triton_templates/postprocessing/config.pbtxt +++ b/triton_templates/postprocessing/config.pbtxt @@ -42,11 +42,13 @@ input [ name: "CUM_LOG_PROBS" data_type: TYPE_FP32 dims: [ -1 ] + optional: true }, { name: "OUTPUT_LOG_PROBS" data_type: TYPE_FP32 dims: [ -1, -1 ] + optional: true }, { name: "CONTEXT_LOGITS" @@ -64,7 +66,7 @@ input [ output [ { name: "OUTPUT" - data_type: TYPE_INT32 + data_type: TYPE_STRING dims: [ -1 ] }, { @@ -96,13 +98,6 @@ parameters { } } -parameters { - key: "tokenizer_type" - value: { - string_value: "${tokenizer_type}" - } -} - parameters { key: "skip_special_tokens" value: { diff --git a/triton_templates/preprocessing/1/model.py b/triton_templates/preprocessing/1/model.py index be8cfa8..62ab243 100644 --- a/triton_templates/preprocessing/1/model.py +++ b/triton_templates/preprocessing/1/model.py @@ -29,7 +29,7 @@ import numpy as np import triton_python_backend_utils as pb_utils -from transformers import AutoTokenizer, LlamaTokenizer, T5Tokenizer +from transformers import AutoTokenizer, T5Tokenizer class TritonPythonModel: @@ -53,42 +53,32 @@ def initialize(self, args): * model_name: Model name """ # Parse model configs - model_config = json.loads(args["model_config"]) - tokenizer_dir = model_config["parameters"]["tokenizer_dir"]["string_value"] - tokenizer_type = model_config["parameters"]["tokenizer_type"]["string_value"] - self.add_special_tokens = model_config["parameters"].get( - "add_special_tokens", {"string_value": "false"} - )["string_value"].lower() in ["true", "1", "t", "y", "yes"] - - if tokenizer_type == "t5": - self.tokenizer = T5Tokenizer(vocab_file=tokenizer_dir, padding_side="left") - elif tokenizer_type == "auto": - self.tokenizer = AutoTokenizer.from_pretrained( - tokenizer_dir, padding_side="left", trust_remote_code=True - ) - elif tokenizer_type == "llama": - self.tokenizer = LlamaTokenizer.from_pretrained( - tokenizer_dir, legacy=False, padding_side="left" - ) - else: - raise AttributeError(f"Unexpected tokenizer type: {tokenizer_type}") + model_config = json.loads(args['model_config']) + tokenizer_dir = model_config['parameters']['tokenizer_dir'][ + 'string_value'] + self.add_special_tokens = model_config['parameters'].get( + 'add_special_tokens', + {'string_value': "false"})['string_value'].lower() in [ + 'true', '1', 't', 'y', 'yes' + ] + + self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir, + legacy=False, + padding_side='left', + trust_remote_code=True) + if isinstance(self.tokenizer, T5Tokenizer): + self.tokenizer_bos_id = self.tokenizer.sp_model.bos_id() self.tokenizer.pad_token = self.tokenizer.eos_token self.tokenizer_end_id = self.tokenizer.encode( - self.tokenizer.eos_token, add_special_tokens=False - )[0] + self.tokenizer.eos_token, add_special_tokens=False)[0] self.tokenizer_pad_id = self.tokenizer.encode( - self.tokenizer.pad_token, add_special_tokens=False - )[0] + self.tokenizer.pad_token, add_special_tokens=False)[0] # Parse model output configs and convert Triton types to numpy types output_names = [ - "INPUT_ID", - "REQUEST_INPUT_LEN", - "BAD_WORDS_IDS", - "STOP_WORDS_IDS", - "OUT_END_ID", - "OUT_PAD_ID", + "INPUT_ID", "REQUEST_INPUT_LEN", "BAD_WORDS_IDS", "STOP_WORDS_IDS", + "OUT_END_ID", "OUT_PAD_ID" ] input_names = ["EMBEDDING_BIAS_WORDS", "EMBEDDING_BIAS_WEIGHTS"] for input_name in input_names: @@ -96,22 +86,16 @@ def initialize(self, args): self, input_name.lower() + "_dtype", pb_utils.triton_string_to_numpy( - pb_utils.get_input_config_by_name(model_config, input_name)[ - "data_type" - ] - ), - ) + pb_utils.get_input_config_by_name( + model_config, input_name)['data_type'])) for output_name in output_names: setattr( self, output_name.lower() + "_dtype", pb_utils.triton_string_to_numpy( - pb_utils.get_output_config_by_name(model_config, output_name)[ - "data_type" - ] - ), - ) + pb_utils.get_output_config_by_name( + model_config, output_name)['data_type'])) def execute(self, requests): """`execute` must be implemented in every Python model. `execute` @@ -140,52 +124,45 @@ def execute(self, requests): logger = pb_utils.Logger for idx, request in enumerate(requests): # Get input tensors - query = pb_utils.get_input_tensor_by_name(request, "QUERY").as_numpy() + query = pb_utils.get_input_tensor_by_name(request, + 'QUERY').as_numpy() batch_dim = query.shape[0] if batch_dim != 1: - err_str = ( - "Inflight batching backend expects requests with batch size of 1." - ) + err_str = "Inflight batching backend expects requests with batch size of 1." logger.log_error(err_str) responses.append( pb_utils.InferenceResponse( - output_tensors=[], error=pb_utils.TritonError(err_str) - ) - ) + output_tensors=[], + error=pb_utils.TritonError(err_str))) continue request_output_len = pb_utils.get_input_tensor_by_name( - request, "REQUEST_OUTPUT_LEN" - ).as_numpy() + request, 'REQUEST_OUTPUT_LEN').as_numpy() bad_words_dict = pb_utils.get_input_tensor_by_name( - request, "BAD_WORDS_DICT" - ) + request, 'BAD_WORDS_DICT') if bad_words_dict is not None: bad_words_dict = bad_words_dict.as_numpy() stop_words_dict = pb_utils.get_input_tensor_by_name( - request, "STOP_WORDS_DICT" - ) + request, 'STOP_WORDS_DICT') if stop_words_dict is not None: stop_words_dict = stop_words_dict.as_numpy() embedding_bias_words = pb_utils.get_input_tensor_by_name( - request, "EMBEDDING_BIAS_WORDS" - ) + request, 'EMBEDDING_BIAS_WORDS') if embedding_bias_words is not None: embedding_bias_words = embedding_bias_words.as_numpy() embedding_bias_weights = pb_utils.get_input_tensor_by_name( - request, "EMBEDDING_BIAS_WEIGHTS" - ) + request, 'EMBEDDING_BIAS_WEIGHTS') if embedding_bias_weights is not None: embedding_bias_weights = embedding_bias_weights.as_numpy() # Take the end_id from the input tensors # If not specified, use tokenizer to get end_id - end_id = pb_utils.get_input_tensor_by_name(request, "END_ID") + end_id = pb_utils.get_input_tensor_by_name(request, 'END_ID') if end_id is not None: end_id = end_id.as_numpy() else: @@ -193,7 +170,7 @@ def execute(self, requests): # Take the pad_id from the input tensors # If not specified, use tokenizer to get pad_id - pad_id = pb_utils.get_input_tensor_by_name(request, "PAD_ID") + pad_id = pb_utils.get_input_tensor_by_name(request, 'PAD_ID') if pad_id is not None: pad_id = pad_id.as_numpy() else: @@ -205,45 +182,33 @@ def execute(self, requests): stop_words = self._to_word_list_format(stop_words_dict) embedding_bias = self._get_embedding_bias( - embedding_bias_words, - embedding_bias_weights, - self.embedding_bias_weights_dtype, - ) + embedding_bias_words, embedding_bias_weights, + self.embedding_bias_weights_dtype) # Create output tensors. You need pb_utils.Tensor # objects to create pb_utils.InferenceResponse. input_id_tensor = pb_utils.Tensor( - "INPUT_ID", input_id.astype(self.input_id_dtype) - ) + 'INPUT_ID', input_id.astype(self.input_id_dtype)) request_input_len_tensor = pb_utils.Tensor( - "REQUEST_INPUT_LEN", - request_input_len.astype(self.request_input_len_dtype), - ) + 'REQUEST_INPUT_LEN', + request_input_len.astype(self.request_input_len_dtype)) request_output_len_tensor = pb_utils.Tensor( - "REQUEST_OUTPUT_LEN", request_output_len - ) - bad_words_ids_tensor = pb_utils.Tensor("BAD_WORDS_IDS", bad_words) - stop_words_ids_tensor = pb_utils.Tensor("STOP_WORDS_IDS", stop_words) - embedding_bias_tensor = pb_utils.Tensor("EMBEDDING_BIAS", embedding_bias) - end_id_tensor = pb_utils.Tensor( - "OUT_END_ID", np.array(end_id, dtype=np.int32) - ) - pad_id_tensor = pb_utils.Tensor( - "OUT_PAD_ID", np.array(pad_id, dtype=np.int32) - ) - - inference_response = pb_utils.InferenceResponse( - output_tensors=[ - input_id_tensor, - bad_words_ids_tensor, - stop_words_ids_tensor, - request_input_len_tensor, - request_output_len_tensor, - embedding_bias_tensor, - end_id_tensor, - pad_id_tensor, - ] - ) + 'REQUEST_OUTPUT_LEN', request_output_len) + bad_words_ids_tensor = pb_utils.Tensor('BAD_WORDS_IDS', bad_words) + stop_words_ids_tensor = pb_utils.Tensor('STOP_WORDS_IDS', + stop_words) + embedding_bias_tensor = pb_utils.Tensor('EMBEDDING_BIAS', + embedding_bias) + end_id_tensor = pb_utils.Tensor('OUT_END_ID', + np.array(end_id, dtype=np.int32)) + pad_id_tensor = pb_utils.Tensor('OUT_PAD_ID', + np.array(pad_id, dtype=np.int32)) + + inference_response = pb_utils.InferenceResponse(output_tensors=[ + input_id_tensor, bad_words_ids_tensor, stop_words_ids_tensor, + request_input_len_tensor, request_output_len_tensor, + embedding_bias_tensor, end_id_tensor, pad_id_tensor + ]) responses.append(inference_response) # You should return a list of pb_utils.InferenceResponse. Length @@ -255,45 +220,46 @@ def finalize(self): Implementing `finalize` function is optional. This function allows the model to perform any necessary clean ups before exit. """ - print("Cleaning up...") + print('Cleaning up...') def _create_request(self, query): """ - query : batch string (2D numpy array) + query : batch string (2D numpy array) """ - start_ids = [ - np.array( - self.tokenizer.encode( - s[0].decode(), add_special_tokens=self.add_special_tokens - ) - ).astype(int) - for s in query - ] + if isinstance(self.tokenizer, T5Tokenizer): + start_ids = [ + np.array([self.tokenizer_bos_id] + self.tokenizer.encode( + s[0].decode(), add_special_tokens=self.add_special_tokens) + ).astype(int) for s in query + ] + else: + start_ids = [ + np.array( + self.tokenizer.encode( + s[0].decode(), + add_special_tokens=self.add_special_tokens)).astype( + int) for s in query + ] start_lengths = np.array([[len(ids)] for ids in start_ids]).astype(int) max_len = 0 for seq in start_ids: max_len = max(max_len, seq.shape[0]) - start_ids = np.stack( - [ - np.pad( - seq, - (0, max_len - seq.shape[0]), - "constant", - constant_values=(0, self.tokenizer_pad_id), - ) - for seq in start_ids - ] - ) + start_ids = np.stack([ + np.pad(seq, (0, max_len - seq.shape[0]), + 'constant', + constant_values=(0, self.tokenizer_pad_id)) + for seq in start_ids + ]) return start_ids, start_lengths def _to_word_list_format(self, word_lists: List[List[str | bytes]]): - """ + ''' word_lists format: len(word_lists) == batch_size word_lists[i] means the words associated to batch item i. A "word" may actually be any string. Like "lorem" or "lorem ipsum". - """ + ''' assert self.tokenizer != None, "need to set tokenizer" if word_lists is None: @@ -302,11 +268,6 @@ def _to_word_list_format(self, word_lists: List[List[str | bytes]]): flat_ids = [] offsets = [] - arbitrary_start_sequence_token = "!" - arbitrary_start_sequence_id = self.tokenizer.encode( - "!", add_special_tokens=False - )[0] - for word_list in word_lists: item_flat_ids = [] item_offsets = [] @@ -315,16 +276,7 @@ def _to_word_list_format(self, word_lists: List[List[str | bytes]]): if isinstance(word, bytes): word = word.decode() - word = arbitrary_start_sequence_token + word ids = self.tokenizer.encode(word, add_special_tokens=False) - if ids[0] != arbitrary_start_sequence_id: - raise ValueError( - f"To standardize tokenizer behavior, we prepend '{arbitrary_start_sequence_token}' to the string representation of each stop sequence." - "We then strip the corresponding first token from the stop sequence IDs." - "However, the first token of the stop sequence IDs was not '{arbitrary_start_sequence_id}', which suggestions there is a problem with the tokenizer that you are using." - ) - else: - ids = ids[1:] if len(ids) == 0: continue @@ -337,14 +289,16 @@ def _to_word_list_format(self, word_lists: List[List[str | bytes]]): pad_to = max(1, max(len(ids) for ids in flat_ids)) for i, (ids, offs) in enumerate(zip(flat_ids, offsets)): - flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), constant_values=0) - offsets[i] = np.pad(offs, (0, pad_to - len(offs)), constant_values=-1) + flat_ids[i] = np.pad(ids, (0, pad_to - len(ids)), + constant_values=0) + offsets[i] = np.pad(offs, (0, pad_to - len(offs)), + constant_values=-1) - return np.array([flat_ids, offsets], dtype="int32").transpose((1, 0, 2)) + return np.array([flat_ids, offsets], dtype="int32").transpose( + (1, 0, 2)) - def _get_embedding_bias( - self, embedding_bias_words, embedding_bias_weights, bias_dtype - ): + def _get_embedding_bias(self, embedding_bias_words, embedding_bias_weights, + bias_dtype): assert self.tokenizer != None, "need to set tokenizer" @@ -352,10 +306,11 @@ def _get_embedding_bias( return np.empty([1, 0], dtype=self.embedding_bias_weights_dtype) batch_embedding_bias = [] - for words, weights in zip(embedding_bias_words, embedding_bias_weights): + for words, weights in zip(embedding_bias_words, + embedding_bias_weights): vocab_size = self.tokenizer.vocab_size - embedding_bias = [0.0] * vocab_size + embedding_bias = [0.] * vocab_size assert len(words) == len( weights diff --git a/triton_templates/preprocessing/config.pbtxt b/triton_templates/preprocessing/config.pbtxt index 41ae1ac..ca92187 100644 --- a/triton_templates/preprocessing/config.pbtxt +++ b/triton_templates/preprocessing/config.pbtxt @@ -125,17 +125,10 @@ parameters { } } -parameters { - key: "tokenizer_type" - value: { - string_value: "${tokenizer_type}" - } -} - parameters { key: "add_special_tokens" value: { - string_value: "False" + string_value: "${add_special_tokens}" } } diff --git a/triton_templates/tensorrt_llm/config.pbtxt b/triton_templates/tensorrt_llm/config.pbtxt index 4acfabc..71d2b98 100644 --- a/triton_templates/tensorrt_llm/config.pbtxt +++ b/triton_templates/tensorrt_llm/config.pbtxt @@ -62,6 +62,13 @@ input [ optional: true allow_ragged_batch: true }, + { + name: "draft_logits" + data_type: TYPE_FP32 + dims: [ -1, -1 ] + optional: true + allow_ragged_batch: true + }, { name: "end_id" data_type: TYPE_INT32 @@ -214,6 +221,17 @@ input [ reshape: { shape: [ ] } optional: true }, + # the unique task ID for the given LoRA. + # To perform inference with a specific LoRA for the first time `lora_task_id` `lora_weights` and `lora_config` must all be given. + # The LoRA will be cached, so that subsequent requests for the same task only require `lora_task_id`. + # If the cache is full the oldest LoRA will be evicted to make space for new ones. An error is returned if `lora_task_id` is not cached. + { + name: "lora_task_id" + data_type: TYPE_UINT64 + dims: [ 1 ] + reshape: { shape: [ ] } + optional: true + }, # weights for a lora adapter shape [ num_lora_modules_layers, D x Hi + Ho x D ] # where the last dimension holds the in / out adapter weights for the associated module (e.g. attn_qkv) and model layer # each of the in / out tensors are first flattened and then concatenated together in the format above. @@ -368,3 +386,45 @@ parameters: { string_value: "${gpu_device_ids}" } } +parameters: { + key: "lora_cache_optimal_adapter_size" + value: { + string_value: "${lora_cache_optimal_adapter_size}" + } +} +parameters: { + key: "lora_cache_max_adapter_size" + value: { + string_value: "${lora_cache_max_adapter_size}" + } +} +parameters: { + key: "lora_cache_gpu_memory_fraction" + value: { + string_value: "${lora_cache_gpu_memory_fraction}" + } +} +parameters: { + key: "lora_cache_host_memory_bytes" + value: { + string_value: "${lora_cache_host_memory_bytes}" + } +} +parameters: { + key: "decoding_mode" + value: { + string_value: "${decoding_mode}" + } +} +parameters: { + key: "worker_path" + value: { + string_value: "/opt/tritonserver/backends/tensorrtllm/triton_tensorrtllm_worker" + } +} +parameters: { + key: "medusa_choices" + value: { + string_value: "${medusa_choices}" + } +} diff --git a/triton_templates/tensorrt_llm_bls/1/lib/decode.py b/triton_templates/tensorrt_llm_bls/1/lib/decode.py new file mode 100644 index 0000000..aa2a6d5 --- /dev/null +++ b/triton_templates/tensorrt_llm_bls/1/lib/decode.py @@ -0,0 +1,332 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from collections.abc import Generator +from dataclasses import dataclass +from typing import Optional + +import numpy as np + + +class RequestValidationError(Exception): + pass + + +def _validate_that(condition: bool, msg: str): + if not condition: + raise RequestValidationError(msg) + + +def _validate_non_empty(data, msg: str): + _validate_that(data is not None and data.size > 0, msg) + + +def _validate_single_gt_0(data, msg: str): + _validate_non_empty(data, msg) + _validate_that(data.flatten()[0] > 0, msg) + + +def _single_value(data: Optional[np.ndarray]): + if data is None: + return None + return data.flatten()[0] + + +@dataclass +class Request: + text_input: np.ndarray = np.array([]) + max_tokens: np.ndarray = np.array([]) + bad_words: Optional[np.ndarray] = None + stop_words: Optional[np.ndarray] = None + end_id: Optional[np.ndarray] = None + pad_id: Optional[np.ndarray] = None + top_k: Optional[np.ndarray] = None + top_p: Optional[np.ndarray] = None + temperature: Optional[np.ndarray] = None + length_penalty: Optional[np.ndarray] = None + repetition_penalty: Optional[np.ndarray] = None + min_length: Optional[np.ndarray] = None + return_log_probs: Optional[np.ndarray] = None + prompt_embedding_table: Optional[np.ndarray] = None + prompt_vocab_size: Optional[np.ndarray] = None + embedding_bias_words: Optional[np.ndarray] = None + embedding_bias_weights: Optional[np.ndarray] = None + num_draft_tokens: Optional[np.ndarray] = None + use_draft_logits: Optional[np.ndarray] = None + stream: Optional[np.ndarray] = None + beam_width: Optional[np.ndarray] = None + return_context_logits: Optional[np.ndarray] = None + return_generation_logits: Optional[np.ndarray] = None + random_seed: Optional[np.ndarray] = None + presence_penalty: Optional[np.ndarray] = None + frequency_penalty: Optional[np.ndarray] = None + + def validate(self): + _validate_non_empty(self.text_input, "text_input is required") + _validate_single_gt_0(self.max_tokens, + "max_tokens must be a single value > 0") + + num_draft_tokens = _single_value(self.num_draft_tokens) + stream = _single_value(self.stream) + gen_logits = _single_value(self.return_generation_logits) + context_logits = _single_value(self.return_context_logits) + + if num_draft_tokens: + _validate_that( + not stream, + "streaming is not supported with speculative decoding") + _validate_that( + not gen_logits, + "generation logits are not supported with speculative decoding" + ) + _validate_that( + not context_logits, + "context logits are not supported with speculative decoding") + + +@dataclass +class DraftRequest: + draft_input_ids: Optional[np.ndarray] = None + draft_logits: Optional[np.ndarray] = None + + +@dataclass +class PreprocResponse: + input_ids: np.ndarray = np.array([]) + input_lengths: np.ndarray = np.array([]) + bad_words_list: Optional[np.ndarray] = None + stop_words_list: Optional[np.ndarray] = None + embedding_bias: Optional[np.ndarray] = None + end_id: Optional[np.ndarray] = None + pad_id: Optional[np.ndarray] = None + + @classmethod + def with_new_inputs(cls, + other, + input_ids: Optional[np.ndarray] = None, + input_lengths: Optional[np.ndarray] = None): + return cls( + input_ids=(input_ids + if input_ids is not None else other.input_ids), + input_lengths=(input_lengths if input_lengths is not None else + other.input_lengths), + bad_words_list=other.bad_words_list, + stop_words_list=other.stop_words_list, + end_id=other.end_id, + pad_id=other.pad_id, + ) + + +@dataclass +class GenerationResponse: + output_ids: np.ndarray = np.array([]) + sequence_length: np.ndarray = np.array([]) + cum_log_probs: Optional[np.ndarray] = None + output_log_probs: Optional[np.ndarray] = None + context_logits: Optional[np.ndarray] = None + generation_logits: Optional[np.ndarray] = None + + +@dataclass +class Response: + text_output: np.ndarray = np.array([]) + cum_log_probs: Optional[np.ndarray] = None + output_log_probs: Optional[np.ndarray] = None + context_logits: Optional[np.ndarray] = None + generation_logits: Optional[np.ndarray] = None + + def __eq__(self, o) -> bool: + """Just for testing""" + if not isinstance(o, Response): + return False + return (np.array_equal(self.text_output, o.text_output) + and np.array_equal(self.cum_log_probs, o.cum_log_probs) + and np.array_equal(self.output_log_probs, o.output_log_probs) + and np.array_equal(self.context_logits, o.context_logits) and + np.array_equal(self.generation_logits, o.generation_logits)) + + +class Decoder: + + def __init__(self, streaming=False, accumulate=False): + self._streaming = streaming + self._accumulate = accumulate + + self._accumulated_tokens = None + + def decode(self, + request: Request, + speculative_decoding=False) -> Generator[Response, None, None]: + preproc_response = self.preprocess(request) + + if speculative_decoding: + for gen_response in self._spec_generate(preproc_response, request): + yield self.postprocess(gen_response) + else: + if not self._streaming: + gen_response = self._generate_non_streaming( + preproc_response, request) + yield self.postprocess(gen_response) + else: + for gen_response in self._generate(preproc_response, request): + yield self.postprocess(gen_response) + + def encountered_stop_words(self, input_ids, stop_words_ids): + for stop_word_ids in stop_words_ids: + if np.array_equal(input_ids[-len(stop_word_ids):], stop_word_ids): + return True + return False + + def _spec_generate( + self, preproc: PreprocResponse, + request: Request) -> Generator[GenerationResponse, None, None]: + + prompt_input_ids: np.ndarray = preproc.input_ids[0] + input_ids: np.ndarray = prompt_input_ids + output_len: int = request.max_tokens[0][0] + last_input_ids: np.ndarray = None + draft_output_ids: np.ndarray = None + draft_logits: np.ndarray = None + + target_response: GenerationResponse = None + + cur_preproc = preproc + + counter = 0 + while True: + counter += 1 + num_draft_tokens = min( + request.num_draft_tokens[0][0], + len(prompt_input_ids) + output_len - len(input_ids) - 1) + + draft_request = None + if num_draft_tokens > 0: + draft_response: GenerationResponse = self._draft_generate_non_streaming( + cur_preproc, request, num_draft_tokens) + seq_len: int = draft_response.sequence_length[0][0] + # [1, beamWidth, outputLength] -> [outputLen] + draft_output_ids = draft_response.output_ids[0][0] + # [1, beamWidth, outputLength, vocabSizePadded] -> [outputLength, vocabSizePadded] + if request.use_draft_logits is not None and request.use_draft_logits[ + 0]: + if draft_response.generation_logits is not None: + draft_logits = draft_response.generation_logits[0][0] + + input_draft_tokens = draft_output_ids[len(input_ids):seq_len] + draft_request = DraftRequest( + draft_input_ids=np.expand_dims(input_draft_tokens, 0)) + if request.use_draft_logits is not None and request.use_draft_logits[ + 0]: + draft_request.draft_logits = np.expand_dims( + draft_logits[-len(input_draft_tokens):], 0) + else: + draft_request = DraftRequest() + target_response = self._generate_non_streaming( + cur_preproc, request, draft_request) + last_input_ids = input_ids + input_ids = target_response.output_ids[0][0] + cur_preproc = PreprocResponse.with_new_inputs( + cur_preproc, np.expand_dims(input_ids, 0), + np.array([[len(input_ids)]], dtype=np.int32)) + + # Evaluate criteria to stop generation loop. + # If we've hit or exceeded the max output length, should stop + length_stop = (len(input_ids) >= + len(prompt_input_ids) + output_len) + if length_stop: + break + # If draft and target have same outputs, should stop. Normally target should return 1 more token. + # If they are the same length, they should differ at the last token + target_draft_equal = draft_output_ids is not None and np.array_equal( + draft_output_ids, input_ids) + if target_draft_equal: + break + # If tokens no longer change, should stop, means we have hit early stopping + last_current_equal = np.array_equal(last_input_ids, input_ids) + if last_current_equal: + break + # Need to check if stop words was encountered + hit_stop_words = self.encountered_stop_words( + input_ids, preproc.stop_words_list[0]) + if hit_stop_words: + break + + yield target_response + + def _draft_generate_non_streaming( + self, preproc: PreprocResponse, request: Request, + num_draft_tokens: int) -> GenerationResponse: + raise NotImplementedError() + + def _generate( + self, + preproc: PreprocResponse, + request: Request, + draft_request: Optional[DraftRequest] = None + ) -> Generator[GenerationResponse, None, None]: + raise NotImplementedError() + + def _generate_non_streaming( + self, + preproc: PreprocResponse, + request: Request, + draft_request: Optional[DraftRequest] = None + ) -> GenerationResponse: + raise NotImplementedError() + + def postprocess(self, gen_response: GenerationResponse) -> Response: + if self._accumulate and self._streaming: + new_tokens: np.ndarray = gen_response.output_ids + if new_tokens.ndim != 3: + raise Exception("Expected output_ids tensor to have 3 dims.") + if new_tokens.shape[0] != 1: + raise Exception("Expected batch size of 1") + if new_tokens.shape[1] != 1: + raise Exception( + "Accumulation of tokens is only implemented for beam width = 1" + ) + + self._accumulated_tokens = new_tokens if ( + self._accumulated_tokens is None) else np.concatenate( + (self._accumulated_tokens, new_tokens), axis=2) + sequence_lengths = np.array([[self._accumulated_tokens.shape[2]]], + dtype=np.int32) + return self._postprocess(self._accumulated_tokens, + sequence_lengths, gen_response) + else: + return self._postprocess(gen_response.output_ids, None, + gen_response) + + def _postprocess(self, tokens: np.ndarray, + sequence_lengths: Optional[np.ndarray], + gen_response: GenerationResponse) -> Response: + raise NotImplementedError() + + def preprocess(self, request: Request) -> PreprocResponse: + raise NotImplementedError() + + def reset_decoder(self): + self._accumulated_tokens = None diff --git a/triton_templates/tensorrt_llm_bls/1/lib/triton_decoder.py b/triton_templates/tensorrt_llm_bls/1/lib/triton_decoder.py new file mode 100644 index 0000000..f0df3b8 --- /dev/null +++ b/triton_templates/tensorrt_llm_bls/1/lib/triton_decoder.py @@ -0,0 +1,433 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from collections.abc import Callable +from typing import Dict, Optional + +import numpy as np +import triton_python_backend_utils as pb_utils +from lib.decode import * +from typing_extensions import override + + +class TritonDecoder(Decoder): + + def __init__(self, + streaming=False, + accumulate=False, + preproc_model_name="preprocessing", + postproc_model_name="postprocessing", + llm_model_name="tensorrt_llm", + draft_llm_model_name: Optional[str] = None): + super().__init__(streaming=streaming, accumulate=accumulate) + self.preproc_model_name = preproc_model_name + self.postproc_model_name = postproc_model_name + self.llm_model_name = llm_model_name + self.draft_llm_model_name = draft_llm_model_name + + self._preproc_outputs = [ + "INPUT_ID", + "REQUEST_INPUT_LEN", + "BAD_WORDS_IDS", + "STOP_WORDS_IDS", + "EMBEDDING_BIAS", + "OUT_PAD_ID", + "OUT_END_ID", + ] + + self._llm_outputs = [ + "output_ids", + "sequence_length", + "cum_log_probs", + "output_log_probs", + "context_logits", + "generation_logits", + ] + + self._postproc_outputs = [ + "OUTPUT", + ] + + self.input_names = [ + "text_input", + "max_tokens", + "bad_words", + "stop_words", + "end_id", + "pad_id", + "top_k", + "top_p", + "temperature", + "length_penalty", + "repetition_penalty", + "min_length", + "presence_penalty", + "frequency_penalty", + "random_seed", + "return_log_probs", + "return_context_logits", + "return_generation_logits", + "beam_width", + "stream", + "prompt_embedding_table", + "prompt_vocab_size", + "embedding_bias_words", + "embedding_bias_weights", + "num_draft_tokens", + "use_draft_logits", + ] + + self.__undo_reshape_whitelist = { + "max_tokens", + "end_id", + "pad_id", + "top_k", + "top_p", + "temperature", + "length_penalty", + "repetition_penalty", + "min_length", + "presence_penalty", + "frequency_penalty", + "random_seed", + "return_log_probs", + "return_context_logits", + "return_generation_logits", + "beam_width", + "stream", + "prompt_vocab_size", + "num_draft_tokens", + "use_draft_logits", + } + + def _exec_triton_request(self, request): + responses = request.exec(decoupled=True) + for r in responses: + if r.has_error(): + raise pb_utils.TritonModelException(r.error().message()) + yield r + + def _exec_triton_request_single(self, request): + responses = request.exec(decoupled=False) + if responses.has_error(): + raise pb_utils.TritonModelException(responses.error().message()) + return responses + + def create_triton_response(self, response: Response): + name_map = { + "text_output": "text_output", + "cum_log_probs": "cum_log_probs", + "output_log_probs": "output_log_probs", + "context_logits": "context_logits", + "generation_logits": "generation_logits" + } + tensors = self.create_triton_tensors(response, name_map) + return pb_utils.InferenceResponse(output_tensors=tensors) + + def convert_triton_request(self, triton_request) -> Request: + request = Request() + for triton_name in self.input_names: + tensor = pb_utils.get_input_tensor_by_name(triton_request, + triton_name) + target_name = triton_name + if tensor is None: + continue + if not hasattr(request, target_name): + raise AttributeError( + f"Request has no attribute '{target_name}'") + setattr(request, target_name, tensor.as_numpy()) + return request + + def convert_triton_response(self, + triton_response, + response_factory: Callable, + name_map=None): + response = response_factory() + for tensor in triton_response.output_tensors(): + if tensor is None: + continue + triton_name = tensor.name() + value = tensor.as_numpy() + target_name = triton_name + if name_map and triton_name in name_map: + target_name = name_map[triton_name] + if name_map and not triton_name in name_map: + continue + if target_name is None: + # explicitly ignore this triton input + continue + if not hasattr(response, target_name): + raise AttributeError( + f"response object has not attribute '{target_name}'") + setattr(response, target_name, value) + return response + + def __undo_reshape(self, x, name): + if name in self.__undo_reshape_whitelist and len(x.shape) == 1: + # handle reshapes + return np.expand_dims(x, 0) + else: + return x + + def create_triton_tensors(self, obj, name_map: dict): + tensors = [] + for name, triton_name in name_map.items(): + if triton_name is None: + continue + value = getattr(obj, name) + if value is None: + continue + t = pb_utils.Tensor(triton_name, self.__undo_reshape(value, name)) + tensors.append(t) + return tensors + + @override + def preprocess(self, request: Request) -> PreprocResponse: + input_tensors = self._get_preproc_tensors(request) + triton_req = pb_utils.InferenceRequest( + model_name=self.preproc_model_name, + inputs=input_tensors, + requested_output_names=self._preproc_outputs) + triton_output = self._exec_triton_request_single(triton_req) + return self._get_preproc_response(triton_output) + + def _get_preproc_tensors(self, request: Request): + name_map = { + "text_input": "QUERY", + "max_tokens": "REQUEST_OUTPUT_LEN", + "bad_words": "BAD_WORDS_DICT", + "stop_words": "STOP_WORDS_DICT", + "embedding_bias_words": "EMBEDDING_BIAS_WORDS", + "embedding_bias_weights": "EMBEDDING_BIAS_WEIGHTS", + "pad_id": "PAD_ID", + "end_id": "END_ID", + } + return self.create_triton_tensors(request, name_map) + + def _get_preproc_response(self, triton_output): + name_map = { + "INPUT_ID": "input_ids", + "REQUEST_INPUT_LEN": "input_lengths", + "BAD_WORDS_IDS": "bad_words_list", + "STOP_WORDS_IDS": "stop_words_list", + "EMBEDDING_BIAS": "embedding_bias", + "OUT_PAD_ID": "pad_id", + "OUT_END_ID": "end_id", + } + return self.convert_triton_response(triton_output, PreprocResponse, + name_map) + + @override + def _draft_generate_non_streaming( + self, preproc: PreprocResponse, request: Request, + num_draft_tokens: int) -> GenerationResponse: + input_tensors = self._get_llm_tensors(preproc, request, + num_draft_tokens, None, True) + triton_req = pb_utils.InferenceRequest( + model_name=self.draft_llm_model_name, + inputs=input_tensors, + requested_output_names=self._llm_outputs) + triton_response = self._exec_triton_request_single(triton_req) + llm_response = self._get_llm_response(triton_response) + return llm_response + + @override + def _generate( + self, + preproc: PreprocResponse, + request: Request, + draft_request: Optional[DraftRequest] = None + ) -> Generator[GenerationResponse, None, None]: + input_tensors = self._get_llm_tensors(preproc, request, None, + draft_request) + triton_req = pb_utils.InferenceRequest( + model_name=self.llm_model_name, + inputs=input_tensors, + requested_output_names=self._llm_outputs) + for r in self._exec_triton_request(triton_req): + yield self._get_llm_response(r) + + @override + def _generate_non_streaming( + self, + preproc: PreprocResponse, + request: Request, + draft_request: Optional[DraftRequest] = None + ) -> GenerationResponse: + input_tensors = self._get_llm_tensors(preproc, request, None, + draft_request) + triton_req = pb_utils.InferenceRequest( + model_name=self.llm_model_name, + inputs=input_tensors, + requested_output_names=self._llm_outputs) + r = self._exec_triton_request_single(triton_req) + return self._get_llm_response(r) + + def _get_llm_tensors(self, + preproc: PreprocResponse, + request: Request, + num_output_tokens: Optional[int] = None, + draft_request: Optional[DraftRequest] = None, + is_draft_model_request: bool = False): + tensors = [] + tensors.extend(self._get_tensors_from_preproc(preproc)) + tensors.extend( + self._get_llm_tensors_from_request(request, num_output_tokens, + draft_request, + is_draft_model_request)) + return tensors + + def _get_tensors_from_preproc(self, preproc: PreprocResponse): + name_map = { + "input_ids": "input_ids", + "input_lengths": "input_lengths", + "bad_words_list": "bad_words_list", + "stop_words_list": "stop_words_list", + "embedding_bias": "embedding_bias", + "pad_id": "pad_id", + "end_id": "end_id", + } + return self.create_triton_tensors(preproc, name_map) + + def _get_llm_tensors_from_request( + self, + request: Request, + num_output_tokens: Optional[int] = None, + draft_request: Optional[DraftRequest] = None, + is_draft_model_request: bool = False): + name_map: Dict[str, Optional[str]] = { + "beam_width": "beam_width", + "top_k": "runtime_top_k", + "top_p": "runtime_top_p", + "length_penalty": "len_penalty", + "repetition_penalty": "repetition_penalty", + "min_length": "min_length", + "presence_penalty": "presence_penalty", + "frequency_penalty": "frequency_penalty", + "random_seed": "random_seed", + "return_log_probs": "return_log_probs", + "stream": "streaming", + "prompt_embedding_table": "prompt_embedding_table", + "prompt_vocab_size": "prompt_vocab_size", + } + tensors = self.create_triton_tensors(request, name_map) + + out_len = request.max_tokens[0][0] if request.max_tokens else None + if num_output_tokens is not None: + out_len = num_output_tokens + elif draft_request: + if draft_request.draft_input_ids is not None: + out_len = len(draft_request.draft_input_ids[0]) + 1 + else: + out_len = 1 + + if out_len is None: + raise Exception("Could not determine request_output_len") + else: + tensors.append( + pb_utils.Tensor("request_output_len", + np.array([[out_len]], dtype=np.int32))) + + if draft_request: + if draft_request.draft_input_ids is not None: + tensors.append( + pb_utils.Tensor("draft_input_ids", + draft_request.draft_input_ids)) + if draft_request.draft_logits is not None and request.use_draft_logits is not None and request.use_draft_logits[ + 0]: + tensors.append( + pb_utils.Tensor("draft_logits", + draft_request.draft_logits)) + + return_context_logits = False + return_generation_logits = False + if draft_request is None: + if is_draft_model_request: + return_generation_logits = request.use_draft_logits[ + 0] if request.use_draft_logits is not None else False + else: + return_context_logits = request.return_context_logits[ + 0] if request.return_context_logits is not None else False + return_generation_logits = request.return_generation_logits[ + 0] if request.return_generation_logits is not None else False + + tensors.append( + pb_utils.Tensor("return_context_logits", + np.array([[return_context_logits]]))) + tensors.append( + pb_utils.Tensor("return_generation_logits", + np.array([[return_generation_logits]]))) + return tensors + + def _get_llm_response(self, triton_output): + name_map = { + "output_ids": "output_ids", + "sequence_length": "sequence_length", + "cum_log_probs": "cum_log_probs", + "output_log_probs": "output_log_probs", + "context_logits": "context_logits", + "generation_logits": "generation_logits", + } + return self.convert_triton_response(triton_output, GenerationResponse, + name_map) + + def _postprocess(self, tokens: np.ndarray, + sequence_lengths: Optional[np.ndarray], + gen_response: GenerationResponse) -> Response: + input_tensors = self._get_postproc_tensors(tokens, sequence_lengths, + gen_response) + triton_req = pb_utils.InferenceRequest( + model_name=self.postproc_model_name, + inputs=input_tensors, + requested_output_names=self._postproc_outputs) + r = self._exec_triton_request_single(triton_req) + response = self._get_response(r, gen_response) + return response + + def _get_postproc_tensors(self, tokens: np.ndarray, + sequence_lengths: Optional[np.ndarray], + gen_response: GenerationResponse): + tensors = [ + pb_utils.Tensor("TOKENS_BATCH", tokens), + pb_utils.Tensor( + "SEQUENCE_LENGTH", sequence_lengths + if sequence_lengths else gen_response.sequence_length) + ] + return tensors + + def _get_response(self, triton_output, gen_res: GenerationResponse): + tensors = triton_output.output_tensors() + t_map = {} + for named_t in tensors: + name = named_t.name() + t = named_t.as_numpy() + t_map[name] = t + response = Response(text_output=t_map["OUTPUT"], + cum_log_probs=gen_res.cum_log_probs, + output_log_probs=gen_res.output_log_probs, + context_logits=gen_res.context_logits, + generation_logits=gen_res.generation_logits) + return response diff --git a/triton_templates/tensorrt_llm_bls/1/model.py b/triton_templates/tensorrt_llm_bls/1/model.py index fcd3b4e..609e323 100644 --- a/triton_templates/tensorrt_llm_bls/1/model.py +++ b/triton_templates/tensorrt_llm_bls/1/model.py @@ -27,8 +27,8 @@ import json import traceback -import numpy as np import triton_python_backend_utils as pb_utils +from lib.triton_decoder import TritonDecoder class TritonPythonModel: @@ -53,305 +53,56 @@ def initialize(self, args): self.logger = pb_utils.Logger - self.bls_input_tensor_names = [ - "text_input", "max_tokens", "bad_words", "stop_words", "end_id", - "pad_id", "top_k", "top_p", "temperature", "length_penalty", - "repetition_penalty", "min_length", "presence_penalty", - "frequency_penalty", "random_seed", "return_log_probs", - "return_context_logits", "return_generation_logits", "beam_width", - "stream", "prompt_embedding_table", "prompt_vocab_size", - "embedding_bias_words", "embedding_bias_weights" - ] - - self.preproc_input_to_bls_input_map = { - "QUERY": "text_input", - "REQUEST_OUTPUT_LEN": "max_tokens", - "BAD_WORDS_DICT": "bad_words", - "STOP_WORDS_DICT": "stop_words", - "EMBEDDING_BIAS_WORDS": "embedding_bias_words", - "EMBEDDING_BIAS_WEIGHTS": "embedding_bias_weights", - "END_ID": "end_id", - "PAD_ID": "pad_id" - } - - self.preproc_output_to_trtllm_input_map = { - "INPUT_ID": "input_ids", - "REQUEST_INPUT_LEN": "input_lengths", - "REQUEST_OUTPUT_LEN": "request_output_len", - "BAD_WORDS_IDS": "bad_words_list", - "STOP_WORDS_IDS": "stop_words_list", - "EMBEDDING_BIAS": "embedding_bias", - "OUT_END_ID": "end_id", - "OUT_PAD_ID": "pad_id", - } - - self.trtllm_input_to_bls_input_map = { - "beam_width": "beam_width", - "runtime_top_k": "top_k", - "runtime_top_p": "top_p", - "len_penalty": "length_penalty", - "repetition_penalty": "repetition_penalty", - "min_length": "min_length", - "presence_penalty": "presence_penalty", - "frequency_penalty": "frequency_penalty", - "random_seed": "random_seed", - "return_log_probs": "return_log_probs", - "return_context_logits": "return_context_logits", - "return_generation_logits": "return_generation_logits", - "streaming": "stream", - "prompt_embedding_table": "prompt_embedding_table", - "prompt_vocab_size": "prompt_vocab_size", - } - - self.trtllm_output_to_postproc_input_map = { - "output_ids": "TOKENS_BATCH", - "sequence_length": "SEQUENCE_LENGTH", - "cum_log_probs": "CUM_LOG_PROBS", - "output_log_probs": "OUTPUT_LOG_PROBS", - "context_logits": "CONTEXT_LOGITS", - "generation_logits": "GENERATION_LOGITS" - } - - self.postproc_output_to_bls_output_map = { - "OUTPUT": "text_output", - "OUT_CUM_LOG_PROBS": "cum_log_probs", - "OUT_OUTPUT_LOG_PROBS": "output_log_probs", - "OUT_CONTEXT_LOGITS": "context_logits", - "OUT_GENERATION_LOGITS": "generation_logits" - } - - def _get_bls_input_tensors_map(self, request): - - bls_input_tensors_map = {} - for input_tensor_name in self.bls_input_tensor_names: - tensor = pb_utils.get_input_tensor_by_name(request, - input_tensor_name) - if tensor != None: - bls_input_tensors_map[input_tensor_name] = tensor - - return bls_input_tensors_map - - def _get_preproc_input_tensors(self, bls_input_tensors_map): - - preproc_input_tensors = [] - - for preproc_name, bls_name in self.preproc_input_to_bls_input_map.items( - ): - - if bls_name in bls_input_tensors_map: - tensor = bls_input_tensors_map[bls_name] - # Change the name to what the preprocessor expects - preproc_input_tensors.append( - pb_utils.Tensor(preproc_name, tensor.as_numpy())) - - return preproc_input_tensors - - def _get_trtllm_input_tensors(self, bls_input_tensors_map, - preproc_output_tensors): - - trtllm_input_tensors = [] - - # Set input tensors from preprocessor outputs - for preproc_output_tensor in preproc_output_tensors: - - trtllm_tensor_name = self.preproc_output_to_trtllm_input_map[ - preproc_output_tensor.name()] - trtllm_input_tensors.append( - pb_utils.Tensor(trtllm_tensor_name, - preproc_output_tensor.as_numpy())) - - # Set input tensors from bls inputs - for trtllm_name, bls_name in self.trtllm_input_to_bls_input_map.items( - ): - - if bls_name in bls_input_tensors_map: - tensor = bls_input_tensors_map[bls_name] - # Change the name to what the preprocessor expects - trtllm_input_tensors.append( - pb_utils.Tensor(trtllm_name, tensor.as_numpy())) - - return trtllm_input_tensors - - def _get_postproc_input_tensors(self, tokens, trtllm_output_tensors): - - postproc_input_tensors = [] - - for trtllm_output_tensor in trtllm_output_tensors: - - # If in decoupled mode, option to append new tokens to existing tokens before calling postprocessor - # This might be needed for some tokenizers - # Note that in that case, the client must overwrite previously received output text - if (self.accumulate_tokens and self.decoupled - and trtllm_output_tensor.name() == "output_ids"): - - new_tokens = trtllm_output_tensor.as_numpy() - if new_tokens.ndim != 3: - raise pb_utils.TritonModelException( - "Expected output_ids tensor to have 3 dims.") - if new_tokens.shape[0] != 1: - raise pb_utils.TritonModelException( - "Expected output_ids tensor to have batch size of 1") - if new_tokens.shape[1] != 1: - raise pb_utils.TritonModelException( - "Accumulation of tokens is only implemented for beam width = 1" - ) - - tokens = new_tokens if (tokens is None) else np.concatenate( - (tokens, new_tokens), axis=2) - - # output ids - postproc_output_ids_name = self.trtllm_output_to_postproc_input_map[ - "output_ids"] - postproc_input_tensors.append( - pb_utils.Tensor(postproc_output_ids_name, tokens)) - - # sequence length - np_seq_len_tensor = np.array([[tokens.shape[2]]], - dtype=np.int32) - postproc_seq_len_name = self.trtllm_output_to_postproc_input_map[ - "sequence_length"] - postproc_input_tensors.append( - pb_utils.Tensor(postproc_seq_len_name, np_seq_len_tensor)) - - # Set input tensors from trtllm outputs - for trtllm_output_tensor in trtllm_output_tensors: - - # output_ids and sequence_length were handled earlier - if (self.accumulate_tokens and self.decoupled - and (trtllm_output_tensor.name() == "output_ids" - or trtllm_output_tensor.name() == "sequence_length")): - continue - - postproc_tensor_name = self.trtllm_output_to_postproc_input_map[ - trtllm_output_tensor.name()] - - postproc_input_tensors.append( - pb_utils.Tensor(postproc_tensor_name, - trtllm_output_tensor.as_numpy())) - - return tokens, postproc_input_tensors - - def _get_bls_output_tensors(self, postproc_output_tensors): - - bls_output_tensors = [] - - # Set input tensors from trtllm outputs - for postproc_output_tensor in postproc_output_tensors: - - bls_tensor_name = self.postproc_output_to_bls_output_map[ - postproc_output_tensor.name()] - bls_output_tensors.append( - pb_utils.Tensor(bls_tensor_name, - postproc_output_tensor.as_numpy())) - - return bls_output_tensors + self.llm_model_name = "tensorrt_llm" + if "tensorrt_llm_model_name" in params: + self.llm_model_name = params["tensorrt_llm_model_name"][ + "string_value"] + self.draft_llm_model_name = None + if "tensorrt_llm_draft_model_name" in params: + self.draft_llm_model_name = params[ + "tensorrt_llm_draft_model_name"]["string_value"] + + self.decoder = TritonDecoder( + streaming=self.decoupled, + accumulate=self.accumulate_tokens, + preproc_model_name="preprocessing", + postproc_model_name="postprocessing", + llm_model_name=self.llm_model_name, + draft_llm_model_name=self.draft_llm_model_name) def execute(self, requests): responses = [] - bls_response_sender = None for request in requests: - - #Get the response sender for the BLS if self.decoupled: - bls_response_sender = request.get_response_sender() - + response_sender = request.get_response_sender() try: - # Get the bls input tensors - bls_input_tensors_map = self._get_bls_input_tensors_map( - request) - - #Check the batch dimension - for name, tensor in bls_input_tensors_map.items(): - batch_dim = tensor.as_numpy().shape[0] - - if batch_dim != 1: - - err_str = "Inflight batching backend expects requests with batch size of 1." - self.logger.log_error(err_str) - raise pb_utils.TritonModelException(err_str) - - # Create the preprocessor input tensors - preproc_input_tensors = self._get_preproc_input_tensors( - bls_input_tensors_map) - - preproc_request = pb_utils.InferenceRequest( - model_name="preprocessing", - inputs=preproc_input_tensors, - requested_output_names=list( - self.preproc_output_to_trtllm_input_map.keys())) - - #Execute preprocessor - preproc_response = preproc_request.exec() - - if preproc_response.has_error(): - raise pb_utils.TritonModelException( - preproc_response.error().message()) - # Create the trtllm input tensors - trtllm_input_tensors = self._get_trtllm_input_tensors( - bls_input_tensors_map, preproc_response.output_tensors()) - - trtllm_request = pb_utils.InferenceRequest( - model_name="tensorrt_llm", - inputs=trtllm_input_tensors, - requested_output_names=list( - self.trtllm_output_to_postproc_input_map.keys())) - - #Execute trtllm - trtllm_responses = trtllm_request.exec( - decoupled=self.decoupled) - - if not self.decoupled: - trtllm_responses = [trtllm_responses] - - tokens = None - - #Loop over the trtllm responses - for trtllm_response in trtllm_responses: - - if trtllm_response.has_error(): - raise pb_utils.TritonModelException( - trtllm_response.error().message()) - - trtllm_output_tensors = trtllm_response.output_tensors() - - tokens, postproc_input_tensors = self._get_postproc_input_tensors( - tokens, trtllm_output_tensors) - - postproc_request = pb_utils.InferenceRequest( - model_name="postprocessing", - inputs=postproc_input_tensors, - requested_output_names=list( - self.postproc_output_to_bls_output_map.keys())) - - #Execute postprocessor - postproc_response = postproc_request.exec() - - if postproc_response.has_error(): - raise pb_utils.TritonModelException( - postproc_response.error().message()) - - # Create the BLS response - bls_output_tensors = self._get_bls_output_tensors( - postproc_response.output_tensors()) - - bls_response = pb_utils.InferenceResponse( - output_tensors=bls_output_tensors) + req = self.decoder.convert_triton_request(request) + req.validate() + speculative_decode = (req.num_draft_tokens is not None + and req.num_draft_tokens[0][0] > 0) + if speculative_decode and (self.draft_llm_model_name is None + or self.draft_llm_model_name == ""): + raise Exception( + "cannot perform speculative decoding without draft model" + ) + res_gen = self.decoder.decode( + req, speculative_decoding=speculative_decode) + for res in res_gen: + triton_response = self.decoder.create_triton_response(res) if self.decoupled: - bls_response_sender.send(bls_response) + response_sender.send(triton_response) else: - responses.append(bls_response) + responses.append(triton_response) - # All responses have been sent, set final flag if self.decoupled: - bls_response_sender.send( + response_sender.send( flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) except Exception: - self.logger.log_error(traceback.format_exc()) # If encountering an error, send a response with err msg error_response = pb_utils.InferenceResponse( @@ -359,17 +110,18 @@ def execute(self, requests): error=pb_utils.TritonError(traceback.format_exc())) if self.decoupled: - bls_response_sender.send(error_response) - bls_response_sender.send( + response_sender.send(error_response) + response_sender.send( flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) else: responses.append(error_response) - if self.decoupled: - return None - else: - assert len(responses) == len(requests) - return responses + self.decoder.reset_decoder() + if self.decoupled: + return None + else: + assert len(responses) == len(requests) + return responses def finalize(self): """`finalize` is called only once when the model is being unloaded. diff --git a/triton_templates/tensorrt_llm_bls/config.pbtxt b/triton_templates/tensorrt_llm_bls/config.pbtxt index 0752626..e5aff22 100644 --- a/triton_templates/tensorrt_llm_bls/config.pbtxt +++ b/triton_templates/tensorrt_llm_bls/config.pbtxt @@ -125,6 +125,7 @@ input [ name: "return_log_probs" data_type: TYPE_BOOL dims: [ 1 ] + reshape: { shape: [ ] } optional: true }, { @@ -176,6 +177,19 @@ input [ data_type: TYPE_FP32 dims: [ -1 ] optional: true + }, + { + name: "num_draft_tokens", + data_type: TYPE_INT32, + dims: [ 1 ] + optional: true + }, + { + name: "use_draft_logits", + data_type: TYPE_BOOL, + dims: [ 1 ] + reshape: { shape: [ ] } + optional: true } ] output [ @@ -212,6 +226,18 @@ parameters: { string_value: "${accumulate_tokens}" } } +parameters: { + key: "tensorrt_llm_model_name" + value: { + string_value: "${tensorrt_llm_model_name}" + } +} +parameters: { + key: "tensorrt_llm_draft_model_name" + value: { + string_value: "${tensorrt_llm_draft_model_name}" + } +} instance_group [ {