1010import os
1111import time
1212import traceback
13+ import random
1314from datetime import datetime , timedelta
1415from typing import TYPE_CHECKING , Any , Dict , Literal , Optional , Union , cast , overload
1516
@@ -769,7 +770,12 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
769770 proxy_logging_obj = proxy_logging_obj ,
770771 )
771772 # Optionally, sleep for a bit before retrying
772- await asyncio .sleep (2 ** i ) # Exponential backoff
773+ # Sleep a random amount to avoid retrying and deadlocking again: when two transactions deadlock they are
774+ # cancelled basically at the same time, so if they wait the same time they will also retry at the same time
775+ # and thus they are more likely to deadlock again.
776+ # Instead, we sleep a random amount so that they retry at slightly different times, lowering the chance of
777+ # repeated deadlocks, and therefore of exceeding the retry limit.
778+ random .uniform (2 ** i , 2 ** (i + 1 ))
773779 except Exception as e :
774780 _raise_failed_update_spend_exception (
775781 e = e , start_time = start_time , proxy_logging_obj = proxy_logging_obj
@@ -849,8 +855,27 @@ async def _update_daily_spend(
849855 try :
850856 for i in range (n_retry_times + 1 ):
851857 try :
858+ # Sort the transactions to minimize the probability of deadlocks by reducing the chance of concurrent
859+ # trasactions locking the same rows/ranges in different orders.
852860 transactions_to_process = dict (
853- list (daily_spend_transactions .items ())[:BATCH_SIZE ]
861+ sorted (
862+ daily_spend_transactions .items (),
863+ # Normally to avoid deadlocks we would sort by the index, but since we have sprinkled indexes
864+ # on our schema like we're discount Salt Bae, we just sort by all fields that have an index,
865+ # in an ad-hoc (but hopefully sensible) order of indexes. The actual ordering matters less than
866+ # ensuring that all concurrent transactions sort in the same order.
867+ # We could in theory use the dict key, as it contains basically the same fields, but this is more
868+ # robust to future changes in the key format.
869+ # If _update_daily_spend ever gets the ability to write to multiple tables at once, the sorting
870+ # should sort by the table first.
871+ key = lambda x : (
872+ x [1 ]["date" ],
873+ x [1 ].get (entity_id_field ),
874+ x [1 ]["api_key" ],
875+ x [1 ]["model" ],
876+ x [1 ]["custom_llm_provider" ],
877+ ),
878+ )[:BATCH_SIZE ]
854879 )
855880
856881 if len (transactions_to_process ) == 0 :
@@ -893,7 +918,8 @@ async def _update_daily_spend(
893918 "model_group" : transaction .get ("model_group" ),
894919 "mcp_namespaced_tool_name" : transaction .get (
895920 "mcp_namespaced_tool_name"
896- ) or "" ,
921+ )
922+ or "" ,
897923 "custom_llm_provider" : transaction .get (
898924 "custom_llm_provider"
899925 ),
@@ -909,13 +935,13 @@ async def _update_daily_spend(
909935
910936 # Add cache-related fields if they exist
911937 if "cache_read_input_tokens" in transaction :
912- common_data ["cache_read_input_tokens" ] = (
913- transaction . get ( "cache_read_input_tokens" , 0 )
914- )
938+ common_data [
939+ "cache_read_input_tokens"
940+ ] = transaction . get ( "cache_read_input_tokens" , 0 )
915941 if "cache_creation_input_tokens" in transaction :
916- common_data ["cache_creation_input_tokens" ] = (
917- transaction . get ( "cache_creation_input_tokens" , 0 )
918- )
942+ common_data [
943+ "cache_creation_input_tokens"
944+ ] = transaction . get ( "cache_creation_input_tokens" , 0 )
919945
920946 # Create update data structure
921947 update_data = {
@@ -976,7 +1002,14 @@ async def _update_daily_spend(
9761002 start_time = start_time ,
9771003 proxy_logging_obj = proxy_logging_obj ,
9781004 )
979- await asyncio .sleep (2 ** i )
1005+ await asyncio .sleep (
1006+ # Sleep a random amount to avoid retrying and deadlocking again: when two transactions deadlock they are
1007+ # cancelled basically at the same time, so if they wait the same time they will also retry at the same time
1008+ # and thus they are more likely to deadlock again.
1009+ # Instead, we sleep a random amount so that they retry at slightly different times, lowering the chance of
1010+ # repeated deadlocks, and therefore of exceeding the retry limit.
1011+ random .uniform (2 ** i , 2 ** (i + 1 ))
1012+ )
9801013
9811014 except Exception as e :
9821015 if "transactions_to_process" in locals ():
0 commit comments