Skip to content

Commit

Permalink
get pipeline status in a db engine compatible way
Browse files Browse the repository at this point in the history
  • Loading branch information
alekszievr committed Feb 10, 2025
1 parent 3218e5e commit 4a1ef00
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
5 changes: 2 additions & 3 deletions cognee/api/v1/cognify/cognify_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
task_status = await get_pipeline_status([dataset_id])

if (
dataset_id in task_status
and task_status[dataset_id] == PipelineRunStatus.DATASET_PROCESSING_STARTED
str(dataset_id) in task_status
and task_status[str(dataset_id)] == PipelineRunStatus.DATASET_PROCESSING_STARTED
):
logger.info("Dataset %s is already being processed.", dataset_name)
return
Expand All @@ -104,7 +104,6 @@ async def run_cognify_pipeline(dataset: Dataset, user: User, tasks: list[Task]):
await index_graph_edges()

send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)

return pipeline_run_status

except Exception as error:
Expand Down
26 changes: 10 additions & 16 deletions cognee/modules/pipelines/operations/get_pipeline_status.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
from uuid import UUID
from sqlalchemy import func, select, cast, Text
from sqlalchemy import func, select
from sqlalchemy.orm import aliased
from cognee.infrastructure.databases.relational import get_relational_engine
from ..models import PipelineRun


async def get_pipeline_status(pipeline_ids: list[UUID]):
db_engine = get_relational_engine()
dialect = db_engine.engine.dialect.name

async with db_engine.get_async_session() as session:
if dialect == "sqlite":
dataset_id_column = func.json_extract(PipelineRun.run_info, "$.dataset_id")
else:
dataset_id_column = PipelineRun.run_info.op("->>")("dataset_id")

query = (
select(
PipelineRun,
func.row_number()
.over(
partition_by=PipelineRun.run_info.op("->>")("dataset_id"),
partition_by=dataset_id_column,
order_by=PipelineRun.created_at.desc(),
)
.label("rn"),
)
.filter(
PipelineRun.run_info.op("->>")("dataset_id").in_([str(id) for id in pipeline_ids])
)
.filter(dataset_id_column.in_([str(id) for id in pipeline_ids]))
.subquery()
)

Expand All @@ -30,16 +34,6 @@ async def get_pipeline_status(pipeline_ids: list[UUID]):

runs = (await session.execute(latest_runs)).scalars().all()

pipeline_statuses = {str(run.id): run.status for run in runs}
pipeline_statuses = {run.run_info["dataset_id"]: run.status for run in runs}

return pipeline_statuses

# f"""SELECT data_id, status
# FROM (
# SELECT data_id, status, ROW_NUMBER() OVER (PARTITION BY data_id ORDER BY created_at DESC) as rn
# FROM cognee.cognee.task_runs
# WHERE data_id IN ({formatted_data_ids})
# ) t
# WHERE rn = 1;"""

# return { dataset["data_id"]: dataset["status"] for dataset in datasets_statuses }

0 comments on commit 4a1ef00

Please sign in to comment.