Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): refactor out ragas base evaluator #11846

Merged
merged 9 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions ddtrace/llmobs/_evaluators/ragas/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import traceback
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace.internal.logger import get_logger
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._constants import INTERNAL_CONTEXT_VARIABLE_KEYS
from ddtrace.llmobs._constants import INTERNAL_QUERY_VARIABLE_KEYS
from ddtrace.llmobs._constants import RAGAS_ML_APP_PREFIX


logger = get_logger(__name__)


class RagasDependencies:
"""
A helper class to store instances of ragas classes and functions
that may or may not exist in a user's environment.
"""

def __init__(self):
import ragas

self.ragas_version = parse_version(ragas.__version__)
if self.ragas_version >= (0, 2, 0) or self.ragas_version < (0, 1, 10):
raise NotImplementedError(
"Ragas version: {} is not supported".format(self.ragas_version),
)

from ragas.llms import llm_factory

self.llm_factory = llm_factory

from ragas.llms.output_parser import RagasoutputParser

self.RagasoutputParser = RagasoutputParser

from ragas.metrics import context_precision

self.context_precision = context_precision

from ragas.metrics.base import ensembler

self.ensembler = ensembler

from ragas.metrics import faithfulness

self.faithfulness = faithfulness

from ragas.metrics.base import get_segmenter

self.get_segmenter = get_segmenter

from ddtrace.llmobs._evaluators.ragas.models import StatementFaithfulnessAnswers

self.StatementFaithfulnessAnswers = StatementFaithfulnessAnswers

from ddtrace.llmobs._evaluators.ragas.models import StatementsAnswers

self.StatementsAnswers = StatementsAnswers


def _get_ml_app_for_ragas_trace(span_event: dict) -> str:
"""
The `ml_app` spans generated from traces of ragas will be named as `dd-ragas-<ml_app>`
or `dd-ragas` if `ml_app` is not present in the span event.
"""
tags: List[str] = span_event.get("tags", [])
ml_app = None
for tag in tags:
if isinstance(tag, str) and tag.startswith("ml_app:"):
ml_app = tag.split(":")[1]
break
if not ml_app:
return RAGAS_ML_APP_PREFIX
return "{}-{}".format(RAGAS_ML_APP_PREFIX, ml_app)


class BaseRagasEvaluator:
"""A class used by EvaluatorRunner to conduct ragas evaluations
on LLM Observability span events. The job of an Evaluator is to take a span and
submit evaluation metrics based on the span's attributes.
Extenders of this class should only need to implement the `evaluate` method.
"""

LABEL = "ragas"
METRIC_TYPE = "score"

def __init__(self, llmobs_service):
"""
Initialize an evaluator that uses the ragas library to generate a score on finished LLM spans.
:param llmobs_service: An instance of the LLM Observability service used for tracing the evaluation and
submitting evaluation metrics.
Raises: NotImplementedError if the ragas library is not found or if ragas version is not supported.
"""
self.llmobs_service = llmobs_service
self.ragas_version = "unknown"
telemetry_state = "ok"
try:
self.ragas_dependencies = RagasDependencies()
self.ragas_version = self.ragas_dependencies.ragas_version
except ImportError as e:
telemetry_state = "fail_import_error"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except AttributeError as e:
telemetry_state = "fail_attribute_error"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except NotImplementedError as e:
telemetry_state = "fail_not_supported"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
except Exception as e:
telemetry_state = "fail_unknown"
raise NotImplementedError("Failed to load dependencies for `{}` evaluator".format(self.LABEL)) from e
finally:
telemetry_writer.add_count_metric(
Kyle-Verhoog marked this conversation as resolved.
Show resolved Hide resolved
namespace=TELEMETRY_APM_PRODUCT.LLMOBS,
name="evaluators.init",
value=1,
tags=(
("evaluator_label", self.LABEL),
("state", telemetry_state),
("evaluator_version", self.ragas_version),
),
)
if telemetry_state != "ok":
telemetry_writer.add_log(
level=TELEMETRY_LOG_LEVEL.ERROR,
message="Failed to import Ragas dependencies",
stack_trace=traceback.format_exc(),
tags={"evaluator_version": self.ragas_version},
)

def run_and_submit_evaluation(self, span_event: dict):
if not span_event:
return
score_result_or_failure, metric_metadata = self.evaluate(span_event)
telemetry_writer.add_count_metric(
TELEMETRY_APM_PRODUCT.LLMOBS,
"evaluators.run",
1,
tags=(
("evaluator_label", self.LABEL),
("state", score_result_or_failure if isinstance(score_result_or_failure, str) else "success"),
("evaluator_version", self.ragas_version),
),
)
if isinstance(score_result_or_failure, float):
self.llmobs_service.submit_evaluation(
span_context={"trace_id": span_event.get("trace_id"), "span_id": span_event.get("span_id")},
label=self.LABEL,
metric_type=self.METRIC_TYPE,
value=score_result_or_failure,
metadata=metric_metadata,
)

def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]:
raise NotImplementedError("evaluate method must be implemented by individual evaluators")

def _extract_evaluation_inputs_from_span(self, span_event: dict) -> Optional[dict]:
"""
Extracts the question, answer, and context used as inputs for a ragas evaluation on a span event.
"""
with self.llmobs_service.workflow("dd-ragas.extract_evaluation_inputs_from_span") as extract_inputs_workflow:
self.llmobs_service.annotate(span=extract_inputs_workflow, input_data=span_event)
lievan marked this conversation as resolved.
Show resolved Hide resolved
question, answer, contexts = None, None, None

meta_io = span_event.get("meta")
if meta_io is None:
return None

meta_input = meta_io.get("input")
meta_output = meta_io.get("output")

if not (meta_input and meta_output):
return None

prompt = meta_input.get("prompt")
if prompt is None:
logger.debug("Failed to extract `prompt` from span for ragas evaluation")
return None
prompt_variables = prompt.get("variables")

input_messages = meta_input.get("messages")

messages = meta_output.get("messages")
if messages is not None and len(messages) > 0:
answer = messages[-1].get("content")

if prompt_variables:
context_keys = prompt.get(INTERNAL_CONTEXT_VARIABLE_KEYS, ["context"])
question_keys = prompt.get(INTERNAL_QUERY_VARIABLE_KEYS, ["question"])
contexts = [prompt_variables.get(key) for key in context_keys if prompt_variables.get(key)]
question = " ".join([prompt_variables.get(key) for key in question_keys if prompt_variables.get(key)])

if not question and input_messages is not None and len(input_messages) > 0:
question = input_messages[-1].get("content")

self.llmobs_service.annotate(
span=extract_inputs_workflow, output_data={"question": question, "contexts": contexts, "answer": answer}
)
if any(field is None for field in (question, contexts, answer)):
logger.debug("Failed to extract inputs required for ragas evaluation")
return None

return {"question": question, "contexts": contexts, "answer": answer}
Loading
Loading