Skip to content

Commit

Permalink
including processes using excluded server list (#1857)
Browse files Browse the repository at this point in the history
* Include only addresses and localities that are present in the exclude server list from the machine-readable status.

---------

Co-authored-by: harsh maheshwari <[email protected]>
  • Loading branch information
09harsh and harsh maheshwari authored Nov 23, 2023
1 parent efb4ae9 commit c73f676
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 23 deletions.
43 changes: 34 additions & 9 deletions controllers/remove_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
// This will return a map of the newly removed ProcessGroups and the ProcessGroups with the ResourcesTerminating condition
removedProcessGroups := r.removeProcessGroups(ctx, logger, cluster, zoneRemovals, zonedRemovals[removals.TerminatingZone])

err = includeProcessGroup(ctx, r, cluster, removedProcessGroups)
err = includeProcessGroup(ctx, logger, r, cluster, removedProcessGroups, status)
if err != nil {
return &requeue{curError: err}
}
Expand Down Expand Up @@ -251,14 +251,18 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
return true, canBeIncluded, nil
}

func includeProcessGroup(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool) error {
func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) error {
adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r)
if err != nil {
return err
}
defer adminClient.Close()

fdbProcessesToInclude := getProcessesToInclude(cluster, removedProcessGroups)
fdbProcessesToInclude, err := getProcessesToInclude(logger, cluster, removedProcessGroups, status)
if err != nil {
return err
}

if len(fdbProcessesToInclude) > 0 {
r.Recorder.Event(cluster, corev1.EventTypeNormal, "IncludingProcesses", fmt.Sprintf("Including removed processes: %v", fdbProcessesToInclude))

Expand All @@ -276,21 +280,42 @@ func includeProcessGroup(ctx context.Context, r *FoundationDBClusterReconciler,
return nil
}

func getProcessesToInclude(cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool) []fdbv1beta2.ProcessAddress {
func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool, status *fdbv1beta2.FoundationDBStatus) ([]fdbv1beta2.ProcessAddress, error) {
fdbProcessesToInclude := make([]fdbv1beta2.ProcessAddress, 0)

if len(removedProcessGroups) == 0 {
return fdbProcessesToInclude
return fdbProcessesToInclude, nil
}

excludedServers, err := fdbstatus.GetExclusions(status)
if err != nil {
return fdbProcessesToInclude, fmt.Errorf("unable to get excluded servers from status, %w", err)
}
excludedServersMap := make(map[string]fdbv1beta2.None, len(excludedServers))
for _, excludedServer := range excludedServers {
excludedServersMap[excludedServer.String()] = fdbv1beta2.None{}
}

idx := 0
for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.IsMarkedForRemoval() && removedProcessGroups[processGroup.ProcessGroupID] {
if cluster.UseLocalitiesForExclusion() {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{StringAddress: processGroup.GetExclusionString()})
foundInExcludedServerList := false
exclusionString := processGroup.GetExclusionString()
if _, ok := excludedServersMap[exclusionString]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{StringAddress: exclusionString})
foundInExcludedServerList = true
}
for _, pAddr := range processGroup.Addresses {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP(pAddr)})
if _, ok := excludedServersMap[pAddr]; ok {
fdbProcessesToInclude = append(fdbProcessesToInclude, fdbv1beta2.ProcessAddress{IPAddress: net.ParseIP(pAddr)})
foundInExcludedServerList = true
}
}
if !foundInExcludedServerList {
// This means that the process is marked for exclusion and is also removed in the previous step but is missing
// its entry in the excluded servers in the status. This should not throw an error as this will block the
// inclusion for other processes, but we should have a record of this event happening in the logs.
logger.Info("processGroup is included but is missing from excluded server list", "processGroup", processGroup)
}
continue
}
Expand All @@ -301,7 +326,7 @@ func getProcessesToInclude(cluster *fdbv1beta2.FoundationDBCluster, removedProce
// Remove the trailing duplicates.
cluster.Status.ProcessGroups = cluster.Status.ProcessGroups[:idx]

return fdbProcessesToInclude
return fdbProcessesToInclude, nil
}

