Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: support v2 volume live migration
Browse files Browse the repository at this point in the history
longhorn-6361

Signed-off-by: Phan Le <[email protected]>
PhanLe1010 committed Dec 14, 2024
1 parent cab44f4 commit 9204221
Showing 3 changed files with 173 additions and 244 deletions.
63 changes: 49 additions & 14 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
@@ -4022,6 +4022,18 @@ func (c *VolumeController) createAndStartMatchingReplicas(v *longhorn.Volume,
rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica,
fixupFunc func(r *longhorn.Replica, obj string), obj string) error {
log := getLoggerForVolume(c.logger, v)
if types.IsDataEngineV2(v.Spec.DataEngine) {
for path, r := range pathToOldRs {
if pathToNewRs[path] != nil {
continue
}
fixupFunc(r, obj)
rs[r.Name] = r
pathToNewRs[path] = r
}
return nil
}

for path, r := range pathToOldRs {
if pathToNewRs[path] != nil {
continue
@@ -4041,7 +4053,7 @@ func (c *VolumeController) createAndStartMatchingReplicas(v *longhorn.Volume,
return nil
}

func (c *VolumeController) deleteInvalidMigrationReplicas(rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica) error {
func (c *VolumeController) deleteInvalidMigrationReplicas(rs, pathToOldRs, pathToNewRs map[string]*longhorn.Replica, v *longhorn.Volume) error {
for path, r := range pathToNewRs {
matchTheOldReplica := pathToOldRs[path] != nil && r.Spec.DesireState == pathToOldRs[path].Spec.DesireState
newReplicaIsAvailable := r.DeletionTimestamp == nil && r.Spec.DesireState == longhorn.InstanceStateRunning &&
@@ -4050,8 +4062,12 @@ func (c *VolumeController) deleteInvalidMigrationReplicas(rs, pathToOldRs, pathT
continue
}
delete(pathToNewRs, path)
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "failed to delete the new replica %v when there is no matching old replica in path %v", r.Name, path)
if types.IsDataEngineV1(v.Spec.DataEngine) {
if err := c.deleteReplica(r, rs); err != nil {
return errors.Wrapf(err, "failed to delete the new replica %v when there is no matching old replica in path %v", r.Name, path)
}
} else {
r.Spec.MigrationEngineName = ""
}
}
return nil
@@ -4135,9 +4151,16 @@ func (c *VolumeController) processMigration(v *longhorn.Volume, es map[string]*l
currentEngine.Spec.Active = true

// cleanupCorruptedOrStaleReplicas() will take care of old replicas
c.switchActiveReplicas(rs, func(r *longhorn.Replica, engineName string) bool {
return r.Spec.EngineName == engineName && r.Spec.HealthyAt != ""
}, currentEngine.Name)
if types.IsDataEngineV1(v.Spec.DataEngine) {
c.switchActiveReplicas(rs, func(r *longhorn.Replica, engineName string) bool {
return r.Spec.EngineName == engineName && r.Spec.HealthyAt != ""
}, currentEngine.Name)
} else {
for _, r := range rs {
r.Spec.MigrationEngineName = ""
r.Spec.EngineName = currentEngine.Name
}
}

// migration rollback or confirmation finished
v.Status.CurrentMigrationNodeID = ""
@@ -4174,13 +4197,19 @@ func (c *VolumeController) processMigration(v *longhorn.Volume, es map[string]*l
return
}

for _, r := range rs {
if r.Spec.EngineName == currentEngine.Name {
continue
if types.IsDataEngineV1(v.Spec.DataEngine) {
for _, r := range rs {
if r.Spec.EngineName == currentEngine.Name {
continue
}
if err2 := c.deleteReplica(r, rs); err2 != nil {
err = errors.Wrapf(err, "failed to delete the migration replica %v during the migration revert: %v", r.Name, err2)
return
}
}
if err2 := c.deleteReplica(r, rs); err2 != nil {
err = errors.Wrapf(err, "failed to delete the migration replica %v during the migration revert: %v", r.Name, err2)
return
} else {
for _, r := range rs {
r.Spec.MigrationEngineName = ""
}
}
}()
@@ -4297,6 +4326,8 @@ func (c *VolumeController) prepareReplicasAndEngineForMigration(v *longhorn.Volu
}
} else if r.Spec.EngineName == migrationEngine.Name {
migrationReplicas[dataPath] = r
} else if r.Spec.MigrationEngineName == migrationEngine.Name {
migrationReplicas[dataPath] = r
} else {
log.Warnf("During migration found unknown replica with engine %v, will directly remove it", r.Spec.EngineName)
if err := c.deleteReplica(r, rs); err != nil {
@@ -4305,12 +4336,16 @@ func (c *VolumeController) prepareReplicasAndEngineForMigration(v *longhorn.Volu
}
}

if err := c.deleteInvalidMigrationReplicas(rs, currentAvailableReplicas, migrationReplicas); err != nil {
if err := c.deleteInvalidMigrationReplicas(rs, currentAvailableReplicas, migrationReplicas, v); err != nil {
return false, false, err
}

if err := c.createAndStartMatchingReplicas(v, rs, currentAvailableReplicas, migrationReplicas, func(r *longhorn.Replica, engineName string) {
r.Spec.EngineName = engineName
if types.IsDataEngineV1(v.Spec.DataEngine) {
r.Spec.EngineName = engineName
} else {
r.Spec.MigrationEngineName = engineName
}
}, migrationEngine.Name); err != nil {
return false, false, err
}
350 changes: 120 additions & 230 deletions k8s/crds.yaml

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions k8s/pkg/apis/longhorn/v1beta2/replica.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,10 @@ type ReplicaSpec struct {
// +optional
EngineName string `json:"engineName"`
// +optional
// MigrationEngineName is indicating the migrating engine which current connected to this replica. This is only
// used for live migration of v2 data engine
MigrationEngineName string `json:"migrationEngineName"`
// +optional
// HealthyAt is set the first time a replica becomes read/write in an engine after creation or rebuild. HealthyAt
// indicates the time the last successful rebuild occurred. When HealthyAt is set, a replica is likely to have
// useful (though possibly stale) data. HealthyAt is cleared before a rebuild. HealthyAt may be later than the

0 comments on commit 9204221

Please sign in to comment.