diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 4f51b46a5..7c54eabdd 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -158,11 +158,12 @@ type Lifecycler struct { readySince time.Time // Keeps stats updated at every heartbeat period - countersLock sync.RWMutex - healthyInstancesCount int - instancesCount int - instancesInZoneCount int - zonesCount int + countersLock sync.RWMutex + healthyInstancesCount int + instancesCount int + healthyInstancesInZoneCount int + instancesInZoneCount int + zonesCount int tokenGenerator TokenGenerator // The maximum time allowed to wait on the CanJoin() condition. @@ -441,6 +442,15 @@ func (i *Lifecycler) InstancesCount() int { return i.instancesCount } +// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in +// this lifecycler's zone, updated during the last heartbeat period. +func (i *Lifecycler) HealthyInstancesInZoneCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.healthyInstancesInZoneCount +} + // InstancesInZoneCount returns the number of instances in the ring that are registered in // this lifecycler's zone, updated during the last heartbeat period. func (i *Lifecycler) InstancesInZoneCount() int { @@ -913,6 +923,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 instancesCount := 0 zones := map[string]int{} + healthyInstancesInZone := map[string]int{} if ringDesc != nil { now := time.Now() @@ -924,6 +935,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { healthyInstancesCount++ + healthyInstancesInZone[ingester.Zone]++ } } } @@ -932,6 +944,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount i.instancesCount = instancesCount + i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone] i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) i.countersLock.Unlock() diff --git a/ring/lifecycler_test.go b/ring/lifecycler_test.go index 2bdca5b79..5502ce548 100644 --- a/ring/lifecycler_test.go +++ b/ring/lifecycler_test.go @@ -160,6 +160,86 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { }) } +func TestLifecycler_HealthyInstancesInZoneCount(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + ctx := context.Background() + + // Add the first ingester to the ring + lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig1.JoinAfter = 100 * time.Millisecond + lifecyclerConfig1.Zone = "zone-a" + + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler1.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler1)) + defer services.StopAndAwaitTerminated(ctx, lifecycler1) // nolint:errcheck + + // Assert the first ingester joined the ring + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 1 + }) + + // Add the second ingester to the ring in the same zone + lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "ing2") + lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig2.JoinAfter = 100 * time.Millisecond + lifecyclerConfig2.Zone = "zone-a" + + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler2.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler2)) + defer services.StopAndAwaitTerminated(ctx, lifecycler2) // nolint:errcheck + + // Assert the second ingester joined the ring + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the first ingester count is updated + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Add the third ingester to the ring in a different zone + lifecyclerConfig3 := testLifecyclerConfig(ringConfig, "ing3") + lifecyclerConfig3.HeartbeatPeriod = 100 * time.Millisecond + lifecyclerConfig3.JoinAfter = 100 * time.Millisecond + lifecyclerConfig3.Zone = "zone-b" + + lifecycler3, err := NewLifecycler(lifecyclerConfig3, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil) + require.NoError(t, err) + assert.Equal(t, 0, lifecycler3.HealthyInstancesInZoneCount()) + + require.NoError(t, services.StartAndAwaitRunning(ctx, lifecycler3)) + defer services.StopAndAwaitTerminated(ctx, lifecycler3) // nolint:errcheck + + // Assert the third ingester joined the ring + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler3.HealthyInstancesInZoneCount() == 1 + }) + + // Assert the first ingester count is correct + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler1.HealthyInstancesInZoneCount() == 2 + }) + + // Assert the second ingester count is correct + test.Poll(t, time.Second, true, func() interface{} { + return lifecycler2.HealthyInstancesInZoneCount() == 2 + }) +} + func TestLifecycler_InstancesInZoneCount(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -169,12 +249,13 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { ringConfig.KVStore.Mock = ringStore instances := []struct { - zone string - healthy bool - expectedInstancesInZoneCount int - expectedInstancesCount int - expectedHealthyInstancesCount int - expectedZonesCount int + zone string + healthy bool + expectedInstancesInZoneCount int + expectedInstancesCount int + expectedHealthyInstancesCount int + expectedZonesCount int + expectedHealthyInstancesInZoneCount int }{ { zone: "zone-a", @@ -187,6 +268,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 1, // after adding a healthy instance in zone-a, expectedZonesCount is 1 expectedZonesCount: 1, + // after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount is 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-a", @@ -199,6 +282,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 1, // zone-a was already added, so expectedZonesCount remains 1 expectedZonesCount: 1, + // after adding an unhealthy instance in zone-a, expectedHealthyInstancesInZoneCount remains 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-a", @@ -211,6 +296,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 2, // zone-a was already added, so expectedZonesCount remains 1 expectedZonesCount: 1, + // after adding a healthy instance in zone-a, expectedHealthyInstancesInZoneCount becomes 2 + expectedHealthyInstancesInZoneCount: 2, }, { zone: "zone-b", @@ -223,6 +310,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 3, // after adding a healthy instance in zone-b, expectedZonesCount becomes 2 expectedZonesCount: 2, + // after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-c", @@ -235,6 +324,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 3, // after adding an unhealthy instance in zone-c, expectedZonesCount becomes 3 expectedZonesCount: 3, + // after adding an unhealthy instance in zone-c, expectedHealthyInstancesInZoneCount is 0 + expectedHealthyInstancesInZoneCount: 0, }, { zone: "zone-c", @@ -247,6 +338,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 4, // zone-c was already added, so expectedZonesCount remains 3 expectedZonesCount: 3, + // after adding a healthy instance in zone-c, expectedHealthyInstancesInZoneCount is 1 + expectedHealthyInstancesInZoneCount: 1, }, { zone: "zone-b", @@ -259,6 +352,8 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { expectedHealthyInstancesCount: 5, // zone-b was already added, so expectedZonesCount remains 3 expectedZonesCount: 3, + // after adding a healthy instance in zone-b, expectedHealthyInstancesInZoneCount becomes 2 + expectedHealthyInstancesInZoneCount: 2, }, } @@ -292,10 +387,15 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) { return lifecycler.HealthyInstancesCount() }) + test.Poll(t, time.Duration(joinWaitMs)*time.Millisecond, instance.expectedHealthyInstancesInZoneCount, func() interface{} { + return lifecycler.HealthyInstancesInZoneCount() + }) + require.Equal(t, instance.expectedInstancesInZoneCount, lifecycler.InstancesInZoneCount()) require.Equal(t, instance.expectedInstancesCount, lifecycler.InstancesCount()) require.Equal(t, instance.expectedHealthyInstancesCount, lifecycler.HealthyInstancesCount()) require.Equal(t, instance.expectedZonesCount, lifecycler.ZonesCount()) + require.Equal(t, instance.expectedHealthyInstancesInZoneCount, lifecycler.HealthyInstancesInZoneCount()) } }