diff --git a/frontend/src/features/myWorkflows/api/runs/useStartRuns.ts b/frontend/src/features/myWorkflows/api/runs/useStartRuns.ts new file mode 100644 index 00000000..ae401ee0 --- /dev/null +++ b/frontend/src/features/myWorkflows/api/runs/useStartRuns.ts @@ -0,0 +1,45 @@ +import { type MutationConfig } from "@services/clients/react-query.client"; +import { useMutation } from "@tanstack/react-query"; +import { type AxiosError } from "axios"; +import { type IBatchWorkflowActionResponse } from "features/myWorkflows/types/workflow"; +import { toast } from "react-toastify"; +import { dominoApiClient } from "services/clients/domino.client"; + +interface StartRunsParams { + workflowIds: number[]; +} + +interface UseStartRuns { + workspaceId?: string; +} + +export const useStartRuns = ( + { workspaceId }: UseStartRuns, + config: MutationConfig = {}, +) => { + return useMutation({ + mutationFn: async ({ workflowIds }) => { + if (!workflowIds) throw new Error("No workflow selected"); + return await postWorkflowRunIds({ workflowIds, workspaceId }); + }, + onError: (e: AxiosError<{ detail: string }>) => { + const message = + (e.response?.data?.detail ?? e?.message) || "Something went wrong"; + + toast.error(message, { + toastId: message, + }); + }, + ...config, + }); +}; + +const postWorkflowRunIds = async ({ + workflowIds, + workspaceId, +}: StartRunsParams & UseStartRuns): Promise => { + return await dominoApiClient.post( + `/batch/workspaces/${workspaceId}/workflows/runs`, + workflowIds, + ); +}; diff --git a/frontend/src/features/myWorkflows/api/runs/useStopRuns.ts b/frontend/src/features/myWorkflows/api/runs/useStopRuns.ts new file mode 100644 index 00000000..9cb5ddd7 --- /dev/null +++ b/frontend/src/features/myWorkflows/api/runs/useStopRuns.ts @@ -0,0 +1,45 @@ +import { type MutationConfig } from "@services/clients/react-query.client"; +import { useMutation } from "@tanstack/react-query"; +import { type AxiosError } from "axios"; +import { type IBatchWorkflowActionResponse } from "features/myWorkflows/types/workflow"; +import { toast } from "react-toastify"; +import { dominoApiClient } from "services/clients/domino.client"; + +interface StopRunsParams { + workflowIds: number[]; +} + +interface UseStopRuns { + workspaceId?: string; +} + +export const useStopRuns = ( + { workspaceId }: UseStopRuns, + config: MutationConfig = {}, +) => { + return useMutation({ + mutationFn: async ({ workflowIds }) => { + if (!workflowIds) throw new Error("No workflow selected"); + return await postWorkflowStopIds({ workflowIds, workspaceId }); + }, + onError: (e: AxiosError<{ detail: string }>) => { + const message = + (e.response?.data?.detail ?? e?.message) || "Something went wrong"; + + toast.error(message, { + toastId: message, + }); + }, + ...config, + }); +}; + +const postWorkflowStopIds = async ({ + workflowIds, + workspaceId, +}: StopRunsParams & UseStopRuns): Promise => { + return await dominoApiClient.patch( + `/batch/workspaces/${workspaceId}/workflows/runs`, + workflowIds, + ); +}; diff --git a/frontend/src/features/myWorkflows/api/workflow/useDeleteWorkflows.ts b/frontend/src/features/myWorkflows/api/workflow/useDeleteWorkflows.ts new file mode 100644 index 00000000..0123706b --- /dev/null +++ b/frontend/src/features/myWorkflows/api/workflow/useDeleteWorkflows.ts @@ -0,0 +1,59 @@ +import { type MutationConfig } from "@services/clients/react-query.client"; +import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { type AxiosError } from "axios"; +import { type IBatchWorkflowActionResponse } from "features/myWorkflows/types/workflow"; +import { toast } from "react-toastify"; +import { dominoApiClient } from "services/clients/domino.client"; + +interface DeleteWorkflowsParams { + workflowIds: number[]; +} + +interface UseDeleteWorkflows { + workspaceId?: string; +} + +export const useDeleteWorkflows = ( + { workspaceId }: UseDeleteWorkflows, + config: MutationConfig< + DeleteWorkflowsParams, + IBatchWorkflowActionResponse + > = {}, +) => { + const queryClient = useQueryClient(); + + return useMutation({ + mutationFn: async ({ workflowIds }) => { + if (!workspaceId) throw new Error("No workspace selected"); + return await deleteWorkflowByIds({ workflowIds, workspaceId }); + }, + onSuccess: async (_, { workflowIds }) => { + await queryClient.invalidateQueries({ + queryKey: ["WORKFLOWS", workspaceId], + }); + await queryClient.invalidateQueries({ + queryKey: ["WORKFLOW", workspaceId, workflowIds], + }); + }, + onError: (e: AxiosError<{ detail: string }>) => { + const message = + (e.response?.data?.detail ?? e?.message) || "Something went wrong"; + + toast.error(message, { + toastId: message, + }); + }, + ...config, + }); +}; + +const deleteWorkflowByIds = async ( + params: DeleteWorkflowsParams & UseDeleteWorkflows, +): Promise => { + return await dominoApiClient.delete( + `/batch/workspaces/${params.workspaceId}/workflows`, + { + data: params.workflowIds, + }, + ); +}; diff --git a/frontend/src/features/myWorkflows/components/WorkflowsList/Actions.tsx b/frontend/src/features/myWorkflows/components/WorkflowsList/Actions.tsx index 3b5d2747..31a7ea61 100644 --- a/frontend/src/features/myWorkflows/components/WorkflowsList/Actions.tsx +++ b/frontend/src/features/myWorkflows/components/WorkflowsList/Actions.tsx @@ -1,94 +1,70 @@ -import DeleteOutlineIcon from "@mui/icons-material/DeleteOutline"; -import PauseCircleOutlineIcon from "@mui/icons-material/PauseCircleOutline"; -import PlayCircleOutlineIcon from "@mui/icons-material/PlayCircleOutline"; -import { IconButton, Tooltip, useTheme } from "@mui/material"; +import { + DeleteOutlined, + PlayCircleOutlined, + StopCircleOutlined, +} from "@mui/icons-material"; +import { Button, Divider } from "@mui/material"; import { type CommonProps } from "@mui/material/OverridableComponent"; -import { Modal, type ModalRef } from "components/Modal"; +import { GridToolbarContainer } from "@mui/x-data-grid"; import { type IWorkflow } from "features/myWorkflows/types"; -import React, { useRef, useState } from "react"; +import React, { useState } from "react"; import { ConfirmDeleteModal } from "./ConfirmDeleteModal"; interface Props extends CommonProps { - id: IWorkflow["id"]; - deleteFn: () => void; + ids: Array; runFn: () => void; - pauseFn: () => void; + stopFn: () => void; + deleteFn: () => void; disabled: boolean; } export const Actions: React.FC = ({ + ids, runFn, + stopFn, deleteFn, - className, - disabled = false, + disabled, }) => { - const theme = useTheme(); - const [deleteModalOpen, setDeleteModalOpen] = useState(false); - const newFeatureModal = useRef(null); return ( - <> - {disabled ? ( - - - - - - - - ) : ( - - - - )} - { - newFeatureModal.current?.open(); - }} + + + + - Are you sure you want to delete this workflow? This action{" "} + Are you sure you want to delete selected {ids.length} workflows? + This action{" "} cannot be undone. } @@ -101,6 +77,6 @@ export const Actions: React.FC = ({ }} confirmText="Delete" /> - + ); }; diff --git a/frontend/src/features/myWorkflows/components/WorkflowsList/FailureDetailsModal.tsx b/frontend/src/features/myWorkflows/components/WorkflowsList/FailureDetailsModal.tsx new file mode 100644 index 00000000..75d7bcf5 --- /dev/null +++ b/frontend/src/features/myWorkflows/components/WorkflowsList/FailureDetailsModal.tsx @@ -0,0 +1,53 @@ +import { type IBatchWorkflowActionDetail } from "@features/myWorkflows/types"; +import { Dialog, DialogContent, DialogTitle } from "@mui/material"; +import { DataGrid, type GridColDef } from "@mui/x-data-grid"; +import { useCallback } from "react"; + +interface Props { + isOpen: boolean; + title: string; + data: IBatchWorkflowActionDetail[]; + cancelCb: () => void; +} + +const columns: Array> = [ + { field: "id", headerName: "ID", width: 90 }, + { + field: "message", + headerName: "Message", + width: 400, + }, +]; + +export const FailureDetailsModal: React.FC = ({ + isOpen, + title, + data, + cancelCb, +}) => { + const cancel = useCallback(() => { + cancelCb(); + }, [cancelCb]); + return ( + + {title} + + + + + ); +}; diff --git a/frontend/src/features/myWorkflows/components/WorkflowsList/RunState.tsx b/frontend/src/features/myWorkflows/components/WorkflowsList/RunState.tsx new file mode 100644 index 00000000..c44d8af4 --- /dev/null +++ b/frontend/src/features/myWorkflows/components/WorkflowsList/RunState.tsx @@ -0,0 +1,58 @@ +import { + Autorenew, + Check, + Close, + HourglassTop, + Remove, +} from "@mui/icons-material"; +import { type IWorkflowRuns } from "features/myWorkflows/types"; +import React from "react"; + +interface Props { + state: IWorkflowRuns["state"]; +} + +export const RunState: React.FC = ({ state }) => { + const stateTitle = state[0].toUpperCase() + state.slice(1); + + if (state === "success") { + return ( + <> + + {stateTitle} + + ); + } + if (state === "failed") { + return ( + <> + + {stateTitle} + + ); + } + if (state === "queued") { + return ( + <> + + {stateTitle} + + ); + } + if (state === "running") { + return ( + <> + + {stateTitle} + + ); + } + if (state === "none") { + return ( + <> + + {stateTitle} + + ); + } +}; diff --git a/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx b/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx index d0cb3be9..5f68f431 100644 --- a/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx +++ b/frontend/src/features/myWorkflows/components/WorkflowsList/index.tsx @@ -1,25 +1,30 @@ import { useWorkspaces } from "@context/workspaces"; -import { InfoOutlined } from "@mui/icons-material"; -import { Paper, Tooltip } from "@mui/material"; +import { useStartRuns } from "@features/myWorkflows/api/runs/useStartRuns"; +import { useStopRuns } from "@features/myWorkflows/api/runs/useStopRuns"; +import { useDeleteWorkflows } from "@features/myWorkflows/api/workflow/useDeleteWorkflows"; +import { InfoOutlined, PlayCircleOutlined } from "@mui/icons-material"; +import { IconButton, Paper, Tooltip, useTheme } from "@mui/material"; import { DataGrid, type GridRowParams, type GridColDef, type GridEventListener, + type GridRowSelectionModel, } from "@mui/x-data-grid"; import { useQueryClient } from "@tanstack/react-query"; import { NoDataOverlay } from "components/NoDataOverlay"; +import { useWorkflows, useStartRun } from "features/myWorkflows/api"; import { - useDeleteWorkflow, - useWorkflows, - useStartRun, -} from "features/myWorkflows/api"; -import { type IWorkflow } from "features/myWorkflows/types"; + type IBatchWorkflowActionDetail, + type IWorkflow, +} from "features/myWorkflows/types"; import React, { useCallback, useMemo } from "react"; import { useNavigate } from "react-router-dom"; import { toast } from "react-toastify"; import { Actions } from "./Actions"; +import { FailureDetailsModal } from "./FailureDetailsModal"; +import { RunState } from "./RunState"; import { Status } from "./Status"; import { WorkflowsListSkeleton } from "./WorkflowsListSkeleton"; @@ -29,12 +34,22 @@ import { WorkflowsListSkeleton } from "./WorkflowsListSkeleton"; */ export const WorkflowList: React.FC = () => { + const theme = useTheme(); const navigate = useNavigate(); const [paginationModel, setPaginationModel] = React.useState({ pageSize: 10, page: 0, }); + const [failureDetails, setFailureDetails] = React.useState< + IBatchWorkflowActionDetail[] + >([]); + + const [failureModalOpen, setFailureModalOpen] = React.useState(false); + const [selectedWorkflowIds, setSelectedWorkflowIds] = React.useState< + Array + >([]); + const { workspace } = useWorkspaces(); const queryClient = useQueryClient(); @@ -49,17 +64,90 @@ export const WorkflowList: React.FC = () => { pageSize: paginationModel.pageSize, }, { - refetchInterval: 5000, + refetchInterval: 1500, }, ); - const { mutateAsync: handleDeleteWorkflow } = useDeleteWorkflow( + const { mutateAsync: handleDeleteWorkflows } = useDeleteWorkflows( { workspaceId: workspace?.id, }, { - onSuccess: () => { - toast.success("Workflow deleted"); + onSuccess: async (response, { workflowIds }) => { + await queryClient.invalidateQueries({ + queryKey: ["WORKFLOWS", workspace?.id, workflowIds], + }); + if (response.result === "success") { + toast.success("Workflows deleted"); + } else { + setFailureDetails(response.details); + toast.warning( +
{ + setFailureModalOpen(true); + }} + style={{ textDecoration: "underline" }} + > + Some workflows couldn't be deleted. +
, + ); + } + }, + }, + ); + + const { mutateAsync: handleRunWorkflows } = useStartRuns( + { + workspaceId: workspace?.id, + }, + { + onSuccess: async (response, { workflowIds }) => { + await queryClient.invalidateQueries({ + queryKey: ["RUNS", workspace?.id, workflowIds], + }); + if (response.result === "success") { + toast.success("Workflows started to run"); + } else { + setFailureDetails(response.details); + toast.warning( +
{ + setFailureModalOpen(true); + }} + style={{ textDecoration: "underline" }} + > + Some workflows couldn't be started. +
, + ); + } + }, + }, + ); + + const { mutateAsync: handleStopWorkflows } = useStopRuns( + { + workspaceId: workspace?.id, + }, + { + onSuccess: async (response, { workflowIds }) => { + await queryClient.invalidateQueries({ + queryKey: ["RUNS", workspace?.id, workflowIds], + }); + if (response.result === "success") { + toast.success("Workflows stopped"); + } else { + setFailureDetails(response.details); + toast.warning( +
{ + setFailureModalOpen(true); + }} + style={{ textDecoration: "underline" }} + > + Some workflows couldn't be stopped. +
, + ); + } }, }, ); @@ -78,23 +166,37 @@ export const WorkflowList: React.FC = () => { }, ); - const deleteWorkflow = useCallback(async (id: IWorkflow["id"]) => { + const runWorkflow = useCallback(async (id: IWorkflow["id"]) => { try { - await handleDeleteWorkflow({ workflowId: String(id) }); - await handleRefreshWorkflows(); + await handleRunWorkflow({ workflowId: String(id) }); } catch (e) { console.error(e); } }, []); - const runWorkflow = useCallback(async (id: IWorkflow["id"]) => { + + const runWorkflows = useCallback(async (ids: Array) => { try { - await handleRunWorkflow({ workflowId: String(id) }); + await handleRunWorkflows({ workflowIds: ids }); } catch (e) { console.error(e); } }, []); - const pauseWorkflow = useCallback((_id: IWorkflow["id"]) => {}, []); + const deleteWorkflows = useCallback(async (ids: Array) => { + try { + await handleDeleteWorkflows({ workflowIds: ids }); + } catch (e) { + console.error(e); + } + }, []); + + const stopWorkflows = useCallback(async (ids: Array) => { + try { + await handleStopWorkflows({ workflowIds: ids }); + } catch (e) { + console.error(e); + } + }, []); const { rows, totalRows } = useMemo( () => ({ @@ -109,22 +211,35 @@ export const WorkflowList: React.FC = () => { { field: "id", headerName: "ID", - width: 80, - headerAlign: "center", - align: "center", + width: 60, + headerAlign: "left", + align: "left", sortable: false, + minWidth: 50, + }, + { + field: "state", + headerName: "Last Run State", + headerAlign: "left", + align: "left", + type: "string", + minWidth: 150, + // flex: 1, + renderCell: (params) => { + return ; + }, }, { field: "status", headerName: "Status", renderCell: (params) => , flex: 0.5, - align: "center", - headerAlign: "center", + align: "left", + headerAlign: "left", sortable: false, minWidth: 100, }, - { field: "name", headerName: "Workflow Name", flex: 2, minWidth: 220 }, + { field: "name", headerName: "Workflow Name", flex: 2, minWidth: 180 }, { field: "start_date", renderHeader: () => ( @@ -136,11 +251,11 @@ export const WorkflowList: React.FC = () => { ), flex: 1, - align: "center", - minWidth: 220, + align: "left", + minWidth: 180, valueFormatter: ({ value }) => new Date(value).toLocaleString(), - headerAlign: "center", + headerAlign: "left", }, { field: "end_date", @@ -152,11 +267,11 @@ export const WorkflowList: React.FC = () => { ), - headerAlign: "center", - align: "center", + headerAlign: "left", + align: "left", type: "string", flex: 1, - minWidth: 220, + minWidth: 180, valueFormatter: ({ value }) => value ? new Date(value).toLocaleString() : "None", }, @@ -164,27 +279,26 @@ export const WorkflowList: React.FC = () => { field: "created_at", headerName: "Created At", flex: 1, - align: "center", - minWidth: 220, - + align: "left", + minWidth: 180, valueFormatter: ({ value }) => new Date(value).toLocaleString(), - headerAlign: "center", + headerAlign: "left", }, { field: "last_changed_at", headerName: "Last Modified", flex: 1, - align: "center", + align: "left", valueFormatter: ({ value }) => new Date(value).toLocaleString(), - headerAlign: "center", - minWidth: 220, + headerAlign: "left", + minWidth: 180, }, { field: "schedule", headerName: "Schedule", flex: 1, - align: "center", - headerAlign: "center", + align: "left", + headerAlign: "left", sortable: false, minWidth: 100, }, @@ -192,39 +306,59 @@ export const WorkflowList: React.FC = () => { field: "next_dagrun", headerName: "Next Run", flex: 1, - align: "center", - headerAlign: "center", - minWidth: 220, + align: "left", + headerAlign: "left", + minWidth: 180, sortable: false, valueFormatter: ({ value }) => value ? new Date(value).toLocaleString() : "none", }, { - field: "actions", - headerName: "Actions", - flex: 1, + field: "run", + headerName: "Run", renderCell: ({ row }) => { return ( - { - void deleteWorkflow(row.id); - }} - runFn={() => { - void runWorkflow(row.id); - }} - pauseFn={() => { - pauseWorkflow(row.id); - }} - disabled={new Date(row.start_date) > new Date()} - /> + <> + {new Date(row.start_date) > new Date() ? ( + + + { + void runWorkflow(row.id); + }} + > + + + + + ) : ( + { + void runWorkflow(row.id); + }} + > + + + )} + ); }, - headerAlign: "center", - align: "center", + headerAlign: "left", + align: "left", sortable: false, - minWidth: 150, + minWidth: 50, }, ], [], @@ -249,6 +383,10 @@ export const WorkflowList: React.FC = () => { [navigate], ); + const handleSelectionModelChange = (newSelection: GridRowSelectionModel) => { + setSelectedWorkflowIds(newSelection as number[]); + }; + if (isLoading) { return ; } @@ -257,8 +395,9 @@ export const WorkflowList: React.FC = () => { <> params.row.status !== "failed" && params.row.status !== "creating" @@ -274,12 +413,32 @@ export const WorkflowList: React.FC = () => { }} rowCount={totalRows} onPaginationModelChange={setPaginationModel} + onRowSelectionModelChange={handleSelectionModelChange} disableDensitySelector disableRowSelectionOnClick hideFooterSelectedRowCount disableColumnMenu disableColumnSelector - slots={{ noRowsOverlay: NoDataOverlay }} + slots={{ + noRowsOverlay: NoDataOverlay, + toolbar: () => { + return ( + { + void runWorkflows(selectedWorkflowIds); + }} + stopFn={() => { + void stopWorkflows(selectedWorkflowIds); + }} + deleteFn={() => { + void deleteWorkflows(selectedWorkflowIds); + }} + disabled={selectedWorkflowIds.length === 0} + /> + ); + }, + }} sx={{ // disable cell selection style "&.MuiDataGrid-root .MuiDataGrid-cell:focus": { @@ -292,6 +451,14 @@ export const WorkflowList: React.FC = () => { }} /> + { + setFailureModalOpen(false); + }} + /> ); }; diff --git a/frontend/src/features/myWorkflows/types/runs.ts b/frontend/src/features/myWorkflows/types/runs.ts index 9651c1e1..929b8548 100644 --- a/frontend/src/features/myWorkflows/types/runs.ts +++ b/frontend/src/features/myWorkflows/types/runs.ts @@ -1,4 +1,4 @@ -enum runState { +export enum runState { success = "success", failed = "failed", running = "running", diff --git a/frontend/src/features/myWorkflows/types/workflow.ts b/frontend/src/features/myWorkflows/types/workflow.ts index d4a97805..90515746 100644 --- a/frontend/src/features/myWorkflows/types/workflow.ts +++ b/frontend/src/features/myWorkflows/types/workflow.ts @@ -1,3 +1,4 @@ +import { type runState } from "features/myWorkflows/types/runs"; import { type DefaultNode } from "features/workflowEditor/components/Panel/WorkflowPanel"; export type IWorkflowElement = DefaultNode; @@ -22,6 +23,7 @@ export interface IWorkflow { is_paused: boolean; is_active: boolean; status: workflowStatus; + last_run_status: runState; is_subdag: boolean; last_pickled: string; schedule: string; @@ -95,3 +97,13 @@ export type IDeleteWorkflowIdResponseInterface = Record; * TODO type properly */ export type IPostWorkflowRunIdResponseInterface = Record; + +export interface IBatchWorkflowActionDetail { + id: number; + message: string; +} + +export interface IBatchWorkflowActionResponse { + result: string; + details: IBatchWorkflowActionDetail[]; +} diff --git a/rest/clients/airflow_client.py b/rest/clients/airflow_client.py index 384a45ec..e1a51b46 100644 --- a/rest/clients/airflow_client.py +++ b/rest/clients/airflow_client.py @@ -70,6 +70,20 @@ def run_dag(self, dag_id): ) return response + + def stop_dag(self, dag_id, dag_run_id): + resource = f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}" + payload = { + "state": "failed" + } + response = self.request( + method="patch", + resource=resource, + json=payload + ) + return response + + def delete_dag(self, dag_id): resource = f"api/v1/dags/{dag_id}" response = self.request( diff --git a/rest/main.py b/rest/main.py index 94f14fec..a9935be2 100644 --- a/rest/main.py +++ b/rest/main.py @@ -5,6 +5,7 @@ from routers.auth_router import router as auth_router from routers.user_router import router as user_router from routers.workflow_router import router as workflow_router +from routers.workflow_router import batch_router as batch_workflow_router from routers.piece_router import router as piece_router from routers.workspace_router import router as workspace_router from routers.piece_repository_router import router as piece_repository_router @@ -49,6 +50,7 @@ def configure_app(): app.include_router(piece_repository_router, tags=['Piece Repository']) app.include_router(piece_router, tags=["Piece"]) app.include_router(workflow_router, tags=["Workflow"]) + app.include_router(batch_workflow_router, tags=["Batch Workflow"]) app.include_router(workspace_router, tags=["Workspace"]) app.include_router(secret_router, tags=["Secret"]) app.include_router(health_check_router, tags=["Health Check"]) diff --git a/rest/repository/workflow_repository.py b/rest/repository/workflow_repository.py index 4d7ccabf..cf5b1301 100644 --- a/rest/repository/workflow_repository.py +++ b/rest/repository/workflow_repository.py @@ -20,6 +20,13 @@ def find_by_id(self, id: int): session.expunge_all() return result + def find_by_ids(self, ids: list[int]): + with session_scope() as session: + result = session.query(Workflow).filter(Workflow.id.in_(ids)).all() + session.flush() + session.expunge_all() + return result + def find_by_workspace_id( self, workspace_id: int, @@ -75,13 +82,20 @@ def get_workflows_summary(self): session.expunge_all() return result - def delete(self, id): + def delete_by_id(self, id: int): with session_scope() as session: result = session.query(Workflow).filter(Workflow.id==id).delete() session.flush() session.expunge_all() return result + def delete_by_ids(self, ids: list[int]): + with session_scope() as session: + result = session.query(Workflow).filter(Workflow.id.in_(ids)).delete(synchronize_session=False) + session.flush() + session.expunge_all() + return result + def delete_by_workspace_id(self, workspace_id: int): with session_scope() as session: result = session.query(Workflow).filter(Workflow.workspace_id==workspace_id).delete(synchronize_session=False) diff --git a/rest/routers/workflow_router.py b/rest/routers/workflow_router.py index 6fa9b443..9272dd18 100644 --- a/rest/routers/workflow_router.py +++ b/rest/routers/workflow_router.py @@ -1,9 +1,11 @@ +import json from fastapi import APIRouter, HTTPException, status, Depends, Response from schemas.context.auth_context import AuthorizationContextData from typing import List from services.workflow_service import WorkflowService -from schemas.requests.workflow import CreateWorkflowRequest, ListWorkflowsFilters +from schemas.requests.workflow import CreateWorkflowRequest, ListWorkflowsFilters, RunWorkflowsRequest from schemas.responses.workflow import ( + BatchWorkflowResponse, GetWorkflowsResponse, GetWorkflowResponse, CreateWorkflowResponse, @@ -32,11 +34,35 @@ router = APIRouter(prefix="/workspaces/{workspace_id}/workflows") +batch_router = APIRouter(prefix="/batch/workspaces/{workspace_id}/workflows") workflow_service = WorkflowService() read_authorizer = Authorizer(permission_level=Permission.read.value) write_authorizer = Authorizer(permission_level=Permission.write.value) +@batch_router.post( + path="/runs", + status_code=207, + response_model=BatchWorkflowResponse, + responses={ + status.HTTP_207_MULTI_STATUS: {"model": BatchWorkflowResponse}, + status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": SomethingWrongError}, + status.HTTP_403_FORBIDDEN: {"model": ForbiddenError}, + status.HTTP_404_NOT_FOUND: {"model": ResourceNotFoundError} + }, + dependencies=[Depends(write_authorizer.authorize)] +) +def run_workflows( + workspace_id, + workflow_ids: List[int], +): + """Run workflows""" + try: + return workflow_service.run_workflows( + workflow_ids=workflow_ids + ) + except (BaseException, ResourceNotFoundException, ConflictException) as e: + raise HTTPException(status_code=e.status_code, detail=e.message) @router.post( @@ -142,6 +168,28 @@ async def delete_workflow( except (BaseException, ForbiddenException, ResourceNotFoundException) as e: raise HTTPException(status_code=e.status_code, detail=e.message) +@batch_router.delete( + path="", + status_code=207, + response_model=BatchWorkflowResponse, + responses={ + status.HTTP_207_MULTI_STATUS: {"model": BatchWorkflowResponse}, + status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": SomethingWrongError}, + status.HTTP_403_FORBIDDEN: {"model": ForbiddenError}, + status.HTTP_404_NOT_FOUND: {"model": ResourceNotFoundError}, + }, + dependencies=[Depends(write_authorizer.authorize)], +) +async def delete_workflows( + workflow_ids: List[int], + workspace_id: int, +): + try: + return await workflow_service.delete_workflows( + workflow_ids=workflow_ids, workspace_id=workspace_id + ) + except (BaseException, ForbiddenException, ResourceNotFoundException) as e: + raise HTTPException(status_code=e.status_code, detail=e.message) @router.post( @@ -166,6 +214,53 @@ def run_workflow( except (BaseException, ForbiddenException, ResourceNotFoundException, ConflictException) as e: raise HTTPException(status_code=e.status_code, detail=e.message) + +@router.patch( + "/{workflow_id}/runs", + status_code=204, + responses={ + status.HTTP_204_NO_CONTENT: {}, + status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": SomethingWrongError}, + status.HTTP_403_FORBIDDEN: {"model": ForbiddenError}, + status.HTTP_404_NOT_FOUND: {"model": ResourceNotFoundError} + }, + dependencies=[Depends(write_authorizer.authorize)] +) +def stop_workflow_run( + workspace_id: int, + workflow_id: int, +): + try: + return workflow_service.stop_workflow_run( + workflow_id=workflow_id + ) + except (BaseException, ResourceNotFoundException) as e: + raise HTTPException(status_code=e.status_code, detail=e.message) + + +@batch_router.patch( + "/runs", + status_code=207, + responses={ + status.HTTP_207_MULTI_STATUS: {"model": BatchWorkflowResponse}, + status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": SomethingWrongError}, + status.HTTP_403_FORBIDDEN: {"model": ForbiddenError}, + status.HTTP_404_NOT_FOUND: {"model": ResourceNotFoundError} + }, + dependencies=[Depends(write_authorizer.authorize)] +) +def stop_workflow_runs( + workspace_id: int, + workflow_ids: List[int], +): + try: + return workflow_service.stop_workflow_runs( + workflow_ids=workflow_ids + ) + except BaseException as e: + raise HTTPException(status_code=e.status_code, detail=e.message) + + @router.get( "/{workflow_id}/runs", status_code=200, diff --git a/rest/schemas/requests/workflow.py b/rest/schemas/requests/workflow.py index 72171eef..7fddb40c 100644 --- a/rest/schemas/requests/workflow.py +++ b/rest/schemas/requests/workflow.py @@ -178,3 +178,7 @@ class ListWorkflowsFilters(BaseModel): end_date__gt: Optional[str] = None schedule: Optional[ScheduleIntervalType] = None + +class RunWorkflowsRequest(BaseModel): + workflow_ids: List[int] + \ No newline at end of file diff --git a/rest/schemas/responses/workflow.py b/rest/schemas/responses/workflow.py index 55d4a824..f0f0a647 100644 --- a/rest/schemas/responses/workflow.py +++ b/rest/schemas/responses/workflow.py @@ -16,6 +16,7 @@ class WorkflowRunState(str, Enum): queued = "queued" success = "success" failed = "failed" + none = "none" class WorkflowRunTaskState(str, Enum): success = "success" @@ -78,6 +79,7 @@ class GetWorkflowsResponseData(BaseModel): is_paused: bool is_active: bool status: WorkflowStatus + last_run_status: WorkflowRunState = None schedule: Optional[ScheduleIntervalTypeResponse] = None next_dagrun: Optional[datetime] = None @@ -234,4 +236,18 @@ class CreateWorkflowResponse(BaseModel): class DeleteWorkflowResponse(BaseModel): - workflow_id: int \ No newline at end of file + workflow_id: int + + +class BatchWorkflowFailureDetail(BaseModel): + id: int + message: str + + +class BatchWorkflowSuccessDetail(BaseModel): + message: str + + +class BatchWorkflowResponse(BaseModel): + result: str + details: List[BatchWorkflowFailureDetail] | BatchWorkflowSuccessDetail diff --git a/rest/services/workflow_service.py b/rest/services/workflow_service.py index 99fafbf6..79aa19a0 100644 --- a/rest/services/workflow_service.py +++ b/rest/services/workflow_service.py @@ -1,5 +1,6 @@ import re from math import ceil +from typing import List from aiohttp import ClientSession import asyncio from copy import deepcopy @@ -26,7 +27,11 @@ GetWorkflowResultReportResponse, GetWorkflowRunTaskLogsResponse, GetWorkflowRunTaskResultResponse, - WorkflowStatus + WorkflowRunState, + WorkflowStatus, + BatchWorkflowFailureDetail, + BatchWorkflowSuccessDetail, + BatchWorkflowResponse ) from schemas.responses.base import PaginationSet from schemas.exceptions.base import ConflictException, ForbiddenException, ResourceNotFoundException, BadRequestException @@ -197,6 +202,14 @@ async def list_workflows( is_paused = False next_dagrun = None status = WorkflowStatus.creating.value + dag_runs = self.airflow_client.get_all_workflow_runs( + dag_id=dag_info['dag_id'], + page=0, + page_size=1, + descending=True + ) + dag_runs_data = dag_runs.json() + if is_dag_broken: status = WorkflowStatus.failed.value schedule = 'failed' @@ -227,6 +240,7 @@ async def list_workflows( is_paused=is_paused, is_active=is_active, status=status, + last_run_status=(dag_runs_data["dag_runs"][0]["state"] if len(dag_runs_data["dag_runs"]) > 0 else WorkflowRunState.none.value), schedule=schedule, next_dagrun=next_dagrun ) @@ -255,10 +269,10 @@ def get_workflow(self, workspace_id: int, workflow_id: str, auth_context: Author if workflow.workspace_id != workspace_id: raise ForbiddenException() - airflow_dag_info = self.airflow_client.get_dag_by_id(dag_id=workflow.uuid_name) + dag_info = self.airflow_client.get_dag_by_id(dag_id=workflow.uuid_name) - if airflow_dag_info.status_code == 404: - airflow_dag_info = { + if dag_info.status_code == 404: + dag_info = { 'is_paused': 'creating', 'is_active': 'creating', 'is_subdag': 'creating', @@ -274,10 +288,10 @@ def get_workflow(self, workspace_id: int, workflow_id: str, auth_context: Author 'next_dagrun_data_interval_end': 'creating', } else: - airflow_dag_info = airflow_dag_info.json() + dag_info = dag_info.json() # Airflow 2.4.0 deprecated schedule_interval in dag but the API (2.7.2) still using it - schedule = airflow_dag_info.pop("schedule_interval") + schedule = dag_info.pop("schedule_interval") if isinstance(schedule, dict): schedule = schedule.get("value") @@ -292,7 +306,7 @@ def get_workflow(self, workspace_id: int, workflow_id: str, auth_context: Author created_by=workflow.created_by, workspace_id=workflow.workspace_id, schedule=schedule, - **airflow_dag_info + **dag_info ) return response @@ -515,6 +529,94 @@ def get_all_pieces_from_tasks_dict(self, tasks_dict): all_pieces.append(v["piece"]) return list(dict.fromkeys(all_pieces)) + + def run_workflows(self, workflow_ids: List[int]): + try: + failure_details = [] + workflows = self.workflow_repository.find_by_ids(ids=workflow_ids) + if not workflows: + raise ResourceNotFoundException("No workflows found.") + found_ids = [workflow.id for workflow in workflows] + not_found_ids = list(set(workflow_ids) - set(found_ids)) + if not_found_ids: + not_found_details = [ + BatchWorkflowFailureDetail(id=id, message="Workflow not found.") + for id in not_found_ids + ] + failure_details += not_found_details + for workflow in workflows: + # Check if start date is in the past + if workflow.start_date and workflow.start_date > datetime.now(tz=timezone.utc): + failure_details.append(BatchWorkflowFailureDetail( + id=workflow.id, + message="Workflow start date is in the future. Can not run it now.")) + continue + + if workflow.end_date and workflow.end_date < datetime.now(tz=timezone.utc): + failure_details.append(BatchWorkflowFailureDetail( + id=workflow.id, + message="You cannot run workflows that have ended.")) + continue + dag_id = workflow.uuid_name + dag_runs = self.airflow_client.get_all_workflow_runs( + dag_id=dag_id, + page=0, + page_size=1, + descending=True + ) + dag_runs_data = dag_runs.json() + if dag_runs_data["dag_runs"]: + run_state = dag_runs_data["dag_runs"][0]["state"] + if run_state == "running": + failure_details.append(BatchWorkflowFailureDetail(id=workflow.id, message="Workflow has already started to run.")) + continue + if run_state == "queued": + failure_details.append(BatchWorkflowFailureDetail(id=workflow.id, message="Workflow is waiting to run.")) + continue + payload = { + "is_paused": False + } + update_dag_response = self.airflow_client.update_dag( + dag_id=dag_id, + payload=payload + ) + if update_dag_response.status_code == 404: + failure_details.append( + BatchWorkflowFailureDetail( + id=workflow.id, message="Workflow still in creation process.")) + continue + if update_dag_response.status_code != 200: + # we can make this single log message + self.logger.error(f"Error while trying to unpause workflow {id}") + self.logger.error(update_dag_response.json()) + failure_details.append(BatchWorkflowFailureDetail( + id=workflow.id, message=update_dag_response.json() + )) + continue + run_dag_response = self.airflow_client.run_dag(dag_id=dag_id) + + if run_dag_response.status_code != 200: + self.logger.error(f"Error while trying to run workflow {id}") + self.logger.error(run_dag_response.json()) + failure_details.append(BatchWorkflowFailureDetail( + id=workflow.id, message=run_dag_response.json() + )) + continue + if failure_details: + return BatchWorkflowResponse ( + result="Some workflows couldn't started.", + details=failure_details) + return BatchWorkflowResponse( + result="success", + details=BatchWorkflowSuccessDetail( + message="Workflows successfully started to run." + ) + ) + except Exception as e: + self.logger.error(e) + raise e + + def run_workflow(self, workflow_id: int): workflow = self.workflow_repository.find_by_id(id=workflow_id) if not workflow: @@ -526,7 +628,21 @@ def run_workflow(self, workflow_id: int): if workflow.end_date and workflow.end_date < datetime.now(tz=timezone.utc): raise ForbiddenException('You cannot run workflows that have ended.') - + + dag_runs = self.airflow_client.get_all_workflow_runs( + dag_id=workflow.uuid_name, + page=0, + page_size=1, + descending=True + ) + dag_runs_data = dag_runs.json() + if dag_runs_data["dag_runs"]: + run_state = dag_runs_data["dag_runs"][0]["state"] + if run_state == "running": + raise ForbiddenException("Workflow has already started to run.") + if run_state == "queued": + raise ForbiddenException("Workflow is waiting to run.") + airflow_workflow_id = workflow.uuid_name # Force unpause workflow @@ -551,6 +667,87 @@ def run_workflow(self, workflow_id: int): self.logger.error(run_dag_response.json()) raise BaseException("Error while trying to run workflow") + + def stop_workflow_run(self, workflow_id: int): + workflow = self.workflow_repository.find_by_id(id=workflow_id) + if not workflow: + raise ResourceNotFoundException("Workflow not found.") + dag_id = workflow.uuid_name + dag_run = self.airflow_client.get_all_workflow_runs( + dag_id=dag_id, + page=0, + page_size=1, + descending=True + ) + dag_run_data = dag_run.json() + if dag_run_data["dag_runs"][0]["state"] not in ["running", "queued"]: + raise ForbiddenException("Worflow must be running or in queue to stop.") + response = self.airflow_client.stop_dag( + dag_id=dag_id, + dag_run_id=dag_run_data["dag_runs"][0]["dag_run_id"] + ) + if response.status_code == 404: + raise ResourceNotFoundException("No workflow run found for given workflow id.") + if response.status_code != 200: + self.logger.error(f"Error while trying to stop workflow runs. Workflow ID: {workflow_id}") + self.logger.error(response.json()) + raise BaseException("Error while trying to stop workflow runs.") + + + def stop_workflow_runs(self, workflow_ids: List[int]): + try: + failure_details = [] + workflows = self.workflow_repository.find_by_ids(ids=workflow_ids) + if not workflows: + raise ResourceNotFoundException("No workflows found.") + found_ids = [workflow.id for workflow in workflows] + not_found_ids = list(set(workflow_ids) - set(found_ids)) + if not_found_ids: + not_found_details = [ + BatchWorkflowFailureDetail(id=id, message="Workflow not found.") + for id in not_found_ids + ] + failure_details += not_found_details + for workflow in workflows: + dag_id = workflow.uuid_name + dag_runs = self.airflow_client.get_all_workflow_runs( + dag_id=dag_id, + page=0, + page_size=1, + descending=True + ) + dag_runs_data = dag_runs.json() + if dag_runs_data["dag_runs"]: + if dag_runs_data["dag_runs"][0]["state"] not in ["running", "queued"]: + failure_details.append(BatchWorkflowFailureDetail(id=workflow.id, message="Worflow must be running or in queue to stop.")) + continue + response = self.airflow_client.stop_dag( + dag_id=dag_id, + dag_run_id=dag_runs_data["dag_runs"][0]["dag_run_id"] + ) + if response.status_code == 404: + failure_details.append(BatchWorkflowFailureDetail(id=workflow.id, message="No workflow run found for given workflow id.")) + if response.status_code != 200: + self.logger.error(f"Error while trying to stop workflow runs. Workflow ID: {workflow.id}") + self.logger.error(response.json()) + failure_details.append(BatchWorkflowFailureDetail( + id=workflow.id, message=response.json() + )) + if failure_details: + return BatchWorkflowResponse( + result="Some workflow runs couldn't be stopped.", + details=failure_details) + return BatchWorkflowResponse( + result="success", + details=BatchWorkflowSuccessDetail( + message="Workflow runs successfully stopped." + ) + ) + except Exception as e: + self.logger.error(e) + raise e + + async def delete_workspace_workflows(self, workspace_id: int): # TODO: improve this? Maybe running in a worker and not in the main thread? Pagination may take a while if there are a lot of workflows. workflows = self.workflow_repository.find_by_workspace_id(workspace_id=workspace_id, paginate=False, count=False) @@ -579,12 +776,54 @@ async def delete_workflow(self, workflow_id: str, workspace_id: int): try: await self.delete_workflow_files(workflow_uuid=workflow.uuid_name) self.airflow_client.delete_dag(dag_id=workflow.uuid_name) - self.workflow_repository.delete(id=workflow_id) - except Exception as e: # TODO improve exception handling + self.workflow_repository.delete_by_id(id=workflow_id) + except Exception as e: # TODO improve exception handling self.logger.exception(e) - self.workflow_repository.delete(id=workflow_id) + self.workflow_repository.delete_by_id(id=workflow_id) raise e + + async def delete_workflows(self, workflow_ids: List[int], workspace_id: int): + try: + failure_details = [] + workflows = self.workflow_repository.find_by_ids(ids=workflow_ids) + if not workflows: + raise ResourceNotFoundException("No workflows found.") + found_ids = [workflow.id for workflow in workflows] + not_found_ids = list(set(workflow_ids) - set(found_ids)) + if not_found_ids: + not_found_details = [ + BatchWorkflowFailureDetail(id=id, message="Workflow not found.") + for id in not_found_ids + ] + failure_details += not_found_details + self.workflow_repository.delete_by_ids(ids=workflow_ids) + for workflow in workflows: + if workflow.workspace_id != workspace_id: + failure_details.append( + BatchWorkflowFailureDetail( + id=id, message="Workflow does not belong to workspace." + ) + ) + continue + await self.delete_workflow_files(workflow_uuid=workflow.uuid_name) + self.airflow_client.delete_dag(dag_id=workflow.uuid_name) + if failure_details: + return BatchWorkflowResponse( + result="Some workflows couldn't be deleted.", + details=failure_details + ) + return BatchWorkflowResponse( + result="success", + details=BatchWorkflowSuccessDetail( + message="Workflows successfully deleted." + ), + ) + except Exception as e: + self.logger.exception(e) + raise e + + def workflow_details(self, workflow_id: str): try: all_tasks_response = self.airflow_client.get_all_workflow_tasks(workflow_id=workflow_id).json()