Skip to content

Commit

Permalink
[NA] Make evaluate functions work stably when event loops are used in…
Browse files Browse the repository at this point in the history
…side (#1171)

* Make evaluate functions work stably when event loops are used inside worker threads with httpx.AsyncClients. Fix ascore call in LiteLLMChatCompletion to call acompletion under the hood

* Added comment
  • Loading branch information
alexkuzmik authored Jan 30, 2025
1 parent 6fd37cb commit f032513
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 32 deletions.
43 changes: 43 additions & 0 deletions sdks/python/src/opik/evaluation/asyncio_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import httpcore
import functools
import contextlib

from typing import Iterator, Callable


@contextlib.contextmanager
def async_http_connections_expire_immediately() -> Iterator[None]:
"""
This patching addresses the issue of httpx.AsyncClient not working
correctly when it's used by multiple event loops.
The connection from connection pool created with one event loop can be tried to be used
by the request processed via another event loop. Asyncio doesn't support
that and the RuntimeError is raised.
So, this context manager patches AsyncHTTPConnection class in a way that all of the
async connections expire immediately and the runtime error is not possible.
Related issues:
https://github.com/comet-ml/opik/issues/1132
https://github.com/encode/httpx/discussions/2959
TODO: this function might probably require extra logic for handling the cases
when there is already existing async connection pool with opened connections, but it is
out of scope for now.
"""
try:
original = httpcore.AsyncHTTPConnection.__init__

def AsyncHTTPConnection__init__wrapper() -> Callable:
@functools.wraps(original)
def wrapped(*args, **kwargs): # type: ignore
kwargs["keepalive_expiry"] = 0
return original(*args, **kwargs)

return wrapped

httpcore.AsyncHTTPConnection.__init__ = AsyncHTTPConnection__init__wrapper()
yield
finally:
httpcore.AsyncHTTPConnection.__init__ = original
65 changes: 34 additions & 31 deletions sdks/python/src/opik/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..api_objects.prompt import prompt_template
from ..api_objects.dataset import dataset
from ..api_objects import opik_client
from . import scorer, scores_logger, report, evaluation_result, utils
from . import scorer, scores_logger, report, evaluation_result, utils, asyncio_support


def evaluate(
Expand Down Expand Up @@ -80,18 +80,19 @@ def evaluate(

start_time = time.time()

test_results = scorer.score_tasks(
client=client,
experiment_=experiment,
dataset_=dataset,
task=task,
scoring_metrics=scoring_metrics,
nb_samples=nb_samples,
workers=task_threads,
verbose=verbose,
project_name=project_name,
scoring_key_mapping=scoring_key_mapping,
)
with asyncio_support.async_http_connections_expire_immediately():
test_results = scorer.score_tasks(
client=client,
experiment_=experiment,
dataset_=dataset,
task=task,
scoring_metrics=scoring_metrics,
nb_samples=nb_samples,
workers=task_threads,
verbose=verbose,
project_name=project_name,
scoring_key_mapping=scoring_key_mapping,
)

total_time = time.time() - start_time

Expand Down Expand Up @@ -159,12 +160,13 @@ def evaluate_experiment(
scoring_key_mapping=scoring_key_mapping,
)

test_results = scorer.score_test_cases(
test_cases=test_cases,
scoring_metrics=scoring_metrics,
workers=scoring_threads,
verbose=verbose,
)
with asyncio_support.async_http_connections_expire_immediately():
test_results = scorer.score_test_cases(
test_cases=test_cases,
scoring_metrics=scoring_metrics,
workers=scoring_threads,
verbose=verbose,
)

first_trace_id = test_results[0].test_case.trace_id
project_name = utils.get_trace_project_name(client=client, trace_id=first_trace_id)
Expand Down Expand Up @@ -280,18 +282,19 @@ def evaluate_prompt(

start_time = time.time()

test_results = scorer.score_tasks(
client=client,
experiment_=experiment,
dataset_=dataset,
task=_build_prompt_evaluation_task(model=model, messages=messages),
scoring_metrics=scoring_metrics,
nb_samples=nb_samples,
workers=task_threads,
verbose=verbose,
project_name=project_name,
scoring_key_mapping=None,
)
with asyncio_support.async_http_connections_expire_immediately():
test_results = scorer.score_tasks(
client=client,
experiment_=experiment,
dataset_=dataset,
task=_build_prompt_evaluation_task(model=model, messages=messages),
scoring_metrics=scoring_metrics,
nb_samples=nb_samples,
workers=task_threads,
verbose=verbose,
project_name=project_name,
scoring_key_mapping=None,
)

total_time = time.time() - start_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def agenerate_provider_response(self, **kwargs: Any) -> ModelResponse:
if opik_monitor.enabled_in_config():
all_kwargs = opik_monitor.try_add_opik_monitoring_to_params(all_kwargs)

response = await self._engine.completion(
response = await self._engine.acompletion(
model=self.model_name, messages=messages, **all_kwargs
)

Expand Down

0 comments on commit f032513

Please sign in to comment.