Skip to content

Commit

Permalink
TestMultipleClients*: Poll for the memberlist status for 10 seconds
Browse files Browse the repository at this point in the history
Closes #389

These tests are highly dependant on timing, which makes them flaky. Poll for a bit longer at the end to make sure that the memberlist was updated
  • Loading branch information
julienduchesne committed Oct 10, 2024
1 parent 879ff5a commit 2493afc
Showing 1 changed file with 61 additions and 41 deletions.
102 changes: 61 additions & 41 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func TestMultipleClientsWithMixedLabelsAndExpectFailure(t *testing.T) {

err := testMultipleClientsWithConfigGenerator(t, len(membersLabel), configGen)
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("expected to see at least %d updates", len(membersLabel)))
require.Contains(t, err.Error(), "expected to see at least 2 members, got 1")
}

func TestMultipleClientsWithMixedLabelsAndClusterLabelVerificationDisabled(t *testing.T) {
Expand Down Expand Up @@ -662,6 +662,8 @@ func TestMultipleClientsWithSameLabelWithClusterLabelVerification(t *testing.T)
}

func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen func(memberId int) KVConfig) error {
t.Helper()

c := dataCodec{}
const key = "ring"
var clients []*Client
Expand Down Expand Up @@ -723,11 +725,10 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen
startTime := time.Now()
firstKv := clients[0]
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) // Watch for 3x cas intervals.
updates := 0
joinedMembers := 0
firstKv.WatchKey(ctx, key, func(in interface{}) bool {
updates++

r := in.(*data)
joinedMembers = len(r.Members)

minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members)

Expand All @@ -740,12 +741,9 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen
})
cancel() // make linter happy

t.Logf("Ring updates observed: %d", updates)

if updates < members {
// in general, at least one update from each node. (although that's not necessarily true...
// but typically we get more updates than that anyway)
return fmt.Errorf("expected to see at least %d updates, got %d", members, updates)
if joinedMembers <= 1 {
// expect at least 2 members. Otherwise, this means that the ring has failed to sync.
return fmt.Errorf("expected to see at least 2 members, got %d", joinedMembers)
}

if err := getClientErr(); err != nil {
Expand All @@ -755,47 +753,69 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen
// Let's check all the clients to see if they have relatively up-to-date information
// All of them should at least have all the clients
// And same tokens.
allTokens := []uint32(nil)

for i := 0; i < members; i++ {
kv := clients[i]
check := func() error {
allTokens := []uint32(nil)

r := getData(t, kv, key)
t.Logf("KV %d: number of known members: %d\n", i, len(r.Members))
if len(r.Members) != members {
return fmt.Errorf("Member %d has only %d members in the ring", i, len(r.Members))
}
for i := 0; i < members; i++ {
kv := clients[i]

minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members)
for n, ing := range r.Members {
if ing.State != ACTIVE {
return fmt.Errorf("Member %d: invalid state of member %s in the ring: %v ", i, n, ing.State)
r := getData(t, kv, key)
t.Logf("KV %d: number of known members: %d\n", i, len(r.Members))
if len(r.Members) != members {
return fmt.Errorf("Member %d has only %d members in the ring", i, len(r.Members))
}
}
now := time.Now()
t.Logf("Member %d: oldest: %v, avg: %v, youngest: %v", i,
now.Sub(time.Unix(minTimestamp, 0)).String(),
now.Sub(time.Unix(avgTimestamp, 0)).String(),
now.Sub(time.Unix(maxTimestamp, 0)).String())

tokens := r.getAllTokens()
if allTokens == nil {
allTokens = tokens
t.Logf("Found tokens: %d", len(allTokens))
} else {
if len(allTokens) != len(tokens) {
return fmt.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens))

minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members)
for n, ing := range r.Members {
if ing.State != ACTIVE {
stateStr := "UNKNOWN"
switch ing.State {
case JOINING:
stateStr = "JOINING"
case LEFT:
stateStr = "LEFT"
}
return fmt.Errorf("Member %d: invalid state of member %s in the ring: %s (%v) ", i, n, stateStr, ing.State)
}
}
now := time.Now()
t.Logf("Member %d: oldest: %v, avg: %v, youngest: %v", i,
now.Sub(time.Unix(minTimestamp, 0)).String(),
now.Sub(time.Unix(avgTimestamp, 0)).String(),
now.Sub(time.Unix(maxTimestamp, 0)).String())

tokens := r.getAllTokens()
if allTokens == nil {
allTokens = tokens
t.Logf("Found tokens: %d", len(allTokens))
} else {
if len(allTokens) != len(tokens) {
return fmt.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens))
}

for ix, tok := range allTokens {
if tok != tokens[ix] {
return fmt.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix])
for ix, tok := range allTokens {
if tok != tokens[ix] {
return fmt.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix])
}
}
}
}

return getClientErr()
}

return getClientErr()
// Try this for ~10 seconds. memberlist is eventually consistent, so we may need to wait a bit, especially with `-race`.
for timeout := time.After(10 * time.Second); ; {
select {
case <-timeout:
return check() // return last error
default:
if err := check(); err == nil {
return nil // it passed
}
time.Sleep(100 * time.Millisecond)
}
}
}

func TestJoinMembersWithRetryBackoff(t *testing.T) {
Expand Down

0 comments on commit 2493afc

Please sign in to comment.