diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 13184a4..fb07bbb 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -358,7 +358,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio log.Info("applying pending migrations", "count", len(status.Pending)) // There are pending migrations // Execute Atlas CLI migrate command - report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{ + reports, err := c.MigrateApplySlice(ctx, &atlasexec.MigrateApplyParams{ Env: data.EnvName, Context: &atlasexec.DeployRunContext{ TriggerType: atlasexec.TriggerTypeKubernetes, @@ -372,12 +372,14 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio } return err } - res.SetReady(dbv1alpha1.AtlasMigrationStatus{ - ObservedHash: data.ObservedHash, - LastApplied: report.End.Unix(), - LastAppliedVersion: report.Target, - }) - r.recordApplied(res, report.Target) + for _, report := range reports { + res.SetReady(dbv1alpha1.AtlasMigrationStatus{ + ObservedHash: data.ObservedHash, + LastApplied: report.End.Unix(), + LastAppliedVersion: report.Target, + }) + r.recordApplied(res, report.Target) + } } if data.Dir != nil { // Compress the migration directory then store it in the secret diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index e199a35..e5062db 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -202,7 +202,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } - var report *atlasexec.SchemaApply + var reports []*atlasexec.SchemaApply switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. case whoami != nil: @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) repo := data.repoURL() if repo == nil { // No repository is set, apply the changes directly. - report, err = cli.SchemaApply(ctx, params) + reports, err = cli.SchemaApplySlice(ctx, params) break } createPlan := func() (ctrl.Result, error) { @@ -352,7 +352,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Try to apply the schema changes with lint policies, // if the changes are rejected by the review policy, create a plan // for the pending changes. - report, err = cli.SchemaApply(ctx, params) + reports, err = cli.SchemaApplySlice(ctx, params) // TODO: Better error handling for rejected changes. if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") { log.Info("schema changes are rejected by the review policy, creating a new schema plan") @@ -391,7 +391,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }); err != nil { return result(err) } - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -409,7 +409,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recordErrEvent(res, err) return result(err) } - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -417,7 +417,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) // No linting policy is set. default: - report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{ + reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, To: desiredURL, TxMode: string(data.TxMode), @@ -433,21 +433,23 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recordErrEvent(res, err) return result(err) } - log.Info("schema changes are applied", "applied", len(report.Changes.Applied)) - // Truncate the applied and pending changes to 1024 bytes. - report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize) - report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize) s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), ObservedHash: hash, } - // Set the plan URL if it exists. - if p := report.Plan; p != nil { - s.PlanLink = p.File.Link - s.PlanURL = p.File.URL + for _, report := range reports { + log.Info("schema changes are applied", "applied", len(report.Changes.Applied)) + // Truncate the applied and pending changes to 1024 bytes. + report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize) + report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize) + // Set the plan URL if it exists. + if p := report.Plan; p != nil { + s.PlanLink = p.File.Link + s.PlanURL = p.File.URL + } + res.SetReady(s, reports) + r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema") } - res.SetReady(s, report) - r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema") return ctrl.Result{}, nil } diff --git a/internal/controller/common.go b/internal/controller/common.go index dc8f144..e9adcdd 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -45,6 +45,8 @@ type ( AtlasExec interface { // MigrateApply runs the `migrate apply` command and returns the successful runs. MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error) + // MigrateApplySlice runs the `migrate apply` command and returns the successful runs. + MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) // MigrateDown runs the `migrate down` command. MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) // MigrateLint runs the `migrate lint` command. @@ -54,6 +56,8 @@ type ( // SchemaApply runs the `schema apply` command. SchemaApply(context.Context, *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) + // SchemaApplySlice runs the `schema apply` command and returns the successful runs. + SchemaApplySlice(context.Context, *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) // SchemaInspect runs the `schema inspect` command. SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error) // SchemaPush runs the `schema push` command. diff --git a/internal/controller/testhelper.go b/internal/controller/testhelper.go index d0befb6..2abcdd0 100644 --- a/internal/controller/testhelper.go +++ b/internal/controller/testhelper.go @@ -87,6 +87,11 @@ func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.Schem return m.schemaApply.res, m.schemaApply.err } +// SchemaAppleSlice implements AtlasExec. +func (m *mockAtlasExec) SchemaApplySlice(ctx context.Context, params *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) { + return []*atlasexec.SchemaApply{m.schemaApply.res}, m.schemaApply.err +} + // SchemaInspect implements AtlasExec. func (m *mockAtlasExec) SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error) { return *m.schemaInspect.res, m.schemaInspect.err @@ -97,6 +102,11 @@ func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyPar return m.apply.res, m.apply.err } +// MigrateApplySlice implements AtlasExec. +func (m *mockAtlasExec) MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) { + return []*atlasexec.MigrateApply{m.apply.res}, m.apply.err +} + // MigrateDown implements AtlasExec. func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) { return m.down.res, m.down.err