-
Notifications
You must be signed in to change notification settings - Fork 419
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(llmobs): refactor out ragas base evaluator (#11846)
Creates a `RagasBaseEvaluator` class that ragas evaluators can share. This is a split-out version of [this PR](#11716) that has incorporated changes based on the first round of comments from @Yun-Kim and @Kyle-Verhoog This class contains shared logic for: - checking ragas dependencies are present, and throwing otherwise - `submit_and_evaluate` function, which generally takes a span event, calls evaluate to generate a score, and then submits an evaluation - extracting out question, contexts, and answer from an LLM span ### Misc changes - rename tests to specify they are faithfulness-specific tests - throw when we parse an unsupported evaluator instead of logging a warning ### ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: lievan <[email protected]>
- Loading branch information
Showing
11 changed files
with
297 additions
and
208 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
import traceback | ||
from typing import List | ||
from typing import Optional | ||
from typing import Tuple | ||
from typing import Union | ||
|
||
from ddtrace.internal.logger import get_logger | ||
from ddtrace.internal.telemetry import telemetry_writer | ||
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT | ||
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL | ||
from ddtrace.internal.utils.version import parse_version | ||
from ddtrace.llmobs._constants import INTERNAL_CONTEXT_VARIABLE_KEYS | ||
from ddtrace.llmobs._constants import INTERNAL_QUERY_VARIABLE_KEYS | ||
from ddtrace.llmobs._constants import RAGAS_ML_APP_PREFIX | ||
|
||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
class RagasDependencies: | ||
""" | ||
A helper class to store instances of ragas classes and functions | ||
that may or may not exist in a user's environment. | ||
""" | ||
|
||
def __init__(self): | ||
import ragas | ||
|
||
self.ragas_version = parse_version(ragas.__version__) | ||
if self.ragas_version >= (0, 2, 0) or self.ragas_version < (0, 1, 10): | ||
raise NotImplementedError( | ||
"Ragas version: {} is not supported".format(self.ragas_version), | ||
) | ||
|
||
from ragas.llms import llm_factory | ||
|
||
self.llm_factory = llm_factory | ||
|
||
from ragas.llms.output_parser import RagasoutputParser | ||
|
||
self.RagasoutputParser = RagasoutputParser | ||
|
||
from ragas.metrics import context_precision | ||
|
||
self.context_precision = context_precision | ||
|
||
from ragas.metrics.base import ensembler | ||
|
||
self.ensembler = ensembler | ||
|
||
from ragas.metrics import faithfulness | ||
|
||
self.faithfulness = faithfulness | ||
|
||
from ragas.metrics.base import get_segmenter | ||
|
||
self.get_segmenter = get_segmenter | ||
|
||
from ddtrace.llmobs._evaluators.ragas.models import StatementFaithfulnessAnswers | ||
|
||
self.StatementFaithfulnessAnswers = StatementFaithfulnessAnswers | ||
|
||
from ddtrace.llmobs._evaluators.ragas.models import StatementsAnswers | ||
|
||
self.StatementsAnswers = StatementsAnswers | ||
|
||
|
||
def _get_ml_app_for_ragas_trace(span_event: dict) -> str: | ||
""" | ||
The `ml_app` spans generated from traces of ragas will be named as `dd-ragas-<ml_app>` | ||
or `dd-ragas` if `ml_app` is not present in the span event. | ||
""" | ||
tags: List[str] = span_event.get("tags", []) | ||
ml_app = None | ||
for tag in tags: | ||
if isinstance(tag, str) and tag.startswith("ml_app:"): | ||
ml_app = tag.split(":")[1] | ||
break | ||
if not ml_app: | ||
return RAGAS_ML_APP_PREFIX | ||
return "{}-{}".format(RAGAS_ML_APP_PREFIX, ml_app) | ||
|
||
|
||
class BaseRagasEvaluator: | ||
"""A class used by EvaluatorRunner to conduct ragas evaluations | ||
on LLM Observability span events. The job of an Evaluator is to take a span and | ||
submit evaluation metrics based on the span's attributes. | ||
Extenders of this class should only need to implement the `evaluate` method. | ||
""" | ||
|
||
LABEL = "ragas" | ||
METRIC_TYPE = "score" | ||
|
||
def __init__(self, llmobs_service): | ||
""" | ||
Initialize an evaluator that uses the ragas library to generate a score on finished LLM spans. | ||
:param llmobs_service: An instance of the LLM Observability service used for tracing the evaluation and | ||
submitting evaluation metrics. | ||
Raises: NotImplementedError if the ragas library is not found or if ragas version is not supported. | ||
""" | ||
self.llmobs_service = llmobs_service | ||
self.ragas_version = "unknown" | ||
telemetry_state = "ok" | ||
try: | ||
self.ragas_dependencies = RagasDependencies() | ||
self.ragas_version = self.ragas_dependencies.ragas_version | ||
except ImportError as e: | ||
telemetry_state = "fail_import_error" | ||
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e | ||
except AttributeError as e: | ||
telemetry_state = "fail_attribute_error" | ||
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e | ||
except NotImplementedError as e: | ||
telemetry_state = "fail_not_supported" | ||
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e | ||
except Exception as e: | ||
telemetry_state = "fail_unknown" | ||
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e | ||
finally: | ||
telemetry_writer.add_count_metric( | ||
namespace=TELEMETRY_APM_PRODUCT.LLMOBS, | ||
name="evaluators.init", | ||
value=1, | ||
tags=( | ||
("evaluator_label", self.LABEL), | ||
("state", telemetry_state), | ||
("evaluator_version", self.ragas_version), | ||
), | ||
) | ||
if telemetry_state != "ok": | ||
telemetry_writer.add_log( | ||
level=TELEMETRY_LOG_LEVEL.ERROR, | ||
message="Failed to import Ragas dependencies", | ||
stack_trace=traceback.format_exc(), | ||
tags={"evaluator_version": self.ragas_version}, | ||
) | ||
|
||
def run_and_submit_evaluation(self, span_event: dict): | ||
if not span_event: | ||
return | ||
score_result_or_failure, metric_metadata = self.evaluate(span_event) | ||
telemetry_writer.add_count_metric( | ||
TELEMETRY_APM_PRODUCT.LLMOBS, | ||
"evaluators.run", | ||
1, | ||
tags=( | ||
("evaluator_label", self.LABEL), | ||
("state", score_result_or_failure if isinstance(score_result_or_failure, str) else "success"), | ||
("evaluator_version", self.ragas_version), | ||
), | ||
) | ||
if isinstance(score_result_or_failure, float): | ||
self.llmobs_service.submit_evaluation( | ||
span_context={"trace_id": span_event.get("trace_id"), "span_id": span_event.get("span_id")}, | ||
label=self.LABEL, | ||
metric_type=self.METRIC_TYPE, | ||
value=score_result_or_failure, | ||
metadata=metric_metadata, | ||
) | ||
|
||
def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]: | ||
raise NotImplementedError("evaluate method must be implemented by individual evaluators") | ||
|
||
def _extract_evaluation_inputs_from_span(self, span_event: dict) -> Optional[dict]: | ||
""" | ||
Extracts the question, answer, and context used as inputs for a ragas evaluation on a span event. | ||
""" | ||
with self.llmobs_service.workflow("dd-ragas.extract_evaluation_inputs_from_span") as extract_inputs_workflow: | ||
self.llmobs_service.annotate(span=extract_inputs_workflow, input_data=span_event) | ||
question, answer, contexts = None, None, None | ||
|
||
meta_io = span_event.get("meta") | ||
if meta_io is None: | ||
return None | ||
|
||
meta_input = meta_io.get("input") | ||
meta_output = meta_io.get("output") | ||
|
||
if not (meta_input and meta_output): | ||
return None | ||
|
||
prompt = meta_input.get("prompt") | ||
if prompt is None: | ||
logger.debug("Failed to extract `prompt` from span for ragas evaluation") | ||
return None | ||
prompt_variables = prompt.get("variables") | ||
|
||
input_messages = meta_input.get("messages") | ||
|
||
messages = meta_output.get("messages") | ||
if messages is not None and len(messages) > 0: | ||
answer = messages[-1].get("content") | ||
|
||
if prompt_variables: | ||
context_keys = prompt.get(INTERNAL_CONTEXT_VARIABLE_KEYS, ["context"]) | ||
question_keys = prompt.get(INTERNAL_QUERY_VARIABLE_KEYS, ["question"]) | ||
contexts = [prompt_variables.get(key) for key in context_keys if prompt_variables.get(key)] | ||
question = " ".join([prompt_variables.get(key) for key in question_keys if prompt_variables.get(key)]) | ||
|
||
if not question and input_messages is not None and len(input_messages) > 0: | ||
question = input_messages[-1].get("content") | ||
|
||
self.llmobs_service.annotate( | ||
span=extract_inputs_workflow, output_data={"question": question, "contexts": contexts, "answer": answer} | ||
) | ||
if any(field is None for field in (question, contexts, answer)): | ||
logger.debug("Failed to extract inputs required for ragas evaluation") | ||
return None | ||
|
||
return {"question": question, "contexts": contexts, "answer": answer} |
Oops, something went wrong.