From a45301a2d102289c853bd51076930327af64d3d5 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Wed, 19 Feb 2025 16:44:49 -0700 Subject: [PATCH 01/20] wip on moving chat mlflow --- llm-service/app/services/chat.py | 138 ++++-------------- llm-service/app/services/mlflow/__init__.py | 151 ++++++++++++++++++++ 2 files changed, 179 insertions(+), 110 deletions(-) create mode 100644 llm-service/app/services/mlflow/__init__.py diff --git a/llm-service/app/services/chat.py b/llm-service/app/services/chat.py index 796ae54..0d28667 100644 --- a/llm-service/app/services/chat.py +++ b/llm-service/app/services/chat.py @@ -35,17 +35,14 @@ # BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF # DATA. # ############################################################################## -import asyncio -import re import time import uuid from typing import List, Iterable -import mlflow +# import mlflow from fastapi import HTTPException from llama_index.core.base.llms.types import MessageRole from llama_index.core.chat_engine.types import AgentChatResponse -from mlflow.entities import Experiment from . import evaluators, llm_completion from .chat_store import ( @@ -56,8 +53,9 @@ RagStudioChatMessage, RagMessage, ) -from .metadata_apis import session_metadata_api, data_sources_metadata_api +from .metadata_apis import session_metadata_api from .metadata_apis.session_metadata_api import Session +from .mlflow import record_rag_mlflow_run, record_direct_llm_mlflow_run from .query import querier from .query.query_configuration import QueryConfiguration from ..ai.vector_stores.qdrant import QdrantVectorStore @@ -78,22 +76,16 @@ def v2_chat( use_summary_filter=session.query_configuration.enable_summary_filter, ) response_id = str(uuid.uuid4()) - experiment: Experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" + + new_chat_message: RagStudioChatMessage = _run_chat( + session, response_id, query, query_configuration, user_name ) - # mlflow.set_experiment_tag("session_id", session.id) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ): - new_chat_message: RagStudioChatMessage = _run_chat( - session, response_id, query, query_configuration, user_name - ) ChatHistoryManager().append_to_history(session_id, [new_chat_message]) return new_chat_message -@mlflow.trace(name="v2_chat") +# @mlflow.trace(name="v2_chat") def _run_chat( session: Session, response_id: str, @@ -101,9 +93,6 @@ def _run_chat( query_configuration: QueryConfiguration, user_name: str, ) -> RagStudioChatMessage: - asyncio.run(log_ml_flow_params(session, query_configuration, user_name)) - mlflow.set_tag("response_id", response_id) - if len(session.data_source_ids) != 1: raise HTTPException( status_code=400, detail="Only one datasource is supported for chat." @@ -150,66 +139,11 @@ def _run_chat( timestamp=time.time(), condensed_question=condensed_question, ) - log_ml_flow_metrics(session, new_chat_message) - return new_chat_message - - -def log_ml_flow_metrics(session: Session, message: RagStudioChatMessage) -> None: - source_nodes: list[RagPredictSourceNode] = message.source_nodes - query = message.rag_message.user - response = message.rag_message.assistant - for evaluation in message.evaluations: - mlflow.log_metric(evaluation.name, evaluation.value) - - mlflow.log_metrics( - { - "source_nodes_count": len(source_nodes), - "max_score": (source_nodes[0].score if source_nodes else 0.0), - "input_word_count": len(re.findall(r"\w+", query)), - "output_word_count": len(re.findall(r"\w+", response)), - } - ) - - flattened_nodes = [node.model_dump() for node in source_nodes] - mlflow.log_table( - { - "response_id": message.id, - "node_id": map(lambda x: x.get("node_id"), flattened_nodes), - "doc_id": map(lambda x: x.get("doc_id"), flattened_nodes), - "source_file_name": map( - lambda x: x.get("source_file_name"), flattened_nodes - ), - "score": map(lambda x: x.get("score"), flattened_nodes), - "query": query, - "response": response, - "condensed_question": message.condensed_question, - }, - artifact_file="response_details.json", - ) - -async def log_ml_flow_params( - session: Session, query_configuration: QueryConfiguration, user_name: str -) -> None: - data_source_metadata = data_sources_metadata_api.get_metadata(session.data_source_ids[0]) - mlflow.log_params( - { - "top_k": query_configuration.top_k, - "inference_model": query_configuration.model_name, - "rerank_model_name": query_configuration.rerank_model_name, - "exclude_knowledge_base": query_configuration.exclude_knowledge_base, - "use_question_condensing": query_configuration.use_question_condensing, - "use_hyde": query_configuration.use_hyde, - "use_summary_filter": query_configuration.use_summary_filter, - "session_id": session.id, - "data_source_ids": session.data_source_ids, - "user_name": user_name, - "embedding_model": data_source_metadata.embedding_model, - "chunk_size": data_source_metadata.chunk_size, - "summarization_model": data_source_metadata.summarization_model, - "chunk_overlap_percent": data_source_metadata.chunk_overlap_percent, - } + record_rag_mlflow_run( + new_chat_message, query_configuration, response_id, session, user_name ) + return new_chat_message def retrieve_chat_history(session_id: int) -> List[RagContext]: @@ -328,39 +262,23 @@ def direct_llm_chat( session_id: int, query: str, user_name: str ) -> RagStudioChatMessage: session = session_metadata_api.get_session(session_id) - experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" - ) response_id = str(uuid.uuid4()) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ): - mlflow.set_tag("response_id", response_id) - mlflow.set_tag("direct_llm", True) - mlflow.log_params( - { - "inference_model": session.inference_model, - "exclude_knowledge_base": True, - "session_id": session.id, - "data_source_ids": session.data_source_ids, - "user_name": user_name, - } - ) + record_direct_llm_mlflow_run(response_id, session, user_name) - chat_response = llm_completion.completion( - session_id, query, session.inference_model - ) - new_chat_message = RagStudioChatMessage( - id=response_id, - source_nodes=[], - inference_model=session.inference_model, - evaluations=[], - rag_message=RagMessage( - user=query, - assistant=str(chat_response.message.content), - ), - timestamp=time.time(), - condensed_question=None, - ) - ChatHistoryManager().append_to_history(session_id, [new_chat_message]) - return new_chat_message + chat_response = llm_completion.completion( + session_id, query, session.inference_model + ) + new_chat_message = RagStudioChatMessage( + id=response_id, + source_nodes=[], + inference_model=session.inference_model, + evaluations=[], + rag_message=RagMessage( + user=query, + assistant=str(chat_response.message.content), + ), + timestamp=time.time(), + condensed_question=None, + ) + ChatHistoryManager().append_to_history(session_id, [new_chat_message]) + return new_chat_message diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py new file mode 100644 index 0000000..91fdfa0 --- /dev/null +++ b/llm-service/app/services/mlflow/__init__.py @@ -0,0 +1,151 @@ +# +# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) +# (C) Cloudera, Inc. 2024 +# All rights reserved. +# +# Applicable Open Source License: Apache 2.0 +# +# NOTE: Cloudera open source products are modular software products +# made up of hundreds of individual components, each of which was +# individually copyrighted. Each Cloudera open source product is a +# collective work under U.S. Copyright Law. Your license to use the +# collective work is as provided in your written agreement with +# Cloudera. Used apart from the collective work, this file is +# licensed for your use pursuant to the open source license +# identified above. +# +# This code is provided to you pursuant a written agreement with +# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute +# this code. If you do not have a written agreement with Cloudera nor +# with an authorized and properly licensed third party, you do not +# have any rights to access nor to use this code. +# +# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the +# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY +# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED +# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO +# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, +# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS +# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE +# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR +# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES +# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF +# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF +# DATA. +# + +import re +from typing import Any + +import mlflow +from mlflow import MlflowClient +from mlflow.entities import Experiment + +from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode +from app.services.metadata_apis import data_sources_metadata_api +from app.services.metadata_apis.session_metadata_api import Session +from app.services.query.query_configuration import QueryConfiguration + + +def chat_log_ml_flow_table(message: RagStudioChatMessage) -> None: + source_nodes: list[RagPredictSourceNode] = message.source_nodes + + query = message.rag_message.user + response = message.rag_message.assistant + + flattened_nodes = [node.model_dump() for node in source_nodes] + mlflow.log_table( + { + "response_id": message.id, + "node_id": map(lambda x: x.get("node_id"), flattened_nodes), + "doc_id": map(lambda x: x.get("doc_id"), flattened_nodes), + "source_file_name": map( + lambda x: x.get("source_file_name"), flattened_nodes + ), + "score": map(lambda x: x.get("score"), flattened_nodes), + "query": query, + "response": response, + "input_word_count": len(re.findall(r"\w+", query)), + "output_word_count": len(re.findall(r"\w+", response)), + "condensed_question": message.condensed_question, + }, + artifact_file="response_details.json", + ) + + +def chat_log_ml_flow_params( + session: Session, query_configuration: QueryConfiguration, user_name: str +) -> dict[str, Any]: + data_source_metadata = data_sources_metadata_api.get_metadata( + session.data_source_ids[0] + ) + return { + "top_k": query_configuration.top_k, + "inference_model": query_configuration.model_name, + "rerank_model_name": query_configuration.rerank_model_name, + "exclude_knowledge_base": query_configuration.exclude_knowledge_base, + "use_question_condensing": query_configuration.use_question_condensing, + "use_hyde": query_configuration.use_hyde, + "use_summary_filter": query_configuration.use_summary_filter, + "session_id": session.id, + "data_source_ids": session.data_source_ids, + "user_name": user_name, + "embedding_model": data_source_metadata.embedding_model, + "chunk_size": data_source_metadata.chunk_size, + "summarization_model": data_source_metadata.summarization_model, + "chunk_overlap_percent": data_source_metadata.chunk_overlap_percent, + } + + +def record_rag_mlflow_run( + new_chat_message, query_configuration, response_id, session, user_name +): + experiment: Experiment = mlflow.set_experiment( + experiment_name=f"session_{session.name}_{session.id}" + ) + + # mlflow.set_experiment_tag("session_id", session.id) + run = mlflow.start_run( + experiment_id=experiment.experiment_id, run_name=f"{response_id}" + ) + client = MlflowClient() + params = chat_log_ml_flow_params(session, query_configuration, user_name) + source_nodes: list[RagPredictSourceNode] = new_chat_message.source_nodes + metrics = { + "source_nodes_count": len(source_nodes), + "max_score": (source_nodes[0].score if source_nodes else 0.0), + **{ + evaluation.name: evaluation.value + for evaluation in new_chat_message.evaluations + }, + } + client.log_batch( + run.info.run_id, + tags={"response_id": response_id}, + params=params, + metrics=metrics, + ) + chat_log_ml_flow_table(new_chat_message) + + +def record_direct_llm_mlflow_run(response_id, session, user_name): + experiment = mlflow.set_experiment( + experiment_name=f"session_{session.name}_{session.id}" + ) + run = mlflow.start_run( + experiment_id=experiment.experiment_id, run_name=f"{response_id}" + ) + client = MlflowClient() + client.log_batch( + run.info.run_id, + tags={"response_id": response_id, "direct_llm": True}, + params={ + "inference_model": session.inference_model, + "exclude_knowledge_base": True, + "session_id": session.id, + "data_source_ids": session.data_source_ids, + "user_name": user_name, + }, + ) From 0e920485ee7f98f4c8d14a45b6806bd049c72b56 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Wed, 19 Feb 2025 16:56:13 -0700 Subject: [PATCH 02/20] centralize datasources metrics to mlflow --- .../app/routers/index/data_source/__init__.py | 31 ++----------- llm-service/app/services/mlflow/__init__.py | 43 +++++++++++++++++-- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index 7194faf..3f8fbdc 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -37,7 +37,6 @@ from fastapi_utils.cbv import cbv from llama_index.core.llms import LLM from llama_index.core.node_parser import SentenceSplitter -from mlflow.entities import Experiment from pydantic import BaseModel from .... import exceptions @@ -49,6 +48,7 @@ from ....services import document_storage, models from ....services.metadata_apis import data_sources_metadata_api from ....services.metadata_apis.data_sources_metadata_api import RagDataSource +from ....services.mlflow import data_source_record_run logger = logging.getLogger(__name__) @@ -143,15 +143,9 @@ def download_and_index( ) -> None: datasource = data_sources_metadata_api.get_metadata(data_source_id) mlflow.llama_index.autolog() - experiment: Experiment = mlflow.set_experiment( - experiment_name=f"datasource_{datasource.name}_{data_source_id}" - ) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"doc_{doc_id}" - ): - self._download_and_index(datasource, doc_id, request) + self._download_and_index(datasource, doc_id, request) - @mlflow.trace(name="download_and_index") + # @mlflow.trace(name="download_and_index") def _download_and_index( self, datasource: RagDataSource, doc_id: str, request: RagIndexDocumentRequest ) -> None: @@ -181,24 +175,7 @@ def _download_and_index( llm=llm, chunks_vector_store=self.chunks_vector_store, ) - - mlflow.log_metrics( - { - "file_size_bytes": file_path.stat().st_size, - } - ) - - mlflow.log_params( - { - "data_source_id": datasource.id, - "embedding_model": datasource.embedding_model, - "summarization_model": datasource.summarization_model, - "chunk_size": request.configuration.chunk_size, - "chunk_overlap": request.configuration.chunk_overlap, - "file_name": request.original_filename, - "file_size_bytes": file_path.stat().st_size, - } - ) + data_source_record_run(datasource, doc_id, file_path, request) # Delete to avoid duplicates self.chunks_vector_store.delete_document(doc_id) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 91fdfa0..51da856 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -37,14 +37,17 @@ # import re +from pathlib import Path from typing import Any import mlflow from mlflow import MlflowClient from mlflow.entities import Experiment +from app.routers.index.data_source import RagIndexDocumentRequest from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode from app.services.metadata_apis import data_sources_metadata_api +from app.services.metadata_apis.data_sources_metadata_api import RagDataSource from app.services.metadata_apis.session_metadata_api import Session from app.services.query.query_configuration import QueryConfiguration @@ -100,8 +103,12 @@ def chat_log_ml_flow_params( def record_rag_mlflow_run( - new_chat_message, query_configuration, response_id, session, user_name -): + new_chat_message: RagStudioChatMessage, + query_configuration: QueryConfiguration, + response_id: str, + session: Session, + user_name: str, +) -> None: experiment: Experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" ) @@ -130,7 +137,9 @@ def record_rag_mlflow_run( chat_log_ml_flow_table(new_chat_message) -def record_direct_llm_mlflow_run(response_id, session, user_name): +def record_direct_llm_mlflow_run( + response_id: str, session: Session, user_name: str +) -> None: experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" ) @@ -149,3 +158,31 @@ def record_direct_llm_mlflow_run(response_id, session, user_name): "user_name": user_name, }, ) + + +def data_source_record_run( + datasource: RagDataSource, + doc_id: str, + file_path: Path, + request: RagIndexDocumentRequest, +) -> None: + experiment: Experiment = mlflow.set_experiment( + experiment_name=f"datasource_{datasource.name}_{datasource.id}" + ) + run = mlflow.start_run( + experiment_id=experiment.experiment_id, run_name=f"doc_{doc_id}" + ) + metrics = { + "file_size_bytes": file_path.stat().st_size, + } + params = { + "data_source_id": datasource.id, + "embedding_model": datasource.embedding_model, + "summarization_model": datasource.summarization_model, + "chunk_size": request.configuration.chunk_size, + "chunk_overlap": request.configuration.chunk_overlap, + "file_name": request.original_filename, + "file_size_bytes": file_path.stat().st_size, + } + client = MlflowClient() + client.log_batch(run_id=run.info.run_id, params=params, metrics=metrics) From 3bf99b8e44710e2e12679de897eb97b825e60617 Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Wed, 19 Feb 2025 16:52:06 -0800 Subject: [PATCH 03/20] Resolve circular import --- .../app/routers/index/data_source/__init__.py | 14 +------------- llm-service/app/services/mlflow/__init__.py | 14 +++++++++++++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index 3f8fbdc..368917f 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -48,7 +48,7 @@ from ....services import document_storage, models from ....services.metadata_apis import data_sources_metadata_api from ....services.metadata_apis.data_sources_metadata_api import RagDataSource -from ....services.mlflow import data_source_record_run +from ....services.mlflow import data_source_record_run, RagIndexDocumentRequest logger = logging.getLogger(__name__) @@ -63,18 +63,6 @@ class SummarizeDocumentRequest(BaseModel): original_filename: str -class RagIndexDocumentConfiguration(BaseModel): - chunk_size: int = 512 # this is llama-index's default - chunk_overlap: int = 10 # percentage of tokens in a chunk (chunk_size) - - -class RagIndexDocumentRequest(BaseModel): - s3_bucket_name: str - s3_document_key: str - original_filename: str - configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() - - class ChunkContentsResponse(BaseModel): text: str metadata: Dict[str, Any] diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 51da856..8935922 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -43,8 +43,8 @@ import mlflow from mlflow import MlflowClient from mlflow.entities import Experiment +from pydantic import BaseModel -from app.routers.index.data_source import RagIndexDocumentRequest from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode from app.services.metadata_apis import data_sources_metadata_api from app.services.metadata_apis.data_sources_metadata_api import RagDataSource @@ -160,6 +160,18 @@ def record_direct_llm_mlflow_run( ) +class RagIndexDocumentConfiguration(BaseModel): + chunk_size: int = 512 # this is llama-index's default + chunk_overlap: int = 10 # percentage of tokens in a chunk (chunk_size) + + +class RagIndexDocumentRequest(BaseModel): + s3_bucket_name: str + s3_document_key: str + original_filename: str + configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() + + def data_source_record_run( datasource: RagDataSource, doc_id: str, From 4dd6643eff1d795817aed1ce2e7390d6e4c02e01 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Thu, 20 Feb 2025 10:03:42 -0700 Subject: [PATCH 04/20] fix tests --- llm-service/app/services/mlflow/__init__.py | 50 ++++++++++++--------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 8935922..47f2e50 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -38,11 +38,11 @@ import re from pathlib import Path -from typing import Any +from typing import Any, Sequence import mlflow from mlflow import MlflowClient -from mlflow.entities import Experiment +from mlflow.entities import Experiment, Param, Metric from pydantic import BaseModel from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode @@ -135,6 +135,7 @@ def record_rag_mlflow_run( metrics=metrics, ) chat_log_ml_flow_table(new_chat_message) + mlflow.end_run() def record_direct_llm_mlflow_run( @@ -146,18 +147,20 @@ def record_direct_llm_mlflow_run( run = mlflow.start_run( experiment_id=experiment.experiment_id, run_name=f"{response_id}" ) + params: Sequence[Param] = [ + Param("inference_model", session.inference_model), + Param("exclude_knowledge_base", True), + Param("session_id", session.id), + Param("data_source_ids", session.data_source_ids), + Param("user_name", user_name), + ] client = MlflowClient() client.log_batch( run.info.run_id, tags={"response_id": response_id, "direct_llm": True}, - params={ - "inference_model": session.inference_model, - "exclude_knowledge_base": True, - "session_id": session.id, - "data_source_ids": session.data_source_ids, - "user_name": user_name, - }, + params=params, ) + mlflow.end_run() class RagIndexDocumentConfiguration(BaseModel): @@ -184,17 +187,20 @@ def data_source_record_run( run = mlflow.start_run( experiment_id=experiment.experiment_id, run_name=f"doc_{doc_id}" ) - metrics = { - "file_size_bytes": file_path.stat().st_size, - } - params = { - "data_source_id": datasource.id, - "embedding_model": datasource.embedding_model, - "summarization_model": datasource.summarization_model, - "chunk_size": request.configuration.chunk_size, - "chunk_overlap": request.configuration.chunk_overlap, - "file_name": request.original_filename, - "file_size_bytes": file_path.stat().st_size, - } + # Todo: Figure out if we care about the timestamp or step but they're required + metrics = [ + Metric("file_size_bytes", file_path.stat().st_size, timestamp=0, step=0), + ] + params: Sequence[Param] = [ + Param("data_source_id", value=str(datasource.id)), + Param("embedding_model", value=datasource.embedding_model), + Param("summarization_model", value=datasource.summarization_model), + Param("chunk_size", value=str(request.configuration.chunk_size)), + Param("chunk_overlap", value=str(request.configuration.chunk_overlap)), + Param("file_name", value=request.original_filename), + Param("file_size_bytes", value=str(file_path.stat().st_size)), + ] + client = MlflowClient() - client.log_batch(run_id=run.info.run_id, params=params, metrics=metrics) + client.log_batch(run_id=run.info.run_id, metrics=metrics, params=params) + mlflow.end_run() From 26386918ec296c058ffba2996343b9614b2c3d58 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Thu, 20 Feb 2025 10:40:36 -0700 Subject: [PATCH 05/20] more cleanup of mlflow --- llm-service/app/services/mlflow/__init__.py | 118 ++++++++++---------- 1 file changed, 61 insertions(+), 57 deletions(-) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 47f2e50..d354814 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -42,7 +42,7 @@ import mlflow from mlflow import MlflowClient -from mlflow.entities import Experiment, Param, Metric +from mlflow.entities import Experiment, Param, Metric, RunTag from pydantic import BaseModel from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode @@ -51,6 +51,8 @@ from app.services.metadata_apis.session_metadata_api import Session from app.services.query.query_configuration import QueryConfiguration +# mypy: disable-error-code="no-untyped-call" + def chat_log_ml_flow_table(message: RagStudioChatMessage) -> None: source_nodes: list[RagPredictSourceNode] = message.source_nodes @@ -114,28 +116,35 @@ def record_rag_mlflow_run( ) # mlflow.set_experiment_tag("session_id", session.id) - run = mlflow.start_run( + with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ) - client = MlflowClient() - params = chat_log_ml_flow_params(session, query_configuration, user_name) - source_nodes: list[RagPredictSourceNode] = new_chat_message.source_nodes - metrics = { - "source_nodes_count": len(source_nodes), - "max_score": (source_nodes[0].score if source_nodes else 0.0), - **{ - evaluation.name: evaluation.value - for evaluation in new_chat_message.evaluations - }, - } - client.log_batch( - run.info.run_id, - tags={"response_id": response_id}, - params=params, - metrics=metrics, - ) - chat_log_ml_flow_table(new_chat_message) - mlflow.end_run() + ) as run: + client = MlflowClient() + params = chat_log_ml_flow_params(session, query_configuration, user_name) + source_nodes: list[RagPredictSourceNode] = new_chat_message.source_nodes + metrics: dict[str, Any] = { + "source_nodes_count": len(source_nodes), + "max_score": (source_nodes[0].score if source_nodes else 0.0), + **{ + evaluation.name: evaluation.value + for evaluation in new_chat_message.evaluations + }, + } + client.log_batch( + run.info.run_id, + tags=[RunTag("response_id", response_id)], + params=[Param(key=key, value=value) for key, value in params.items()], + metrics=[ + Metric( + key=key, + value=value, + timestamp=int(new_chat_message.timestamp), + step=0, + ) + for key, value in metrics.items() + ], + ) + chat_log_ml_flow_table(new_chat_message) def record_direct_llm_mlflow_run( @@ -144,23 +153,22 @@ def record_direct_llm_mlflow_run( experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" ) - run = mlflow.start_run( + with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ) - params: Sequence[Param] = [ - Param("inference_model", session.inference_model), - Param("exclude_knowledge_base", True), - Param("session_id", session.id), - Param("data_source_ids", session.data_source_ids), - Param("user_name", user_name), - ] - client = MlflowClient() - client.log_batch( - run.info.run_id, - tags={"response_id": response_id, "direct_llm": True}, - params=params, - ) - mlflow.end_run() + ) as run: + params: Sequence[Param] = [ + Param("inference_model", session.inference_model), + Param("exclude_knowledge_base", True), + Param("session_id", session.id), + Param("data_source_ids", session.data_source_ids), + Param("user_name", user_name), + ] + client = MlflowClient() + client.log_batch( + run.info.run_id, + tags=[RunTag("response_id", response_id), RunTag("direct_llm", True)], + params=params, + ) class RagIndexDocumentConfiguration(BaseModel): @@ -184,23 +192,19 @@ def data_source_record_run( experiment: Experiment = mlflow.set_experiment( experiment_name=f"datasource_{datasource.name}_{datasource.id}" ) - run = mlflow.start_run( + with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=f"doc_{doc_id}" - ) - # Todo: Figure out if we care about the timestamp or step but they're required - metrics = [ - Metric("file_size_bytes", file_path.stat().st_size, timestamp=0, step=0), - ] - params: Sequence[Param] = [ - Param("data_source_id", value=str(datasource.id)), - Param("embedding_model", value=datasource.embedding_model), - Param("summarization_model", value=datasource.summarization_model), - Param("chunk_size", value=str(request.configuration.chunk_size)), - Param("chunk_overlap", value=str(request.configuration.chunk_overlap)), - Param("file_name", value=request.original_filename), - Param("file_size_bytes", value=str(file_path.stat().st_size)), - ] - - client = MlflowClient() - client.log_batch(run_id=run.info.run_id, metrics=metrics, params=params) - mlflow.end_run() + ) as run: + + params: Sequence[Param] = [ + Param("data_source_id", value=str(datasource.id)), + Param("embedding_model", value=datasource.embedding_model), + Param("summarization_model", value=datasource.summarization_model), + Param("chunk_size", value=str(request.configuration.chunk_size)), + Param("chunk_overlap", value=str(request.configuration.chunk_overlap)), + Param("file_name", value=request.original_filename), + Param("file_size_bytes", value=str(file_path.stat().st_size)), + ] + + client = MlflowClient() + client.log_batch(run_id=run.info.run_id, params=params) From 21ec4285b4fcf26021271fe0ac3923b6a39bf391 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Thu, 20 Feb 2025 12:36:21 -0700 Subject: [PATCH 06/20] wip on creating a reconciler --- .../app/routers/index/sessions/__init__.py | 32 +-- llm-service/app/services/mlflow/__init__.py | 78 ++++++-- llm-service/reconciler/mlflow_reconciler.py | 184 ++++++++++++++++++ 3 files changed, 247 insertions(+), 47 deletions(-) create mode 100644 llm-service/reconciler/mlflow_reconciler.py diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index aa61a58..931b53e 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -42,14 +42,13 @@ import mlflow from fastapi import APIRouter, Cookie -from mlflow.entities import Experiment, Run from pydantic import BaseModel from .... import exceptions from ....rag_types import RagPredictConfiguration from ....services.chat import generate_suggested_questions, v2_chat, direct_llm_chat from ....services.chat_store import ChatHistoryManager, RagStudioChatMessage -from ....services.metadata_apis import session_metadata_api +from ....services.mlflow import rating_mlflow_log_metric, feedback_mlflow_log_table logger = logging.getLogger(__name__) router = APIRouter(prefix="/sessions/{session_id}", tags=["Sessions"]) @@ -93,18 +92,7 @@ def rating( response_id: str, request: ChatResponseRating, ) -> ChatResponseRating: - session = session_metadata_api.get_session(session_id) - experiment: Experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" - ) - runs: list[Run] = mlflow.search_runs( - [experiment.experiment_id], - filter_string=f"tags.response_id='{response_id}'", - output_format="list", - ) - for run in runs: - value: int = 1 if request.rating else -1 - mlflow.log_metric("rating", value, run_id=run.info.run_id) + rating_mlflow_log_metric(request, response_id, session_id) return ChatResponseRating(rating=request.rating) @@ -121,21 +109,7 @@ def feedback( response_id: str, request: ChatResponseFeedback, ) -> ChatResponseFeedback: - session = session_metadata_api.get_session(session_id) - experiment: Experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" - ) - runs: list[Run] = mlflow.search_runs( - [experiment.experiment_id], - filter_string=f"tags.response_id='{response_id}'", - output_format="list", - ) - for run in runs: - mlflow.log_table( - data={"feedback": request.feedback}, - artifact_file="feedback.json", - run_id=run.info.run_id, - ) + feedback_mlflow_log_table(request, response_id, session_id) return ChatResponseFeedback(feedback=request.feedback) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index d354814..fa01372 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -35,18 +35,18 @@ # BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF # DATA. # - +import json import re from pathlib import Path from typing import Any, Sequence import mlflow from mlflow import MlflowClient -from mlflow.entities import Experiment, Param, Metric, RunTag +from mlflow.entities import Experiment, Param, Metric, RunTag, Run from pydantic import BaseModel from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode -from app.services.metadata_apis import data_sources_metadata_api +from app.services.metadata_apis import data_sources_metadata_api, session_metadata_api from app.services.metadata_apis.data_sources_metadata_api import RagDataSource from app.services.metadata_apis.session_metadata_api import Session from app.services.query.query_configuration import QueryConfiguration @@ -183,28 +183,70 @@ class RagIndexDocumentRequest(BaseModel): configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() +def write_mlflow_run_json(experiment_name: str, run_name: str, data: dict[str, Any]): + contents = { + "experiment_name": experiment_name, + "run_name": run_name, + "data": data, + "status": "pending", + } + with open(f"{experiment_name}-{run_name}.json", "w") as f: + json.dump(contents, f) + + def data_source_record_run( datasource: RagDataSource, doc_id: str, file_path: Path, request: RagIndexDocumentRequest, ) -> None: + + write_mlflow_run_json( + f"datasource_{datasource.name}_{datasource.id}", + f"doc_{doc_id}", + { + "params": { + "data_source_id": str(datasource.id), + "embedding_model": datasource.embedding_model, + "summarization_model": datasource.summarization_model, + "chunk_size": str(request.configuration.chunk_size), + "chunk_overlap": str(request.configuration.chunk_overlap), + "file_name": request.original_filename, + "file_size_bytes": str(file_path.stat().st_size), + } + }, + ) + + +def rating_mlflow_log_metric(request, response_id, session_id): + session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( - experiment_name=f"datasource_{datasource.name}_{datasource.id}" + experiment_name=f"session_{session.name}_{session.id}" + ) + runs: list[Run] = mlflow.search_runs( + [experiment.experiment_id], + filter_string=f"tags.response_id='{response_id}'", + output_format="list", ) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"doc_{doc_id}" - ) as run: - params: Sequence[Param] = [ - Param("data_source_id", value=str(datasource.id)), - Param("embedding_model", value=datasource.embedding_model), - Param("summarization_model", value=datasource.summarization_model), - Param("chunk_size", value=str(request.configuration.chunk_size)), - Param("chunk_overlap", value=str(request.configuration.chunk_overlap)), - Param("file_name", value=request.original_filename), - Param("file_size_bytes", value=str(file_path.stat().st_size)), - ] + for run in runs: + value: int = 1 if request.rating else -1 + mlflow.log_metric("rating", value, run_id=run.info.run_id) - client = MlflowClient() - client.log_batch(run_id=run.info.run_id, params=params) + +def feedback_mlflow_log_table(request, response_id, session_id): + session = session_metadata_api.get_session(session_id) + experiment: Experiment = mlflow.set_experiment( + experiment_name=f"session_{session.name}_{session.id}" + ) + runs: list[Run] = mlflow.search_runs( + [experiment.experiment_id], + filter_string=f"tags.response_id='{response_id}'", + output_format="list", + ) + for run in runs: + mlflow.log_table( + data={"feedback": request.feedback}, + artifact_file="feedback.json", + run_id=run.info.run_id, + ) diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py new file mode 100644 index 0000000..855df8f --- /dev/null +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -0,0 +1,184 @@ +""" +Reconciler script to process io request pairs. +""" + +# +# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) +# (C) Cloudera, Inc. 2024 +# All rights reserved. +# +# Applicable Open Source License: Apache 2.0 +# +# NOTE: Cloudera open source products are modular software products +# made up of hundreds of individual components, each of which was +# individually copyrighted. Each Cloudera open source product is a +# collective work under U.S. Copyright Law. Your license to use the +# collective work is as provided in your written agreement with +# Cloudera. Used apart from the collective work, this file is +# licensed for your use pursuant to the open source license +# identified above. +# +# This code is provided to you pursuant a written agreement with +# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute +# this code. If you do not have a written agreement with Cloudera nor +# with an authorized and properly licensed third party, you do not +# have any rights to access nor to use this code. +# +# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the +# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY +# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED +# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO +# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, +# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS +# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE +# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR +# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES +# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF +# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF +# DATA. +# + +import os +import json +import logging +import sys +import time +import threading +from pathlib import Path +import argparse +import asyncio +from typing import Any, Literal + +import mlflow +from mlflow.entities import Experiment +from pydantic import BaseModel +from uvicorn.logging import DefaultFormatter + +# Configure logging +logger = logging.getLogger(__name__) +formatter = DefaultFormatter("%(levelprefix)s %(message)s") + +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(formatter) + +logger.addHandler(handler) + + +class MlflowTable(BaseModel): + data: dict[str, Any] + artifact_file: str + run_id: str + + +class MlflowRunData(BaseModel): + experiment_name: str + run_name: str + tags: dict[str, Any] + metrics: dict[str, int] + params: dict[str, Any] + table: MlflowTable + + +def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed", None]: + + if data["status"] == "success": + return None + + if data["status"] == "pending": + experiment: Experiment = mlflow.set_experiment( + experiment_name=data["experiment_name"] + ) + with mlflow.start_run( + experiment_id=experiment.experiment_id, run_name=data["run_name"] + ): + mlflow.log_params(data["params"]) + mlflow.log_metrics(data["metrics"]) + mlflow.set_tags(data["tags"]) + mlflow.log_table(data["table"].data, data["table"].artifact_file) + return "success" + + return "failed" + + +async def process_io_pair(file_path, processing_function): + """Callback function to process a io saved in a file.""" + with open(file_path, "r") as f: + data = json.load(f) + # Process io pair + status = await processing_function(data) + # save the response + if status is not None: + data["status"] = status + with open(file_path, "w") as f: + json.dump(data, f, indent=2) + if status == "pending": + logger.info( + "MLFlow experiment and run IDs set for i/o pair: %s. Queued for evaluation.", + file_path, + ) + return + if status == "failed": + logger.error("Failed to process i/o pair: %s. Will retry.", file_path) + return + + +def background_worker(directory, processing_function): + """Background thread function to process files.""" + if not isinstance(directory, Path): + directory = Path(directory) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + while True: + files_to_process = [ + file_path + for file_path in directory.iterdir() + if file_path.is_file() and file_path.suffix == ".json" + ] + for file_path in files_to_process: + try: + loop.run_until_complete( + process_io_pair( + file_path=file_path, processing_function=processing_function + ) + ) + except Exception as e: + logger.error("Error processing file %s: %s", file_path, e) + time.sleep(15) + + +# Start background worker thread +def start_background_worker(directory, processing_function): + """Start the background worker thread.""" + if not isinstance(directory, Path): + directory = Path(directory) + worker_thread = threading.Thread( + target=background_worker, args=(directory, processing_function), daemon=True + ) + worker_thread.start() + + +if __name__ == "__main__": + # Directory to save JSON files + # Argument parsing to get the data directory + parser = argparse.ArgumentParser( + description="Reconciler script to process io request pairs." + ) + parser.add_argument( + "--data-dir", + type=str, + default=os.path.join("data", "responses"), + help="Directory to save JSON files", + ) + args = parser.parse_args() + + data_dir = Path(args.data_dir) + data_dir.mkdir(exist_ok=True) + start_background_worker(data_dir, evaluate_json_data) + try: + while True: + logger.info("Reconciler looking for i/o pairs in %s...", data_dir) + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") From 10b325a485655a4bee7fc9f5d9f71fb83aa3198f Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Thu, 20 Feb 2025 16:40:58 -0700 Subject: [PATCH 07/20] getting close on the reconciler --- llm-service/.gitignore | 4 +- .../app/routers/index/data_source/__init__.py | 3 +- .../app/routers/index/sessions/__init__.py | 3 +- llm-service/app/services/mlflow/__init__.py | 122 ++++++++---------- llm-service/reconciler/mlflow_reconciler.py | 75 +++++++---- local-dev.sh | 4 + scripts/startup_app.sh | 4 + 7 files changed, 122 insertions(+), 93 deletions(-) diff --git a/llm-service/.gitignore b/llm-service/.gitignore index 0b633f3..6cfe156 100644 --- a/llm-service/.gitignore +++ b/llm-service/.gitignore @@ -166,4 +166,6 @@ cython_debug/ # Mlflow -mlruns/ \ No newline at end of file +mlruns/ + +reconciler/data/ \ No newline at end of file diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index 368917f..7d6d4ec 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -32,7 +32,6 @@ from http import HTTPStatus from typing import Any, Dict, Optional -import mlflow from fastapi import APIRouter, Depends, HTTPException from fastapi_utils.cbv import cbv from llama_index.core.llms import LLM @@ -130,7 +129,7 @@ def download_and_index( request: RagIndexDocumentRequest, ) -> None: datasource = data_sources_metadata_api.get_metadata(data_source_id) - mlflow.llama_index.autolog() + # mlflow.llama_index.autolog() self._download_and_index(datasource, doc_id, request) # @mlflow.trace(name="download_and_index") diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 931b53e..64dd086 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -40,7 +40,6 @@ import logging from typing import Annotated -import mlflow from fastapi import APIRouter, Cookie from pydantic import BaseModel @@ -142,7 +141,7 @@ def chat( _basusertoken: Annotated[str | None, Cookie()] = None, ) -> RagStudioChatMessage: user_name = parse_jwt_cookie(_basusertoken) - mlflow.llama_index.autolog() + # mlflow.llama_index.autolog() configuration = request.configuration or RagPredictConfiguration() if configuration.exclude_knowledge_base: diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index fa01372..5ad6447 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -36,13 +36,14 @@ # DATA. # import json +import os import re +from datetime import datetime from pathlib import Path -from typing import Any, Sequence +from typing import Any import mlflow -from mlflow import MlflowClient -from mlflow.entities import Experiment, Param, Metric, RunTag, Run +from mlflow.entities import Experiment, Run from pydantic import BaseModel from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode @@ -54,30 +55,30 @@ # mypy: disable-error-code="no-untyped-call" -def chat_log_ml_flow_table(message: RagStudioChatMessage) -> None: +def chat_log_ml_flow_table(message: RagStudioChatMessage) -> dict[str, Any]: source_nodes: list[RagPredictSourceNode] = message.source_nodes query = message.rag_message.user response = message.rag_message.assistant flattened_nodes = [node.model_dump() for node in source_nodes] - mlflow.log_table( - { + return { + "data": { "response_id": message.id, - "node_id": map(lambda x: x.get("node_id"), flattened_nodes), - "doc_id": map(lambda x: x.get("doc_id"), flattened_nodes), - "source_file_name": map( - lambda x: x.get("source_file_name"), flattened_nodes + "node_id": list(map(lambda x: x.get("node_id"), flattened_nodes)), + "doc_id": list(map(lambda x: x.get("doc_id"), flattened_nodes)), + "source_file_name": list( + map(lambda x: x.get("source_file_name"), flattened_nodes) ), - "score": map(lambda x: x.get("score"), flattened_nodes), + "score": list(map(lambda x: x.get("score"), flattened_nodes)), "query": query, "response": response, "input_word_count": len(re.findall(r"\w+", query)), "output_word_count": len(re.findall(r"\w+", response)), "condensed_question": message.condensed_question, }, - artifact_file="response_details.json", - ) + "artifact_file": "response_details.json", + } def chat_log_ml_flow_params( @@ -111,64 +112,51 @@ def record_rag_mlflow_run( session: Session, user_name: str, ) -> None: - experiment: Experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" - ) + params = chat_log_ml_flow_params(session, query_configuration, user_name) + source_nodes: list[RagPredictSourceNode] = new_chat_message.source_nodes + metrics: dict[str, Any] = { + "source_nodes_count": len(source_nodes), + "max_score": (source_nodes[0].score if source_nodes else 0.0), + **{ + evaluation.name: evaluation.value + for evaluation in new_chat_message.evaluations + }, + } - # mlflow.set_experiment_tag("session_id", session.id) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ) as run: - client = MlflowClient() - params = chat_log_ml_flow_params(session, query_configuration, user_name) - source_nodes: list[RagPredictSourceNode] = new_chat_message.source_nodes - metrics: dict[str, Any] = { - "source_nodes_count": len(source_nodes), - "max_score": (source_nodes[0].score if source_nodes else 0.0), - **{ - evaluation.name: evaluation.value - for evaluation in new_chat_message.evaluations + write_mlflow_run_json( + experiment_name=f"session_{session.name}_{session.id}", + run_name=f"{response_id}", + data={ + "params": params, + "tags": { + "response_id": response_id, }, - } - client.log_batch( - run.info.run_id, - tags=[RunTag("response_id", response_id)], - params=[Param(key=key, value=value) for key, value in params.items()], - metrics=[ - Metric( - key=key, - value=value, - timestamp=int(new_chat_message.timestamp), - step=0, - ) - for key, value in metrics.items() - ], - ) - chat_log_ml_flow_table(new_chat_message) + "metrics": metrics, + "table": chat_log_ml_flow_table(new_chat_message), + }, + ) def record_direct_llm_mlflow_run( response_id: str, session: Session, user_name: str ) -> None: - experiment = mlflow.set_experiment( - experiment_name=f"session_{session.name}_{session.id}" + write_mlflow_run_json( + f"session_{session.name}_{session.id}", + f"{response_id}", + { + "params": { + "inference_model": session.inference_model, + "exclude_knowledge_base": True, + "session_id": session.id, + "data_source_ids": session.data_source_ids, + "user_name": user_name, + }, + "tags": { + "response_id": response_id, + "direct_llm": True, + }, + }, ) - with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=f"{response_id}" - ) as run: - params: Sequence[Param] = [ - Param("inference_model", session.inference_model), - Param("exclude_knowledge_base", True), - Param("session_id", session.id), - Param("data_source_ids", session.data_source_ids), - Param("user_name", user_name), - ] - client = MlflowClient() - client.log_batch( - run.info.run_id, - tags=[RunTag("response_id", response_id), RunTag("direct_llm", True)], - params=params, - ) class RagIndexDocumentConfiguration(BaseModel): @@ -187,10 +175,14 @@ def write_mlflow_run_json(experiment_name: str, run_name: str, data: dict[str, A contents = { "experiment_name": experiment_name, "run_name": run_name, - "data": data, "status": "pending", + "created_at": datetime.now().timestamp(), + **data, } - with open(f"{experiment_name}-{run_name}.json", "w") as f: + with open( + f"{os.environ['MLFLOW_RECONCILER_DATA_PATH']}/{experiment_name}-{run_name}.json", + "w", + ) as f: json.dump(contents, f) diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py index 855df8f..5e5f7ab 100644 --- a/llm-service/reconciler/mlflow_reconciler.py +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -49,40 +49,56 @@ from pathlib import Path import argparse import asyncio -from typing import Any, Literal +from typing import Any, Literal, Optional import mlflow from mlflow.entities import Experiment from pydantic import BaseModel -from uvicorn.logging import DefaultFormatter -# Configure logging logger = logging.getLogger(__name__) -formatter = DefaultFormatter("%(levelprefix)s %(message)s") -handler = logging.StreamHandler(sys.stdout) -handler.setFormatter(formatter) -logger.addHandler(handler) +def configure_logger(): + """Configure logger formatting and verbosity.""" + # match Java backend's formatting + formatter = logging.Formatter( + fmt=" ".join( + [ + "%(asctime)s", + "%(levelname)5s", + "%(name)30s", + "%(message)s", + ] + ) + ) + # https://docs.python.org/3/library/logging.html#logging.Formatter.formatTime + formatter.converter = time.gmtime + formatter.default_time_format = "%H:%M:%S" + formatter.default_msec_format = "%s.%03d" + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.WARNING) + handler.setFormatter(formatter) + + logger.setLevel(logging.WARNING) + logger.addHandler(handler) class MlflowTable(BaseModel): data: dict[str, Any] artifact_file: str - run_id: str class MlflowRunData(BaseModel): experiment_name: str run_name: str - tags: dict[str, Any] - metrics: dict[str, int] - params: dict[str, Any] - table: MlflowTable - + tags: Optional[dict[str, Any]] + metrics: Optional[dict[str, int]] + params: Optional[dict[str, Any]] + table: Optional[MlflowTable] -def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed", None]: +async def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed", None]: if data["status"] == "success": return None @@ -90,13 +106,18 @@ def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed", None experiment: Experiment = mlflow.set_experiment( experiment_name=data["experiment_name"] ) + print(data) with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=data["run_name"] ): - mlflow.log_params(data["params"]) - mlflow.log_metrics(data["metrics"]) - mlflow.set_tags(data["tags"]) - mlflow.log_table(data["table"].data, data["table"].artifact_file) + if "tags" in data: + mlflow.set_tags(data["tags"]) + if "params" in data: + mlflow.log_params(data["params"]) + if "metrics" in data: + mlflow.log_metrics(data["metrics"]) + if "table" in data: + mlflow.log_table(data["table"].data, data["table"].artifact_file) return "success" return "failed" @@ -109,16 +130,23 @@ async def process_io_pair(file_path, processing_function): # Process io pair status = await processing_function(data) # save the response + if status == "success": + logger.info("Successfully processed i/o pair: %s", file_path) + os.remove(file_path) + return + if status is not None: data["status"] = status with open(file_path, "w") as f: json.dump(data, f, indent=2) + if status == "pending": logger.info( "MLFlow experiment and run IDs set for i/o pair: %s. Queued for evaluation.", file_path, ) return + if status == "failed": logger.error("Failed to process i/o pair: %s. Will retry.", file_path) return @@ -143,8 +171,8 @@ def background_worker(directory, processing_function): file_path=file_path, processing_function=processing_function ) ) - except Exception as e: - logger.error("Error processing file %s: %s", file_path, e) + except Exception: + logger.error("Error processing file %s", file_path, exc_info=True) time.sleep(15) @@ -160,6 +188,7 @@ def start_background_worker(directory, processing_function): if __name__ == "__main__": + configure_logger() # Directory to save JSON files # Argument parsing to get the data directory parser = argparse.ArgumentParser( @@ -168,7 +197,7 @@ def start_background_worker(directory, processing_function): parser.add_argument( "--data-dir", type=str, - default=os.path.join("data", "responses"), + default=os.path.join(os.path.dirname(__file__), "data"), help="Directory to save JSON files", ) args = parser.parse_args() @@ -178,7 +207,7 @@ def start_background_worker(directory, processing_function): start_background_worker(data_dir, evaluate_json_data) try: while True: - logger.info("Reconciler looking for i/o pairs in %s...", data_dir) + logger.debug("Reconciler looking for i/o pairs in %s...", data_dir) time.sleep(1) except KeyboardInterrupt: - logger.info("Shutting down...") + logger.debug("Shutting down...") diff --git a/local-dev.sh b/local-dev.sh index ff9b663..26e3730 100755 --- a/local-dev.sh +++ b/local-dev.sh @@ -42,6 +42,7 @@ set -a && source .env && set +a python3.12 scripts/validator/validate_env.py export RAG_DATABASES_DIR=$(pwd)/databases +export MLFLOW_RECONCILER_DATA_PATH=$(pwd)/llm-service/reconciler/data cleanup() { # kill all processes whose parent is this process @@ -81,6 +82,9 @@ while ! curl --output /dev/null --silent --fail http://localhost:8081/amp-update sleep 4 done +# start mlflow reconciler +uv run reconciler/mlflow_reconciler.py & + # start up the jarva cd ../backend ./gradlew --console=plain bootRun & diff --git a/scripts/startup_app.sh b/scripts/startup_app.sh index d1d275e..a6957cb 100755 --- a/scripts/startup_app.sh +++ b/scripts/startup_app.sh @@ -55,6 +55,7 @@ export RAG_DATABASES_DIR=$(pwd)/databases export LLM_SERVICE_URL="http://localhost:8081" export API_URL="http://localhost:8080" export MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR=false +export MLFLOW_RECONCILER_DATA_PATH=$(pwd)/llm-service/reconciler/data # start Qdrant vector DB qdrant/qdrant & 2>&1 @@ -72,6 +73,9 @@ while ! curl --output /dev/null --silent --fail http://localhost:8081/amp-update sleep 4 done +# start mlflow reconciler +uv run reconciler/mlflow_reconciler.py & + # start Node production server cd .. From 0551535ce82ac3e8625b16942107ffcc5ba7b0c3 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 10:32:33 -0700 Subject: [PATCH 08/20] small changes --- llm-service/reconciler/mlflow_reconciler.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py index 5e5f7ab..7e37d7c 100644 --- a/llm-service/reconciler/mlflow_reconciler.py +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -106,18 +106,17 @@ async def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed" experiment: Experiment = mlflow.set_experiment( experiment_name=data["experiment_name"] ) - print(data) with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=data["run_name"] ): - if "tags" in data: + if "tags" in data and data["tags"] is not None: mlflow.set_tags(data["tags"]) - if "params" in data: + if "params" in data and data["params"] is not None: mlflow.log_params(data["params"]) - if "metrics" in data: + if "metrics" in data and data["metrics"] is not None: mlflow.log_metrics(data["metrics"]) - if "table" in data: - mlflow.log_table(data["table"].data, data["table"].artifact_file) + if "table" in data and data["table"] is not None: + mlflow.log_table(**data["table"]) return "success" return "failed" From 41572b59f1159d640540a71ed1aeb91363c5317a Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 11:37:05 -0700 Subject: [PATCH 09/20] remove unnecessary check --- llm-service/reconciler/mlflow_reconciler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py index 7e37d7c..9deca86 100644 --- a/llm-service/reconciler/mlflow_reconciler.py +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -109,13 +109,13 @@ async def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed" with mlflow.start_run( experiment_id=experiment.experiment_id, run_name=data["run_name"] ): - if "tags" in data and data["tags"] is not None: + if "tags" in data: mlflow.set_tags(data["tags"]) - if "params" in data and data["params"] is not None: + if "params" in data: mlflow.log_params(data["params"]) - if "metrics" in data and data["metrics"] is not None: + if "metrics" in data: mlflow.log_metrics(data["metrics"]) - if "table" in data and data["table"] is not None: + if "table" in data: mlflow.log_table(**data["table"]) return "success" From 20a7fd8e3644f85b6cd084f1d2f65ecb02153fbd Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 11:44:26 -0700 Subject: [PATCH 10/20] fix mypy --- llm-service/app/services/mlflow/__init__.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 5ad6447..6a90d06 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -46,6 +46,7 @@ from mlflow.entities import Experiment, Run from pydantic import BaseModel +from app.routers.index.sessions import ChatResponseRating, ChatResponseFeedback from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode from app.services.metadata_apis import data_sources_metadata_api, session_metadata_api from app.services.metadata_apis.data_sources_metadata_api import RagDataSource @@ -171,7 +172,9 @@ class RagIndexDocumentRequest(BaseModel): configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() -def write_mlflow_run_json(experiment_name: str, run_name: str, data: dict[str, Any]): +def write_mlflow_run_json( + experiment_name: str, run_name: str, data: dict[str, Any] +) -> None: contents = { "experiment_name": experiment_name, "run_name": run_name, @@ -210,7 +213,9 @@ def data_source_record_run( ) -def rating_mlflow_log_metric(request, response_id, session_id): +def rating_mlflow_log_metric( + request: ChatResponseRating, response_id: str, session_id: int +) -> None: session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" @@ -226,7 +231,9 @@ def rating_mlflow_log_metric(request, response_id, session_id): mlflow.log_metric("rating", value, run_id=run.info.run_id) -def feedback_mlflow_log_table(request, response_id, session_id): +def feedback_mlflow_log_table( + request: ChatResponseFeedback, response_id: str, session_id: int +) -> None: session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" From 7ac20432de2bfb6e3fe2b67395a9b2306069a010 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 11:51:27 -0700 Subject: [PATCH 11/20] add back tracing --- llm-service/app/routers/index/data_source/__init__.py | 5 +++-- llm-service/app/services/chat.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index 7d6d4ec..c604a35 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -32,6 +32,7 @@ from http import HTTPStatus from typing import Any, Dict, Optional +import mlflow from fastapi import APIRouter, Depends, HTTPException from fastapi_utils.cbv import cbv from llama_index.core.llms import LLM @@ -129,10 +130,10 @@ def download_and_index( request: RagIndexDocumentRequest, ) -> None: datasource = data_sources_metadata_api.get_metadata(data_source_id) - # mlflow.llama_index.autolog() + mlflow.llama_index.autolog() self._download_and_index(datasource, doc_id, request) - # @mlflow.trace(name="download_and_index") + @mlflow.trace(name="download_and_index") def _download_and_index( self, datasource: RagDataSource, doc_id: str, request: RagIndexDocumentRequest ) -> None: diff --git a/llm-service/app/services/chat.py b/llm-service/app/services/chat.py index 0d28667..0565c1e 100644 --- a/llm-service/app/services/chat.py +++ b/llm-service/app/services/chat.py @@ -39,7 +39,7 @@ import uuid from typing import List, Iterable -# import mlflow +import mlflow from fastapi import HTTPException from llama_index.core.base.llms.types import MessageRole from llama_index.core.chat_engine.types import AgentChatResponse @@ -85,7 +85,7 @@ def v2_chat( return new_chat_message -# @mlflow.trace(name="v2_chat") +@mlflow.trace(name="v2_chat") def _run_chat( session: Session, response_id: str, From 9a6ed392761b8c52bc4d578dfbc771f382addeb7 Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 21 Feb 2025 10:50:30 -0800 Subject: [PATCH 12/20] Load JSON into MlflowRunData --- llm-service/reconciler/mlflow_reconciler.py | 48 ++++++++++++--------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py index 9deca86..8b3b8b7 100644 --- a/llm-service/reconciler/mlflow_reconciler.py +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -84,6 +84,9 @@ def configure_logger(): logger.addHandler(handler) +MlflowRunStatus = Literal["pending", "success", "failed"] + + class MlflowTable(BaseModel): data: dict[str, Any] artifact_file: str @@ -92,31 +95,33 @@ class MlflowTable(BaseModel): class MlflowRunData(BaseModel): experiment_name: str run_name: str - tags: Optional[dict[str, Any]] - metrics: Optional[dict[str, int]] - params: Optional[dict[str, Any]] - table: Optional[MlflowTable] + tags: Optional[dict[str, Any]] = None + metrics: Optional[dict[str, int]] = None + params: Optional[dict[str, Any]] = None + table: Optional[MlflowTable] = None + status: MlflowRunStatus -async def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed", None]: - if data["status"] == "success": +async def evaluate_json_data(data: MlflowRunData) -> Optional[MlflowRunStatus]: + if data.status == "success": return None - if data["status"] == "pending": + if data.status == "pending": experiment: Experiment = mlflow.set_experiment( - experiment_name=data["experiment_name"] + experiment_name=data.experiment_name, ) with mlflow.start_run( - experiment_id=experiment.experiment_id, run_name=data["run_name"] + experiment_id=experiment.experiment_id, + run_name=data.run_name, ): - if "tags" in data: - mlflow.set_tags(data["tags"]) - if "params" in data: - mlflow.log_params(data["params"]) - if "metrics" in data: - mlflow.log_metrics(data["metrics"]) - if "table" in data: - mlflow.log_table(**data["table"]) + if data.tags: + mlflow.set_tags(data.tags) + if data.params: + mlflow.log_params(data.params) + if data.metrics: + mlflow.log_metrics(data.metrics) + if data.table: + mlflow.log_table(**data.table) return "success" return "failed" @@ -125,19 +130,20 @@ async def evaluate_json_data(data: MlflowRunData) -> Literal["success", "failed" async def process_io_pair(file_path, processing_function): """Callback function to process a io saved in a file.""" with open(file_path, "r") as f: - data = json.load(f) + data = MlflowRunData(**json.load(f)) + # Process io pair status = await processing_function(data) - # save the response if status == "success": logger.info("Successfully processed i/o pair: %s", file_path) os.remove(file_path) return + # save the response if status is not None: - data["status"] = status + data.status = status with open(file_path, "w") as f: - json.dump(data, f, indent=2) + json.dump(data.model_dump(), f, indent=2) if status == "pending": logger.info( From c18ede5e586901dd75fbd65518793af621fc1ca6 Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 21 Feb 2025 10:56:02 -0800 Subject: [PATCH 13/20] Resolve circular imports --- llm-service/app/routers/index/sessions/__init__.py | 4 ++-- llm-service/app/services/mlflow/__init__.py | 13 ++++--------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 64dd086..82ba2be 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -91,7 +91,7 @@ def rating( response_id: str, request: ChatResponseRating, ) -> ChatResponseRating: - rating_mlflow_log_metric(request, response_id, session_id) + rating_mlflow_log_metric(request.rating, response_id, session_id) return ChatResponseRating(rating=request.rating) @@ -108,7 +108,7 @@ def feedback( response_id: str, request: ChatResponseFeedback, ) -> ChatResponseFeedback: - feedback_mlflow_log_table(request, response_id, session_id) + feedback_mlflow_log_table(request.feedback, response_id, session_id) return ChatResponseFeedback(feedback=request.feedback) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 6a90d06..604a42f 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -46,7 +46,6 @@ from mlflow.entities import Experiment, Run from pydantic import BaseModel -from app.routers.index.sessions import ChatResponseRating, ChatResponseFeedback from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode from app.services.metadata_apis import data_sources_metadata_api, session_metadata_api from app.services.metadata_apis.data_sources_metadata_api import RagDataSource @@ -213,9 +212,7 @@ def data_source_record_run( ) -def rating_mlflow_log_metric( - request: ChatResponseRating, response_id: str, session_id: int -) -> None: +def rating_mlflow_log_metric(rating: bool, response_id: str, session_id: int) -> None: session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" @@ -227,13 +224,11 @@ def rating_mlflow_log_metric( ) for run in runs: - value: int = 1 if request.rating else -1 + value: int = 1 if rating else -1 mlflow.log_metric("rating", value, run_id=run.info.run_id) -def feedback_mlflow_log_table( - request: ChatResponseFeedback, response_id: str, session_id: int -) -> None: +def feedback_mlflow_log_table(feedback: str, response_id: str, session_id: int) -> None: session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( experiment_name=f"session_{session.name}_{session.id}" @@ -245,7 +240,7 @@ def feedback_mlflow_log_table( ) for run in runs: mlflow.log_table( - data={"feedback": request.feedback}, + data={"feedback": feedback}, artifact_file="feedback.json", run_id=run.info.run_id, ) From 7abe527e43b863151c3a635d6f0b3add955a76ef Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 21 Feb 2025 11:00:24 -0800 Subject: [PATCH 14/20] Don't pass Request classes into services --- .../app/routers/index/data_source/__init__.py | 30 ++++++++++++++-- llm-service/app/services/mlflow/__init__.py | 36 ------------------- 2 files changed, 28 insertions(+), 38 deletions(-) diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index c604a35..740246b 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -48,7 +48,7 @@ from ....services import document_storage, models from ....services.metadata_apis import data_sources_metadata_api from ....services.metadata_apis.data_sources_metadata_api import RagDataSource -from ....services.mlflow import data_source_record_run, RagIndexDocumentRequest +from ....services.mlflow import write_mlflow_run_json logger = logging.getLogger(__name__) @@ -63,6 +63,18 @@ class SummarizeDocumentRequest(BaseModel): original_filename: str +class RagIndexDocumentConfiguration(BaseModel): + chunk_size: int = 512 # this is llama-index's default + chunk_overlap: int = 10 # percentage of tokens in a chunk (chunk_size) + + +class RagIndexDocumentRequest(BaseModel): + s3_bucket_name: str + s3_document_key: str + original_filename: str + configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() + + class ChunkContentsResponse(BaseModel): text: str metadata: Dict[str, Any] @@ -163,7 +175,21 @@ def _download_and_index( llm=llm, chunks_vector_store=self.chunks_vector_store, ) - data_source_record_run(datasource, doc_id, file_path, request) + write_mlflow_run_json( + f"datasource_{datasource.name}_{datasource.id}", + f"doc_{doc_id}", + { + "params": { + "data_source_id": str(datasource.id), + "embedding_model": datasource.embedding_model, + "summarization_model": datasource.summarization_model, + "chunk_size": str(request.configuration.chunk_size), + "chunk_overlap": str(request.configuration.chunk_overlap), + "file_name": request.original_filename, + "file_size_bytes": str(file_path.stat().st_size), + } + }, + ) # Delete to avoid duplicates self.chunks_vector_store.delete_document(doc_id) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 604a42f..3b6dcbe 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -159,18 +159,6 @@ def record_direct_llm_mlflow_run( ) -class RagIndexDocumentConfiguration(BaseModel): - chunk_size: int = 512 # this is llama-index's default - chunk_overlap: int = 10 # percentage of tokens in a chunk (chunk_size) - - -class RagIndexDocumentRequest(BaseModel): - s3_bucket_name: str - s3_document_key: str - original_filename: str - configuration: RagIndexDocumentConfiguration = RagIndexDocumentConfiguration() - - def write_mlflow_run_json( experiment_name: str, run_name: str, data: dict[str, Any] ) -> None: @@ -188,30 +176,6 @@ def write_mlflow_run_json( json.dump(contents, f) -def data_source_record_run( - datasource: RagDataSource, - doc_id: str, - file_path: Path, - request: RagIndexDocumentRequest, -) -> None: - - write_mlflow_run_json( - f"datasource_{datasource.name}_{datasource.id}", - f"doc_{doc_id}", - { - "params": { - "data_source_id": str(datasource.id), - "embedding_model": datasource.embedding_model, - "summarization_model": datasource.summarization_model, - "chunk_size": str(request.configuration.chunk_size), - "chunk_overlap": str(request.configuration.chunk_overlap), - "file_name": request.original_filename, - "file_size_bytes": str(file_path.stat().st_size), - } - }, - ) - - def rating_mlflow_log_metric(rating: bool, response_id: str, session_id: int) -> None: session = session_metadata_api.get_session(session_id) experiment: Experiment = mlflow.set_experiment( From 6972cedd76c70473dcf6a82922926814614dc1bd Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 21 Feb 2025 11:04:04 -0800 Subject: [PATCH 15/20] Create data directory before running reconciler --- local-dev.sh | 2 ++ scripts/startup_app.sh | 1 + 2 files changed, 3 insertions(+) diff --git a/local-dev.sh b/local-dev.sh index 26e3730..2eb546d 100755 --- a/local-dev.sh +++ b/local-dev.sh @@ -70,6 +70,8 @@ if [ -z "$USE_SYSTEM_UV" ]; then python -m pip install uv fi uv sync + +mkdir -p $MLFLOW_RECONCILER_DATA_PATH uv run pytest -sxvvra app uv run mlflow ui & diff --git a/scripts/startup_app.sh b/scripts/startup_app.sh index a6957cb..b25ed10 100755 --- a/scripts/startup_app.sh +++ b/scripts/startup_app.sh @@ -65,6 +65,7 @@ scripts/startup_java.sh & 2>&1 # start Python backend cd llm-service +mkdir -p $MLFLOW_RECONCILER_DATA_PATH uv run fastapi run --reload --host 127.0.0.1 --port 8081 & 2>&1 # wait for the python backend to be ready From 02ae01bb0278f6f1bb7487681a9261b1232e4ca0 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 12:29:49 -0700 Subject: [PATCH 16/20] remove tracing and fix max score type --- llm-service/app/routers/index/data_source/__init__.py | 3 --- llm-service/app/routers/index/sessions/__init__.py | 1 - llm-service/app/services/chat.py | 2 -- llm-service/reconciler/mlflow_reconciler.py | 6 ++++-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/llm-service/app/routers/index/data_source/__init__.py b/llm-service/app/routers/index/data_source/__init__.py index 740246b..41df200 100644 --- a/llm-service/app/routers/index/data_source/__init__.py +++ b/llm-service/app/routers/index/data_source/__init__.py @@ -32,7 +32,6 @@ from http import HTTPStatus from typing import Any, Dict, Optional -import mlflow from fastapi import APIRouter, Depends, HTTPException from fastapi_utils.cbv import cbv from llama_index.core.llms import LLM @@ -142,10 +141,8 @@ def download_and_index( request: RagIndexDocumentRequest, ) -> None: datasource = data_sources_metadata_api.get_metadata(data_source_id) - mlflow.llama_index.autolog() self._download_and_index(datasource, doc_id, request) - @mlflow.trace(name="download_and_index") def _download_and_index( self, datasource: RagDataSource, doc_id: str, request: RagIndexDocumentRequest ) -> None: diff --git a/llm-service/app/routers/index/sessions/__init__.py b/llm-service/app/routers/index/sessions/__init__.py index 82ba2be..4d13c7c 100644 --- a/llm-service/app/routers/index/sessions/__init__.py +++ b/llm-service/app/routers/index/sessions/__init__.py @@ -141,7 +141,6 @@ def chat( _basusertoken: Annotated[str | None, Cookie()] = None, ) -> RagStudioChatMessage: user_name = parse_jwt_cookie(_basusertoken) - # mlflow.llama_index.autolog() configuration = request.configuration or RagPredictConfiguration() if configuration.exclude_knowledge_base: diff --git a/llm-service/app/services/chat.py b/llm-service/app/services/chat.py index 0565c1e..314901a 100644 --- a/llm-service/app/services/chat.py +++ b/llm-service/app/services/chat.py @@ -39,7 +39,6 @@ import uuid from typing import List, Iterable -import mlflow from fastapi import HTTPException from llama_index.core.base.llms.types import MessageRole from llama_index.core.chat_engine.types import AgentChatResponse @@ -85,7 +84,6 @@ def v2_chat( return new_chat_message -@mlflow.trace(name="v2_chat") def _run_chat( session: Session, response_id: str, diff --git a/llm-service/reconciler/mlflow_reconciler.py b/llm-service/reconciler/mlflow_reconciler.py index 8b3b8b7..7192e91 100644 --- a/llm-service/reconciler/mlflow_reconciler.py +++ b/llm-service/reconciler/mlflow_reconciler.py @@ -96,7 +96,7 @@ class MlflowRunData(BaseModel): experiment_name: str run_name: str tags: Optional[dict[str, Any]] = None - metrics: Optional[dict[str, int]] = None + metrics: Optional[dict[str, float]] = None params: Optional[dict[str, Any]] = None table: Optional[MlflowTable] = None status: MlflowRunStatus @@ -121,7 +121,9 @@ async def evaluate_json_data(data: MlflowRunData) -> Optional[MlflowRunStatus]: if data.metrics: mlflow.log_metrics(data.metrics) if data.table: - mlflow.log_table(**data.table) + mlflow.log_table( + data=data.table.data, artifact_file=data.table.artifact_file + ) return "success" return "failed" From b3a60ba703a942f33c849d1f9363db0d95e005a3 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 13:10:53 -0700 Subject: [PATCH 17/20] remove unused --- llm-service/app/services/mlflow/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/llm-service/app/services/mlflow/__init__.py b/llm-service/app/services/mlflow/__init__.py index 3b6dcbe..901840a 100644 --- a/llm-service/app/services/mlflow/__init__.py +++ b/llm-service/app/services/mlflow/__init__.py @@ -39,16 +39,13 @@ import os import re from datetime import datetime -from pathlib import Path from typing import Any import mlflow from mlflow.entities import Experiment, Run -from pydantic import BaseModel from app.services.chat_store import RagStudioChatMessage, RagPredictSourceNode from app.services.metadata_apis import data_sources_metadata_api, session_metadata_api -from app.services.metadata_apis.data_sources_metadata_api import RagDataSource from app.services.metadata_apis.session_metadata_api import Session from app.services.query.query_configuration import QueryConfiguration From b420df56b31b0164fa9c46e469ae47aaa467c39d Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 14:20:14 -0700 Subject: [PATCH 18/20] fix pytests --- llm-service/app/main.py | 2 ++ llm-service/app/tests/conftest.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/llm-service/app/main.py b/llm-service/app/main.py index c831bed..a6b656d 100644 --- a/llm-service/app/main.py +++ b/llm-service/app/main.py @@ -38,6 +38,8 @@ import functools import logging +import os +import pwd import sys import time from collections.abc import Awaitable, Callable diff --git a/llm-service/app/tests/conftest.py b/llm-service/app/tests/conftest.py index 1f65fe9..1c1bd7d 100644 --- a/llm-service/app/tests/conftest.py +++ b/llm-service/app/tests/conftest.py @@ -83,6 +83,15 @@ def use_local_storage(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("S3_RAG_DOCUMENT_BUCKET", "") +@pytest.fixture(autouse=True) +def use_mlflow_reconciler_data_path( + monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path +) -> None: + mlflow_dir = tmp_path / "mlflow-run-data" + mlflow_dir.mkdir() + monkeypatch.setenv("MLFLOW_RECONCILER_DATA_PATH", str(mlflow_dir)) + + @pytest.fixture def document_id(index_document_request_body: dict[str, Any]) -> str: return cast(str, index_document_request_body["s3_document_key"].split("/")[-1]) From 2898dc46b97efb20000b35eedb01012b095d0554 Mon Sep 17 00:00:00 2001 From: Elijah Williams Date: Fri, 21 Feb 2025 14:21:39 -0700 Subject: [PATCH 19/20] remove unused --- llm-service/app/main.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/llm-service/app/main.py b/llm-service/app/main.py index a6b656d..c831bed 100644 --- a/llm-service/app/main.py +++ b/llm-service/app/main.py @@ -38,8 +38,6 @@ import functools import logging -import os -import pwd import sys import time from collections.abc import Awaitable, Callable From 54f87a9005193cb71c227d89bdf4949b0bfa4714 Mon Sep 17 00:00:00 2001 From: Michael Liu Date: Fri, 21 Feb 2025 13:26:31 -0800 Subject: [PATCH 20/20] Create the reconciler data dir right before starting app --- local-dev.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/local-dev.sh b/local-dev.sh index 2eb546d..6f963b8 100755 --- a/local-dev.sh +++ b/local-dev.sh @@ -70,12 +70,11 @@ if [ -z "$USE_SYSTEM_UV" ]; then python -m pip install uv fi uv sync - -mkdir -p $MLFLOW_RECONCILER_DATA_PATH uv run pytest -sxvvra app uv run mlflow ui & +mkdir -p $MLFLOW_RECONCILER_DATA_PATH uv run fastapi dev --port=8081 & # wait for the python backend to be ready