Skip to content

Commit

Permalink
Make sure the operator can proceed with exclusions even if multiple p…
Browse files Browse the repository at this point in the history
…ods are failing (#1867)
  • Loading branch information
johscheuer authored Oct 27, 2023
1 parent 5ed7627 commit f4780b5
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 20 deletions.
16 changes: 15 additions & 1 deletion controllers/exclude_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,19 @@ func canExcludeNewProcesses(logger logr.Logger, cluster *fdbv1beta2.FoundationDB
// this requirement in the future and take the fault tolerance into account. We add the desired fault tolerance to
// have some buffer to prevent cases where the operator might need to exclude more processes but there are more missing
// processes.
return cluster.DesiredFaultTolerance() + len(validProcesses) - desiredProcessCount - ongoingExclusions, missingProcesses
allowedExclusions := cluster.DesiredFaultTolerance() + len(validProcesses) - desiredProcessCount - ongoingExclusions

// If automatic replacements are enabled and the allowed exclusions is less than or equal to 0, we have to check
// how many processes are missing and if more processes are missing than the automatic replacements is allowed to
// replace, we will allow exclusions for the count of automatic replacements removing the already ongoing exclusions.
// This code should make sure that the operator can automatically replace processes, even in the case where multiple
// processes are failing.
if cluster.GetEnableAutomaticReplacements() && allowedExclusions <= 0 {
automaticReplacements := cluster.GetMaxConcurrentAutomaticReplacements()
if len(missingProcesses) > automaticReplacements {
return automaticReplacements - ongoingExclusions, missingProcesses
}
}

return allowedExclusions, missingProcesses
}
48 changes: 46 additions & 2 deletions controllers/exclude_processes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,59 @@ var _ = Describe("exclude_processes", func() {
})
})

