From 95d5001fd005da4b583b370d1845f449a2e6d9f4 Mon Sep 17 00:00:00 2001 From: Dat Dao Date: Tue, 17 Dec 2024 16:02:07 +0700 Subject: [PATCH] internal/controller: handle the case of Atlas returning multiple results --- .../controller/atlasmigration_controller.go | 16 +++++---- internal/controller/atlasschema_controller.go | 34 ++++++++++--------- .../controller/atlasschema_controller_test.go | 2 +- internal/controller/common.go | 9 +++-- internal/controller/lint.go | 18 ++++++---- internal/controller/testhelper.go | 11 ++++-- 6 files changed, 51 insertions(+), 39 deletions(-) diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 13184a4..e221b6b 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -358,7 +358,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio log.Info("applying pending migrations", "count", len(status.Pending)) // There are pending migrations // Execute Atlas CLI migrate command - report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{ + reports, err := c.MigrateApplySlice(ctx, &atlasexec.MigrateApplyParams{ Env: data.EnvName, Context: &atlasexec.DeployRunContext{ TriggerType: atlasexec.TriggerTypeKubernetes, @@ -372,12 +372,14 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio } return err } - res.SetReady(dbv1alpha1.AtlasMigrationStatus{ - ObservedHash: data.ObservedHash, - LastApplied: report.End.Unix(), - LastAppliedVersion: report.Target, - }) - r.recordApplied(res, report.Target) + if len(reports) > 0 { + res.SetReady(dbv1alpha1.AtlasMigrationStatus{ + ObservedHash: data.ObservedHash, + LastApplied: reports[0].End.Unix(), + LastAppliedVersion: reports[0].Target, + }) + r.recordApplied(res, reports[0].Target) + } } if data.Dir != nil { // Compress the migration directory then store it in the secret diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index e199a35..c6ea86f 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -202,7 +202,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } - var report *atlasexec.SchemaApply + var reports []*atlasexec.SchemaApply switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. case whoami != nil: @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) repo := data.repoURL() if repo == nil { // No repository is set, apply the changes directly. - report, err = cli.SchemaApply(ctx, params) + reports, err = cli.SchemaApplySlice(ctx, params) break } createPlan := func() (ctrl.Result, error) { @@ -352,7 +352,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Try to apply the schema changes with lint policies, // if the changes are rejected by the review policy, create a plan // for the pending changes. - report, err = cli.SchemaApply(ctx, params) + reports, err = cli.SchemaApplySlice(ctx, params) // TODO: Better error handling for rejected changes. if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") { log.Info("schema changes are rejected by the review policy, creating a new schema plan") @@ -391,7 +391,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }); err != nil { return result(err) } - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -409,7 +409,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recordErrEvent(res, err) return result(err) } - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -417,7 +417,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) // No linting policy is set. default: - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -433,21 +433,23 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recordErrEvent(res, err) return result(err) } - log.Info("schema changes are applied", "applied", len(report.Changes.Applied)) - // Truncate the applied and pending changes to 1024 bytes. - report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize) - report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize) s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), ObservedHash: hash, } - // Set the plan URL if it exists. - if p := report.Plan; p != nil { - s.PlanLink = p.File.Link - s.PlanURL = p.File.URL + if len(reports) > 0 { + log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied)) + // Truncate the applied and pending changes to 1024 bytes. + reports[0].Changes.Applied = truncateSQL(reports[0].Changes.Applied, sqlLimitSize) + reports[0].Changes.Pending = truncateSQL(reports[0].Changes.Pending, sqlLimitSize) + // Set the plan URL if it exists. + if p := reports[0].Plan; p != nil { + s.PlanLink = p.File.Link + s.PlanURL = p.File.URL + } + res.SetReady(s, reports[0]) + r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema") } - res.SetReady(s, report) - r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema") return ctrl.Result{}, nil } diff --git a/internal/controller/atlasschema_controller_test.go b/internal/controller/atlasschema_controller_test.go index 8638b25..0532b0a 100644 --- a/internal/controller/atlasschema_controller_test.go +++ b/internal/controller/atlasschema_controller_test.go @@ -672,7 +672,7 @@ func (t *test) initDB(statement string) { require.NoError(t, err) cli, err := atlasexec.NewClient(wd.Path(), "atlas") require.NoError(t, err) - _, err = cli.SchemaApply(context.Background(), &atlasexec.SchemaApplyParams{ + _, err = cli.SchemaApplySlice(context.Background(), &atlasexec.SchemaApplyParams{ URL: t.dburl, DevURL: "sqlite://file2/?mode=memory", To: "file://./schema.sql", diff --git a/internal/controller/common.go b/internal/controller/common.go index dc8f144..bf208f8 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -43,17 +43,16 @@ type ( } // AtlasExec is the interface for the atlas exec client. AtlasExec interface { - // MigrateApply runs the `migrate apply` command and returns the successful runs. - MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error) + // MigrateApplySlice runs the `migrate apply` command and returns the successful runs. + MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) // MigrateDown runs the `migrate down` command. MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) // MigrateLint runs the `migrate lint` command. MigrateLint(context.Context, *atlasexec.MigrateLintParams) (*atlasexec.SummaryReport, error) // MigrateStatus runs the `migrate status` command. MigrateStatus(context.Context, *atlasexec.MigrateStatusParams) (*atlasexec.MigrateStatus, error) - - // SchemaApply runs the `schema apply` command. - SchemaApply(context.Context, *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) + // SchemaApplySlice runs the `schema apply` command and returns the successful runs. + SchemaApplySlice(context.Context, *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) // SchemaInspect runs the `schema inspect` command. SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error) // SchemaPush runs the `schema push` command. diff --git a/internal/controller/lint.go b/internal/controller/lint.go index 4c27c26..3d45bcf 100644 --- a/internal/controller/lint.go +++ b/internal/controller/lint.go @@ -21,6 +21,7 @@ import ( "strings" "ariga.io/atlas-go-sdk/atlasexec" + "ariga.io/atlas/sql/migrate" "ariga.io/atlas/sql/sqlcheck" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -47,7 +48,7 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD if err != nil { return err } - plan, err := cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + plans, err := cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, Vars: vars, To: data.Desired.String(), @@ -63,12 +64,15 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD "unable to remove temporary directory", "dir", dir) } }() - dir, err := memDir(map[string]string{ - "1.sql": current, - "2.sql": strings.Join(plan.Changes.Pending, ";\n"), - }) - if err != nil { - return err + var dir migrate.Dir + if len(plans) > 0 { + dir, err = memDir(map[string]string{ + "1.sql": current, + "2.sql": strings.Join(plans[0].Changes.Pending, ";\n"), + }) + if err != nil { + return err + } } err = wd.CopyFS(lintDirName, dir) if err != nil { diff --git a/internal/controller/testhelper.go b/internal/controller/testhelper.go index d0befb6..9c7c802 100644 --- a/internal/controller/testhelper.go +++ b/internal/controller/testhelper.go @@ -82,9 +82,9 @@ func (m *mockAtlasExec) WhoAmI(context.Context) (*atlasexec.WhoAmI, error) { return m.whoami.res, m.whoami.err } -// SchemaApply implements AtlasExec. -func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) { - return m.schemaApply.res, m.schemaApply.err +// SchemaAppleSlice implements AtlasExec. +func (m *mockAtlasExec) SchemaApplySlice(ctx context.Context, params *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) { + return []*atlasexec.SchemaApply{m.schemaApply.res}, m.schemaApply.err } // SchemaInspect implements AtlasExec. @@ -97,6 +97,11 @@ func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyPar return m.apply.res, m.apply.err } +// MigrateApplySlice implements AtlasExec. +func (m *mockAtlasExec) MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) { + return []*atlasexec.MigrateApply{m.apply.res}, m.apply.err +} + // MigrateDown implements AtlasExec. func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) { return m.down.res, m.down.err