From af799f3e6ec775ecfad5b90505eb1c1d4524a9e2 Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 11:56:23 -0700 Subject: [PATCH 1/6] throttle tlm requests on partial success (deberta not succeeding) --- cleanlab_studio/errors.py | 4 ++++ cleanlab_studio/internal/api/api.py | 4 ++++ cleanlab_studio/internal/tlm/concurrency.py | 6 +++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cleanlab_studio/errors.py b/cleanlab_studio/errors.py index 23c21507..12ca5370 100644 --- a/cleanlab_studio/errors.py +++ b/cleanlab_studio/errors.py @@ -86,6 +86,10 @@ def __init__(self, message: str, status_code: int) -> None: self.status_code = status_code +class TlmPartialSuccess(APIError): + pass + + class UnsupportedVersionError(HandledError): def __init__(self) -> None: super().__init__( diff --git a/cleanlab_studio/internal/api/api.py b/cleanlab_studio/internal/api/api.py index 73eab2c4..3c48dd83 100644 --- a/cleanlab_studio/internal/api/api.py +++ b/cleanlab_studio/internal/api/api.py @@ -10,6 +10,7 @@ InvalidProjectConfiguration, RateLimitError, TlmBadRequest, + TlmPartialSuccess, TlmServerError, ) from cleanlab_studio.internal.tlm.concurrency import TlmRateHandler @@ -651,6 +652,9 @@ async def tlm_prompt( await handle_tlm_client_error_from_resp(res, batch_index) await handle_tlm_api_error_from_resp(res, batch_index) + if not res_json.get("nli_deberta_success", True): + raise TlmPartialSuccess("Partial failure on deberta call -- slowdown request rate.") + finally: if local_scoped_client: await client_session.close() diff --git a/cleanlab_studio/internal/tlm/concurrency.py b/cleanlab_studio/internal/tlm/concurrency.py index 14065a7e..5180106a 100644 --- a/cleanlab_studio/internal/tlm/concurrency.py +++ b/cleanlab_studio/internal/tlm/concurrency.py @@ -2,7 +2,7 @@ from types import TracebackType from typing import Optional, Type -from cleanlab_studio.errors import RateLimitError, TlmServerError +from cleanlab_studio.errors import RateLimitError, TlmPartialSuccess, TlmServerError class TlmRateHandler: @@ -60,6 +60,10 @@ async def __aexit__( ): await self._decrease_congestion_window() + elif isinstance(exc, TlmPartialSuccess): + print("Partial success, decreasing congestion window.") + await self._decrease_congestion_window() + # release acquired send semaphore from aenter self._send_semaphore.release() From e2fe4dcaabe06ab5f2276f611448ddc62114ce14 Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 11:56:40 -0700 Subject: [PATCH 2/6] Bump version (v2.0.6) --- cleanlab_studio/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cleanlab_studio/version.py b/cleanlab_studio/version.py index b2003686..3527e3d4 100644 --- a/cleanlab_studio/version.py +++ b/cleanlab_studio/version.py @@ -1,7 +1,7 @@ # Note to developers: # Consider if backend's MIN_CLI_VERSION needs updating when pushing any changes to this file. -__version__ = "2.0.5" +__version__ = "2.0.6" SCHEMA_VERSION = "0.2.0" MIN_SCHEMA_VERSION = "0.1.0" From 953825bbfba248db0ff91cb5cc6f57e7cdb0ad6e Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 11:57:36 -0700 Subject: [PATCH 3/6] remove debug print --- cleanlab_studio/internal/tlm/concurrency.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cleanlab_studio/internal/tlm/concurrency.py b/cleanlab_studio/internal/tlm/concurrency.py index 5180106a..2636723a 100644 --- a/cleanlab_studio/internal/tlm/concurrency.py +++ b/cleanlab_studio/internal/tlm/concurrency.py @@ -61,7 +61,6 @@ async def __aexit__( await self._decrease_congestion_window() elif isinstance(exc, TlmPartialSuccess): - print("Partial success, decreasing congestion window.") await self._decrease_congestion_window() # release acquired send semaphore from aenter From fc1593327b3d75ace86e7f4e5c61bfe37dee8ca2 Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 12:32:59 -0700 Subject: [PATCH 4/6] docstring --- cleanlab_studio/errors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cleanlab_studio/errors.py b/cleanlab_studio/errors.py index 12ca5370..448cdacf 100644 --- a/cleanlab_studio/errors.py +++ b/cleanlab_studio/errors.py @@ -87,6 +87,8 @@ def __init__(self, message: str, status_code: int) -> None: class TlmPartialSuccess(APIError): + """TLM request partially succeeded. Still return result to user.""" + pass From eecf627fda7e24c18b8378edc883f5053e15f141 Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 12:36:37 -0700 Subject: [PATCH 5/6] docstring --- cleanlab_studio/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cleanlab_studio/errors.py b/cleanlab_studio/errors.py index 448cdacf..3a8716be 100644 --- a/cleanlab_studio/errors.py +++ b/cleanlab_studio/errors.py @@ -87,7 +87,7 @@ def __init__(self, message: str, status_code: int) -> None: class TlmPartialSuccess(APIError): - """TLM request partially succeeded. Still return result to user.""" + """TLM request partially succeeded. Still returns result to user.""" pass From 4ba2643ef5a7a6a9d73a80c14027348271cfe7d0 Mon Sep 17 00:00:00 2001 From: ryansingman Date: Thu, 30 May 2024 14:33:40 -0700 Subject: [PATCH 6/6] fix response key name, clarify exception swallowing in decorator --- cleanlab_studio/internal/api/api.py | 2 +- cleanlab_studio/internal/tlm/concurrency.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cleanlab_studio/internal/api/api.py b/cleanlab_studio/internal/api/api.py index 3c48dd83..85f566ac 100644 --- a/cleanlab_studio/internal/api/api.py +++ b/cleanlab_studio/internal/api/api.py @@ -652,7 +652,7 @@ async def tlm_prompt( await handle_tlm_client_error_from_resp(res, batch_index) await handle_tlm_api_error_from_resp(res, batch_index) - if not res_json.get("nli_deberta_success", True): + if not res_json.get("deberta_success", True): raise TlmPartialSuccess("Partial failure on deberta call -- slowdown request rate.") finally: diff --git a/cleanlab_studio/internal/tlm/concurrency.py b/cleanlab_studio/internal/tlm/concurrency.py index 2636723a..117c9cb4 100644 --- a/cleanlab_studio/internal/tlm/concurrency.py +++ b/cleanlab_studio/internal/tlm/concurrency.py @@ -50,6 +50,8 @@ async def __aexit__( If request failed due to 503, decrease congestion window. Else if request failed for other reason, don't change congestion window, just exit. """ + swallow_exception: bool = False + if exc_type is None: await self._increase_congestion_window() @@ -62,11 +64,12 @@ async def __aexit__( elif isinstance(exc, TlmPartialSuccess): await self._decrease_congestion_window() + swallow_exception = True # release acquired send semaphore from aenter self._send_semaphore.release() - return False + return swallow_exception async def _increase_congestion_window( self,