From 682f598f54b90187345e7010b9842badc2bebf6c Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Mon, 20 Jan 2025 06:19:28 +0000 Subject: [PATCH 01/11] added validation for mlmd_push api --- server/app/main.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/app/main.py b/server/app/main.py index 9fe72a03..0893d67a 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -91,10 +91,16 @@ async def read_root(request: Request): async def mlmd_push(info: Request): print("mlmd push started") print("......................") + # Check if the body is empty + if not await info.body(): + return {"error": "Request body is missing or empty"} req_info = await info.json() + # Check if "pipeline_name" and "json_payload" are present and not empty + if not req_info.get("pipeline_name"): + return {"error": "Pipeline name is missing"} + if not req_info.get("json_payload"): + return {"error": "JSON payload is missing"} pipeline_name = req_info["pipeline_name"] - if not pipeline_name: - return {"error": "Pipeline name is required"} if pipeline_name not in pipeline_locks: # create lock object for pipeline if it doesn't exists in lock pipeline_locks[pipeline_name] = asyncio.Lock() pipeline_lock = pipeline_locks[pipeline_name] From 132f133761b72f63b1090af3f34f335c301e02d8 Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Wed, 22 Jan 2025 05:51:32 +0000 Subject: [PATCH 02/11] shifted unsued rest apis to bottom of file --- server/app/main.py | 82 +++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/server/app/main.py b/server/app/main.py index 0893d67a..fa31412b 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -173,27 +173,6 @@ async def executions( else: return -@app.get("/artifact-lineage/force-directed-graph/{pipeline_name}") -async def artifact_lineage(request: Request, pipeline_name: str): - ''' - This api returns dictionary of nodes and links for given pipeline. - response = { - nodes: [{id:"",name:""}], - links: [{source:1,target:4},{}], - } - - ''' - # checks if mlmd file exists on server - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response=await async_api(get_lineage_data, server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) - return response - else: - return f"Pipeline name {pipeline_name} doesn't exist." - - else: - return None @app.get("/list-of-executions/{pipeline_name}") async def list_of_executions(request: Request, pipeline_name: str): @@ -213,23 +192,6 @@ async def list_of_executions(request: Request, pipeline_name: str): else: return None -@app.get("/execution-lineage/force-directed-graph/{pipeline_name}/{uuid}") -async def execution_lineage(request: Request, pipeline_name: str, uuid: str): - ''' - returns dictionary of nodes and links for given execution_type. - response = { - nodes: [{id:"",name:"",execution_uuid:""}], - links: [{source:1,target:4},{}], - } - ''' - # checks if mlmd file exists on server - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response = await async_api(query_execution_lineage_d3force, server_store_path, pipeline_name, dict_of_exe_ids, uuid) - else: - response = None - return response @app.get("/execution-lineage/tangled-tree/{uuid}/{pipeline_name}") async def execution_lineage(request: Request,uuid, pipeline_name: str): @@ -418,3 +380,47 @@ async def update_global_exe_dict(pipeline_name): # type(dict_of_exe_ids[pipeline_name]) = dict_of_exe_ids[pipeline_name] = output_dict[pipeline_name] return + + +# This API is no longer in use within the project but is retained for reference or potential future use. +@app.get("/execution-lineage/force-directed-graph/{pipeline_name}/{uuid}") +async def execution_lineage(request: Request, pipeline_name: str, uuid: str): + ''' + returns dictionary of nodes and links for given execution_type. + response = { + nodes: [{id:"",name:"",execution_uuid:""}], + links: [{source:1,target:4},{}], + } + ''' + # checks if mlmd file exists on server + if os.path.exists(server_store_path): + query = cmfquery.CmfQuery(server_store_path) + if (pipeline_name in query.get_pipeline_names()): + response = await async_api(query_execution_lineage_d3force, server_store_path, pipeline_name, dict_of_exe_ids, uuid) + else: + response = None + return response + + +# This API is no longer in use within the project but is retained for reference or potential future use. +@app.get("/artifact-lineage/force-directed-graph/{pipeline_name}") +async def artifact_lineage(request: Request, pipeline_name: str): + ''' + This api returns dictionary of nodes and links for given pipeline. + response = { + nodes: [{id:"",name:""}], + links: [{source:1,target:4},{}], + } + + ''' + # checks if mlmd file exists on server + if os.path.exists(server_store_path): + query = cmfquery.CmfQuery(server_store_path) + if (pipeline_name in query.get_pipeline_names()): + response=await async_api(get_lineage_data, server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) + return response + else: + return f"Pipeline name {pipeline_name} doesn't exist." + + else: + return None \ No newline at end of file From 618bec3d985a21cfb414694b99f030b4c5dd0acf Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Wed, 22 Jan 2025 12:04:01 +0000 Subject: [PATCH 03/11] updated libraries in requirement.txt, setup.py and pyproject.toml --- pyproject.toml | 7 +++++-- requirements.txt | 10 ++++++---- server/requirements.txt | 9 ++------- setup.py | 3 ++- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b923644f..be875908 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,13 +8,16 @@ dependencies = [ "retrying", "pyarrow", "neo4j", - "scikit-learn", "tabulate", "click", "minio", "paramiko", + "scikit_learn" + "boto3", "scitokens", - "cryptography", + "cryptography", + "ray", + "google", ] authors = [ { name="Hewlett Packard Enterprise"}, diff --git a/requirements.txt b/requirements.txt index 356aefb6..6a12beea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ -ml-metadata==1.11.0 -dvc[s3,ssh]==2.27.0 +ml-metadata==1.15.0 +dvc[s3,ssh]==3.51.1 pandas retrying pyarrow @@ -10,5 +10,7 @@ minio paramiko scikit_learn boto3 -textwrap -typing +scitokens +cryptography +ray +google \ No newline at end of file diff --git a/server/requirements.txt b/server/requirements.txt index 345556fd..0443d3e5 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -1,11 +1,6 @@ fastapi -fastapi-pagination uvicorn pydantic -minio -dvc[s3,ssh]==2.27.0 -Jinja2 -matplotlib pandas -pygraphviz==1.10 -python-multipart +dvc[s3,ssh]==3.51.1 +python-multipart \ No newline at end of file diff --git a/setup.py b/setup.py index cc747bb9..e26692de 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,8 @@ packages=find_packages(), install_requires=["ml-metadata==1.15.0", "dvc[ssh,s3]==3.51.1", "pandas", "retrying", "pyarrow", "neo4j", \ - "scikit-learn", "tabulate", "click", "minio", "paramiko", "scitokens", "cryptography"], # add any additional packages that + "tabulate", "click", "minio", "paramiko", "scikit_learn", "boto3", "scitokens", "cryptography", \ + "ray", "google"], # add any additional packages that # needs to be installed along with your package. Eg: 'caer' keywords=['python', 'first package'], From 5f9a5d3544459e98dc44b93a47b96123c2d0085a Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Thu, 23 Jan 2025 13:14:08 +0000 Subject: [PATCH 04/11] added pydantic validation to mlmd_push rest api --- server/app/main.py | 18 +++++------------- server/app/schemas/dataframe.py | 17 +++++++++++++++-- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/server/app/main.py b/server/app/main.py index fa31412b..6431b6d3 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -32,6 +32,7 @@ from pathlib import Path import os import json +from server.app.schemas.dataframe import MLMDPushRequest server_store_path = "/cmf-server/data/mlmd" @@ -88,26 +89,17 @@ async def read_root(request: Request): # api to post mlmd file to cmf-server @app.post("/mlmd_push") -async def mlmd_push(info: Request): +async def mlmd_push(info: MLMDPushRequest): print("mlmd push started") print("......................") - # Check if the body is empty - if not await info.body(): - return {"error": "Request body is missing or empty"} - req_info = await info.json() - # Check if "pipeline_name" and "json_payload" are present and not empty - if not req_info.get("pipeline_name"): - return {"error": "Pipeline name is missing"} - if not req_info.get("json_payload"): - return {"error": "JSON payload is missing"} - pipeline_name = req_info["pipeline_name"] + pipeline_name = info.pipeline_name if pipeline_name not in pipeline_locks: # create lock object for pipeline if it doesn't exists in lock pipeline_locks[pipeline_name] = asyncio.Lock() pipeline_lock = pipeline_locks[pipeline_name] lock_counts[pipeline_name] += 1 # increment lock count by 1 if pipeline going to enter inside lock section async with pipeline_lock: try: - status = await async_api(create_unique_executions, server_store_path, req_info) + status = await async_api(create_unique_executions, server_store_path, info.model_dump()) if status == "version_update": # Raise an HTTPException with status code 422 raise HTTPException(status_code=422, detail="version_update") @@ -120,7 +112,7 @@ async def mlmd_push(info: Request): if lock_counts[pipeline_name] == 0: #if lock_counts of pipeline is zero means lock is release from it del pipeline_locks[pipeline_name] # Remove the lock if it's no longer needed del lock_counts[pipeline_name] - return {"status": status, "data": req_info} + return {"status": status, "data": info.model_dump()} # api to get mlmd file from cmf-server @app.get("/mlmd_pull/{pipeline_name}", response_class=HTMLResponse) diff --git a/server/app/schemas/dataframe.py b/server/app/schemas/dataframe.py index e58ea31a..9b7a78a0 100644 --- a/server/app/schemas/dataframe.py +++ b/server/app/schemas/dataframe.py @@ -1,9 +1,22 @@ -from pydantic import BaseModel, HttpUrl +from pydantic import BaseModel, HttpUrl, Field, model_validator import pandas as pd -from typing import Sequence +from typing import Sequence, Optional class ExecutionDataFrame(BaseModel): context_id: str context_type: str execution: str + +class MLMDPushRequest(BaseModel): + id: Optional[int] = Field(None, description="Optional execution id for the request") + pipeline_name: str = Field(..., min_length=1, description="Name of the pipeline (cannot be empty)") + json_payload: str = Field(..., description="JSON payload for the pipeline (cannot be empty)") + + @model_validator(mode="after") + def validate_fields(cls, values): + if not values.pipeline_name.strip(): + raise ValueError("Pipeline name must not be empty or whitespace") + if not values.json_payload: + raise ValueError("JSON payload must not be empty") + return values \ No newline at end of file From d4de015013f9b246539410a2c36046d3277ab17a Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Thu, 30 Jan 2025 13:15:16 +0000 Subject: [PATCH 05/11] added validations to rest api using pydantic library --- pyproject.toml | 10 +- server/app/main.py | 207 ++++++++++--------- server/app/query_execution_lineage_d3tree.py | 4 +- server/app/schemas/dataframe.py | 24 ++- 4 files changed, 138 insertions(+), 107 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index be875908..8a1fab4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "click", "minio", "paramiko", - "scikit_learn" + "scikit_learn", "boto3", "scitokens", "cryptography", @@ -20,16 +20,16 @@ dependencies = [ "google", ] authors = [ - { name="Hewlett Packard Enterprise"}, + { name = "Hewlett Packard Enterprise"} ] description = "Track metadata for AI pipeline" readme = "README.md" requires-python = ">=3.9,<3.11" classifiers = [ "Programming Language :: Python :: 3", - "Operating System :: POSIX :: Linux", + "Operating System :: POSIX :: Linux" ] [project.urls] -"Homepage" = "https://github.com/HewlettPackard/cmf" -"Bug Tracker" = "https://github.com/HewlettPackard/cmf/issues" +Homepage = "https://github.com/HewlettPackard/cmf" +BugTracker = "https://github.com/HewlettPackard/cmf/issues" diff --git a/server/app/main.py b/server/app/main.py index 6431b6d3..4071d4e7 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -1,5 +1,5 @@ # cmf-server api's -from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File +from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles @@ -32,7 +32,7 @@ from pathlib import Path import os import json -from server.app.schemas.dataframe import MLMDPushRequest +from server.app.schemas.dataframe import MLMDPushRequest, MLMDRequest, ExecutionsQueryParams, ArtifactsQueryParams server_store_path = "/cmf-server/data/mlmd" @@ -116,16 +116,13 @@ async def mlmd_push(info: MLMDPushRequest): # api to get mlmd file from cmf-server @app.get("/mlmd_pull/{pipeline_name}", response_class=HTMLResponse) -async def mlmd_pull(info: Request, pipeline_name: str): +async def mlmd_pull(pipeline_name: str, request: MLMDRequest): # checks if mlmd file exists on server - req_info = await info.json() - if os.path.exists(server_store_path): - #json_payload values can be json data, NULL or no_exec_id. - json_payload= await async_api(get_mlmd_from_server, server_store_path, pipeline_name, req_info['exec_id']) - else: - raise HTTPException(status_code=413, detail=f"mlmd file not available on cmf-server.") - if json_payload == None: - raise HTTPException(status_code=406, detail=f"Pipeline {pipeline_name} not found.") + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + #json_payload values can be json data, NULL or no_exec_id. + json_payload = await async_api(get_mlmd_from_server, server_store_path, pipeline_name, request.exec_id) return json_payload # api to display executions available in mlmd @@ -133,37 +130,44 @@ async def mlmd_pull(info: Request, pipeline_name: str): async def executions( request: Request, pipeline_name: str, - page: int = Query(1, description="Page number", gt=0), - per_page: int = Query(5, description="Items per page", le=100), - sort_field: str = Query("Context_Type", description="Column to sort by"), - sort_order: str = Query("asc", description="Sort order (asc or desc)"), - filter_by: str = Query(None, description="Filter by column"), - filter_value: str = Query(None, description="Filter value"), + query_params: ExecutionsQueryParams = Depends() ): + page = query_params.page + per_page = query_params.per_page + sort_field = query_params.sort_field + sort_order = query_params.sort_order + filter_by = query_params.filter_by + filter_value = query_params.filter_value # checks if mlmd file exists on server - if os.path.exists(server_store_path) and pipeline_name in dict_of_exe_ids: - exe_ids_initial = dict_of_exe_ids[pipeline_name] - # Apply filtering if provided - if filter_by and filter_value: - exe_ids_initial = exe_ids_initial[exe_ids_initial[filter_by].str.contains(filter_value, case=False)] - # Apply sorting if provided - exe_ids_sorted = exe_ids_initial.sort_values(by=sort_field, ascending=(sort_order == "asc")) - exe_ids = exe_ids_sorted['id'].tolist() - total_items = len(exe_ids) - start_idx = (page - 1) * per_page - end_idx = start_idx + per_page - if total_items < end_idx: - end_idx = total_items - exe_ids_list = exe_ids[start_idx:end_idx] - executions_df = await async_api(get_executions, server_store_path, pipeline_name, exe_ids_list) - temp = executions_df.to_json(orient="records") - executions_parsed = json.loads(temp) - return { - "total_items": total_items, - "items": executions_parsed - } + await check_mlmd_file_exists() + if pipeline_name in dict_of_exe_ids: + try: + exe_ids_initial = dict_of_exe_ids[pipeline_name] + # Apply filtering if provided + if filter_by and filter_value: + exe_ids_initial = exe_ids_initial[exe_ids_initial[filter_by].str.contains(filter_value, case=False)] + # Apply sorting if provided + exe_ids_sorted = exe_ids_initial.sort_values(by=sort_field, ascending=(sort_order == "asc")) + exe_ids = exe_ids_sorted['id'].tolist() + total_items = len(exe_ids) + start_idx = (page - 1) * per_page + end_idx = start_idx + per_page + if total_items < end_idx: + end_idx = total_items + exe_ids_list = exe_ids[start_idx:end_idx] + executions_df = await async_api(get_executions, server_store_path, pipeline_name, exe_ids_list) + temp = executions_df.to_json(orient="records") + executions_parsed = json.loads(temp) + return { + "total_items": total_items, + "items": executions_parsed + } + except Exception as e: + print(f"An error occurred: {str(e)}") + return {"error": f"Failed to get executions available in mlmd: {e}"} else: - return + print(f"Pipeline {pipeline_name} not found.") + raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_name} not found.") @app.get("/list-of-executions/{pipeline_name}") @@ -173,16 +177,11 @@ async def list_of_executions(request: Request, pipeline_name: str): ''' # checks if mlmd file exists on server - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response = await async_api(get_lineage_data, server_store_path,pipeline_name,"Execution",dict_of_art_ids,dict_of_exe_ids) - return response - else: - return f"Pipeline name {pipeline_name} doesn't exist." - - else: - return None + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + response = await async_api(get_lineage_data, server_store_path,pipeline_name,"Execution",dict_of_art_ids,dict_of_exe_ids) + return response @app.get("/execution-lineage/tangled-tree/{uuid}/{pipeline_name}") @@ -195,29 +194,33 @@ async def execution_lineage(request: Request,uuid, pipeline_name: str): } ''' # checks if mlmd file exists on server - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response = await async_api(query_execution_lineage_d3tree, server_store_path, pipeline_name, dict_of_exe_ids,uuid) + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + response = await async_api(query_execution_lineage_d3tree, server_store_path, pipeline_name, dict_of_exe_ids,uuid) return response + # api to display artifacts available in mlmd @app.get("/artifacts/{pipeline_name}/{type}") async def artifacts( - request: Request, pipeline_name: str, type: str, # type = artifact type - page: int = Query(1, description="Page number", gt=0), - per_page: int = Query(5, description="Items per page", le=100), - sort_field: str = Query("name", description="Column to sort by"), - sort_order: str = Query("asc", description="Sort order (asc or desc)"), - filter_by: str = Query(None, description="Filter by column"), - filter_value: str = Query(None, description="Filter value"), + query_params: ArtifactsQueryParams = Depends() ): + page = query_params.page + per_page = query_params.per_page + sort_field = query_params.sort_field + sort_order = query_params.sort_order + filter_by = query_params.filter_by + filter_value = query_params.filter_value art_ids_dict = {} art_type = type # checks if mlmd file exists on server - if os.path.exists(server_store_path): + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + try: art_ids_dict = dict_of_art_ids[pipeline_name] if not art_ids_dict: return { #return {items: None} so that GUI loads @@ -256,12 +259,9 @@ async def artifacts( "total_items": total_items, "items": data_paginated } - else: - print(f"{server_store_path} file doesn't exist.") - return { - "total_items": 0, - "items": None - } + except Exception as e: + print(f"An error occurred: {e}") + return {"error": f"Failed to get artifacts available in mlmd: {e}"} @app.get("/artifact-lineage/tangled-tree/{pipeline_name}") async def artifact_lineage(request: Request, pipeline_name: str) -> List[List[Dict[str, Any]]]: @@ -274,23 +274,20 @@ async def artifact_lineage(request: Request, pipeline_name: str) -> List[List[Di ] ''' # checks if mlmd file exists on server - response = None - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response = await async_api(query_artifact_lineage_d3tree, server_store_path, pipeline_name, dict_of_art_ids) + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + response = await async_api(query_artifact_lineage_d3tree, server_store_path, pipeline_name, dict_of_art_ids) return response + #This api's returns list of artifact types. @app.get("/artifact_types") -async def artifact_types(request: Request): +async def artifact_types(): # checks if mlmd file exists on server - if os.path.exists(server_store_path): - artifact_types = await async_api(get_artifact_types, server_store_path) - return artifact_types - else: - artifact_types = "" - return + await check_mlmd_file_exists() + artifact_types = await async_api(get_artifact_types, server_store_path) + return artifact_types @app.get("/pipelines") @@ -330,31 +327,30 @@ async def model_card(request:Request, modelId: int, response_model=List[Dict[str model_input_art_df = pd.DataFrame() model_output_art_df = pd.DataFrame() # checks if mlmd file exists on server - if os.path.exists(server_store_path): - model_data_df, model_exe_df, model_input_art_df, model_output_art_df = await get_model_data(server_store_path, modelId) - if not model_data_df.empty: - result_1 = model_data_df.to_json(orient="records") - json_payload_1 = json.loads(result_1) - if not model_exe_df.empty: - result_2 = model_exe_df.to_json(orient="records") - json_payload_2 = json.loads(result_2) - if not model_input_art_df.empty: - result_3 = model_input_art_df.to_json(orient="records") - json_payload_3 = json.loads(result_3) - if not model_output_art_df.empty: - result_4 = model_output_art_df.to_json(orient="records") - json_payload_4 = json.loads(result_4) + await check_mlmd_file_exists() + model_data_df, model_exe_df, model_input_art_df, model_output_art_df = await get_model_data(server_store_path, modelId) + if not model_data_df.empty: + result_1 = model_data_df.to_json(orient="records") + json_payload_1 = json.loads(result_1) + if not model_exe_df.empty: + result_2 = model_exe_df.to_json(orient="records") + json_payload_2 = json.loads(result_2) + if not model_input_art_df.empty: + result_3 = model_input_art_df.to_json(orient="records") + json_payload_3 = json.loads(result_3) + if not model_output_art_df.empty: + result_4 = model_output_art_df.to_json(orient="records") + json_payload_4 = json.loads(result_4) return [json_payload_1, json_payload_2, json_payload_3, json_payload_4] @app.get("/artifact-execution-lineage/tangled-tree/{pipeline_name}") async def artifact_execution_lineage(request: Request, pipeline_name: str): - # checks if mlmd file exists on server - response = None - if os.path.exists(server_store_path): - query = cmfquery.CmfQuery(server_store_path) - if (pipeline_name in query.get_pipeline_names()): - response = await query_visualization_artifact_execution(server_store_path, pipeline_name, dict_of_art_ids, dict_of_exe_ids) + # checks if mlmd file exists on server + await check_mlmd_file_exists() + # checks if pipeline exists + await check_pipeline_exists(pipeline_name) + response = await query_visualization_artifact_execution(server_store_path, pipeline_name, dict_of_art_ids, dict_of_exe_ids) return response @@ -373,6 +369,19 @@ async def update_global_exe_dict(pipeline_name): dict_of_exe_ids[pipeline_name] = output_dict[pipeline_name] return +# Function to checks if mlmd file exists on server +async def check_mlmd_file_exists(): + if not os.path.exists(server_store_path): + print(f"{server_store_path} file doesn't exist.") + raise HTTPException(status_code=404, detail=f"{server_store_path} file doesn't exist.") + +# Function to check if the pipeline exists +async def check_pipeline_exists(pipeline_name): + query = cmfquery.CmfQuery(server_store_path) + if pipeline_name not in query.get_pipeline_names(): + print(f"Pipeline {pipeline_name} not found.") + raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_name} not found.") + # This API is no longer in use within the project but is retained for reference or potential future use. @app.get("/execution-lineage/force-directed-graph/{pipeline_name}/{uuid}") diff --git a/server/app/query_execution_lineage_d3tree.py b/server/app/query_execution_lineage_d3tree.py index 94168ef9..5ce7aec5 100644 --- a/server/app/query_execution_lineage_d3tree.py +++ b/server/app/query_execution_lineage_d3tree.py @@ -37,12 +37,12 @@ def query_execution_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_e parents_set = set() queue = UniqueQueue() df = pd.DataFrame() - - parents = query.get_one_hop_parent_executions_ids(execution_id, pipeline_id) #list of parent execution ids dict_parents = {} if parents == None: parents = [] + if not execution_id: + return {"error": f"No execution lineage is available for pipeline {pipeline_name} with uuid {uuid}"} dict_parents[execution_id[0]] = list(set(parents)) # [2] = [1,2,3,4] list of parent id parents_set.add(execution_id[0]) #created so that we can directly find execuions using execution ids for i in set(parents): diff --git a/server/app/schemas/dataframe.py b/server/app/schemas/dataframe.py index 9b7a78a0..01d20267 100644 --- a/server/app/schemas/dataframe.py +++ b/server/app/schemas/dataframe.py @@ -19,4 +19,26 @@ def validate_fields(cls, values): raise ValueError("Pipeline name must not be empty or whitespace") if not values.json_payload: raise ValueError("JSON payload must not be empty") - return values \ No newline at end of file + return values + +class MLMDRequest(BaseModel): + exec_id: int | None = Field( + ..., + description="Execution ID must be an integer. It is a required field but can be null." + ) + +class ExecutionsQueryParams(BaseModel): + page: int = Field(1, gt=0, description="Page number") # Page must be > 0 + per_page: int = Field(5, le=100, description="Items per page, max 100") # Limit per page to max 100 + sort_field: str = Field("Context_Type", description="Column to sort by") + sort_order: str = Field("asc", description="Sort order (asc or desc)") + filter_by: Optional[str] = Field(None, description="Filter by column") + filter_value: Optional[str] = Field(None, description="Filter value") + +class ArtifactsQueryParams(BaseModel): + page: int = Field(1, gt=0, description="Page number") # Page must be > 0 + per_page: int = Field(5, le=100, description="Items per page, max 100") # Limit per page to max 100 + sort_field: str = Field("name", description="Column to sort by") + sort_order: str = Field("asc", description="Sort order (asc or desc)") + filter_by: Optional[str] = Field(None, description="Filter by column") + filter_value: Optional[str] = Field(None, description="Filter value") \ No newline at end of file From 9c12ae597e952ca69c1b0cdde18c580132a0de3d Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Fri, 31 Jan 2025 13:14:48 +0000 Subject: [PATCH 06/11] added missing validations and comments for explanation --- pyproject.toml | 2 +- server/app/get_data.py | 20 ++++++++++---- server/app/main.py | 39 ++++++++++++++++++-------- server/app/schemas/dataframe.py | 49 +++++++++++++++++++-------------- 4 files changed, 71 insertions(+), 39 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8a1fab4a..b6cb674f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "scitokens", "cryptography", "ray", - "google", + "google" ] authors = [ { name = "Hewlett Packard Enterprise"} diff --git a/server/app/get_data.py b/server/app/get_data.py index 20e7d649..7433b8be 100644 --- a/server/app/get_data.py +++ b/server/app/get_data.py @@ -218,16 +218,24 @@ def create_unique_executions(server_store_path, req_info) -> str: Args: server_store_path = mlmd file path on server Returns: - Status of parse_json_to_mlmd - "exists": if execution already exists on cmf-server - "success": execution pushed successfully on cmf-server + str: A status message indicating the result of the operation: + - "exists": Execution already exists on the CMF server. + - "success": Execution successfully pushed to the CMF server. + - "invalid_json_payload": If the JSON payload is invalid or incorrectly formatted. + - "pipeline_not_exist": If the provided pipeline name does not match the one in the payload. """ mlmd_data = json.loads(req_info["json_payload"]) - pipelines = mlmd_data["Pipeline"] + # Ensure the pipeline name in req_info matches the one in the JSON payload to maintain data integrity + pipelines = mlmd_data.get("Pipeline", []) # Extract "Pipeline" list, default to empty list if missing + if not pipelines: + return "invalid_json_payload" # No pipelines found in payload pipeline = pipelines[0] - pipeline_name = pipeline["name"] + pipeline_name = pipeline.get("name") # Extract pipeline name, use .get() to avoid KeyError if not pipeline_name: - return {"error": "Pipeline name is required"} + return "invalid_json_payload" # Missing pipeline name + req_pipeline_name = req_info["pipeline_name"] + if req_pipeline_name != pipeline_name: + return "pipeline_not_exist" # Mismatch between provided pipeline name and payload executions_server = [] list_executions_exists = [] if os.path.exists(server_store_path): diff --git a/server/app/main.py b/server/app/main.py index 4071d4e7..daf966b7 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -32,7 +32,7 @@ from pathlib import Path import os import json -from server.app.schemas.dataframe import MLMDPushRequest, MLMDRequest, ExecutionsQueryParams, ArtifactsQueryParams +from server.app.schemas.dataframe import MLMDPushRequest, MLMDPullRequest, ExecutionsRequest, ArtifactsRequest server_store_path = "/cmf-server/data/mlmd" @@ -87,19 +87,27 @@ async def lifespan(app: FastAPI): async def read_root(request: Request): return {"cmf-server"} + # api to post mlmd file to cmf-server @app.post("/mlmd_push") async def mlmd_push(info: MLMDPushRequest): print("mlmd push started") print("......................") - pipeline_name = info.pipeline_name + req_info = info.model_dump() # Serializing the input data into a dictionary using model_dump() + pipeline_name = req_info["pipeline_name"] if pipeline_name not in pipeline_locks: # create lock object for pipeline if it doesn't exists in lock pipeline_locks[pipeline_name] = asyncio.Lock() pipeline_lock = pipeline_locks[pipeline_name] lock_counts[pipeline_name] += 1 # increment lock count by 1 if pipeline going to enter inside lock section async with pipeline_lock: try: - status = await async_api(create_unique_executions, server_store_path, info.model_dump()) + status = await async_api(create_unique_executions, server_store_path, req_info) + if status == "invalid_json_payload": + # Invalid JSON payload, return 400 Bad Request + raise HTTPException(status_code=400, detail="Invalid JSON payload. The pipeline name is missing.") + if status == "pipeline_not_exist": + # Pipeline name does not exist in the server, return 404 Not Found + raise HTTPException(status_code=404, detail=f"Pipeline name '{pipeline_name}' does not exist.") if status == "version_update": # Raise an HTTPException with status code 422 raise HTTPException(status_code=422, detail="version_update") @@ -112,11 +120,12 @@ async def mlmd_push(info: MLMDPushRequest): if lock_counts[pipeline_name] == 0: #if lock_counts of pipeline is zero means lock is release from it del pipeline_locks[pipeline_name] # Remove the lock if it's no longer needed del lock_counts[pipeline_name] - return {"status": status, "data": info.model_dump()} + return {"status": status} + # api to get mlmd file from cmf-server @app.get("/mlmd_pull/{pipeline_name}", response_class=HTMLResponse) -async def mlmd_pull(pipeline_name: str, request: MLMDRequest): +async def mlmd_pull(pipeline_name: str, request: MLMDPullRequest): # checks if mlmd file exists on server await check_mlmd_file_exists() # checks if pipeline exists @@ -125,13 +134,15 @@ async def mlmd_pull(pipeline_name: str, request: MLMDRequest): json_payload = await async_api(get_mlmd_from_server, server_store_path, pipeline_name, request.exec_id) return json_payload + # api to display executions available in mlmd @app.get("/executions/{pipeline_name}") async def executions( request: Request, pipeline_name: str, - query_params: ExecutionsQueryParams = Depends() + query_params: ExecutionsRequest = Depends() ): + # Extract the query parameters from the query_params object page = query_params.page per_page = query_params.per_page sort_field = query_params.sort_field @@ -185,7 +196,7 @@ async def list_of_executions(request: Request, pipeline_name: str): @app.get("/execution-lineage/tangled-tree/{uuid}/{pipeline_name}") -async def execution_lineage(request: Request,uuid, pipeline_name: str): +async def execution_lineage(request: Request, uuid: str, pipeline_name: str): ''' returns dictionary of nodes and links for given execution_type. response = { @@ -204,10 +215,12 @@ async def execution_lineage(request: Request,uuid, pipeline_name: str): # api to display artifacts available in mlmd @app.get("/artifacts/{pipeline_name}/{type}") async def artifacts( + request: Request, pipeline_name: str, type: str, # type = artifact type - query_params: ArtifactsQueryParams = Depends() + query_params: ArtifactsRequest = Depends() ): + # Extract the query parameters from the query_params object page = query_params.page per_page = query_params.per_page sort_field = query_params.sort_field @@ -263,6 +276,7 @@ async def artifacts( print(f"An error occurred: {e}") return {"error": f"Failed to get artifacts available in mlmd: {e}"} + @app.get("/artifact-lineage/tangled-tree/{pipeline_name}") async def artifact_lineage(request: Request, pipeline_name: str) -> List[List[Dict[str, Any]]]: ''' @@ -369,12 +383,14 @@ async def update_global_exe_dict(pipeline_name): dict_of_exe_ids[pipeline_name] = output_dict[pipeline_name] return + # Function to checks if mlmd file exists on server async def check_mlmd_file_exists(): if not os.path.exists(server_store_path): print(f"{server_store_path} file doesn't exist.") raise HTTPException(status_code=404, detail=f"{server_store_path} file doesn't exist.") + # Function to check if the pipeline exists async def check_pipeline_exists(pipeline_name): query = cmfquery.CmfQuery(server_store_path) @@ -383,7 +399,8 @@ async def check_pipeline_exists(pipeline_name): raise HTTPException(status_code=404, detail=f"Pipeline {pipeline_name} not found.") -# This API is no longer in use within the project but is retained for reference or potential future use. +""" +This API is no longer in use within the project but is retained for reference or potential future use. @app.get("/execution-lineage/force-directed-graph/{pipeline_name}/{uuid}") async def execution_lineage(request: Request, pipeline_name: str, uuid: str): ''' @@ -403,7 +420,6 @@ async def execution_lineage(request: Request, pipeline_name: str, uuid: str): return response -# This API is no longer in use within the project but is retained for reference or potential future use. @app.get("/artifact-lineage/force-directed-graph/{pipeline_name}") async def artifact_lineage(request: Request, pipeline_name: str): ''' @@ -424,4 +440,5 @@ async def artifact_lineage(request: Request, pipeline_name: str): return f"Pipeline name {pipeline_name} doesn't exist." else: - return None \ No newline at end of file + return None +""" \ No newline at end of file diff --git a/server/app/schemas/dataframe.py b/server/app/schemas/dataframe.py index 01d20267..94876169 100644 --- a/server/app/schemas/dataframe.py +++ b/server/app/schemas/dataframe.py @@ -1,44 +1,51 @@ -from pydantic import BaseModel, HttpUrl, Field, model_validator -import pandas as pd -from typing import Sequence, Optional +from pydantic import BaseModel, Field, model_validator +from typing import Optional +import json -class ExecutionDataFrame(BaseModel): - context_id: str - context_type: str - execution: str - -class MLMDPushRequest(BaseModel): +# Pydantic model for the request body in the MLMD push API. +class MLMDPushRequest(BaseModel): + # ... indicates required field id: Optional[int] = Field(None, description="Optional execution id for the request") - pipeline_name: str = Field(..., min_length=1, description="Name of the pipeline (cannot be empty)") - json_payload: str = Field(..., description="JSON payload for the pipeline (cannot be empty)") + pipeline_name: str = Field(..., min_length=1, description="Name of the pipeline") + json_payload: str = Field(..., description="JSON payload for the pipeline") + # Custom validation for pipeline name and JSON payload @model_validator(mode="after") def validate_fields(cls, values): if not values.pipeline_name.strip(): raise ValueError("Pipeline name must not be empty or whitespace") if not values.json_payload: raise ValueError("JSON payload must not be empty") + # Attempt to parse the JSON payload to ensure it is valid JSON + try: + json.loads(values.json_payload) # Try to load the JSON string + except json.JSONDecodeError: + raise ValueError("JSON payload is not valid JSON") # Raise error if invalid JSON + return values + -class MLMDRequest(BaseModel): +# Pydantic model for the request body in the MLMD pull API. +class MLMDPullRequest(BaseModel): + # The execution ID is required, but it can be None if no specific execution is needed exec_id: int | None = Field( ..., description="Execution ID must be an integer. It is a required field but can be null." ) -class ExecutionsQueryParams(BaseModel): +# Base query parameters for pagination, sorting, and filtering. +class BaseRequest(BaseModel): page: int = Field(1, gt=0, description="Page number") # Page must be > 0 per_page: int = Field(5, le=100, description="Items per page, max 100") # Limit per page to max 100 - sort_field: str = Field("Context_Type", description="Column to sort by") sort_order: str = Field("asc", description="Sort order (asc or desc)") filter_by: Optional[str] = Field(None, description="Filter by column") filter_value: Optional[str] = Field(None, description="Filter value") -class ArtifactsQueryParams(BaseModel): - page: int = Field(1, gt=0, description="Page number") # Page must be > 0 - per_page: int = Field(5, le=100, description="Items per page, max 100") # Limit per page to max 100 - sort_field: str = Field("name", description="Column to sort by") - sort_order: str = Field("asc", description="Sort order (asc or desc)") - filter_by: Optional[str] = Field(None, description="Filter by column") - filter_value: Optional[str] = Field(None, description="Filter value") \ No newline at end of file +# Query parameters for execution. +class ExecutionsRequest(BaseRequest): + sort_field: str = Field("Context_Type", description="Column to sort by (default: Context_Type)") + +# Query parameters for artifact. +class ArtifactsRequest(BaseRequest): + sort_field: str = Field("name", description="Column to sort by (default: name)") \ No newline at end of file From df7415b4f7acd135a0128e6ba96bb3ff3ad0d775 Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Tue, 4 Feb 2025 09:42:04 +0000 Subject: [PATCH 07/11] updated libraries --- pyproject.toml | 3 +-- requirements.txt | 3 +-- setup.py | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b6cb674f..544c57a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,7 @@ dependencies = [ "boto3", "scitokens", "cryptography", - "ray", - "google" + "ray==2.34.0" ] authors = [ { name = "Hewlett Packard Enterprise"} diff --git a/requirements.txt b/requirements.txt index 6a12beea..cf026fd2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,5 +12,4 @@ scikit_learn boto3 scitokens cryptography -ray -google \ No newline at end of file +ray==2.34.0 \ No newline at end of file diff --git a/setup.py b/setup.py index e26692de..1b9fb142 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ install_requires=["ml-metadata==1.15.0", "dvc[ssh,s3]==3.51.1", "pandas", "retrying", "pyarrow", "neo4j", \ "tabulate", "click", "minio", "paramiko", "scikit_learn", "boto3", "scitokens", "cryptography", \ - "ray", "google"], # add any additional packages that + "ray==2.34.0"], # add any additional packages that # needs to be installed along with your package. Eg: 'caer' keywords=['python', 'first package'], From 88551f5f9261ca4828b31864b40b373d0b47df28 Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Tue, 18 Feb 2025 16:19:24 +0530 Subject: [PATCH 08/11] changed request json to query params for mlmd pull --- cmflib/server_interface/server_interface.py | 2 +- server/app/main.py | 7 ++++--- server/app/schemas/dataframe.py | 9 --------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/cmflib/server_interface/server_interface.py b/cmflib/server_interface/server_interface.py index 521d53cb..0b3b281a 100644 --- a/cmflib/server_interface/server_interface.py +++ b/cmflib/server_interface/server_interface.py @@ -29,7 +29,7 @@ def call_mlmd_push(json_payload, url, exec_id, pipeline_name): # This function gets mlmd data from mlmd_pull api from cmf-server def call_mlmd_pull(url, pipeline_name, exec_id): url_to_pass = f"{url}/mlmd_pull/{pipeline_name}" - response = requests.get(url_to_pass, json={"exec_id": exec_id}) # Get request + response = requests.get(url_to_pass, params={"exec_id": exec_id}) # Get request return response diff --git a/server/app/main.py b/server/app/main.py index a8784669..6f61a175 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -32,7 +32,8 @@ from pathlib import Path import os import json -from server.app.schemas.dataframe import MLMDPushRequest, MLMDPullRequest, ExecutionsRequest, ArtifactsRequest +import typing as t +from server.app.schemas.dataframe import MLMDPushRequest, ExecutionsRequest, ArtifactsRequest server_store_path = "/cmf-server/data/mlmd" @@ -125,13 +126,13 @@ async def mlmd_push(info: MLMDPushRequest): # api to get mlmd file from cmf-server @app.get("/mlmd_pull/{pipeline_name}", response_class=HTMLResponse) -async def mlmd_pull(pipeline_name: str, request: MLMDPullRequest): +async def mlmd_pull(pipeline_name: str, exec_id: t.Optional[int]= None): # checks if mlmd file exists on server await check_mlmd_file_exists() # checks if pipeline exists await check_pipeline_exists(pipeline_name) #json_payload values can be json data, NULL or no_exec_id. - json_payload = await async_api(get_mlmd_from_server, server_store_path, pipeline_name, request.exec_id) + json_payload = await async_api(get_mlmd_from_server, server_store_path, pipeline_name, exec_id) return json_payload diff --git a/server/app/schemas/dataframe.py b/server/app/schemas/dataframe.py index 94876169..5227e0a0 100644 --- a/server/app/schemas/dataframe.py +++ b/server/app/schemas/dataframe.py @@ -24,15 +24,6 @@ def validate_fields(cls, values): raise ValueError("JSON payload is not valid JSON") # Raise error if invalid JSON return values - - -# Pydantic model for the request body in the MLMD pull API. -class MLMDPullRequest(BaseModel): - # The execution ID is required, but it can be None if no specific execution is needed - exec_id: int | None = Field( - ..., - description="Execution ID must be an integer. It is a required field but can be null." - ) # Base query parameters for pagination, sorting, and filtering. class BaseRequest(BaseModel): From c0ed27e678f3f13c282d45d93d2c174c9b2ae95c Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Tue, 18 Feb 2025 17:37:50 +0530 Subject: [PATCH 09/11] updated comments --- server/app/query_execution_lineage_d3tree.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/app/query_execution_lineage_d3tree.py b/server/app/query_execution_lineage_d3tree.py index 5ce7aec5..1361fe20 100644 --- a/server/app/query_execution_lineage_d3tree.py +++ b/server/app/query_execution_lineage_d3tree.py @@ -34,6 +34,9 @@ def query_execution_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_e #finding execution_id by comparing Execution_uuid (d09fdb26-0e9d-11ef-944f-4bf54f5aca7f) and uuid ('Prepare_u3tr') result = df[df['Execution_uuid'].str[:4] == uuid] #result = df[id: "1","Execution_type_name", "Execution_uuid"] execution_id=result["id"].tolist() + # Return error if no execution ID is found for the given uuid + if not execution_id: + return {"error": f"uuid '{uuid}' does not match any execution in pipeline '{pipeline_name}'"} parents_set = set() queue = UniqueQueue() df = pd.DataFrame() @@ -41,8 +44,6 @@ def query_execution_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_e dict_parents = {} if parents == None: parents = [] - if not execution_id: - return {"error": f"No execution lineage is available for pipeline {pipeline_name} with uuid {uuid}"} dict_parents[execution_id[0]] = list(set(parents)) # [2] = [1,2,3,4] list of parent id parents_set.add(execution_id[0]) #created so that we can directly find execuions using execution ids for i in set(parents): From d4f1e84695de4407e2ae566aab90f03eb673d86b Mon Sep 17 00:00:00 2001 From: Jaychaware Date: Thu, 20 Feb 2025 16:22:01 +0530 Subject: [PATCH 10/11] minor changes --- server/app/main.py | 10 +++++----- server/app/schemas/dataframe.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/app/main.py b/server/app/main.py index d164198a..06ce179f 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -33,7 +33,7 @@ import os import json import typing as t -from server.app.schemas.dataframe import MLMDPushRequest, ExecutionsRequest, ArtifactsRequest +from server.app.schemas.dataframe import MLMDPushRequest, ExecutionRequest, ArtifactRequest server_store_path = "/cmf-server/data/mlmd" @@ -143,7 +143,7 @@ async def mlmd_pull(pipeline_name: str, exec_uuid: t.Optional[str]= None): async def executions( request: Request, pipeline_name: str, - query_params: ExecutionsRequest = Depends() + query_params: ExecutionRequest = Depends() ): # Extract the query parameters from the query_params object page = query_params.page @@ -194,7 +194,7 @@ async def list_of_executions(request: Request, pipeline_name: str): await check_mlmd_file_exists() # checks if pipeline exists await check_pipeline_exists(pipeline_name) - response = await async_api(get_lineage_data, server_store_path,pipeline_name,"Execution",dict_of_art_ids,dict_of_exe_ids) + response = await async_api(get_lineage_data, server_store_path, pipeline_name, "Execution", dict_of_art_ids, dict_of_exe_ids) return response @@ -221,7 +221,7 @@ async def artifacts( request: Request, pipeline_name: str, type: str, # type = artifact type - query_params: ArtifactsRequest = Depends() + query_params: ArtifactRequest = Depends() ): # Extract the query parameters from the query_params object page = query_params.page @@ -485,7 +485,7 @@ async def artifact_lineage(request: Request, pipeline_name: str): if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response=await async_api(get_lineage_data, server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) + response=await async_api(get_lineage_data, server_store_path, pipeline_name, "Artifacts", dict_of_art_ids, dict_of_exe_ids) return response else: return f"Pipeline name {pipeline_name} doesn't exist." diff --git a/server/app/schemas/dataframe.py b/server/app/schemas/dataframe.py index 1bc97a08..7494e4ad 100644 --- a/server/app/schemas/dataframe.py +++ b/server/app/schemas/dataframe.py @@ -34,9 +34,9 @@ class BaseRequest(BaseModel): filter_value: Optional[str] = Field(None, description="Filter value") # Query parameters for execution. -class ExecutionsRequest(BaseRequest): +class ExecutionRequest(BaseRequest): sort_field: str = Field("Context_Type", description="Column to sort by (default: Context_Type)") # Query parameters for artifact. -class ArtifactsRequest(BaseRequest): +class ArtifactRequest(BaseRequest): sort_field: str = Field("name", description="Column to sort by (default: name)") \ No newline at end of file From 62822d92299afcc2bb29973a6bec82ff6604acb0 Mon Sep 17 00:00:00 2001 From: AyeshaSanadi Date: Fri, 21 Feb 2025 12:01:37 +0530 Subject: [PATCH 11/11] Fixed pipeline name inside python env --- cmflib/commands/metadata/push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmflib/commands/metadata/push.py b/cmflib/commands/metadata/push.py index 121b24b6..a36c5563 100644 --- a/cmflib/commands/metadata/push.py +++ b/cmflib/commands/metadata/push.py @@ -153,7 +153,7 @@ def run(self): # however, we will be keeping the record of the status code. # Getting all executions df to get the custom property 'Python_Env' - executions = query.get_all_executions_in_pipeline(self.args.pipeline_name) + executions = query.get_all_executions_in_pipeline(pipeline_name) if not executions.empty: if 'custom_properties_Python_Env' in executions.columns: list_of_env_files = executions['custom_properties_Python_Env'].drop_duplicates().tolist()