Skip to content

Commit

Permalink
internal/controller: handle the case of Atlas returning multiple results
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao committed Dec 17, 2024
1 parent 7110c6d commit 212bf17
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
16 changes: 9 additions & 7 deletions internal/controller/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
log.Info("applying pending migrations", "count", len(status.Pending))
// There are pending migrations
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{
reports, err := c.MigrateApplySlice(ctx, &atlasexec.MigrateApplyParams{
Env: data.EnvName,
Context: &atlasexec.DeployRunContext{
TriggerType: atlasexec.TriggerTypeKubernetes,
Expand All @@ -372,12 +372,14 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
}
return err
}
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
})
r.recordApplied(res, report.Target)
for _, report := range reports {
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
})
r.recordApplied(res, report.Target)
}
}
if data.Dir != nil {
// Compress the migration directory then store it in the secret
Expand Down
34 changes: 18 additions & 16 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
var report *atlasexec.SchemaApply
var reports []*atlasexec.SchemaApply
switch desiredURL := data.Desired.String(); {
// The resource is connected to Atlas Cloud.
case whoami != nil:
Expand All @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
repo := data.repoURL()
if repo == nil {
// No repository is set, apply the changes directly.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
break
}
createPlan := func() (ctrl.Result, error) {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Try to apply the schema changes with lint policies,
// if the changes are rejected by the review policy, create a plan
// for the pending changes.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
// TODO: Better error handling for rejected changes.
if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") {
log.Info("schema changes are rejected by the review policy, creating a new schema plan")
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}); err != nil {
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -409,15 +409,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
AutoApprove: true,
})
// No linting policy is set.
default:
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -433,21 +433,23 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
log.Info("schema changes are applied", "applied", len(report.Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize)
report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize)
s := dbv1alpha1.AtlasSchemaStatus{
LastApplied: time.Now().Unix(),
ObservedHash: hash,
}
// Set the plan URL if it exists.
if p := report.Plan; p != nil {
s.PlanLink = p.File.Link
s.PlanURL = p.File.URL
for _, report := range reports {
log.Info("schema changes are applied", "applied", len(report.Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize)
report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize)
// Set the plan URL if it exists.
if p := report.Plan; p != nil {
s.PlanLink = p.File.Link
s.PlanURL = p.File.URL
}
res.SetReady(s, reports)
r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema")
}
res.SetReady(s, report)
r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema")
return ctrl.Result{}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type (
AtlasExec interface {
// MigrateApply runs the `migrate apply` command and returns the successful runs.
MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error)
// MigrateApplySlice runs the `migrate apply` command and returns the successful runs.
MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error)
// MigrateDown runs the `migrate down` command.
MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error)
// MigrateLint runs the `migrate lint` command.
Expand All @@ -54,6 +56,8 @@ type (

// SchemaApply runs the `schema apply` command.
SchemaApply(context.Context, *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error)
// SchemaApplySlice runs the `schema apply` command and returns the successful runs.
SchemaApplySlice(context.Context, *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error)
// SchemaInspect runs the `schema inspect` command.
SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error)
// SchemaPush runs the `schema push` command.
Expand Down
10 changes: 10 additions & 0 deletions internal/controller/testhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.Schem
return m.schemaApply.res, m.schemaApply.err
}

// SchemaAppleSlice implements AtlasExec.
func (m *mockAtlasExec) SchemaApplySlice(ctx context.Context, params *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) {
return []*atlasexec.SchemaApply{m.schemaApply.res}, m.schemaApply.err
}

// SchemaInspect implements AtlasExec.
func (m *mockAtlasExec) SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error) {
return *m.schemaInspect.res, m.schemaInspect.err
Expand All @@ -97,6 +102,11 @@ func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyPar
return m.apply.res, m.apply.err
}

// MigrateApplySlice implements AtlasExec.
func (m *mockAtlasExec) MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) {
return []*atlasexec.MigrateApply{m.apply.res}, m.apply.err
}

// MigrateDown implements AtlasExec.
func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) {
return m.down.res, m.down.err
Expand Down

0 comments on commit 212bf17

Please sign in to comment.