Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: further optimize archive workflow listing. Fixes #13601 #13819

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ E2E_PARALLEL ?= 20
E2E_SUITE_TIMEOUT ?= 15m
GOTEST ?= go test -v -p 20
ALL_BUILD_TAGS ?= api,cli,cron,executor,examples,corefunctional,functional,plugins
BENCHMARK_COUNT ?= 6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so benchstat gives confidence intervals. Without this, it'll show the message ¹ need >= 6 samples for confidence interval at level 0.95


# should we build the static files?
ifneq (,$(filter $(MAKECMDGOALS),codegen lint test docs start))
Expand Down Expand Up @@ -612,7 +613,7 @@ Test%:
E2E_WAIT_TIMEOUT=$(E2E_WAIT_TIMEOUT) go test -failfast -v -timeout $(E2E_SUITE_TIMEOUT) -count 1 --tags $(ALL_BUILD_TAGS) -parallel $(E2E_PARALLEL) ./test/e2e -run='.*/$*'

Benchmark%:
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -bench '$@' .
go test --tags $(ALL_BUILD_TAGS) ./test/e2e -run='$@' -benchmem -count=$(BENCHMARK_COUNT) -bench .

# clean

Expand Down
6 changes: 6 additions & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ func (m migrate) Exec(ctx context.Context) (err error) {
noop{},
ansiSQLChange(`alter table argo_archived_workflows alter column workflow set data type jsonb using workflow::jsonb`),
),
// change argo_archived_workflows_i4 index to include clustername so MySQL uses it for listing archived workflows. #13601
ternary(dbType == MySQL,
ansiSQLChange(`drop index argo_archived_workflows_i4 on argo_archived_workflows`),
ansiSQLChange(`drop index argo_archived_workflows_i4`),
),
ansiSQLChange(`create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)`),
Copy link

@agilgur5 agilgur5 Oct 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how quick does this run? quick enough to include in a patch version? i.e. to cherry-pick into 3.5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very quickly. Tested after populating my DB with 100,000 rows, all associated with the same clustername:

MySQL:

$ time go run ./hack/db migrate -d 'mysql:password@tcp/argo'
INFO[0000] Migrating database schema                     clusterName=default dbType=mysql
INFO[0000] applying database change                      change="drop index argo_archived_workflows_i4 on argo_archived_workflows" changeSchemaVersion=61
INFO[0000] applying database change                      change="create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)" changeSchemaVersion=62
2024/10/26 02:24:13     Session ID:     00001
        Query:          create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)
        Stack:          
                fmt.(*pp).handleMethods@/usr/local/go/src/fmt/print.go:673
                fmt.(*pp).printArg@/usr/local/go/src/fmt/print.go:756
                fmt.(*pp).doPrint@/usr/local/go/src/fmt/print.go:1208
                fmt.Append@/usr/local/go/src/fmt/print.go:289
                log.(*Logger).Print.func1@/usr/local/go/src/log/log.go:261
                log.(*Logger).output@/usr/local/go/src/log/log.go:238
                log.(*Logger).Print@/usr/local/go/src/log/log.go:260
                github.com/argoproj/argo-workflows/v3/persist/sqldb.ansiSQLChange.apply@/home/vscode/go/src/github.com/argoproj/argo-workflows/persist/sqldb/ansi_sql_change.go:11
                github.com/argoproj/argo-workflows/v3/persist/sqldb.migrate.applyChange.func1@/home/vscode/go/src/github.com/argoproj/argo-workflows/persist/sqldb/migrate.go:301
                github.com/argoproj/argo-workflows/v3/persist/sqldb.migrate.applyChange@/home/vscode/go/src/github.com/argoproj/argo-workflows/persist/sqldb/migrate.go:290
                github.com/argoproj/argo-workflows/v3/persist/sqldb.migrate.Exec@/home/vscode/go/src/github.com/argoproj/argo-workflows/persist/sqldb/migrate.go:279
                main.NewMigrateCommand.func1@/home/vscode/go/src/github.com/argoproj/argo-workflows/hack/db/main.go:50
                github.com/spf13/cobra.(*Command).execute@/home/vscode/go/pkg/mod/github.com/spf13/[email protected]/command.go:985
                github.com/spf13/cobra.(*Command).ExecuteC@/home/vscode/go/pkg/mod/github.com/spf13/[email protected]/command.go:1117
                github.com/spf13/cobra.(*Command).Execute@/home/vscode/go/pkg/mod/github.com/spf13/[email protected]/command.go:1041
                main.main@/home/vscode/go/src/github.com/argoproj/argo-workflows/hack/db/main.go:39
                runtime.main@/usr/local/go/src/runtime/proc.go:272
                runtime.goexit@/usr/local/go/src/runtime/asm_amd64.s:1700
        Rows affected:  0
        Last insert ID: 0
        Error:          upper: slow query
        Time taken:     0.95033s
        Context:        context.Background


real    0m1.758s
user    0m1.074s
sys     0m0.294s

PostgreSQL:

 $ time go run ./hack/db migrate
INFO[0000] Migrating database schema                     clusterName=default dbType=postgres
INFO[0000] applying database change                      change="drop index argo_archived_workflows_i4" changeSchemaVersion=61
INFO[0000] applying database change                      change="create index argo_archived_workflows_i4 on argo_archived_workflows (clustername, startedat)" changeSchemaVersion=62

