Skip to content

Commit

Permalink
Eng 2735 Add order by and limit parameters for V2 workflow results (#…
Browse files Browse the repository at this point in the history
…1240)

Co-authored-by: Eunice Chan <[email protected]>
  • Loading branch information
eunice-chan and eunice-chan authored Apr 26, 2023
1 parent e739720 commit f7e889e
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 15 deletions.
71 changes: 71 additions & 0 deletions integration_tests/backend/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@


class TestBackend:
# V2
GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows"
GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"

LIST_WORKFLOW_SAVED_OBJECTS_TEMPLATE = "/api/workflow/%s/objects"
GET_TEST_INTEGRATION_TEMPLATE = "/api/integration/%s/test"
Expand Down Expand Up @@ -318,3 +320,72 @@ def test_endpoint_workflows_get(self):
for key in keys:
assert key in v2_workflow
assert v2_workflow["user_id"] == user_id

def test_endpoint_dag_results_get(self):
flow_id, n_runs = self.flows["flow_with_metrics_and_checks"]
resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json()

assert len(resp) == n_runs

fields = ["id", "dag_id", "exec_state"]

def check_structure(resp, all_succeeded=False):
for result in resp:
for field in fields:
assert field in result
if all_succeeded:
assert result["exec_state"]["status"] == "succeeded"
assert result["exec_state"]["failure_type"] == None
assert result["exec_state"]["error"] == None

check_structure(resp, all_succeeded=True)

# Using the order parameter
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status",
).json()

check_structure(resp)
statuses = [result["exec_state"]["status"] for result in resp]
sorted_statuses = sorted(statuses, reverse=True) # Descending order
assert statuses == sorted_statuses

# Default is descending
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=true",
).json()

check_structure(resp)
descending_statuses = [result["exec_state"]["status"] for result in resp]
assert statuses == descending_statuses

# Ascending works
flow_id, n_runs = self.flows["flow_with_failure"]
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&order_descending=false",
).json()

check_structure(resp)
ascending_statuses = [result["exec_state"]["status"] for result in resp]
assert descending_statuses[::-1] == ascending_statuses

# Using the limit parameter
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?limit=1",
).json()

check_structure(resp)
assert len(resp) == 1

# Using both the order and limit parameters
resp = self.get_response(
self.GET_DAG_RESULTS_TEMPLATE % flow_id + "?order_by=status&limit=1",
).json()

check_structure(resp)
workflow_status = [result["exec_state"]["status"] for result in resp]
assert len(workflow_status) == 1
workflow_status = workflow_status[0]
assert workflow_status == sorted_statuses[0]
4 changes: 3 additions & 1 deletion src/golang/cmd/server/handler/get_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (h *GetWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfac
dags[dbDAG.ID] = constructedDAG
}

dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, "", -1, true, h.Database)
if err != nil {
return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error occurred when retrieving workflow.")
}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/cmd/server/handler/get_workflow_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func (h *GetWorkflowHistoryHandler) Perform(ctx context.Context, interfaceArgs i
return nil, http.StatusBadRequest, errors.Wrap(err, fmt.Sprintf("Workflow %v does not exist.", args.workflowId))
}