func (r *FoundationDBClusterReconciler) getProcessGroupsToRemove(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, remainingMap map[string]bool, cordSet map[string]fdbv1beta2.None) (bool, bool, []*fdbv1beta2.ProcessGroupStatus) {
Expand Down
79 changes: 72 additions & 7 deletions controllers/remove_process_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"

"github.com/FoundationDB/fdb-kubernetes-operator/pkg/fdbadminclient/mock"

Expand Down Expand Up @@ -462,6 +463,9 @@ var _ = Describe("remove_process_groups", func() {

Context("validating getProcessesToInclude", func() {
var removedProcessGroups map[fdbv1beta2.ProcessGroupID]bool
var status *fdbv1beta2.FoundationDBStatus
var err error
var adminClient *mock.AdminClient

BeforeEach(func() {
cluster = &fdbv1beta2.FoundationDBCluster{
Expand Down Expand Up @@ -492,6 +496,13 @@ var _ = Describe("remove_process_groups", func() {
},
}
removedProcessGroups = make(map[fdbv1beta2.ProcessGroupID]bool)
adminClient, err = mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
})

JustBeforeEach(func() {
status, err = adminClient.GetStatus()
Expect(err).NotTo(HaveOccurred())
})

Context("cluster doesn't support inclusions using locality", func() {
Expand All @@ -501,7 +512,10 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
Expect(len(getProcessesToInclude(cluster, removedProcessGroups))).To(Equal(0))
processesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(processesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
})
})

Expand All @@ -510,13 +524,15 @@ var _ = Describe("remove_process_groups", func() {
processGroup := cluster.Status.ProcessGroups[0]
Expect(processGroup.ProcessGroupID).To(Equal(fdbv1beta2.ProcessGroupID("storage-1")))
processGroup.MarkForRemoval()
cluster.Status.ProcessGroups[0] = processGroup

for _, address := range processGroup.Addresses {
adminClient.ExcludedAddresses[address] = fdbv1beta2.None{}
}
removedProcessGroups[processGroup.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude := getProcessesToInclude(cluster, removedProcessGroups)
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal("1.1.1.1"))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
Expand All @@ -531,7 +547,8 @@ var _ = Describe("remove_process_groups", func() {

When("including no process", func() {
It("should not include any process", func() {
fdbProcessesToInclude := getProcessesToInclude(cluster, removedProcessGroups)
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(0))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(16))
})
Expand All @@ -544,18 +561,66 @@ var _ = Describe("remove_process_groups", func() {
removedProcessGroup = cluster.Status.ProcessGroups[0]
Expect(removedProcessGroup.ProcessGroupID).To(Equal(fdbv1beta2.ProcessGroupID("storage-1")))
removedProcessGroup.MarkForRemoval()
cluster.Status.ProcessGroups[0] = removedProcessGroup
adminClient.ExcludedAddresses[removedProcessGroup.GetExclusionString()] = fdbv1beta2.None{}
removedProcessGroups[removedProcessGroup.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
})
})

When("including a process which is excluded both by IP and locality", func() {
var removedProcessGroup *fdbv1beta2.ProcessGroupStatus

BeforeEach(func() {
removedProcessGroup = cluster.Status.ProcessGroups[0]
Expect(removedProcessGroup.ProcessGroupID).To(Equal(fdbv1beta2.ProcessGroupID("storage-1")))
removedProcessGroup.MarkForRemoval()

adminClient.ExcludedAddresses[removedProcessGroup.GetExclusionString()] = fdbv1beta2.None{}
adminClient.ExcludedAddresses[removedProcessGroup.Addresses[0]] = fdbv1beta2.None{}
removedProcessGroups[removedProcessGroup.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude := getProcessesToInclude(cluster, removedProcessGroups)
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(2))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(fmt.Sprintf("%s %s", removedProcessGroup.GetExclusionString(), removedProcessGroup.Addresses[0])))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(15))
})
})

When("one excluded process is missing from excluded servers", func() {
var removedProcessGroup *fdbv1beta2.ProcessGroupStatus
var removedProcessGroup2 *fdbv1beta2.ProcessGroupStatus

BeforeEach(func() {
removedProcessGroup = cluster.Status.ProcessGroups[0]
Expect(removedProcessGroup.ProcessGroupID).To(Equal(fdbv1beta2.ProcessGroupID("storage-1")))
removedProcessGroup.MarkForRemoval()
removedProcessGroups[removedProcessGroup.ProcessGroupID] = true

removedProcessGroup2 = cluster.Status.ProcessGroups[1]
Expect(removedProcessGroup2.ProcessGroupID).To(Equal(fdbv1beta2.ProcessGroupID("storage-2")))
removedProcessGroup2.MarkForRemoval()
adminClient.ExcludedAddresses[removedProcessGroup2.GetExclusionString()] = fdbv1beta2.None{}
removedProcessGroups[removedProcessGroup2.ProcessGroupID] = true
})

It("should include one process", func() {
fdbProcessesToInclude, err := getProcessesToInclude(logr.Logger{}, cluster, removedProcessGroups, status)
Expect(err).NotTo(HaveOccurred())
Expect(len(fdbProcessesToInclude)).To(Equal(1))
Expect(fdbv1beta2.ProcessAddressesString(fdbProcessesToInclude, " ")).To(Equal(removedProcessGroup2.GetExclusionString()))
Expect(len(cluster.Status.ProcessGroups)).To(Equal(14))
})
})
})
})
})
3 changes: 3 additions & 0 deletions e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(useLocalitiesForExclusion)
fdbCluster.UpdateClusterSpecWithSpec(spec)
Expect(fdbCluster.GetCluster().UseLocalitiesForExclusion()).To(Equal(useLocalitiesForExclusion))

