Skip to content

Commit

Permalink
Add healthyInstancesInZoneCount to Lifecycler; update tests (#526)
Browse files Browse the repository at this point in the history
* Add healthyInstancesInZoneCount to Lifecycler; update tests

Signed-off-by: JordanRushing <[email protected]>

* Improve healthyInstancesInZone naming; simplify test

Signed-off-by: JordanRushing <[email protected]>

---------

Signed-off-by: JordanRushing <[email protected]>
  • Loading branch information
JordanRushing authored Jun 26, 2024
1 parent b3bde8c commit 35810fd
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 11 deletions.
23 changes: 18 additions & 5 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ type Lifecycler struct {
readySince time.Time

// Keeps stats updated at every heartbeat period
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
instancesInZoneCount int
zonesCount int
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
healthyInstancesInZoneCount int
instancesInZoneCount int
zonesCount int

tokenGenerator TokenGenerator
// The maximum time allowed to wait on the CanJoin() condition.
Expand Down Expand Up @@ -441,6 +442,15 @@ func (i *Lifecycler) InstancesCount() int {
return i.instancesCount
}

// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesInZoneCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()

return i.healthyInstancesInZoneCount
}

// InstancesInZoneCount returns the number of instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) InstancesInZoneCount() int {
Expand Down Expand Up @@ -913,6 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
instancesCount := 0
zones := map[string]int{}
healthyInstancesInZone := map[string]int{}

if ringDesc != nil {
now := time.Now()
Expand All @@ -924,6 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
healthyInstancesCount++
healthyInstancesInZone[ingester.Zone]++
}
}
}
Expand All @@ -932,6 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
i.countersLock.Lock()
i.healthyInstancesCount = healthyInstancesCount
i.instancesCount = instancesCount
i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone]
i.instancesInZoneCount = zones[i.cfg.Zone]
i.zonesCount = len(zones)
i.countersLock.Unlock()
Expand Down
112 changes: 106 additions & 6 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,86 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) {
})
}

func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

ctx := context.Background()

// Add the first ingester to the ring
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig1.JoinAfter = 100 * time.Millisecond
lifecyclerConfig1.Zone = "zone-a"

lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler1.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1))
defer services.StopAndAwaitTerminated(ctx, lifecycler1) // nolint:errcheck

// Assert the first ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 1
})

// Add the second ingester to the ring in the same zone
lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "ing2")
lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig2.JoinAfter = 100 * time.Millisecond
lifecyclerConfig2.Zone = "zone-a"

lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler2.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler2))
defer services.StopAndAwaitTerminated(ctx, lifecycler2) // nolint:errcheck

// Assert the second ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler2.HealthyInstancesInZoneCount() == 2
})

// Assert the first ingester count is updated
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 2
})

// Add the third ingester to the ring in a different zone
lifecyclerConfig3 := testLifecyclerConfig(ringConfig, "ing3")
lifecyclerConfig3.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig3.JoinAfter = 100 * time.Millisecond
lifecyclerConfig3.Zone = "zone-b"

lifecycler3, err := NewLifecycler(lifecyclerConfig3, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
assert.Equal(t, 0, lifecycler3.HealthyInstancesInZoneCount())

require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler3))
defer services.StopAndAwaitTerminated(ctx, lifecycler3) // nolint:errcheck

// Assert the third ingester joined the ring
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler3.HealthyInstancesInZoneCount() == 1
})

// Assert the first ingester count is correct
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler1.HealthyInstancesInZoneCount() == 2
})

// Assert the second ingester count is correct
test.Poll(t, time.Second, true, func() interface{} {
return lifecycler2.HealthyInstancesInZoneCount() == 2
})
}

func TestLifecycler_InstancesInZoneCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand All @@ -169,12 +249,13 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
ringConfig.KVStore.Mock = ringStore

instances := []struct {
zone string
healthy bool
expectedInstancesInZoneCount int
expectedInstancesCount int
expectedHealthyInstancesCount int
expectedZonesCount int
zone string
healthy bool
expectedInstancesInZoneCount int
expectedInstancesCount int
expectedHealthyInstancesCount int
expectedZonesCount int
expectedHealthyInstancesInZoneCount int
}{
{
zone: "zone-a",
Expand All @@ -187,6 +268,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 1,
// after adding a healthy instance in zone-a, expectedZonesCount is 1
expectedZonesCount: 1,
// after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount is 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-a",
Expand All @@ -199,6 +282,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 1,
// zone-a was already added, so expectedZonesCount remains 1
expectedZonesCount: 1,
// after adding an unhealthy instance in zone-a, expectedHealthyInstancesInZoneCount remains 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-a",
Expand All @@ -211,6 +296,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 2,
// zone-a was already added, so expectedZonesCount remains 1
expectedZonesCount: 1,
// after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount becomes 2
expectedHealthyInstancesInZoneCount: 2,
},
{
zone: "zone-b",
Expand All @@ -223,6 +310,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 3,
// after adding a healthy instance in zone-b, expectedZonesCount becomes 2
expectedZonesCount: 2,
// after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-c",
Expand All @@ -235,6 +324,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 3,
// after adding an unhealthy instance in zone-c, expectedZonesCount becomes 3
expectedZonesCount: 3,
// after adding an unhealthy instance in zone-c, expectedHealthyInstancesInZoneCount is 0
expectedHealthyInstancesInZoneCount: 0,
},
{
zone: "zone-c",
Expand All @@ -247,6 +338,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 4,
// zone-c was already added, so expectedZonesCount remains 3
expectedZonesCount: 3,
// after adding a healthy instance in zone-c, expectedHealthyInstancesInZoneCount is 1
expectedHealthyInstancesInZoneCount: 1,
},
{
zone: "zone-b",
Expand All @@ -259,6 +352,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
expectedHealthyInstancesCount: 5,
// zone-b was already added, so expectedZonesCount remains 3
expectedZonesCount: 3,
// after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 2
expectedHealthyInstancesInZoneCount: 2,
},
}

Expand Down Expand Up @@ -292,10 +387,15 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
return lifecycler.HealthyInstancesCount()
})

test.Poll(t, time.Duration(joinWaitMs)*time.Millisecond, instance.expectedHealthyInstancesInZoneCount, func() interface{} {
return lifecycler.HealthyInstancesInZoneCount()
})

require.Equal(t, instance.expectedInstancesInZoneCount, lifecycler.InstancesInZoneCount())
require.Equal(t, instance.expectedInstancesCount, lifecycler.InstancesCount())
require.Equal(t, instance.expectedHealthyInstancesCount, lifecycler.HealthyInstancesCount())
require.Equal(t, instance.expectedZonesCount, lifecycler.ZonesCount())
require.Equal(t, instance.expectedHealthyInstancesInZoneCount, lifecycler.HealthyInstancesInZoneCount())
}
}

Expand Down

0 comments on commit 35810fd

Please sign in to comment.