results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, h.Database)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
results, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowId, "", -1, true, h.Database)
if err != nil && err != database.ErrNoRows() { // Don't return an error if there are just no rows.
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error while retrieving workflow runs.")
}
Expand Down
39 changes: 35 additions & 4 deletions src/golang/cmd/server/handler/v2/dag_results_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,30 @@ import (
// Method: GET
// Params:
// `workflowId`: ID for `workflow` object
// `dagResultID`: ID for `workflow_dag_result` object
// Request:
// Headers:
// `api-key`: user's API Key
// Parameters:
// `order_by`:
// Optional single field that the query should be ordered. Requires the table prefix.
// `order_descending`:
// Optional boolean specifying whether order_by should be ascending or descending.
// `limit`:
// Optional limit on the number of storage migrations returned. Defaults to all of them.
// Response:
// Body:
// serialized `[]response.DAGResult`

type dagResultsGetArgs struct {
*aq_context.AqContext
workflowID uuid.UUID

// A nil value means that the order is not set.
orderBy string
// Default is descending (true).
orderDescending bool
// A negative value for limit (eg. -1) means that the limit is not set.
limit int
}

type DAGResultsGetHandler struct {
Expand All @@ -60,9 +73,27 @@ func (h *DAGResultsGetHandler) Prepare(r *http.Request) (interface{}, int, error
return nil, http.StatusBadRequest, err
}

limit, err := (parser.LimitQueryParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

orderBy, err := (parser.OrderByQueryParser{}).Parse(r, models.AllDAGResultCols())
if err != nil {
return nil, http.StatusBadRequest, err
}

descending, err := (parser.OrderDescendingQueryParser{}).Parse(r)
if err != nil {
return nil, http.StatusBadRequest, err
}

return &dagResultsGetArgs{
AqContext: aqContext,
workflowID: workflowID,
AqContext: aqContext,
workflowID: workflowID,
orderBy: orderBy,
orderDescending: descending,
limit: limit,
}, http.StatusOK, nil
}

Expand All @@ -83,7 +114,7 @@ func (h *DAGResultsGetHandler) Perform(ctx context.Context, interfaceArgs interf
return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
}

dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
dbDAGResults, err := h.DAGResultRepo.GetByWorkflow(ctx, args.workflowID, args.orderBy, args.limit, args.orderDescending, h.Database)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.")
}
Expand Down
25 changes: 25 additions & 0 deletions src/golang/cmd/server/request/parser/query_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package parser

import (
"net/http"
"strconv"

"github.com/dropbox/godropbox/errors"
)

type LimitQueryParser struct{}

func (LimitQueryParser) Parse(r *http.Request) (int, error) {
query := r.URL.Query()

var err error
limit := -1
if limitVal := query.Get("limit"); len(limitVal) > 0 {
limit, err = strconv.Atoi(limitVal)
if err != nil {
return -1, errors.Wrap(err, "Invalid limit parameter.")
}
}

return limit, nil
}
32 changes: 32 additions & 0 deletions src/golang/cmd/server/request/parser/query_order_by.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package parser

import (
"net/http"

"github.com/dropbox/godropbox/errors"
)

type OrderByQueryParser struct{}

func (OrderByQueryParser) Parse(r *http.Request, tableColumns []string) (string, error) {
query := r.URL.Query()

var err error
var orderBy string
if orderByVal := query.Get("order_by"); len(orderByVal) > 0 {
// Check is a field in table
isColumn := false
for _, column := range tableColumns {
if column == orderByVal {
isColumn = true
break
}
}
if !isColumn {
return "", errors.Wrap(err, "Invalid order_by value.")
}
orderBy = orderByVal
}

return orderBy, nil
}
31 changes: 31 additions & 0 deletions src/golang/cmd/server/request/parser/query_order_descending.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package parser

import (
"net/http"
"strings"

"github.com/dropbox/godropbox/errors"
)

type OrderDescendingQueryParser struct{}

func (OrderDescendingQueryParser) Parse(r *http.Request) (bool, error) {
query := r.URL.Query()

var err error
orderDescending := true
if orderDescendingVal := query.Get("order_descending"); len(orderDescendingVal) > 0 {
orderDescendingVal = strings.ToLower(orderDescendingVal)
if orderDescendingVal == "true" {
return true, nil
}

if orderDescendingVal == "false" {
return false, nil
}

return true, errors.Wrap(err, "Invalid order_descending value.")
}

return orderDescending, nil
}
4 changes: 3 additions & 1 deletion src/golang/lib/airflow/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func syncWorkflowDag(
return err
}

dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, DB)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResults, err := dagResultRepo.GetByWorkflow(ctx, dag.WorkflowID, "", -1, true, DB)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ func (eng *aqEngine) DeleteWorkflow(
dagIDs = append(dagIDs, dag.ID)
}

dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, txn)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
dagResultsToDelete, err := eng.DAGResultRepo.GetByWorkflow(ctx, workflowObj.ID, "", -1, true, txn)
if err != nil {
return errors.Wrap(err, "Unexpected error occurred while retrieving workflow dag results.")
}
Expand Down
6 changes: 3 additions & 3 deletions src/golang/lib/models/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ type DAGResult struct {

// DAGResultCols returns a comma-separated string of all DAGResult columns.
func DAGResultCols() string {
return strings.Join(allDAGResultCols(), ",")
return strings.Join(AllDAGResultCols(), ",")
}

// DAGResultColsWithPrefix returns a comma-separated string of all
// DAGResult columns prefixed by the table name.
func DAGResultColsWithPrefix() string {
cols := allDAGResultCols()
cols := AllDAGResultCols()
for i, col := range cols {
cols[i] = fmt.Sprintf("%s.%s", DAGResultTable, col)
}

return strings.Join(cols, ",")
}

func allDAGResultCols() []string {
func AllDAGResultCols() []string {
return []string{
DAGResultID,
DAGResultDagID,
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/repos/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type dagResultReader interface {
GetBatch(ctx context.Context, IDs []uuid.UUID, DB database.Database) ([]models.DAGResult, error)

// GetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID.
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error)
GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error)

// GetKOffsetByWorkflow returns the DAGResults of all DAGs associated with the Workflow with workflowID
// except for the last k DAGResults ordered by DAGResult.CreatedAt.
Expand Down
23 changes: 21 additions & 2 deletions src/golang/lib/repos/sqlite/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqlite
import (
"context"
"fmt"
"strconv"
"time"

"github.com/aqueducthq/aqueduct/lib/database"
Expand Down Expand Up @@ -52,13 +53,31 @@ func (*dagResultReader) GetBatch(ctx context.Context, IDs []uuid.UUID, DB databa
return getDAGResults(ctx, DB, query, args...)
}

func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, DB database.Database) ([]models.DAGResult, error) {
func (*dagResultReader) GetByWorkflow(ctx context.Context, workflowID uuid.UUID, orderBy string, limit int, orderDescending bool, DB database.Database) ([]models.DAGResult, error) {
var orderByQuery string
if len(orderBy) > 0 {
orderByQuery = fmt.Sprintf(" ORDER BY %s.%s", models.DAGResultTable, orderBy)
if orderDescending {
orderByQuery = orderByQuery + " DESC"
} else {
orderByQuery = orderByQuery + " ASC"
}
}

var limitQuery string
if limit == 0 {
return []models.DAGResult{}, nil
}
if limit > 0 {
limitQuery = fmt.Sprintf(" LIMIT %s", strconv.Itoa(limit))
}

query := fmt.Sprintf(
`SELECT %s
FROM workflow_dag_result, workflow_dag
WHERE
workflow_dag_result.workflow_dag_id = workflow_dag.id
AND workflow_dag.workflow_id = $1;`,
AND workflow_dag.workflow_id = $1`+orderByQuery+limitQuery+`;`,
models.DAGResultColsWithPrefix(),
)
args := []interface{}{workflowID}
Expand Down
4 changes: 3 additions & 1 deletion src/golang/lib/repos/tests/dag_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func (ts *TestSuite) TestDAGResult_GetByWorkflow() {

expectedDAGResults := ts.seedDAGResultWithDAG(2, []uuid.UUID{dag.ID, dag.ID})

actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, ts.DB)
// Default values to not have an order and not have a limit: Empty string for order_by, -1 for limit
// Set true for order_by order (desc/asc) because doesn't matter.
actualDAGResults, err := ts.dagResult.GetByWorkflow(ts.ctx, dag.WorkflowID, "", -1, true, ts.DB)
require.Nil(ts.T(), err)
requireDeepEqualDAGResults(ts.T(), expectedDAGResults, actualDAGResults)
}
Expand Down

0 comments on commit f7e889e

Please sign in to comment.