From a6ad4d0eb2d0b0734dd758fae651c3b1cae5abcd Mon Sep 17 00:00:00 2001 From: Julien Duchesne Date: Tue, 8 Oct 2024 11:55:42 -0400 Subject: [PATCH] Fix all flaky tests - `TestPartitionInstanceLifecycler`: Increase test polling timeout. 1s seems tight, it fails with `-race`. Fixes https://github.com/grafana/dskit/issues/572 - `TestBasicLifecycler_HeartbeatWhileRunning`: Same as above - `TestRejoin`: Converted the test to use `require.Eventually` instead of a custom func + increased polling timeout a bit - `TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA1`: Wait until the server is ready before running tests --- crypto/tls/test/tls_integration_test.go | 17 ++++++++++++ kv/memberlist/memberlist_client_test.go | 31 ++++------------------ ring/basic_lifecycler_test.go | 8 +++--- ring/partition_instance_lifecycler_test.go | 2 +- 4 files changed, 26 insertions(+), 32 deletions(-) diff --git a/crypto/tls/test/tls_integration_test.go b/crypto/tls/test/tls_integration_test.go index 08c998ef9..941cd39e3 100644 --- a/crypto/tls/test/tls_integration_test.go +++ b/crypto/tls/test/tls_integration_test.go @@ -115,6 +115,23 @@ func newIntegrationClientServer( require.NoError(t, err) }() + // Wait until the server is up and running + assert.Eventually(t, func() bool { + conn, err := net.DialTimeout("tcp", httpAddr.String(), 1*time.Second) + if err != nil { + t.Logf("error dialing http: %v", err) + return false + } + defer conn.Close() + grpcConn, err := net.DialTimeout("tcp", grpcAddr.String(), 1*time.Second) + if err != nil { + t.Logf("error dialing grpc: %v", err) + return false + } + defer grpcConn.Close() + return true + }, 2500*time.Millisecond, 1*time.Second, "server is not up") + httpURL := fmt.Sprintf("https://localhost:%d/hello", httpAddr.Port) grpcHost := net.JoinHostPort("localhost", strconv.Itoa(grpcAddr.Port)) diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index b2c189967..0d14c5f76 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -10,7 +10,6 @@ import ( "math" "math/rand" "net" - "reflect" "sort" "strconv" "strings" @@ -1243,24 +1242,24 @@ func TestRejoin(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv2)) defer services.StopAndAwaitTerminated(context.Background(), mkv2) //nolint:errcheck - membersFunc := func() interface{} { - return mkv2.memberlist.NumMembers() + expectMembers := func(expected int) func() bool { + return func() bool { return mkv2.memberlist.NumMembers() == expected } } - poll(t, 7*time.Second, 2, membersFunc) // Probe interval is 5s, with 2s timeout, so probe for 7s. + require.Eventually(t, expectMembers(2), 10*time.Second, 100*time.Millisecond, "expected 2 members in the cluster") // Shutdown first KV require.NoError(t, services.StopAndAwaitTerminated(context.Background(), mkv1)) // Second KV should see single member now. - poll(t, 7*time.Second, 1, membersFunc) + require.Eventually(t, expectMembers(1), 10*time.Second, 100*time.Millisecond, "expected 1 member in the cluster") // Let's start first KV again. It is not configured to join the cluster, but KV2 is rejoining. mkv1 = NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv1)) defer services.StopAndAwaitTerminated(context.Background(), mkv1) //nolint:errcheck - poll(t, 7*time.Second, 2, membersFunc) + require.Eventually(t, expectMembers(2), 10*time.Second, 100*time.Millisecond, "expected 2 member in the cluster") } func TestMessageBuffer(t *testing.T) { @@ -1601,26 +1600,6 @@ func getOrCreateData(in interface{}) *data { return r } -// poll repeatedly evaluates condition until we either timeout, or it succeeds. -func poll(t testing.TB, d time.Duration, want interface{}, have func() interface{}) { - t.Helper() - - deadline := time.Now().Add(d) - for { - if time.Now().After(deadline) { - break - } - if reflect.DeepEqual(want, have()) { - return - } - time.Sleep(d / 100) - } - h := have() - if !reflect.DeepEqual(want, h) { - t.Fatalf("expected %v, got %v", want, h) - } -} - type testLogger struct { } diff --git a/ring/basic_lifecycler_test.go b/ring/basic_lifecycler_test.go index 914221a6c..df12bce6f 100644 --- a/ring/basic_lifecycler_test.go +++ b/ring/basic_lifecycler_test.go @@ -331,12 +331,10 @@ func TestBasicLifecycler_HeartbeatWhileRunning(t *testing.T) { desc, _ := getInstanceFromStore(t, store, testInstanceID) initialTimestamp := desc.GetTimestamp() - test.Poll(t, time.Second, true, func() interface{} { + assert.Eventually(t, func() bool { desc, _ := getInstanceFromStore(t, store, testInstanceID) - currTimestamp := desc.GetTimestamp() - - return currTimestamp > initialTimestamp - }) + return desc.GetTimestamp() > initialTimestamp + }, 2*time.Second, 10*time.Millisecond, "expected timestamp to be updated") assert.Greater(t, testutil.ToFloat64(lifecycler.metrics.heartbeats), float64(0)) } diff --git a/ring/partition_instance_lifecycler_test.go b/ring/partition_instance_lifecycler_test.go index f7e027779..1ba37e1fa 100644 --- a/ring/partition_instance_lifecycler_test.go +++ b/ring/partition_instance_lifecycler_test.go @@ -64,7 +64,7 @@ func TestPartitionInstanceLifecycler(t *testing.T) { assert.Eventually(t, func() bool { actual := getPartitionRingFromStore(t, store, ringKey) return actual.Partitions[1].State == PartitionActive - }, time.Second, eventuallyTick) + }, 3*time.Second, eventuallyTick) }) t.Run("should wait for the configured minimum waiting time before switching a pending partition to active", func(t *testing.T) {