From 4a1ef0069797cc46ebca6a3f9b8c417d33f46bdd Mon Sep 17 00:00:00 2001 From: Rita Aleksziev Date: Mon, 10 Feb 2025 10:23:41 +0100 Subject: [PATCH] get pipeline status in a db engine compatible way --- cognee/api/v1/cognify/cognify_v2.py | 5 ++-- .../operations/get_pipeline_status.py | 26 +++++++------------ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/cognee/api/v1/cognify/cognify_v2.py b/cognee/api/v1/cognify/cognify_v2.py index 6ee92fad..a71f4b17 100644 --- a/cognee/api/v1/cognify/cognify_v2.py +++ b/cognee/api/v1/cognify/cognify_v2.py @@ -81,8 +81,8 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): task_status = await get_pipeline_status([dataset_id]) if ( - dataset_id in task_status - and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED + str(dataset_id) in task_status + and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED ): logger.info("Dataset %s is already being processed.", dataset_name) return @@ -104,7 +104,6 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]): await index_graph_edges() send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id) - return pipeline_run_status except Exception as error: diff --git a/cognee/modules/pipelines/operations/get_pipeline_status.py b/cognee/modules/pipelines/operations/get_pipeline_status.py index 30c43a5c..40a99c14 100644 --- a/cognee/modules/pipelines/operations/get_pipeline_status.py +++ b/cognee/modules/pipelines/operations/get_pipeline_status.py @@ -1,5 +1,5 @@ from uuid import UUID -from sqlalchemy import func, select, cast, Text +from sqlalchemy import func, select from sqlalchemy.orm import aliased from cognee.infrastructure.databases.relational import get_relational_engine from ..models import PipelineRun @@ -7,21 +7,25 @@ async def get_pipeline_status(pipeline_ids: list[UUID]): db_engine = get_relational_engine() + dialect = db_engine.engine.dialect.name async with db_engine.get_async_session() as session: + if dialect == "sqlite": + dataset_id_column = func.json_extract(PipelineRun.run_info, "$.dataset_id") + else: + dataset_id_column = PipelineRun.run_info.op("->>")("dataset_id") + query = ( select( PipelineRun, func.row_number() .over( - partition_by=PipelineRun.run_info.op("->>")("dataset_id"), + partition_by=dataset_id_column, order_by=PipelineRun.created_at.desc(), ) .label("rn"), ) - .filter( - PipelineRun.run_info.op("->>")("dataset_id").in_([str(id) for id in pipeline_ids]) - ) + .filter(dataset_id_column.in_([str(id) for id in pipeline_ids])) .subquery() ) @@ -30,16 +34,6 @@ async def get_pipeline_status(pipeline_ids: list[UUID]): runs = (await session.execute(latest_runs)).scalars().all() - pipeline_statuses = {str(run.id): run.status for run in runs} + pipeline_statuses = {run.run_info["dataset_id"]: run.status for run in runs} return pipeline_statuses - - # f"""SELECT data_id, status - # FROM ( - # SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn - # FROM cognee.cognee.task_runs - # WHERE data_id IN ({formatted_data_ids}) - # ) t - # WHERE rn = 1;""" - - # return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }