Skip to content

Commit

Permalink
fix(cost_calculator.py): fixes tgai unmapped model pricing
Browse files Browse the repository at this point in the history
Fixes error where tgai helper function returned None. Enforces stronger type hints, refactors code, adds more unit testing.
  • Loading branch information
krrishdholakia committed Jun 9, 2024
1 parent dddd4a7 commit b26c3c7
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 261 deletions.
2 changes: 1 addition & 1 deletion litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ def identify(event_details):
openai_image_generation_models = ["dall-e-2", "dall-e-3"]

from .timeout import timeout
from .cost_calculator import completion_cost
from .utils import (
client,
exception_type,
Expand All @@ -718,7 +719,6 @@ def identify(event_details):
create_pretrained_tokenizer,
create_tokenizer,
cost_per_token,
completion_cost,
supports_function_calling,
supports_parallel_function_calling,
supports_vision,
Expand Down
280 changes: 276 additions & 4 deletions litellm/cost_calculator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,287 @@
# What is this?
## File for 'response_cost' calculation in Logging
from typing import Optional, Union, Literal
from typing import Optional, Union, Literal, List
import litellm._logging
from litellm.utils import (
ModelResponse,
EmbeddingResponse,
ImageResponse,
TranscriptionResponse,
TextCompletionResponse,
CallTypes,
completion_cost,
cost_per_token,
print_verbose,
CostPerToken,
token_counter,
)
import litellm
from litellm import verbose_logger


# Extract the number of billion parameters from the model name
# only used for together_computer LLMs
def get_model_params_and_category(model_name) -> str:
"""
Helper function for calculating together ai pricing.
Returns
- str - model pricing category if mapped else received model name
"""
import re

model_name = model_name.lower()
re_params_match = re.search(
r"(\d+b)", model_name
) # catch all decimals like 3b, 70b, etc
category = None
if re_params_match is not None:
params_match = str(re_params_match.group(1))
params_match = params_match.replace("b", "")
if params_match is not None:
params_billion = float(params_match)
else:
return model_name
# Determine the category based on the number of parameters
if params_billion <= 3.0:
category = "together-ai-up-to-3b"
elif params_billion <= 7.0:
category = "together-ai-3.1b-7b"
elif params_billion <= 20.0:
category = "together-ai-7.1b-20b"
elif params_billion <= 40.0:
category = "together-ai-20.1b-40b"
elif params_billion <= 70.0:
category = "together-ai-40.1b-70b"
if category is not None:
return category

return model_name


def get_replicate_completion_pricing(completion_response=None, total_time=0.0):
# see https://replicate.com/pricing
# for all litellm currently supported LLMs, almost all requests go to a100_80gb
a100_80gb_price_per_second_public = (
0.001400 # assume all calls sent to A100 80GB for now
)
if total_time == 0.0: # total time is in ms
start_time = completion_response["created"]
end_time = getattr(completion_response, "ended", time.time())
total_time = end_time - start_time

return a100_80gb_price_per_second_public * total_time / 1000


def completion_cost(
completion_response=None,
model: Optional[str] = None,
prompt="",
messages: List = [],
completion="",
total_time=0.0, # used for replicate, sagemaker
call_type: Literal[
"embedding",
"aembedding",
"completion",
"acompletion",
"atext_completion",
"text_completion",
"image_generation",
"aimage_generation",
"moderation",
"amoderation",
"atranscription",
"transcription",
"aspeech",
"speech",
] = "completion",
### REGION ###
custom_llm_provider=None,
region_name=None, # used for bedrock pricing
### IMAGE GEN ###
size=None,
quality=None,
n=None, # number of images
### CUSTOM PRICING ###
custom_cost_per_token: Optional[CostPerToken] = None,
custom_cost_per_second: Optional[float] = None,
) -> float:
"""
Calculate the cost of a given completion call fot GPT-3.5-turbo, llama2, any litellm supported llm.
Parameters:
completion_response (litellm.ModelResponses): [Required] The response received from a LiteLLM completion request.
[OPTIONAL PARAMS]
model (str): Optional. The name of the language model used in the completion calls
prompt (str): Optional. The input prompt passed to the llm
completion (str): Optional. The output completion text from the llm
total_time (float): Optional. (Only used for Replicate LLMs) The total time used for the request in seconds
custom_cost_per_token: Optional[CostPerToken]: the cost per input + output token for the llm api call.
custom_cost_per_second: Optional[float]: the cost per second for the llm api call.
Returns:
float: The cost in USD dollars for the completion based on the provided parameters.
Exceptions:
Raises exception if model not in the litellm model cost map. Register model, via custom pricing or PR - https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json
Note:
- If completion_response is provided, the function extracts token information and the model name from it.
- If completion_response is not provided, the function calculates token counts based on the model and input text.
- The cost is calculated based on the model, prompt tokens, and completion tokens.
- For certain models containing "togethercomputer" in the name, prices are based on the model size.
- For un-mapped Replicate models, the cost is calculated based on the total time used for the request.
"""
try:
if (
(call_type == "aimage_generation" or call_type == "image_generation")
and model is not None
and isinstance(model, str)
and len(model) == 0
and custom_llm_provider == "azure"
):
model = "dall-e-2" # for dall-e-2, azure expects an empty model name
# Handle Inputs to completion_cost
prompt_tokens = 0
completion_tokens = 0
custom_llm_provider = None
if completion_response is not None:
# get input/output tokens from completion_response
prompt_tokens = completion_response.get("usage", {}).get("prompt_tokens", 0)
completion_tokens = completion_response.get("usage", {}).get(
"completion_tokens", 0
)
total_time = completion_response.get("_response_ms", 0)
verbose_logger.debug(
f"completion_response response ms: {completion_response.get('_response_ms')} "
)
model = model or completion_response.get(
"model", None
) # check if user passed an override for model, if it's none check completion_response['model']
if hasattr(completion_response, "_hidden_params"):
if (
completion_response._hidden_params.get("model", None) is not None
and len(completion_response._hidden_params["model"]) > 0
):
model = completion_response._hidden_params.get("model", model)
custom_llm_provider = completion_response._hidden_params.get(
"custom_llm_provider", ""
)
region_name = completion_response._hidden_params.get(
"region_name", region_name
)
size = completion_response._hidden_params.get(
"optional_params", {}
).get(
"size", "1024-x-1024"
) # openai default
quality = completion_response._hidden_params.get(
"optional_params", {}
).get(
"quality", "standard"
) # openai default
n = completion_response._hidden_params.get("optional_params", {}).get(
"n", 1
) # openai default
else:
if len(messages) > 0:
prompt_tokens = token_counter(model=model, messages=messages)
elif len(prompt) > 0:
prompt_tokens = token_counter(model=model, text=prompt)
completion_tokens = token_counter(model=model, text=completion)
if model is None:
raise ValueError(
f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
)

if (
call_type == CallTypes.image_generation.value
or call_type == CallTypes.aimage_generation.value
):
### IMAGE GENERATION COST CALCULATION ###
if custom_llm_provider == "vertex_ai":
# https://cloud.google.com/vertex-ai/generative-ai/pricing
# Vertex Charges Flat $0.20 per image
return 0.020

# fix size to match naming convention
if "x" in size and "-x-" not in size:
size = size.replace("x", "-x-")
image_gen_model_name = f"{size}/{model}"
image_gen_model_name_with_quality = image_gen_model_name
if quality is not None:
image_gen_model_name_with_quality = f"{quality}/{image_gen_model_name}"
size = size.split("-x-")
height = int(size[0]) # if it's 1024-x-1024 vs. 1024x1024
width = int(size[1])
verbose_logger.debug(f"image_gen_model_name: {image_gen_model_name}")
verbose_logger.debug(
f"image_gen_model_name_with_quality: {image_gen_model_name_with_quality}"
)
if image_gen_model_name in litellm.model_cost:
return (
litellm.model_cost[image_gen_model_name]["input_cost_per_pixel"]
* height
* width
* n
)
elif image_gen_model_name_with_quality in litellm.model_cost:
return (
litellm.model_cost[image_gen_model_name_with_quality][
"input_cost_per_pixel"
]
* height
* width
* n
)
else:
raise Exception(
f"Model={image_gen_model_name} not found in completion cost model map"
)
# Calculate cost based on prompt_tokens, completion_tokens
if (
"togethercomputer" in model
or "together_ai" in model
or custom_llm_provider == "together_ai"
):
# together ai prices based on size of llm
# get_model_params_and_category takes a model name and returns the category of LLM size it is in model_prices_and_context_window.json
model = get_model_params_and_category(model)
# replicate llms are calculate based on time for request running
# see https://replicate.com/pricing
elif (
model in litellm.replicate_models or "replicate" in model
) and model not in litellm.model_cost:
# for unmapped replicate model, default to replicate's time tracking logic
return get_replicate_completion_pricing(completion_response, total_time)

if model is None:
raise ValueError(
f"Model is None and does not exist in passed completion_response. Passed completion_response={completion_response}, model={model}"
)

(
prompt_tokens_cost_usd_dollar,
completion_tokens_cost_usd_dollar,
) = cost_per_token(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
custom_llm_provider=custom_llm_provider,
response_time_ms=total_time,
region_name=region_name,
custom_cost_per_second=custom_cost_per_second,
custom_cost_per_token=custom_cost_per_token,
)
_final_cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar
print_verbose(
f"final cost: {_final_cost}; prompt_tokens_cost_usd_dollar: {prompt_tokens_cost_usd_dollar}; completion_tokens_cost_usd_dollar: {completion_tokens_cost_usd_dollar}"
)
return _final_cost
except Exception as e:
raise e


def response_cost_calculator(
Expand Down Expand Up @@ -47,7 +317,7 @@ def response_cost_calculator(
) -> Optional[float]:
try:
response_cost: float = 0.0
if cache_hit is not None and cache_hit == True:
if cache_hit is not None and cache_hit is True:
response_cost = 0.0
else:
response_object._hidden_params["optional_params"] = optional_params
Expand All @@ -62,9 +332,11 @@ def response_cost_calculator(
if (
model in litellm.model_cost
and custom_pricing is not None
and custom_llm_provider == True
and custom_llm_provider is True
): # override defaults if custom pricing is set
base_model = model
elif base_model is None:
base_model = model
# base_model defaults to None if not set on model_info
response_cost = completion_cost(
completion_response=response_object,
Expand Down
49 changes: 49 additions & 0 deletions litellm/tests/test_completion_cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,52 @@ def test_groq_response_cost_tracking(is_streaming):
assert response_cost > 0.0

print(f"response_cost: {response_cost}")


def test_together_ai_qwen_completion_cost():
input_kwargs = {
"completion_response": litellm.ModelResponse(
**{
"id": "890db0c33c4ef94b-SJC",
"choices": [
{
"finish_reason": "eos",
"index": 0,
"message": {
"content": "I am Qwen, a large language model created by Alibaba Cloud.",
"role": "assistant",
},
}
],
"created": 1717900130,
"model": "together_ai/qwen/Qwen2-72B-Instruct",
"object": "chat.completion",
"system_fingerprint": None,
"usage": {
"completion_tokens": 15,
"prompt_tokens": 23,
"total_tokens": 38,
},
}
),
"model": "qwen/Qwen2-72B-Instruct",
"prompt": "",
"messages": [],
"completion": "",
"total_time": 0.0,
"call_type": "completion",
"custom_llm_provider": "together_ai",
"region_name": None,
"size": None,
"quality": None,
"n": None,
"custom_cost_per_token": None,
"custom_cost_per_second": None,
}

try:
litellm.completion_cost(**input_kwargs)
except litellm.NotFoundError:
pass
except Exception:
pytest.fail("This should have returned a 'not found error'")
Loading

0 comments on commit b26c3c7

Please sign in to comment.