From fc844b3c92845cbdd3ffbe5904b04138bbed06dc Mon Sep 17 00:00:00 2001 From: Mason Malone <651224+MasonM@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:00:44 -0700 Subject: [PATCH 1/4] test: allow configuring benchmark iterations Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b324d746cc53..2f2c93c77d3b 100644 --- a/Makefile +++ b/Makefile @@ -39,6 +39,7 @@ E2E_PARALLEL ?= 20 E2E_SUITE_TIMEOUT ?= 15m GOTEST ?= go test -v -p 20 ALL_BUILD_TAGS ?= api,cli,cron,executor,examples,corefunctional,functional,plugins +BENCHMARK_COUNT ?= 6 # should we build the static files? ifneq (,$(filter $(MAKECMDGOALS),codegen lint test docs start)) @@ -623,7 +624,7 @@ Test%: E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $(ALL_BUILD_TAGS) -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*' Benchmark%: - go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench . + go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -count=$(BENCHMARK_COUNT) -bench . # clean From 3ea4b943777908c548d6bd7146a42c372f3a5c0c Mon Sep 17 00:00:00 2001 From: Mason Malone <651224+MasonM@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:33:02 -0700 Subject: [PATCH 2/4] fix: further optimize archived list query Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> --- persist/sqldb/workflow_archive.go | 80 ++++++++++++++++--------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 7371fb37bedd..4283e6663be3 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -157,44 +157,58 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata + var cteSelector, selectQuery db.Selector + var baseSelector = r.session.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat") - selectQuery, err := selectArchivedWorkflowQuery(r.dbType) - if err != nil { - return nil, err + switch r.dbType { + case MySQL: + cteSelector = baseSelector.Columns( + db.Raw("coalesce(workflow->'$.metadata', '{}') as metadata"), + db.Raw("coalesce(workflow->'$.status', '{}') as status"), + db.Raw("workflow->>'$.spec.suspend' as suspend"), + ) + selectQuery = baseSelector.Columns( + db.Raw("coalesce(metadata->'$.labels', '{}') as labels"), + db.Raw("coalesce(metadata->'$.annotations', '{}') as annotations"), + db.Raw("coalesce(status->>'$.progress', '') as progress"), + db.Raw("coalesce(metadata->>'$.creationTimestamp', '') as creationtimestamp"), + "suspend", + db.Raw("coalesce(status->>'$.message', '') as message"), + db.Raw("coalesce(status->>'$.estimatedDuration', '0') as estimatedduration"), + db.Raw("coalesce(status->'$.resourcesDuration', '{}') as resourcesduration"), + ) + case Postgres: + cteSelector = baseSelector.Columns( + db.Raw("coalesce(workflow->'metadata', '{}') as metadata"), + db.Raw("coalesce(workflow->'status', '{}') as status"), + db.Raw("workflow->'spec'->>'suspend' as suspend"), + ) + selectQuery = baseSelector.Columns( + db.Raw("coalesce(metadata->>'labels', '{}') as labels"), + db.Raw("coalesce(metadata->>'annotations', '{}') as annotations"), + db.Raw("coalesce(status->>'progress', '') as progress"), + db.Raw("coalesce(metadata->>'creationTimestamp', '') as creationtimestamp"), + "suspend", + db.Raw("coalesce(status->>'message', '') as message"), + db.Raw("coalesce(status->>'estimatedDuration', '0') as estimatedduration"), + db.Raw("coalesce(status->>'resourcesDuration', '{}') as resourcesduration"), + ) + default: + return nil, fmt.Errorf("unsupported db type %s", r.dbType) } - subSelector := r.session.SQL(). - Select(db.Raw("uid")). + cteSelector = cteSelector. From(archiveTableName). Where(r.clusterManagedNamespaceAndInstanceID()) - subSelector, err = BuildArchivedWorkflowSelector(subSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) if err != nil { return nil, err } - if r.dbType == MySQL { - // workaround for mysql 42000 error (Unsupported subquery syntax): - // - // Error 1235 (42000): This version of MySQL doesn't yet support 'LIMIT \u0026 IN/ALL/ANY/SOME subquery' - // - // more context: - // * https://dev.mysql.com/doc/refman/8.0/en/subquery-errors.html - // * https://dev.to/gkoniaris/limit-mysql-subquery-results-inside-a-where-in-clause-using-laravel-s-eloquent-orm-26en - subSelector = r.session.SQL().Select(db.Raw("*")).From(subSelector).As("x") - } - - // why a subquery? the json unmarshal triggers for every row in the filter - // query. by filtering on uid first, we delay json parsing until a single - // row, speeding up the query(e.g. up to 257 times faster for some - // deployments). - // - // more context: https://github.com/argoproj/argo-workflows/pull/13566 - selector := r.session.SQL().Select(selectQuery).From(archiveTableName).Where( - r.clusterManagedNamespaceAndInstanceID().And(db.Cond{"uid IN": subSelector}), - ) - - err = selector.All(&archivedWfs) + err = r.session.SQL(). + Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")). + All(&archivedWfs) if err != nil { return nil, err } @@ -446,13 +460,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows") return nil } - -func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) { - switch t { - case MySQL: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil - case Postgres: - return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->'metadata'->>'labels', '{}') as labels, coalesce(workflow->'metadata'->>'annotations', '{}') as annotations, coalesce(workflow->'status'->>'progress', '') as progress, coalesce(workflow->'metadata'->>'creationTimestamp', '') as creationtimestamp, workflow->'spec'->>'suspend' as suspend, coalesce(workflow->'status'->>'message', '') as message, coalesce(workflow->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce(workflow->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil - } - return nil, fmt.Errorf("unsupported db type %s", t) -} From 6a90a9b2d9b790bf8ee53293e895375029c25fe6 Mon Sep 17 00:00:00 2001 From: Mason Malone <651224+MasonM@users.noreply.github.com> Date: Fri, 25 Oct 2024 17:38:51 -0700 Subject: [PATCH 3/4] fix: optimize MySQL variant Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> --- persist/sqldb/workflow_archive.go | 83 +++++++++++++++++-------------- 1 file changed, 45 insertions(+), 38 deletions(-) diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index 4283e6663be3..f19ab5f950cd 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -157,33 +157,49 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error { func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) { var archivedWfs []archivedWorkflowMetadata - var cteSelector, selectQuery db.Selector var baseSelector = r.session.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat") switch r.dbType { case MySQL: - cteSelector = baseSelector.Columns( - db.Raw("coalesce(workflow->'$.metadata', '{}') as metadata"), - db.Raw("coalesce(workflow->'$.status', '{}') as status"), - db.Raw("workflow->>'$.spec.suspend' as suspend"), - ) - selectQuery = baseSelector.Columns( - db.Raw("coalesce(metadata->'$.labels', '{}') as labels"), - db.Raw("coalesce(metadata->'$.annotations', '{}') as annotations"), - db.Raw("coalesce(status->>'$.progress', '') as progress"), - db.Raw("coalesce(metadata->>'$.creationTimestamp', '') as creationtimestamp"), - "suspend", - db.Raw("coalesce(status->>'$.message', '') as message"), - db.Raw("coalesce(status->>'$.estimatedDuration', '0') as estimatedduration"), - db.Raw("coalesce(status->'$.resourcesDuration', '{}') as resourcesduration"), - ) + selectQuery := baseSelector. + Columns( + db.Raw("coalesce(workflow->'$.metadata.labels', '{}') as labels"), + db.Raw("coalesce(workflow->'$.metadata.annotations', '{}') as annotations"), + db.Raw("coalesce(workflow->>'$.status.progress', '') as progress"), + db.Raw("coalesce(workflow->>'$.metadata.creationTimestamp', '') as creationtimestamp"), + db.Raw("workflow->>'$.spec.suspend'"), + db.Raw("coalesce(workflow->>'$.status.message', '') as message"), + db.Raw("coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration"), + db.Raw("coalesce(workflow->'$.status.resourcesDuration', '{}') as resourcesduration"), + ). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()) + + selectQuery, err := BuildArchivedWorkflowSelector(selectQuery, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + if err != nil { + return nil, err + } + + err = selectQuery.All(&archivedWfs) + if err != nil { + return nil, err + } case Postgres: - cteSelector = baseSelector.Columns( - db.Raw("coalesce(workflow->'metadata', '{}') as metadata"), - db.Raw("coalesce(workflow->'status', '{}') as status"), - db.Raw("workflow->'spec'->>'suspend' as suspend"), - ) - selectQuery = baseSelector.Columns( + cteSelector := baseSelector. + Columns( + db.Raw("coalesce(workflow->'metadata', '{}') as metadata"), + db.Raw("coalesce(workflow->'status', '{}') as status"), + db.Raw("workflow->'spec'->>'suspend' as suspend"), + ). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()) + + cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) + if err != nil { + return nil, err + } + + selectQuery := baseSelector.Columns( db.Raw("coalesce(metadata->>'labels', '{}') as labels"), db.Raw("coalesce(metadata->>'annotations', '{}') as annotations"), db.Raw("coalesce(status->>'progress', '') as progress"), @@ -193,26 +209,17 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl db.Raw("coalesce(status->>'estimatedDuration', '0') as estimatedduration"), db.Raw("coalesce(status->>'resourcesDuration', '{}') as resourcesduration"), ) + + err = r.session.SQL(). + Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")). + All(&archivedWfs) + if err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported db type %s", r.dbType) } - cteSelector = cteSelector. - From(archiveTableName). - Where(r.clusterManagedNamespaceAndInstanceID()) - - cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false) - if err != nil { - return nil, err - } - - err = r.session.SQL(). - Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")). - All(&archivedWfs) - if err != nil { - return nil, err - } - wfs := make(wfv1.Workflows, len(archivedWfs)) for i, md := range archivedWfs { labels := make(map[string]string) From 01bbc41240ed564f7b7e7c9f684d873841e45053 Mon Sep 17 00:00:00 2001 From: Mason Malone <651224+MasonM@users.noreply.github.com> Date: Fri, 25 Oct 2024 17:44:19 -0700 Subject: [PATCH 4/4] fix: add migration to update argo_archived_workflows_i4 index Signed-off-by: Mason Malone <651224+MasonM@users.noreply.github.com> --- persist/sqldb/migrate.go | 6 ++++++ persist/sqldb/workflow_archive.go | 2 ++ 2 files changed, 8 insertions(+) diff --git a/persist/sqldb/migrate.go b/persist/sqldb/migrate.go index 633dfd2ff31a..336c33e98304 100644 --- a/persist/sqldb/migrate.go +++ b/persist/sqldb/migrate.go @@ -269,6 +269,12 @@ func (m migrate) Exec(ctx context.Context) (err error) { noop{}, ansiSQLChange(`alter table argo_archived_workflows alter column workflow set data type jsonb using workflow::jsonb`), ), + // change argo_archived_workflows_i4 index to include clustername so MySQL uses it for listing archived workflows. #13601 + ternary(dbType == MySQL, + ansiSQLChange(`drop index argo_archived_workflows_i4 on argo_archived_workflows`), + ansiSQLChange(`drop index argo_archived_workflows_i4`), + ), + ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)`), } { err := m.applyChange(changeSchemaVersion, change) if err != nil { diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index f19ab5f950cd..f6d40f4a6e1e 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -185,6 +185,8 @@ func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workfl return nil, err } case Postgres: + // Use a common table expression to reduce detoast overhead for the "workflow" column: + // https://github.com/argoproj/argo-workflows/issues/13601#issuecomment-2420499551 cteSelector := baseSelector. Columns( db.Raw("coalesce(workflow->'metadata', '{}') as metadata"),