diff --git a/Makefile b/Makefile index c51b9e3d77e7..c185578381f9 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,7 @@ E2E_PARALLEL ?= 20 E2E_SUITE_TIMEOUT ?= 15m GOTEST ?= go test -v -p 20 ALL_BUILD_TAGS ?= api,cli,cron,executor,examples,corefunctional,functional,plugins +BENCHMARK_COUNT ?= 6 # should we build the static files? ifneq (,$(filter $(MAKECMDGOALS),codegen lint test docs start)) @@ -612,7 +613,7 @@ Test%: E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $(ALL_BUILD_TAGS) -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*' Benchmark%: - go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench '$@' . + go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -count=$(BENCHMARK_COUNT) -bench . # clean diff --git a/persist/sqldb/migrate.go b/persist/sqldb/migrate.go index 633dfd2ff31a..336c33e98304 100644 --- a/persist/sqldb/migrate.go +++ b/persist/sqldb/migrate.go @@ -269,6 +269,12 @@ func (m migrate) Exec(ctx context.Context) (err error) { noop{}, ansiSQLChange(`alter table argo_archived_workflows alter column workflow set data type jsonb using workflow::jsonb`), ), + // change argo_archived_workflows_i4 index to include clustername so MySQL uses it for listing archived workflows. #13601 + ternary(dbType == MySQL, + ansiSQLChange(`drop index argo_archived_workflows_i4 on argo_archived_workflows`), + ansiSQLChange(`drop index argo_archived_workflows_i4`), + ), + ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)`), } { err := m.applyChange(changeSchemaVersion, change) if err != nil { diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 7371fb37bedd..f6d40f4a6e1e 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -157,46 +157,69 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata + var baseSelector = r.session.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat") - selectQuery, err := selectArchivedWorkflowQuery(r.dbType) - if err != nil { - return nil, err - } + switch r.dbType { + case MySQL: + selectQuery := baseSelector. + Columns( + db.Raw("coalesce(workflow->'$.metadata.labels', '{}') as labels"), + db.Raw("coalesce(workflow->'$.metadata.annotations', '{}') as annotations"), + db.Raw("coalesce(workflow->>'$.status.progress', '') as progress"), + db.Raw("coalesce(workflow->>'$.metadata.creationTimestamp', '') as creationtimestamp"), + db.Raw("workflow->>'$.spec.suspend'"), + db.Raw("coalesce(workflow->>'$.status.message', '') as message"), + db.Raw("coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration"), + db.Raw("coalesce(workflow->'$.status.resourcesDuration', '{}') as resourcesduration"), + ). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()) - subSelector := r.session.SQL(). - Select(db.Raw("uid")). - From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()) + selectQuery, err := BuildArchivedWorkflowSelector(selectQuery, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + if err != nil { + return nil, err + } - subSelector, err = BuildArchivedWorkflowSelector(subSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) - if err != nil { - return nil, err - } + err = selectQuery.All(&archivedWfs) + if err != nil { + return nil, err + } + case Postgres: + // Use a common table expression to reduce detoast overhead for the "workflow" column: + // https://github.com/argoproj/argo-workflows/issues/13601#issuecomment-2420499551 + cteSelector := baseSelector. + Columns( + db.Raw("coalesce(workflow->'metadata', '{}') as metadata"), + db.Raw("coalesce(workflow->'status', '{}') as status"), + db.Raw("workflow->'spec'->>'suspend' as suspend"), + ). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()) - if r.dbType == MySQL { - // workaround for mysql 42000 error (Unsupported subquery syntax): - // - // Error 1235 (42000): This version of MySQL doesn't yet support 'LIMIT \u0026 IN/ALL/ANY/SOME subquery' - // - // more context: - // * https://dev.mysql.com/doc/refman/8.0/en/subquery-errors.html - // * https://dev.to/gkoniaris/limit-mysql-subquery-results-inside-a-where-in-clause-using-laravel-s-eloquent-orm-26en - subSelector = r.session.SQL().Select(db.Raw("*")).From(subSelector).As("x") - } + cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + if err != nil { + return nil, err + } - // why a subquery? the json unmarshal triggers for every row in the filter - // query. by filtering on uid first, we delay json parsing until a single - // row, speeding up the query(e.g. up to 257 times faster for some - // deployments). - // - // more context: https://github.com/argoproj/argo-workflows/pull/13566 - selector := r.session.SQL().Select(selectQuery).From(archiveTableName).Where( - r.clusterManagedNamespaceAndInstanceID().And(db.Cond{"uid IN": subSelector}), - ) + selectQuery := baseSelector.Columns( + db.Raw("coalesce(metadata->>'labels', '{}') as labels"), + db.Raw("coalesce(metadata->>'annotations', '{}') as annotations"), + db.Raw("coalesce(status->>'progress', '') as progress"), + db.Raw("coalesce(metadata->>'creationTimestamp', '') as creationtimestamp"), + "suspend", + db.Raw("coalesce(status->>'message', '') as message"), + db.Raw("coalesce(status->>'estimatedDuration', '0') as estimatedduration"), + db.Raw("coalesce(status->>'resourcesDuration', '{}') as resourcesduration"), + ) - err = selector.All(&archivedWfs) - if err != nil { - return nil, err + err = r.session.SQL(). + Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")). + All(&archivedWfs) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unsupported db type %s", r.dbType) } wfs := make(wfv1.Workflows, len(archivedWfs)) @@ -446,13 +469,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") return nil } - -func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) { - switch t { - case MySQL: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil - case Postgres: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->'metadata'->>'labels', '{}') as labels, coalesce(workflow->'metadata'->>'annotations', '{}') as annotations, coalesce(workflow->'status'->>'progress', '') as progress, coalesce(workflow->'metadata'->>'creationTimestamp', '') as creationtimestamp, workflow->'spec'->>'suspend' as suspend, coalesce(workflow->'status'->>'message', '') as message, coalesce(workflow->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce(workflow->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil - } - return nil, fmt.Errorf("unsupported db type %s", t) -}