When("two process groups are missing", func() {
When("more process groups are missing than the operator is allowed to automatically replace", func() {
BeforeEach(func() {
createMissingProcesses(cluster, 2, fdbv1beta2.ProcessClassStorage)
createMissingProcesses(cluster, cluster.GetMaxConcurrentAutomaticReplacements()+1, fdbv1beta2.ProcessClassStorage)
})

It("should not allow the exclusion", func() {
Expect(allowedExclusions).To(BeNumerically("==", 0))
Expect(missingProcesses).To(Equal([]fdbv1beta2.ProcessGroupID{"storage-1", "storage-2"}))
})

FWhen("the missing timestamp is older than 5 minutes", func() {
BeforeEach(func() {
for idx, processGroup := range cluster.Status.ProcessGroups {
timestamp := processGroup.GetConditionTime(fdbv1beta2.MissingProcesses)
if timestamp == nil {
continue
}

cluster.Status.ProcessGroups[idx].ProcessGroupConditions[0].Timestamp = time.Now().Add(-10 * time.Minute).Unix()
}
})

When("automatic replacements enabled", func() {
When("no exclusions are ongoing", func() {
It("should allow the exclusion to replace as many processes as automatic replacements are allowed", func() {
Expect(allowedExclusions).To(BeNumerically("==", cluster.GetMaxConcurrentAutomaticReplacements()))
Expect(missingProcesses).To(HaveLen(cluster.GetMaxConcurrentAutomaticReplacements() + 1))
})
})

When("one exclusion is ongoing", func() {
BeforeEach(func() {
ongoingExclusions = 1
})

It("should allow the exclusion to replace as many processes as automatic replacements are allowed", func() {
Expect(allowedExclusions).To(BeNumerically("==", cluster.GetMaxConcurrentAutomaticReplacements()-ongoingExclusions))
Expect(missingProcesses).To(HaveLen(cluster.GetMaxConcurrentAutomaticReplacements() + 1))
})
})
})

When("automatic replacements disabled", func() {
BeforeEach(func() {
cluster.Spec.AutomationOptions.Replacements.Enabled = pointer.Bool(false)
})

It("should not allow the exclusions", func() {
Expect(allowedExclusions).To(BeNumerically("==", -1*len(missingProcesses)))
Expect(missingProcesses).To(HaveLen(cluster.GetMaxConcurrentAutomaticReplacements() + 1))
})
})
})
})

When("two process groups of a different type are missing", func() {
Expand Down
4 changes: 4 additions & 0 deletions e2e/fixtures/chaos_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (factory *Factory) DeleteChaosMeshExperimentSafe(experiment *ChaosMeshExper
}

func (factory *Factory) deleteChaosMeshExperiment(experiment *ChaosMeshExperiment) error {
if experiment == nil {
return nil
}

log.Println("Start deleting", experiment.name)
err := factory.getChaosExperiment(experiment.name, experiment.namespace, experiment.chaosObject)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion e2e/fixtures/fdb_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,17 @@ func (fdbCluster *FdbCluster) SetSkipReconciliation(skip bool) error {

// WaitForPodRemoval will wait until the specified Pod is deleted.
func (fdbCluster *FdbCluster) WaitForPodRemoval(pod *corev1.Pod) error {
if pod == nil {
return nil
}

log.Printf("waiting until the pod %s/%s is deleted", pod.Namespace, pod.Name)
counter := 0
forceReconcile := 10

// Poll every 2 seconds for a maximum of 40 minutes.
fetchedPod := &corev1.Pod{}
return wait.PollImmediate(2*time.Second, 40*time.Minute, func() (bool, error) {
err := wait.PollImmediate(2*time.Second, 40*time.Minute, func() (bool, error) {
err := fdbCluster.getClient().
Get(ctx.Background(), client.ObjectKeyFromObject(pod), fetchedPod)
if err != nil {
Expand Down Expand Up @@ -892,6 +896,12 @@ func (fdbCluster *FdbCluster) WaitForPodRemoval(pod *corev1.Pod) error {
counter++
return false, nil
})

if err == nil {
return nil
}

return fmt.Errorf("pod %s/%s was not removed in the expected time", pod.Namespace, pod.Name)
}

// GetClusterSpec returns the current cluster spec.
Expand Down
116 changes: 100 additions & 16 deletions e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,24 +692,108 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
Expect(fdbCluster.SetAutoReplacements(true, 30*time.Second)).ShouldNot(HaveOccurred())
})

It("should replace the partitioned Pod", func() {
pod := fixtures.ChooseRandomPod(fdbCluster.GetStatelessPods())
log.Printf("partition Pod: %s", pod.Name)
exp = factory.InjectPartitionBetween(
fixtures.PodSelector(pod),
chaosmesh.PodSelectorSpec{
GenericSelectorSpec: chaosmesh.GenericSelectorSpec{
Namespaces: []string{pod.Namespace},
LabelSelectors: fdbCluster.GetCachedCluster().GetMatchLabels(),
When("a Pod gets partitioned", func() {
var partitionedPod *corev1.Pod

BeforeEach(func() {
partitionedPod = fixtures.ChooseRandomPod(fdbCluster.GetStatelessPods())
log.Printf("partition Pod: %s", partitionedPod.Name)
exp = factory.InjectPartitionBetween(
fixtures.PodSelector(partitionedPod),
chaosmesh.PodSelectorSpec{
GenericSelectorSpec: chaosmesh.GenericSelectorSpec{
Namespaces: []string{partitionedPod.Namespace},
LabelSelectors: fdbCluster.GetCachedCluster().GetMatchLabels(),
},
},
},
)
)

log.Printf("waiting for pod removal: %s", pod.Name)
Expect(fdbCluster.WaitForPodRemoval(pod)).ShouldNot(HaveOccurred())
exists, err := factory.DoesPodExist(*pod)
Expect(err).ShouldNot(HaveOccurred())
Expect(exists).To(BeFalse())
// Make sure the operator picks up the changes to the environment. This is not really needed but will
// speed up the tests.
fdbCluster.ForceReconcile()
})

It("should replace the partitioned Pod", func() {
log.Printf("waiting for pod removal: %s", partitionedPod.Name)
Expect(fdbCluster.WaitForPodRemoval(partitionedPod)).ShouldNot(HaveOccurred())
exists, err := factory.DoesPodExist(*partitionedPod)
Expect(err).ShouldNot(HaveOccurred())
Expect(exists).To(BeFalse())
})
})

When("more Pods are partitioned than the automatic replacement can replace concurrently", func() {
var partitionedPods []corev1.Pod
var concurrentReplacements int
var experiments []*fixtures.ChaosMeshExperiment
creationTimestamps := map[string]metav1.Time{}

BeforeEach(func() {
concurrentReplacements = fdbCluster.GetCluster().GetMaxConcurrentAutomaticReplacements()
// Allow to replace 1 Pod concurrently
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.Replacements.MaxConcurrentReplacements = pointer.Int(1)
fdbCluster.UpdateClusterSpecWithSpec(spec)

// Make sure we disable automatic replacements to prevent the case that the operator replaces one Pod
// before the partitioned is injected.
Expect(fdbCluster.SetAutoReplacements(false, 30*time.Second)).ShouldNot(HaveOccurred())

// Pick 2 Pods, so the operator has to replace them one after another
partitionedPods = fixtures.RandomPickPod(fdbCluster.GetStatelessPods().Items, 3)
for _, partitionedPod := range partitionedPods {
pod := partitionedPod
log.Printf("partition Pod: %s", pod.Name)
experiments = append(experiments, factory.InjectPartitionBetween(
fixtures.PodSelector(&pod),
chaosmesh.PodSelectorSpec{
GenericSelectorSpec: chaosmesh.GenericSelectorSpec{
Namespaces: []string{pod.Namespace},
LabelSelectors: fdbCluster.GetCachedCluster().GetMatchLabels(),
},
},
))

creationTimestamps[pod.Name] = pod.CreationTimestamp
}

// Make sure the status can settle
time.Sleep(1 * time.Minute)

// Now we can enable the replacements.
Expect(fdbCluster.SetAutoReplacements(true, 30*time.Second)).ShouldNot(HaveOccurred())
})

AfterEach(func() {
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.Replacements.MaxConcurrentReplacements = pointer.Int(concurrentReplacements)
fdbCluster.UpdateClusterSpecWithSpec(spec)
for _, experiment := range experiments {
factory.DeleteChaosMeshExperimentSafe(experiment)
}
})

It("should replace the all partitioned Pod", func() {
Eventually(func(g Gomega) bool {
allUpdatedOrRemoved := true
for _, pod := range fdbCluster.GetStatelessPods().Items {
creationTime, exists := creationTimestamps[pod.Name]
// If the Pod doesn't exist we can assume it was updated.
if !exists {
continue
}

log.Println(pod.Name, "creation time map:", creationTime.String(), "creation time metadata:", pod.CreationTimestamp.String())
// The current creation timestamp of the Pod is after the initial creation
// timestamp, so the Pod was replaced.
if pod.CreationTimestamp.Compare(creationTime.Time) < 1 {
allUpdatedOrRemoved = false
}
}

return allUpdatedOrRemoved
})
})
})

AfterEach(func() {
Expand Down

0 comments on commit f4780b5

Please sign in to comment.