From 6fc5b8ad15cd4f36838c5e399f3628909537d595 Mon Sep 17 00:00:00 2001 From: Alexej Penner Date: Wed, 11 Dec 2024 13:42:36 +0100 Subject: [PATCH] Reformatted and cleaned up deprecated code --- llm-complete-guide/gh_action_rag.py | 14 +- llm-complete-guide/pipelines/__init__.py | 4 +- .../pipelines/finetune_embeddings.py | 1 - llm-complete-guide/pipelines/llm_basic_rag.py | 1 - .../pipelines/llm_index_and_evaluate.py | 3 +- .../pipelines/local_deployment.py | 1 - .../pipelines/prod_deployment.py | 5 +- llm-complete-guide/run.py | 20 +-- llm-complete-guide/service.py | 104 ++++++++------ llm-complete-guide/steps/bento_builder.py | 21 +-- llm-complete-guide/steps/bento_deployment.py | 2 +- llm-complete-guide/steps/bento_dockerizer.py | 19 +-- llm-complete-guide/steps/eval_pii.py | 10 +- llm-complete-guide/steps/eval_retrieval.py | 10 +- .../steps/finetune_embeddings.py | 12 +- llm-complete-guide/steps/k8s_deployment.py | 85 +++++------ llm-complete-guide/steps/populate_index.py | 134 ++++++++++-------- llm-complete-guide/steps/rag_deployment.py | 7 +- llm-complete-guide/steps/url_scraper.py | 3 +- .../steps/url_scraping_utils.py | 14 +- llm-complete-guide/steps/visualize_chat.py | 6 +- llm-complete-guide/steps/vllm_deployment.py | 25 ++-- llm-complete-guide/utils/llm_utils.py | 94 +++++++----- llm-complete-guide/utils/openai_utils.py | 1 - 24 files changed, 321 insertions(+), 275 deletions(-) diff --git a/llm-complete-guide/gh_action_rag.py b/llm-complete-guide/gh_action_rag.py index ee8ac86d..e21e9980 100644 --- a/llm-complete-guide/gh_action_rag.py +++ b/llm-complete-guide/gh_action_rag.py @@ -21,12 +21,10 @@ import click import yaml -from zenml.enums import PluginSubType - from pipelines.llm_index_and_evaluate import llm_index_and_evaluate -from zenml.client import Client from zenml import Model -from zenml.exceptions import ZenKeyError +from zenml.client import Client +from zenml.enums import PluginSubType @click.command( @@ -89,7 +87,7 @@ def main( zenml_model_name: Optional[str] = "zenml-docs-qa-rag", zenml_model_version: Optional[str] = None, ): - """ + """ Executes the pipeline to train a basic RAG model. Args: @@ -108,14 +106,14 @@ def main( config = yaml.safe_load(file) # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": postfix = "" else: postfix = "-dev" - + if Path("ZENML_VERSION.txt").exists(): with open("ZENML_VERSION.txt", "r") as file: zenml_model_version = file.read().strip() @@ -177,7 +175,7 @@ def main( service_account_id=service_account_id, auth_window=0, flavor="builtin", - action_type=PluginSubType.PIPELINE_RUN + action_type=PluginSubType.PIPELINE_RUN, ).id client.create_trigger( name="Production Trigger LLM-Complete", diff --git a/llm-complete-guide/pipelines/__init__.py b/llm-complete-guide/pipelines/__init__.py index 9055b486..ad60e74f 100644 --- a/llm-complete-guide/pipelines/__init__.py +++ b/llm-complete-guide/pipelines/__init__.py @@ -19,7 +19,7 @@ from pipelines.generate_chunk_questions import generate_chunk_questions from pipelines.llm_basic_rag import llm_basic_rag from pipelines.llm_eval import llm_eval -from pipelines.rag_deployment import rag_deployment from pipelines.llm_index_and_evaluate import llm_index_and_evaluate from pipelines.local_deployment import local_deployment -from pipelines.prod_deployment import production_deployment \ No newline at end of file +from pipelines.prod_deployment import production_deployment +from pipelines.rag_deployment import rag_deployment diff --git a/llm-complete-guide/pipelines/finetune_embeddings.py b/llm-complete-guide/pipelines/finetune_embeddings.py index e53ae3f1..19b8b08c 100644 --- a/llm-complete-guide/pipelines/finetune_embeddings.py +++ b/llm-complete-guide/pipelines/finetune_embeddings.py @@ -12,7 +12,6 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from constants import EMBEDDINGS_MODEL_NAME_ZENML from steps.finetune_embeddings import ( evaluate_base_model, evaluate_finetuned_model, diff --git a/llm-complete-guide/pipelines/llm_basic_rag.py b/llm-complete-guide/pipelines/llm_basic_rag.py index 82a97b21..895c4df3 100644 --- a/llm-complete-guide/pipelines/llm_basic_rag.py +++ b/llm-complete-guide/pipelines/llm_basic_rag.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from litellm import config_path from steps.populate_index import ( generate_embeddings, diff --git a/llm-complete-guide/pipelines/llm_index_and_evaluate.py b/llm-complete-guide/pipelines/llm_index_and_evaluate.py index 16423867..b82c84a3 100644 --- a/llm-complete-guide/pipelines/llm_index_and_evaluate.py +++ b/llm-complete-guide/pipelines/llm_index_and_evaluate.py @@ -15,9 +15,10 @@ # limitations under the License. # -from pipelines import llm_basic_rag, llm_eval from zenml import pipeline +from pipelines import llm_basic_rag, llm_eval + @pipeline def llm_index_and_evaluate() -> None: diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py index db632cfe..b68e72e5 100644 --- a/llm-complete-guide/pipelines/local_deployment.py +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -1,6 +1,5 @@ from steps.bento_builder import bento_builder from steps.bento_deployment import bento_deployment -from steps.visualize_chat import create_chat_interface from zenml import pipeline diff --git a/llm-complete-guide/pipelines/prod_deployment.py b/llm-complete-guide/pipelines/prod_deployment.py index 06eb3519..3abee7a2 100644 --- a/llm-complete-guide/pipelines/prod_deployment.py +++ b/llm-complete-guide/pipelines/prod_deployment.py @@ -22,12 +22,11 @@ @pipeline(enable_cache=False) -def production_deployment( -): +def production_deployment(): """Model deployment pipeline. This is a pipeline deploys trained model for future inference. """ bento_model_image = bento_dockerizer() deployment_info = k8s_deployment(bento_model_image) - create_chat_interface(deployment_info) \ No newline at end of file + create_chat_interface(deployment_info) diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index 06ad8fd3..c6368a65 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -47,14 +47,14 @@ generate_synthetic_data, llm_basic_rag, llm_eval, - rag_deployment, llm_index_and_evaluate, local_deployment, production_deployment, + rag_deployment, ) from structures import Document -from zenml.materializers.materializer_registry import materializer_registry from zenml import Model +from zenml.materializers.materializer_registry import materializer_registry logger = get_logger(__name__) @@ -150,7 +150,7 @@ "env", default="local", help="The environment to use for the completion.", -) +) def main( pipeline: str, query_text: Optional[str] = None, @@ -186,9 +186,9 @@ def main( } }, } - + # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": @@ -200,8 +200,8 @@ def main( with open("ZENML_VERSION.txt", "r") as file: zenml_version = file.read().strip() zenml_version += postfix - #zenml_model_version = file.read().strip() - #zenml_model_version += postfix + # zenml_model_version = file.read().strip() + # zenml_model_version += postfix else: raise RuntimeError( "No model version file found. Please create a file called ZENML_VERSION.txt in the root of the repo with the model version." @@ -294,7 +294,9 @@ def main( elif pipeline == "embeddings": finetune_embeddings.with_options( - model=zenml_model, config_path=config_path, **embeddings_finetune_args + model=zenml_model, + config_path=config_path, + **embeddings_finetune_args, )() elif pipeline == "chunks": @@ -309,4 +311,4 @@ def main( materializer_registry.register_materializer_type( Document, DocumentMaterializer ) - main() \ No newline at end of file + main() diff --git a/llm-complete-guide/service.py b/llm-complete-guide/service.py index adec9f44..73a77683 100644 --- a/llm-complete-guide/service.py +++ b/llm-complete-guide/service.py @@ -1,14 +1,11 @@ -import asyncio -from typing import Any, AsyncGenerator, Dict +from typing import AsyncGenerator import bentoml import litellm import numpy as np from constants import ( - EMBEDDINGS_MODEL_ID_FINE_TUNED, MODEL_NAME_MAP, OPENAI_MODEL, - SECRET_NAME, SECRET_NAME_ELASTICSEARCH, ) from elasticsearch import Elasticsearch @@ -29,30 +26,43 @@ http={ "cors": { "enabled": True, - "access_control_allow_origins": ["https://cloud.zenml.io"], # Add your allowed origins - "access_control_allow_methods": ["GET", "OPTIONS", "POST", "HEAD", "PUT"], + "access_control_allow_origins": [ + "https://cloud.zenml.io" + ], # Add your allowed origins + "access_control_allow_methods": [ + "GET", + "OPTIONS", + "POST", + "HEAD", + "PUT", + ], "access_control_allow_credentials": True, "access_control_allow_headers": ["*"], # "access_control_allow_origin_regex": "https://.*\.my_org\.com", # Optional regex "access_control_max_age": 1200, "access_control_expose_headers": ["Content-Length"], } - } + }, ) class RAGService: """RAG service for generating responses using LLM and RAG.""" + def __init__(self): """Initialize the RAG service.""" # Initialize embeddings model self.embeddings_model = SentenceTransformer(EMBEDDINGS_MODEL) - + # Initialize reranker self.reranker = Reranker("flashrank") - + # Initialize Elasticsearch client client = Client() - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret( + SECRET_NAME_ELASTICSEARCH + ).secret_values["elasticsearch_api_key"] self.es_client = Elasticsearch(es_host, api_key=es_api_key) def get_embeddings(self, text: str) -> np.ndarray: @@ -62,32 +72,42 @@ def get_embeddings(self, text: str) -> np.ndarray: embeddings = embeddings[0] return embeddings - def get_similar_docs(self, query_embedding: np.ndarray, n: int = 20) -> list: + def get_similar_docs( + self, query_embedding: np.ndarray, n: int = 20 + ) -> list: """Get similar documents for the given query embedding.""" if query_embedding.ndim == 2: query_embedding = query_embedding[0] - - response = self.es_client.search(index="zenml_docs", knn={ - "field": "embedding", - "query_vector": query_embedding.tolist(), - "num_candidates": 50, - "k": n - }) - + + response = self.es_client.search( + index="zenml_docs", + knn={ + "field": "embedding", + "query_vector": query_embedding.tolist(), + "num_candidates": 50, + "k": n, + }, + ) + docs = [] for hit in response["hits"]["hits"]: - docs.append({ - "content": hit["_source"]["content"], - "url": hit["_source"]["url"], - "parent_section": hit["_source"]["parent_section"] - }) + docs.append( + { + "content": hit["_source"]["content"], + "url": hit["_source"]["url"], + "parent_section": hit["_source"]["parent_section"], + } + ) return docs def rerank_documents(self, query: str, documents: list) -> list: """Rerank documents using the reranker.""" - docs_texts = [f"{doc['content']} PARENT SECTION: {doc['parent_section']}" for doc in documents] + docs_texts = [ + f"{doc['content']} PARENT SECTION: {doc['parent_section']}" + for doc in documents + ] results = self.reranker.rank(query=query, docs=docs_texts) - + reranked_docs = [] for result in results.results: index_val = result.doc_id @@ -95,7 +115,9 @@ def rerank_documents(self, query: str, documents: list) -> list: reranked_docs.append((result.text, doc["url"])) return reranked_docs[:5] - async def get_completion(self, messages: list, model: str, temperature: float, max_tokens: int) -> AsyncGenerator[str, None]: + async def get_completion( + self, messages: list, model: str, temperature: float, max_tokens: int + ) -> AsyncGenerator[str, None]: """Handle the completion request and streaming response.""" try: response = await litellm.acompletion( @@ -104,9 +126,9 @@ async def get_completion(self, messages: list, model: str, temperature: float, m temperature=temperature, max_tokens=max_tokens, api_key=get_openai_api_key(), - stream=True + stream=True, ) - + async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content @@ -124,16 +146,16 @@ async def generate( try: # Get embeddings for query query_embedding = self.get_embeddings(query) - + # Retrieve similar documents similar_docs = self.get_similar_docs(query_embedding, n=20) - + # Rerank documents reranked_docs = self.rerank_documents(query, similar_docs) - + # Prepare context from reranked documents context = "\n\n".join([doc[0] for doc in reranked_docs]) - + # Prepare system message system_message = """ You are a friendly chatbot. \ @@ -149,15 +171,17 @@ async def generate( {"role": "system", "content": system_message}, {"role": "user", "content": query}, { - "role": "assistant", - "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}" - } + "role": "assistant", + "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}", + }, ] # Get completion from LLM using the new async method model = MODEL_NAME_MAP.get(OPENAI_MODEL, OPENAI_MODEL) - async for chunk in self.get_completion(messages, model, temperature, max_tokens): + async for chunk in self.get_completion( + messages, model, temperature, max_tokens + ): yield chunk - + except Exception as e: - yield f"Error occurred: {str(e)}" \ No newline at end of file + yield f"Error occurred: {str(e)}" diff --git a/llm-complete-guide/steps/bento_builder.py b/llm-complete-guide/steps/bento_builder.py index b89d571b..c94c4b33 100644 --- a/llm-complete-guide/steps/bento_builder.py +++ b/llm-complete-guide/steps/bento_builder.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -import importlib import os from typing import Optional -import bentoml from bentoml import bentos from bentoml._internal.bento import bento from constants import ( @@ -25,22 +23,24 @@ from zenml import ArtifactConfig, Model, get_step_context, step from zenml import __version__ as zenml_version from zenml.client import Client +from zenml.enums import ArtifactType from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME from zenml.integrations.bentoml.materializers.bentoml_bento_materializer import ( BentoMaterializer, ) -from zenml.integrations.bentoml.steps import bento_builder_step from zenml.logger import get_logger -from zenml.orchestrators.utils import get_config_environment_vars from zenml.utils import source_utils logger = get_logger(__name__) + @step(output_materializers=BentoMaterializer, enable_cache=False) def bento_builder() -> ( Annotated[ Optional[bento.Bento], - ArtifactConfig(name="bentoml_rag_deployment", is_model_artifact=True), + ArtifactConfig( + name="bentoml_rag_deployment", artifact_type=ArtifactType.MODEL + ), ] ): """Predictions step. @@ -65,7 +65,9 @@ def bento_builder() -> ( if Client().active_stack.orchestrator.flavor == "local": model = get_step_context().model version_to_deploy = Model(name=model.name, version="production") - logger.info(f"Building BentoML bundle for model: {version_to_deploy.name}") + logger.info( + f"Building BentoML bundle for model: {version_to_deploy.name}" + ) # Build the BentoML bundle bento = bentos.build( service="service.py:RAGService", @@ -74,11 +76,14 @@ def bento_builder() -> ( "model_name": version_to_deploy.name, "model_version": version_to_deploy.version, "model_uri": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", - "bento_uri": os.path.join(get_step_context().get_output_artifact_uri(), DEFAULT_BENTO_FILENAME), + "bento_uri": os.path.join( + get_step_context().get_output_artifact_uri(), + DEFAULT_BENTO_FILENAME, + ), }, build_ctx=source_utils.get_source_root(), python={ - "requirements_txt":"requirements.txt", + "requirements_txt": "requirements.txt", }, ) else: diff --git a/llm-complete-guide/steps/bento_deployment.py b/llm-complete-guide/steps/bento_deployment.py index 8d26dfa4..de7f7a01 100644 --- a/llm-complete-guide/steps/bento_deployment.py +++ b/llm-complete-guide/steps/bento_deployment.py @@ -56,4 +56,4 @@ def bento_deployment( logger.info( f"The deployed service info: {model_deployer.get_model_server_info(service)}" ) - return service \ No newline at end of file + return service diff --git a/llm-complete-guide/steps/bento_dockerizer.py b/llm-complete-guide/steps/bento_dockerizer.py index 4e52dcba..81009ce8 100644 --- a/llm-complete-guide/steps/bento_dockerizer.py +++ b/llm-complete-guide/steps/bento_dockerizer.py @@ -11,23 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -import os -from typing import Optional import bentoml -from bentoml import bentos -from bentoml._internal.bento import bento from typing_extensions import Annotated from zenml import ArtifactConfig, Model, get_step_context, step -from zenml import __version__ as zenml_version from zenml.client import Client -from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME -from zenml.integrations.bentoml.steps import bento_builder_step from zenml.logger import get_logger -from zenml.utils import source_utils logger = get_logger(__name__) + @step(enable_cache=False) def bento_dockerizer() -> ( Annotated[ @@ -36,14 +29,16 @@ def bento_dockerizer() -> ( ] ): """dockerize_bento step. - + This step is responsible for dockerizing the BentoML model. """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### zenml_client = Client() model = get_step_context().model version_to_deploy = Model(name=model.name) - bentoml_deployment = zenml_client.get_artifact_version(name_id_or_prefix="bentoml_rag_deployment") + bentoml_deployment = zenml_client.get_artifact_version( + name_id_or_prefix="bentoml_rag_deployment" + ) bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' container_registry = zenml_client.active_stack.container_registry assert container_registry, "Container registry is not configured." @@ -59,7 +54,7 @@ def bento_dockerizer() -> ( except Exception as e: logger.error(f"Error containerizing the bento: {e}") raise e - + container_registry.push_image(image_name) ### YOUR CODE ENDS HERE ### - return image_name \ No newline at end of file + return image_name diff --git a/llm-complete-guide/steps/eval_pii.py b/llm-complete-guide/steps/eval_pii.py index 460e0e35..5148ff31 100644 --- a/llm-complete-guide/steps/eval_pii.py +++ b/llm-complete-guide/steps/eval_pii.py @@ -306,8 +306,9 @@ def eval_pii( "ips_found": train_results["statistics"]["total_findings"]["ips"], } log_metadata( - metadata=train_metadata, artifact_name="train_pii_results", - infer_artifact=True + metadata=train_metadata, + artifact_name="train_pii_results", + infer_artifact=True, ) test_metadata = { @@ -322,8 +323,9 @@ def eval_pii( "ips_found": test_results["statistics"]["total_findings"]["ips"], } log_metadata( - metadata=test_metadata, artifact_name="test_pii_results", - infer_artifact=True + metadata=test_metadata, + artifact_name="test_pii_results", + infer_artifact=True, ) pii_chart = plot_pii_results(train_results, test_results) diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index 2b555b85..0261bef2 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -90,11 +90,11 @@ def query_similar_docs( num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, - conn=conn, - es_client=es_client, - n=num_docs, - include_metadata=True + embedded_question, + conn=conn, + es_client=es_client, + n=num_docs, + include_metadata=True, ) if use_reranking: diff --git a/llm-complete-guide/steps/finetune_embeddings.py b/llm-complete-guide/steps/finetune_embeddings.py index 8ef535b4..def28080 100644 --- a/llm-complete-guide/steps/finetune_embeddings.py +++ b/llm-complete-guide/steps/finetune_embeddings.py @@ -49,6 +49,7 @@ from sentence_transformers.util import cos_sim from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client +from zenml.enums import ArtifactType from zenml.utils.cuda_utils import cleanup_gpu_memory @@ -202,7 +203,8 @@ def evaluate_finetuned_model( } log_metadata( - metadata={"finetuned_model_eval": finetuned_model_eval}, infer_model=True + metadata={"finetuned_model_eval": finetuned_model_eval}, + infer_model=True, ) return results @@ -218,7 +220,7 @@ def finetune( ) -> Annotated[ SentenceTransformer, ArtifactConfig( - is_model_artifact=True, + artifact_type=ArtifactType.MODEL, name="finetuned-model", ), ]: @@ -298,8 +300,8 @@ def finetune( token=zenml_client.get_secret(SECRET_NAME).secret_values["hf_token"], ) - log_metadata( - infer_model=True, + log_metadata( + infer_model=True, metadata={ "training_params": { "num_train_epochs": epochs, @@ -324,7 +326,7 @@ def finetune( else "N/A", }, "huggingface_model_id": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", - } + }, ) # handle materialization error with this workaround: diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py index 6d6dfad0..b62a1ea8 100644 --- a/llm-complete-guide/steps/k8s_deployment.py +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -13,7 +13,7 @@ # permissions and limitations under the License. import re from pathlib import Path -from typing import Dict, Optional, cast +from typing import Dict, cast import yaml from kubernetes import client, config @@ -21,7 +21,6 @@ from zenml import get_step_context, step from zenml.client import Client from zenml.integrations.bentoml.services.bentoml_local_deployment import ( - BentoMLLocalDeploymentConfig, BentoMLLocalDeploymentService, ) from zenml.logger import get_logger @@ -29,9 +28,10 @@ logger = get_logger(__name__) + def apply_kubernetes_configuration(k8s_configs: list) -> None: """Apply Kubernetes configurations using the K8s Python client. - + Args: k8s_configs: List of Kubernetes configuration dictionaries """ @@ -40,16 +40,16 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: config.load_kube_config() except: config.load_incluster_config() # For in-cluster deployment - + # Initialize API clients k8s_apps_v1 = client.AppsV1Api() k8s_core_v1 = client.CoreV1Api() - + for k8s_config in k8s_configs: kind = k8s_config["kind"] name = k8s_config["metadata"]["name"] namespace = k8s_config["metadata"].get("namespace", "default") - + try: if kind == "Deployment": # Check if deployment exists @@ -57,61 +57,53 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: k8s_apps_v1.read_namespaced_deployment(name, namespace) # Update existing deployment k8s_apps_v1.patch_namespaced_deployment( - name=name, - namespace=namespace, - body=k8s_config + name=name, namespace=namespace, body=k8s_config ) logger.info(f"Updated existing deployment: {name}") except ApiException as e: if e.status == 404: # Create new deployment k8s_apps_v1.create_namespaced_deployment( - namespace=namespace, - body=k8s_config + namespace=namespace, body=k8s_config ) logger.info(f"Created new deployment: {name}") else: raise e - + elif kind == "Service": # Check if service exists try: k8s_core_v1.read_namespaced_service(name, namespace) # Update existing service k8s_core_v1.patch_namespaced_service( - name=name, - namespace=namespace, - body=k8s_config + name=name, namespace=namespace, body=k8s_config ) logger.info(f"Updated existing service: {name}") except ApiException as e: if e.status == 404: # Create new service k8s_core_v1.create_namespaced_service( - namespace=namespace, - body=k8s_config + namespace=namespace, body=k8s_config ) logger.info(f"Created new service: {name}") else: raise e - + except ApiException as e: logger.error(f"Error applying {kind} {name}: {e}") raise e + @step(enable_cache=False) -def k8s_deployment( - docker_image_tag: str, - namespace: str = "default" -) -> Dict: +def k8s_deployment(docker_image_tag: str, namespace: str = "default") -> Dict: # Get the raw model name raw_model_name = get_step_context().model.name # Sanitize the model name model_name = sanitize_name(raw_model_name) - + # Get environment variables environment_vars = get_config_environment_vars() - + # Get current deployment zenml_client = Client() model_deployer = zenml_client.active_stack.model_deployer @@ -124,16 +116,16 @@ def k8s_deployment( template_path = Path(__file__).parent / "k8s_template.yaml" with open(template_path, "r") as f: k8s_configs = list(yaml.safe_load_all(f)) - + # Update configurations with sanitized names for config in k8s_configs: # Add namespace config["metadata"]["namespace"] = namespace - + # Update metadata labels and name config["metadata"]["labels"]["app"] = model_name config["metadata"]["name"] = model_name - + if config["kind"] == "Service": # Update service selector config["spec"]["selector"]["app"] = model_name @@ -143,45 +135,45 @@ def k8s_deployment( "service.beta.kubernetes.io/aws-load-balancer-ssl-cert": "arn:aws:acm:eu-central-1:339712793861:certificate/0426ace8-5fa3-40dd-bd81-b0fb1064bd85", "service.beta.kubernetes.io/aws-load-balancer-backend-protocol": "http", "service.beta.kubernetes.io/aws-load-balancer-ssl-ports": "443", - "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600" + "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", } - + # Update ports config["spec"]["ports"] = [ - { - "name": "https", - "port": 443, - "targetPort": 3000 - } + {"name": "https", "port": 443, "targetPort": 3000} ] - + elif config["kind"] == "Deployment": # Update deployment selector and template config["spec"]["selector"]["matchLabels"]["app"] = model_name - config["spec"]["template"]["metadata"]["labels"]["app"] = model_name - + config["spec"]["template"]["metadata"]["labels"]["app"] = ( + model_name + ) + # Update the container image and name containers = config["spec"]["template"]["spec"]["containers"] for container in containers: container["name"] = model_name container["image"] = docker_image_tag - + # Add environment variables to the container env_vars = [] for key, value in environment_vars.items(): env_vars.append({"name": key, "value": value}) container["env"] = env_vars - + # Apply the configurations try: apply_kubernetes_configuration(k8s_configs) deployment_status = "success" - logger.info(f"Successfully deployed model {model_name} with image: {docker_image_tag}") + logger.info( + f"Successfully deployed model {model_name} with image: {docker_image_tag}" + ) except Exception as e: deployment_status = "failed" logger.error(f"Failed to deploy model {model_name}: {str(e)}") raise e - + # Return deployment information deployment_info = { "model_name": model_name, @@ -192,9 +184,9 @@ def k8s_deployment( "configurations": k8s_configs, "url": "chat-rag.staging.cloudinfra.zenml.io", } - + if services: - bentoml_deployment= cast(BentoMLLocalDeploymentService, services[0]) + bentoml_deployment = cast(BentoMLLocalDeploymentService, services[0]) zenml_client.update_service( id=bentoml_deployment.uuid, prediction_url="https://chat-rag.staging.cloudinfra.zenml.io", @@ -202,11 +194,10 @@ def k8s_deployment( labels={ "docker_image": docker_image_tag, "namespace": namespace, - } + }, ) - - return deployment_info + return deployment_info def sanitize_name(name: str) -> str: @@ -216,4 +207,4 @@ def sanitize_name(name: str) -> str: sanitized = sanitized[:63].strip("-") # Ensure the name doesn't start or end with '-' sanitized = sanitized.strip("-") - return sanitized \ No newline at end of file + return sanitized diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index c477f505..556784e3 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -23,26 +23,25 @@ import json import logging import math -from typing import Annotated, Any, Dict, List, Tuple from enum import Enum +from typing import Annotated, Any, Dict, List, Tuple from constants import ( CHUNK_OVERLAP, CHUNK_SIZE, EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, + SECRET_NAME, SECRET_NAME_ELASTICSEARCH, - ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont from sentence_transformers import SentenceTransformer from structures import Document from utils.llm_utils import get_db_conn, get_es_client, split_documents -from zenml import ArtifactConfig, log_metadata, step, log_metadata -from zenml.metadata.metadata_types import Uri +from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client -from constants import SECRET_NAME +from zenml.metadata.metadata_types import Uri logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -453,11 +452,11 @@ def draw_bar_chart( """Draws a bar chart on the given image.""" # Ensure labels is a list, even if empty labels = labels or [] - + # Skip drawing if no data if not data: return - + max_value = max(data) bar_width = width // len(data) bar_spacing = 10 @@ -487,10 +486,21 @@ def draw_bar_chart( for i, label in enumerate(labels): if label is not None: # Add null check for individual labels font = ImageFont.load_default(size=10) - bbox = draw.textbbox((0, 0), str(label), font=font) # Convert to string + bbox = draw.textbbox( + (0, 0), str(label), font=font + ) # Convert to string label_width = bbox[2] - bbox[0] - label_x = x + i * (bar_width + bar_spacing) + (bar_width - label_width) // 2 - draw.text((label_x, y + height - 15), str(label), font=font, fill="black") + label_x = ( + x + + i * (bar_width + bar_spacing) + + (bar_width - label_width) // 2 + ) + draw.text( + (label_x, y + height - 15), + str(label), + font=font, + fill="black", + ) @step @@ -517,7 +527,7 @@ def preprocess_documents( try: log_metadata( artifact_name="split_chunks", - infer_artifact=True, + infer_artifact=True, metadata={ "chunk_size": CHUNK_SIZE, "chunk_overlap": CHUNK_OVERLAP, @@ -539,7 +549,7 @@ def preprocess_documents( log_metadata( artifact_name="split_chunks", - infer_artifact=True, + infer_artifact=True, metadata=stats, ) @@ -572,7 +582,7 @@ def generate_embeddings( log_metadata( artifact_name="documents_with_embeddings", - infer_artifact=True, + infer_artifact=True, metadata={ "embedding_type": EMBEDDINGS_MODEL, "embedding_dimensionality": EMBEDDING_DIMENSIONALITY, @@ -603,6 +613,7 @@ class IndexType(Enum): ELASTICSEARCH = "elasticsearch" POSTGRES = "postgres" + @step(enable_cache=False) def index_generator( documents: str, @@ -627,11 +638,12 @@ def index_generator( _index_generator_elastic(documents) else: _index_generator_postgres(documents) - + except Exception as e: logger.error(f"Error in index_generator: {e}") raise + def _index_generator_elastic(documents: str) -> None: """Generates an Elasticsearch index for the given documents.""" try: @@ -650,11 +662,11 @@ def _index_generator_elastic(documents: str) -> None: "type": "dense_vector", "dims": EMBEDDING_DIMENSIONALITY, "index": True, - "similarity": "cosine" + "similarity": "cosine", }, "filename": {"type": "text"}, "parent_section": {"type": "text"}, - "url": {"type": "text"} + "url": {"type": "text"}, } } } @@ -664,50 +676,49 @@ def _index_generator_elastic(documents: str) -> None: # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] operations = [] - + for doc in document_list: content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() ).hexdigest() - - exists_query = { - "query": { - "term": { - "doc_id": content_hash - } - } - } - + + exists_query = {"query": {"term": {"doc_id": content_hash}}} + if not es.count(index=index_name, body=exists_query)["count"]: - operations.append({ - "index": { - "_index": index_name, - "_id": content_hash + operations.append( + {"index": {"_index": index_name, "_id": content_hash}} + ) + + operations.append( + { + "doc_id": content_hash, + "content": doc.page_content, + "token_count": doc.token_count, + "embedding": doc.embedding, + "filename": doc.filename, + "parent_section": doc.parent_section, + "url": doc.url, } - }) - - operations.append({ - "doc_id": content_hash, - "content": doc.page_content, - "token_count": doc.token_count, - "embedding": doc.embedding, - "filename": doc.filename, - "parent_section": doc.parent_section, - "url": doc.url - }) - + ) + if operations: response = es.bulk(operations=operations, timeout="10m") - - success_count = sum(1 for item in response['items'] if 'index' in item and item['index']['status'] == 201) - failed_count = len(response['items']) - success_count - + + success_count = sum( + 1 + for item in response["items"] + if "index" in item and item["index"]["status"] == 201 + ) + failed_count = len(response["items"]) - success_count + logger.info(f"Successfully indexed {success_count} documents") if failed_count > 0: logger.warning(f"Failed to index {failed_count} documents") - for item in response['items']: - if 'index' in item and item['index']['status'] != 201: - logger.warning(f"Failed to index document: {item['index']['error']}") + for item in response["items"]: + if "index" in item and item["index"]["status"] != 201: + logger.warning( + f"Failed to index document: {item['index']['error']}" + ) else: logger.info("No new documents to index") @@ -717,11 +728,12 @@ def _index_generator_elastic(documents: str) -> None: logger.error(f"Error in Elasticsearch indexing: {e}") raise + def _index_generator_postgres(documents: str) -> None: """Generates a PostgreSQL index for the given documents.""" try: conn = get_db_conn() - + with conn.cursor() as cur: # Install pgvector if not already installed cur.execute("CREATE EXTENSION IF NOT EXISTS vector") @@ -743,7 +755,7 @@ def _index_generator_postgres(documents: str) -> None: conn.commit() register_vector(conn) - + # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] @@ -775,7 +787,6 @@ def _index_generator_postgres(documents: str) -> None: ) conn.commit() - cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") num_records = cur.fetchone()[0] logger.info(f"Number of vector records in table: {num_records}") @@ -800,6 +811,7 @@ def _index_generator_postgres(documents: str) -> None: if conn: conn.close() + def _log_metadata(index_type: IndexType) -> None: """Log metadata about the indexing process.""" prompt = """ @@ -812,9 +824,11 @@ def _log_metadata(index_type: IndexType) -> None: """ client = Client() - + if index_type == IndexType.ELASTICSEARCH: - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] connection_details = { "host": es_host, "api_key": "*********", @@ -824,10 +838,16 @@ def _log_metadata(index_type: IndexType) -> None: store_name = "pgvector" connection_details = { - "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], + "user": client.get_secret(SECRET_NAME).secret_values[ + "supabase_user" + ], "password": "**********", - "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], - "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], + "host": client.get_secret(SECRET_NAME).secret_values[ + "supabase_host" + ], + "port": client.get_secret(SECRET_NAME).secret_values[ + "supabase_port" + ], "dbname": "postgres", } diff --git a/llm-complete-guide/steps/rag_deployment.py b/llm-complete-guide/steps/rag_deployment.py index dae442bd..38c03097 100644 --- a/llm-complete-guide/steps/rag_deployment.py +++ b/llm-complete-guide/steps/rag_deployment.py @@ -2,7 +2,6 @@ import webbrowser from huggingface_hub import HfApi - from utils.hf_utils import get_hf_token from utils.llm_utils import process_input_with_retrieval from zenml import step @@ -50,9 +49,7 @@ def predict(message, history): ) -def upload_files_to_repo( - api, repo_id: str, files_mapping: dict, token: str -): +def upload_files_to_repo(api, repo_id: str, files_mapping: dict, token: str): """Upload multiple files to a Hugging Face repository Args: @@ -98,7 +95,7 @@ def gradio_rag_deployment() -> None: key="ZENML_STORE_API_KEY", value=ZENML_API_TOKEN, ) - + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_URL", diff --git a/llm-complete-guide/steps/url_scraper.py b/llm-complete-guide/steps/url_scraper.py index 68421d0c..58fc425e 100644 --- a/llm-complete-guide/steps/url_scraper.py +++ b/llm-complete-guide/steps/url_scraper.py @@ -26,7 +26,7 @@ def url_scraper( docs_url: str = "https://docs.zenml.io", repo_url: str = "https://github.com/zenml-io/zenml", website_url: str = "https://zenml.io", - use_dev_set: bool = False + use_dev_set: bool = False, ) -> Annotated[str, ArtifactConfig(name="urls")]: """Generates a list of relevant URLs to scrape. @@ -42,7 +42,6 @@ def url_scraper( # examples_readme_urls = get_nested_readme_urls(repo_url) use_dev_set = False if use_dev_set: - docs_urls = [ "https://docs.zenml.io/getting-started/system-architectures", "https://docs.zenml.io/getting-started/core-concepts", diff --git a/llm-complete-guide/steps/url_scraping_utils.py b/llm-complete-guide/steps/url_scraping_utils.py index d6367cbf..ec97ac94 100644 --- a/llm-complete-guide/steps/url_scraping_utils.py +++ b/llm-complete-guide/steps/url_scraping_utils.py @@ -13,14 +13,15 @@ # permissions and limitations under the License. import re -import requests -from bs4 import BeautifulSoup -from typing import List from logging import getLogger +from typing import List +import requests +from bs4 import BeautifulSoup logger = getLogger(__name__) + def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: """ Retrieve all pages from the ZenML documentation sitemap. @@ -32,18 +33,19 @@ def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: List[str]: A list of all documentation page URLs. """ logger.info("Fetching sitemap from docs.zenml.io...") - + # Fetch the sitemap sitemap_url = f"{base_url}/sitemap.xml" response = requests.get(sitemap_url) soup = BeautifulSoup(response.text, "xml") - + # Extract all URLs from the sitemap urls = [loc.text for loc in soup.find_all("loc")] - + logger.info(f"Found {len(urls)} pages in the sitemap.") return urls + def extract_parent_section(url: str) -> str: """ Extracts the parent section from a URL. diff --git a/llm-complete-guide/steps/visualize_chat.py b/llm-complete-guide/steps/visualize_chat.py index 02ca90b3..480516a3 100644 --- a/llm-complete-guide/steps/visualize_chat.py +++ b/llm-complete-guide/steps/visualize_chat.py @@ -9,8 +9,8 @@ @step(enable_cache=False) def create_chat_interface( - deployment_info: Dict[str, Any], - ) -> Annotated[HTMLString, "chat_bot"]: + deployment_info: Dict[str, Any], +) -> Annotated[HTMLString, "chat_bot"]: step_context = get_step_context() html = """
@@ -307,4 +307,4 @@ def create_chat_interface( "deployment_url": Uri(f"{model_version_url}/?tab=deployments"), }, ) - return HTMLString(html) \ No newline at end of file + return HTMLString(html) diff --git a/llm-complete-guide/steps/vllm_deployment.py b/llm-complete-guide/steps/vllm_deployment.py index 1379d168..3ef60cab 100644 --- a/llm-complete-guide/steps/vllm_deployment.py +++ b/llm-complete-guide/steps/vllm_deployment.py @@ -15,6 +15,9 @@ from typing import Optional, cast +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, +) from zenml import get_step_context, step from zenml.integrations.vllm.model_deployers.vllm_model_deployer import ( VLLMModelDeployer, @@ -25,15 +28,6 @@ ) from zenml.logger import get_logger -from constants import ( - DATASET_NAME_ARGILLA, - DATASET_NAME_DISTILABEL, - EMBEDDINGS_MODEL_ID_BASELINE, - EMBEDDINGS_MODEL_ID_FINE_TUNED, - EMBEDDINGS_MODEL_MATRYOSHKA_DIMS, - SECRET_NAME, -) - logger = get_logger(__name__) @@ -71,7 +65,7 @@ def vllm_model_deployer_step( # create a config for the new model service predictor_cfg = VLLMServiceConfig( - pipeline_name= pipeline_name, + pipeline_name=pipeline_name, step_name=step_name, model_name=step_context.model.name, model_version=step_context.model.version, @@ -82,20 +76,19 @@ def vllm_model_deployer_step( ) # create a new model deployment and replace an old one if it exists - svc = model_deployer.deploy_model( + svc = ( + model_deployer.deploy_model( replace=True, config=predictor_cfg, timeout=timeout, service_type=VLLMDeploymentService.SERVICE_TYPE, ), - new_service = cast( - VLLMDeploymentService, - svc - ) + ) + new_service = cast(VLLMDeploymentService, svc) logger.info( f"VLLM deployment service started and reachable at:\n" f" {new_service.prediction_url}\n" ) - return new_service \ No newline at end of file + return new_service diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 31782615..34f99a51 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -230,8 +230,12 @@ def get_es_client() -> Elasticsearch: Elasticsearch: An Elasticsearch client. """ client = Client() - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_api_key" + ] es = Elasticsearch( es_host, @@ -265,12 +269,12 @@ def get_db_conn() -> connection: def get_topn_similar_docs_pgvector( - query_embedding: List[float], - conn: psycopg2.extensions.connection, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + conn: psycopg2.extensions.connection, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the PostgreSQL database. Args: @@ -302,13 +306,14 @@ def get_topn_similar_docs_pgvector( return cur.fetchall() + def get_topn_similar_docs_elasticsearch( - query_embedding: List[float], - es_client: Elasticsearch, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + es_client: Elasticsearch, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the Elasticsearch index. Args: @@ -319,7 +324,7 @@ def get_topn_similar_docs_elasticsearch( only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. """ index_name = "zenml_docs" - + if only_urls: source = ["url"] elif include_metadata: @@ -334,36 +339,42 @@ def get_topn_similar_docs_elasticsearch( "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", - "params": {"query_vector": query_embedding} - } + "params": {"query_vector": query_embedding}, + }, } }, - "size": n + "size": n, } # response = es_client.search(index=index_name, body=query) - response = es_client.search(index=index_name, knn={ - "field": "embedding", - "query_vector": query_embedding, - "num_candidates": 50, - "k": n - }) + response = es_client.search( + index=index_name, + knn={ + "field": "embedding", + "query_vector": query_embedding, + "num_candidates": 50, + "k": n, + }, + ) results = [] - for hit in response['hits']['hits']: + for hit in response["hits"]["hits"]: if only_urls: - results.append((hit['_source']['url'],)) + results.append((hit["_source"]["url"],)) elif include_metadata: - results.append(( - hit['_source']['content'], - hit['_source']['url'], - hit['_source']['parent_section'] - )) + results.append( + ( + hit["_source"]["content"], + hit["_source"]["url"], + hit["_source"]["parent_section"], + ) + ) else: - results.append((hit['_source']['content'],)) + results.append((hit["_source"]["content"],)) return results + def get_topn_similar_docs( query_embedding: List[float], conn: psycopg2.extensions.connection = None, @@ -387,12 +398,17 @@ def get_topn_similar_docs( """ if conn is None and es_client is None: raise ValueError("Either conn or es_client must be provided") - + if conn is not None: - return get_topn_similar_docs_pgvector(query_embedding, conn, n, include_metadata, only_urls) - + return get_topn_similar_docs_pgvector( + query_embedding, conn, n, include_metadata, only_urls + ) + if es_client is not None: - return get_topn_similar_docs_elasticsearch(query_embedding, es_client, n, include_metadata, only_urls) + return get_topn_similar_docs_elasticsearch( + query_embedding, es_client, n, include_metadata, only_urls + ) + def get_completion_from_messages( messages, model=OPENAI_MODEL, temperature=0.4, max_tokens=1000 @@ -431,6 +447,7 @@ def get_embeddings(text): model = SentenceTransformer(EMBEDDINGS_MODEL) return model.encode(text) + def find_vectorstore_name() -> str: """Finds the name of the vector store used for the given embeddings model. @@ -438,8 +455,11 @@ def find_vectorstore_name() -> str: str: The name of the vector store. """ from zenml.client import Client + client = Client() - model = client.get_model_version(ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev") + model = client.get_model_version( + ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev" + ) return model.run_metadata["vector_store"]["name"] diff --git a/llm-complete-guide/utils/openai_utils.py b/llm-complete-guide/utils/openai_utils.py index 9f5e8ac8..15b84cc5 100644 --- a/llm-complete-guide/utils/openai_utils.py +++ b/llm-complete-guide/utils/openai_utils.py @@ -5,5 +5,4 @@ def get_openai_api_key() -> str: api_key = Client().get_secret(SECRET_NAME).secret_values["openai_api_key"] - return api_key