From f7e889e9368e1408db4d04cd44d9088bf319248e Mon Sep 17 00:00:00 2001 From: eunice-chan <30596854+eunice-chan@users.noreply.github.com> Date: Tue, 25 Apr 2023 19:29:19 -0500 Subject: [PATCH] Eng 2735 Add order by and limit parameters for V2 workflow results (#1240) Co-authored-by: Eunice Chan --- integration_tests/backend/test_reads.py | 71 +++++++++++++++++++ src/golang/cmd/server/handler/get_workflow.go | 4 +- .../server/handler/get_workflow_history.go | 4 +- .../cmd/server/handler/v2/dag_results_get.go | 39 ++++++++-- .../cmd/server/request/parser/query_limit.go | 25 +++++++ .../server/request/parser/query_order_by.go | 32 +++++++++ .../request/parser/query_order_descending.go | 31 ++++++++ src/golang/lib/airflow/sync.go | 4 +- src/golang/lib/engine/aq_engine.go | 4 +- src/golang/lib/models/dag_result.go | 6 +- src/golang/lib/repos/dag_result.go | 2 +- src/golang/lib/repos/sqlite/dag_result.go | 23 +++++- src/golang/lib/repos/tests/dag_result.go | 4 +- 13 files changed, 234 insertions(+), 15 deletions(-) create mode 100644 src/golang/cmd/server/request/parser/query_limit.go create mode 100644 src/golang/cmd/server/request/parser/query_order_by.go create mode 100644 src/golang/cmd/server/request/parser/query_order_descending.go diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py index b47f4a163..ba4a67c12 100644 --- a/integration_tests/backend/test_reads.py +++ b/integration_tests/backend/test_reads.py @@ -17,7 +17,9 @@ class TestBackend: + # V2 GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows" + GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results" LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects" GET_TEST_INTEGRATION_TEMPLATE = "/api/integration/%s/test" @@ -318,3 +320,72 @@ def test_endpoint_workflows_get(self): for key in keys: assert key in v2_workflow assert v2_workflow["user_id"] == user_id + + def test_endpoint_dag_results_get(self): + flow_id, n_runs = self.flows["flow_with_metrics_and_checks"] + resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json() + + assert len(resp) == n_runs + + fields = ["id", "dag_id", "exec_state"] + + def check_structure(resp, all_succeeded=False): + for result in resp: + for field in fields: + assert field in result + if all_succeeded: + assert result["exec_state"]["status"] == "succeeded" + assert result["exec_state"]["failure_type"] == None + assert result["exec_state"]["error"] == None + + check_structure(resp, all_succeeded=True) + + # Using the order parameter + flow_id, n_runs = self.flows["flow_with_failure"] + resp = self.get_response( + self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status", + ).json() + + check_structure(resp) + statuses = [result["exec_state"]["status"] for result in resp] + sorted_statuses = sorted(statuses, reverse=True) # Descending order + assert statuses == sorted_statuses + + # Default is descending + flow_id, n_runs = self.flows["flow_with_failure"] + resp = self.get_response( + self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=true", + ).json() + + check_structure(resp) + descending_statuses = [result["exec_state"]["status"] for result in resp] + assert statuses == descending_statuses + + # Ascending works + flow_id, n_runs = self.flows["flow_with_failure"] + resp = self.get_response( + self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=false", + ).json() + + check_structure(resp) + ascending_statuses = [result["exec_state"]["status"] for result in resp] + assert descending_statuses[::-1] == ascending_statuses + + # Using the limit parameter + resp = self.get_response( + self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?limit=1", + ).json() + + check_structure(resp) + assert len(resp) == 1 + + # Using both the order and limit parameters + resp = self.get_response( + self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&limit=1", + ).json() + + check_structure(resp) + workflow_status = [result["exec_state"]["status"] for result in resp] + assert len(workflow_status) == 1 + workflow_status = workflow_status[0] + assert workflow_status == sorted_statuses[0] diff --git a/src/golang/cmd/server/handler/get_workflow.go b/src/golang/cmd/server/handler/get_workflow.go index 4eb75f365..df9ebec68 100644 --- a/src/golang/cmd/server/handler/get_workflow.go +++ b/src/golang/cmd/server/handler/get_workflow.go @@ -184,7 +184,9 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac dags[dbDAG.ID] = constructedDAG } - dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database) + // Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit + // Set true for order_by order (desc/asc) because doesn't matter. + dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, "", -1, true, h.Database) if err != nil { return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.") } diff --git a/src/golang/cmd/server/handler/get_workflow_history.go b/src/golang/cmd/server/handler/get_workflow_history.go index 51e2e24f5..091f4cdef 100644 --- a/src/golang/cmd/server/handler/get_workflow_history.go +++ b/src/golang/cmd/server/handler/get_workflow_history.go @@ -97,7 +97,9 @@ func (h *GetWorkflowHistoryHandler) Perform(ctx context.Context, interfaceArgs i return nil, http.StatusBadRequest, errors.Wrap(err, fmt.Sprintf("Workflow %v does not exist.", args.workflowId)) } - results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, h.Database) + // Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit + // Set true for order_by order (desc/asc) because doesn't matter. + results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, "", -1, true, h.Database) if err != nil && err != database.ErrNoRows() { // Don't return an error if there are just no rows. return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error while retrieving workflow runs.") } diff --git a/src/golang/cmd/server/handler/v2/dag_results_get.go b/src/golang/cmd/server/handler/v2/dag_results_get.go index a9d9412b1..cee60dfd8 100644 --- a/src/golang/cmd/server/handler/v2/dag_results_get.go +++ b/src/golang/cmd/server/handler/v2/dag_results_get.go @@ -23,10 +23,16 @@ import ( // Method: GET // Params: // `workflowId`: ID for `workflow` object -// `dagResultID`: ID for `workflow_dag_result` object // Request: // Headers: // `api-key`: user's API Key +// Parameters: +// `order_by`: +// Optional single field that the query should be ordered. Requires the table prefix. +// `order_descending`: +// Optional boolean specifying whether order_by should be ascending or descending. +// `limit`: +// Optional limit on the number of storage migrations returned. Defaults to all of them. // Response: // Body: // serialized `[]response.DAGResult` @@ -34,6 +40,13 @@ import ( type dagResultsGetArgs struct { *aq_context.AqContext workflowID uuid.UUID + + // A nil value means that the order is not set. + orderBy string + // Default is descending (true). + orderDescending bool + // A negative value for limit (eg. -1) means that the limit is not set. + limit int } type DAGResultsGetHandler struct { @@ -60,9 +73,27 @@ func (h *DAGResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error return nil, http.StatusBadRequest, err } + limit, err := (parser.LimitQueryParser{}).Parse(r) + if err != nil { + return nil, http.StatusBadRequest, err + } + + orderBy, err := (parser.OrderByQueryParser{}).Parse(r, models.AllDAGResultCols()) + if err != nil { + return nil, http.StatusBadRequest, err + } + + descending, err := (parser.OrderDescendingQueryParser{}).Parse(r) + if err != nil { + return nil, http.StatusBadRequest, err + } + return &dagResultsGetArgs{ - AqContext: aqContext, - workflowID: workflowID, + AqContext: aqContext, + workflowID: workflowID, + orderBy: orderBy, + orderDescending: descending, + limit: limit, }, http.StatusOK, nil } @@ -83,7 +114,7 @@ func (h *DAGResultsGetHandler) Perform(ctx context.Context, interfaceArgs interf return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.") } - dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database) + dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, args.orderBy, args.limit, args.orderDescending, h.Database) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.") } diff --git a/src/golang/cmd/server/request/parser/query_limit.go b/src/golang/cmd/server/request/parser/query_limit.go new file mode 100644 index 000000000..f29a69a6f --- /dev/null +++ b/src/golang/cmd/server/request/parser/query_limit.go @@ -0,0 +1,25 @@ +package parser + +import ( + "net/http" + "strconv" + + "github.com/dropbox/godropbox/errors" +) + +type LimitQueryParser struct{} + +func (LimitQueryParser) Parse(r *http.Request) (int, error) { + query := r.URL.Query() + + var err error + limit := -1 + if limitVal := query.Get("limit"); len(limitVal) > 0 { + limit, err = strconv.Atoi(limitVal) + if err != nil { + return -1, errors.Wrap(err, "Invalid limit parameter.") + } + } + + return limit, nil +} diff --git a/src/golang/cmd/server/request/parser/query_order_by.go b/src/golang/cmd/server/request/parser/query_order_by.go new file mode 100644 index 000000000..ba10f63b3 --- /dev/null +++ b/src/golang/cmd/server/request/parser/query_order_by.go @@ -0,0 +1,32 @@ +package parser + +import ( + "net/http" + + "github.com/dropbox/godropbox/errors" +) + +type OrderByQueryParser struct{} + +func (OrderByQueryParser) Parse(r *http.Request, tableColumns []string) (string, error) { + query := r.URL.Query() + + var err error + var orderBy string + if orderByVal := query.Get("order_by"); len(orderByVal) > 0 { + // Check is a field in table + isColumn := false + for _, column := range tableColumns { + if column == orderByVal { + isColumn = true + break + } + } + if !isColumn { + return "", errors.Wrap(err, "Invalid order_by value.") + } + orderBy = orderByVal + } + + return orderBy, nil +} diff --git a/src/golang/cmd/server/request/parser/query_order_descending.go b/src/golang/cmd/server/request/parser/query_order_descending.go new file mode 100644 index 000000000..0cf19235d --- /dev/null +++ b/src/golang/cmd/server/request/parser/query_order_descending.go @@ -0,0 +1,31 @@ +package parser + +import ( + "net/http" + "strings" + + "github.com/dropbox/godropbox/errors" +) + +type OrderDescendingQueryParser struct{} + +func (OrderDescendingQueryParser) Parse(r *http.Request) (bool, error) { + query := r.URL.Query() + + var err error + orderDescending := true + if orderDescendingVal := query.Get("order_descending"); len(orderDescendingVal) > 0 { + orderDescendingVal = strings.ToLower(orderDescendingVal) + if orderDescendingVal == "true" { + return true, nil + } + + if orderDescendingVal == "false" { + return false, nil + } + + return true, errors.Wrap(err, "Invalid order_descending value.") + } + + return orderDescending, nil +} diff --git a/src/golang/lib/airflow/sync.go b/src/golang/lib/airflow/sync.go index 40c0727b6..c46ce4731 100644 --- a/src/golang/lib/airflow/sync.go +++ b/src/golang/lib/airflow/sync.go @@ -121,7 +121,9 @@ func syncWorkflowDag( return err } - dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, DB) + // Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit + // Set true for order_by order (desc/asc) because doesn't matter. + dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, "", -1, true, DB) if err != nil { return err } diff --git a/src/golang/lib/engine/aq_engine.go b/src/golang/lib/engine/aq_engine.go index 7c841c183..7254ef3d7 100644 --- a/src/golang/lib/engine/aq_engine.go +++ b/src/golang/lib/engine/aq_engine.go @@ -510,7 +510,9 @@ func (eng *aqEngine) DeleteWorkflow( dagIDs = append(dagIDs, dag.ID) } - dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, txn) + // Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit + // Set true for order_by order (desc/asc) because doesn't matter. + dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, "", -1, true, txn) if err != nil { return errors.Wrap(err, "Unexpected error occurred while retrieving workflow dag results.") } diff --git a/src/golang/lib/models/dag_result.go b/src/golang/lib/models/dag_result.go index af68a4fda..148b3f0d6 100644 --- a/src/golang/lib/models/dag_result.go +++ b/src/golang/lib/models/dag_result.go @@ -32,13 +32,13 @@ type DAGResult struct { // DAGResultCols returns a comma-separated string of all DAGResult columns. func DAGResultCols() string { - return strings.Join(allDAGResultCols(), ",") + return strings.Join(AllDAGResultCols(), ",") } // DAGResultColsWithPrefix returns a comma-separated string of all // DAGResult columns prefixed by the table name. func DAGResultColsWithPrefix() string { - cols := allDAGResultCols() + cols := AllDAGResultCols() for i, col := range cols { cols[i] = fmt.Sprintf("%s.%s", DAGResultTable, col) } @@ -46,7 +46,7 @@ func DAGResultColsWithPrefix() string { return strings.Join(cols, ",") } -func allDAGResultCols() []string { +func AllDAGResultCols() []string { return []string{ DAGResultID, DAGResultDagID, diff --git a/src/golang/lib/repos/dag_result.go b/src/golang/lib/repos/dag_result.go index ddde38200..d609f78bd 100644 --- a/src/golang/lib/repos/dag_result.go +++ b/src/golang/lib/repos/dag_result.go @@ -25,7 +25,7 @@ type dagResultReader interface { GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.DAGResult, error) // GetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID. - GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error) + GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error) // GetKOffsetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID // except for the last k DAGResults ordered by DAGResult.CreatedAt. diff --git a/src/golang/lib/repos/sqlite/dag_result.go b/src/golang/lib/repos/sqlite/dag_result.go index ff29c05ba..966e68b6c 100644 --- a/src/golang/lib/repos/sqlite/dag_result.go +++ b/src/golang/lib/repos/sqlite/dag_result.go @@ -3,6 +3,7 @@ package sqlite import ( "context" "fmt" + "strconv" "time" "github.com/aqueducthq/aqueduct/lib/database" @@ -52,13 +53,31 @@ func (*dagResultReader) GetBatch(ctx context.Context, IDs []uuid.UUID, DB databa return getDAGResults(ctx, DB, query, args...) } -func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error) { +func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error) { + var orderByQuery string + if len(orderBy) > 0 { + orderByQuery = fmt.Sprintf(" ORDER BY %s.%s", models.DAGResultTable, orderBy) + if orderDescending { + orderByQuery = orderByQuery + " DESC" + } else { + orderByQuery = orderByQuery + " ASC" + } + } + + var limitQuery string + if limit == 0 { + return []models.DAGResult{}, nil + } + if limit > 0 { + limitQuery = fmt.Sprintf(" LIMIT %s", strconv.Itoa(limit)) + } + query := fmt.Sprintf( `SELECT %s FROM workflow_dag_result, workflow_dag WHERE workflow_dag_result.workflow_dag_id = workflow_dag.id - AND workflow_dag.workflow_id = $1;`, + AND workflow_dag.workflow_id = $1`+orderByQuery+limitQuery+`;`, models.DAGResultColsWithPrefix(), ) args := []interface{}{workflowID} diff --git a/src/golang/lib/repos/tests/dag_result.go b/src/golang/lib/repos/tests/dag_result.go index 235621544..213e3095c 100644 --- a/src/golang/lib/repos/tests/dag_result.go +++ b/src/golang/lib/repos/tests/dag_result.go @@ -38,7 +38,9 @@ func (ts *TestSuite) TestDAGResult_GetByWorkflow() { expectedDAGResults := ts.seedDAGResultWithDAG(2, []uuid.UUID{dag.ID, dag.ID}) - actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, ts.DB) + // Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit + // Set true for order_by order (desc/asc) because doesn't matter. + actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, "", -1, true, ts.DB) require.Nil(ts.T(), err) requireDeepEqualDAGResults(ts.T(), expectedDAGResults, actualDAGResults) }