real    0m0.868s
user    0m1.117s
sys     0m0.271s

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dope, great work again!

} {
err := m.applyChange(changeSchemaVersion, change)
if err != nil {
Expand Down
101 changes: 57 additions & 44 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,46 +157,69 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {

func (r *workflowArchive) ListWorkflows(options sutils.ListOptions) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowMetadata
var baseSelector = r.session.SQL().Select("name", "namespace", "uid", "phase", "startedat", "finishedat")

selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
if err != nil {
return nil, err
}
switch r.dbType {
case MySQL:
selectQuery := baseSelector.
Columns(
db.Raw("coalesce(workflow->'$.metadata.labels', '{}') as labels"),
db.Raw("coalesce(workflow->'$.metadata.annotations', '{}') as annotations"),
db.Raw("coalesce(workflow->>'$.status.progress', '') as progress"),
db.Raw("coalesce(workflow->>'$.metadata.creationTimestamp', '') as creationtimestamp"),
db.Raw("workflow->>'$.spec.suspend'"),
db.Raw("coalesce(workflow->>'$.status.message', '') as message"),
db.Raw("coalesce(workflow->>'$.status.estimatedDuration', '0') as estimatedduration"),
db.Raw("coalesce(workflow->'$.status.resourcesDuration', '{}') as resourcesduration"),
).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())

subSelector := r.session.SQL().
Select(db.Raw("uid")).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())
selectQuery, err := BuildArchivedWorkflowSelector(selectQuery, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}

subSelector, err = BuildArchivedWorkflowSelector(subSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}
err = selectQuery.All(&archivedWfs)
if err != nil {
return nil, err
}
case Postgres:
// Use a common table expression to reduce detoast overhead for the "workflow" column:
// https://github.com/argoproj/argo-workflows/issues/13601#issuecomment-2420499551
cteSelector := baseSelector.
Columns(
db.Raw("coalesce(workflow->'metadata', '{}') as metadata"),
db.Raw("coalesce(workflow->'status', '{}') as status"),
db.Raw("workflow->'spec'->>'suspend' as suspend"),
).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID())

if r.dbType == MySQL {
// workaround for mysql 42000 error (Unsupported subquery syntax):
//
// Error 1235 (42000): This version of MySQL doesn't yet support 'LIMIT \u0026 IN/ALL/ANY/SOME subquery'
//
// more context:
// * https://dev.mysql.com/doc/refman/8.0/en/subquery-errors.html
// * https://dev.to/gkoniaris/limit-mysql-subquery-results-inside-a-where-in-clause-using-laravel-s-eloquent-orm-26en
subSelector = r.session.SQL().Select(db.Raw("*")).From(subSelector).As("x")
}
cteSelector, err := BuildArchivedWorkflowSelector(cteSelector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return nil, err
}

// why a subquery? the json unmarshal triggers for every row in the filter
// query. by filtering on uid first, we delay json parsing until a single
// row, speeding up the query(e.g. up to 257 times faster for some
// deployments).
//
// more context: https://github.com/argoproj/argo-workflows/pull/13566
selector := r.session.SQL().Select(selectQuery).From(archiveTableName).Where(
r.clusterManagedNamespaceAndInstanceID().And(db.Cond{"uid IN": subSelector}),
)
selectQuery := baseSelector.Columns(
db.Raw("coalesce(metadata->>'labels', '{}') as labels"),
db.Raw("coalesce(metadata->>'annotations', '{}') as annotations"),
db.Raw("coalesce(status->>'progress', '') as progress"),
db.Raw("coalesce(metadata->>'creationTimestamp', '') as creationtimestamp"),
"suspend",
db.Raw("coalesce(status->>'message', '') as message"),
db.Raw("coalesce(status->>'estimatedDuration', '0') as estimatedduration"),
db.Raw("coalesce(status->>'resourcesDuration', '{}') as resourcesduration"),
)

err = selector.All(&archivedWfs)
if err != nil {
return nil, err
err = r.session.SQL().
Iterator("WITH workflows AS ? ?", cteSelector, selectQuery.From("workflows")).
All(&archivedWfs)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported db type %s", r.dbType)
}

wfs := make(wfv1.Workflows, len(archivedWfs))
Expand Down Expand Up @@ -446,13 +469,3 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
return nil
}

func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
switch t {
case MySQL:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(JSON_EXTRACT(workflow,'$.metadata.labels'), '{}') as labels,coalesce(JSON_EXTRACT(workflow,'$.metadata.annotations'), '{}') as annotations, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.progress')), '') as progress, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.metadata.creationTimestamp')), '') as creationtimestamp, JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.spec.suspend')) as suspend, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.message')), '') as message, coalesce(JSON_UNQUOTE(JSON_EXTRACT(workflow,'$.status.estimatedDuration')), '0') as estimatedduration, coalesce(JSON_EXTRACT(workflow,'$.status.resourcesDuration'), '{}') as resourcesduration"), nil
case Postgres:
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->'metadata'->>'labels', '{}') as labels, coalesce(workflow->'metadata'->>'annotations', '{}') as annotations, coalesce(workflow->'status'->>'progress', '') as progress, coalesce(workflow->'metadata'->>'creationTimestamp', '') as creationtimestamp, workflow->'spec'->>'suspend' as suspend, coalesce(workflow->'status'->>'message', '') as message, coalesce(workflow->'status'->>'estimatedDuration', '0') as estimatedduration, coalesce(workflow->'status'->>'resourcesDuration', '{}') as resourcesduration"), nil
}
return nil, fmt.Errorf("unsupported db type %s", t)
}
Loading