// Making sure we included back all the process groups after exclusion is complete.
Expect(fdbCluster.GetStatus().Cluster.DatabaseConfiguration.ExcludedServers).To(BeEmpty())
})

When("IP addresses are used for exclusion", func() {
Expand Down
29 changes: 22 additions & 7 deletions pkg/fdbadminclient/mock/admin_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,17 @@ func (client *AdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) {
status.Cluster.DatabaseConfiguration.VersionFlags.LogSpill = 2
}

if len(client.ExcludedAddresses) > 0 {
status.Cluster.DatabaseConfiguration.ExcludedServers = make([]fdbv1beta2.ExcludedServers, 0, len(client.ExcludedAddresses))
}
for excludedAddresses := range client.ExcludedAddresses {
if net.ParseIP(excludedAddresses) != nil {
status.Cluster.DatabaseConfiguration.ExcludedServers = append(status.Cluster.DatabaseConfiguration.ExcludedServers, fdbv1beta2.ExcludedServers{Address: excludedAddresses})
} else {
status.Cluster.DatabaseConfiguration.ExcludedServers = append(status.Cluster.DatabaseConfiguration.ExcludedServers, fdbv1beta2.ExcludedServers{Locality: excludedAddresses})
}
}

status.Cluster.FullReplication = true
status.Cluster.Data.State.Healthy = true
status.Cluster.Data.State.Name = "healthy"
Expand Down Expand Up @@ -595,15 +606,19 @@ func (client *AdminClient) GetExclusions() ([]fdbv1beta2.ProcessAddress, error)
return nil, client.mockError
}

pAddrs := make([]fdbv1beta2.ProcessAddress, len(client.ExcludedAddresses))
pAddrs := make([]fdbv1beta2.ProcessAddress, 0, len(client.ExcludedAddresses))
for addr := range client.ExcludedAddresses {
pAddrs = append(pAddrs, fdbv1beta2.ProcessAddress{
IPAddress: net.ParseIP(addr),
Port: 0,
Flags: nil,
})
ip := net.ParseIP(addr)
if ip == nil {
pAddrs = append(pAddrs, fdbv1beta2.ProcessAddress{StringAddress: addr})
} else {
pAddrs = append(pAddrs, fdbv1beta2.ProcessAddress{
IPAddress: net.ParseIP(addr),
Port: 0,
Flags: nil,
})
}
}

return pAddrs, nil
}

Expand Down

0 comments on commit c73f676

Please sign in to comment.