From 92dee7478cd2bc1a161d2215571688f73fe37e1d Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:42:02 -0500 Subject: [PATCH] Attempt to reduce flakiness of integration tests (#49850) (#49894) Closes #47156. All of the tests suffering from issues dialing hosts, and failing with a `failed to dial target host` error were incorrectly waiting for nodes to become visible before establishing connections. The main culprit for most of the failures was `waitForNodesToRegister`, though a few tests had a very similar hand rolled variant, which incorrectly returned when the nodes appeard in Auth. However, since the Proxy is the one performing dialing, they should have waited for the nodes to appear in the Proxy. To resolve, `waitForNodesToRegister` and all hand rolled equivalents have been removed in favor of `(TeleInstance) WaitForNodeCount` which correctly uses the `CachingAccessPoint` of the RemoteSite instead of `GetClient`. Additionally, `helpers.WaitForNodeCount` was updated to validate that the node watcher used for routing in the Proxy also contained the expected number of nodes. --- integration/helpers/helpers.go | 5 +- integration/helpers/instance.go | 50 +++++++++ integration/helpers/trustedclusters.go | 33 ------ integration/integration_test.go | 137 +++++++------------------ integration/proxy/proxy_helpers.go | 2 +- integration/proxy/proxy_test.go | 6 +- 6 files changed, 92 insertions(+), 141 deletions(-) diff --git a/integration/helpers/helpers.go b/integration/helpers/helpers.go index 63525f86b319f..2b40ea3b46762 100644 --- a/integration/helpers/helpers.go +++ b/integration/helpers/helpers.go @@ -129,10 +129,7 @@ func ExternalSSHCommand(o CommandOptions) (*exec.Cmd, error) { } // Create an exec.Command and tell it where to find the SSH agent. - cmd, err := exec.Command(sshpath, execArgs...), nil - if err != nil { - return nil, trace.Wrap(err) - } + cmd := exec.Command(sshpath, execArgs...) cmd.Env = []string{fmt.Sprintf("SSH_AUTH_SOCK=%v", o.SocketPath)} return cmd, nil diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index 4a28d91b0f560..fb4fdc3947f09 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -46,7 +46,9 @@ import ( "github.com/gravitational/teleport/api/breaker" clientproto "github.com/gravitational/teleport/api/client/proto" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/keygen" "github.com/gravitational/teleport/lib/auth/state" @@ -1737,3 +1739,51 @@ func (i *TeleInstance) StopAll() error { i.Log.Infof("Stopped all teleport services for site %q", i.Secrets.SiteName) return trace.NewAggregate(errors...) } + +// WaitForNodeCount waits for a certain number of nodes in the provided cluster +// to be visible to the Proxy. This should be called prior to any client dialing +// of nodes to be sure that the node is registered and routable. +func (i *TeleInstance) WaitForNodeCount(ctx context.Context, cluster string, count int) error { + const ( + deadline = time.Second * 30 + iterWaitTime = time.Second + ) + + err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { + site, err := i.Tunnel.GetSite(cluster) + if err != nil { + return trace.Wrap(err) + } + + // Validate that the site cache contains the expected count. + accessPoint, err := site.CachingAccessPoint() + if err != nil { + return trace.Wrap(err) + } + + nodes, err := accessPoint.GetNodes(ctx, apidefaults.Namespace) + if err != nil { + return trace.Wrap(err) + } + if len(nodes) != count { + return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count) + } + + // Validate that the site watcher contains the expected count. + watcher, err := site.NodeWatcher() + if err != nil { + return trace.Wrap(err) + } + + if watcher.NodeCount() != count { + return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.NodeCount(), count) + } + + return nil + }) + if err != nil { + return trace.Wrap(err) + } + + return nil +} diff --git a/integration/helpers/trustedclusters.go b/integration/helpers/trustedclusters.go index a883fb8635a9e..1b3f43b61507c 100644 --- a/integration/helpers/trustedclusters.go +++ b/integration/helpers/trustedclusters.go @@ -30,9 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" - "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/reversetunnelclient" ) @@ -112,37 +110,6 @@ func WaitForClusters(tun reversetunnelclient.Server, expected int) func() bool { } } -// WaitForNodeCount waits for a certain number of nodes to show up in the remote site. -func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, count int) error { - const ( - deadline = time.Second * 30 - iterWaitTime = time.Second - ) - - err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { - remoteSite, err := t.Tunnel.GetSite(clusterName) - if err != nil { - return trace.Wrap(err) - } - accessPoint, err := remoteSite.CachingAccessPoint() - if err != nil { - return trace.Wrap(err) - } - nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace) - if err != nil { - return trace.Wrap(err) - } - if len(nodes) == count { - return nil - } - return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count) - }) - if err != nil { - return trace.Wrap(err) - } - return nil -} - // WaitForActiveTunnelConnections waits for remote cluster to report a minimum number of active connections func WaitForActiveTunnelConnections(t *testing.T, tunnel reversetunnelclient.Server, clusterName string, expectedCount int) { require.EventuallyWithT(t, func(t *assert.CollectT) { diff --git a/integration/integration_test.go b/integration/integration_test.go index a0bbcc3f74bc7..6557cdcf9b1b8 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -440,27 +440,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { ctx := context.Background() - // wait 10 seconds for both nodes to show up, otherwise + // wait for both nodes to show up, otherwise // we'll have trouble connecting to the node below. - waitForNodes := func(site authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInSite), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(site, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) // should have no sessions: @@ -855,8 +837,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { teleportSvr := suite.newTeleport(t, nil, true) defer teleportSvr.StopAll() - site := teleportSvr.GetSiteAPI(helpers.Site) - // addNode adds a node to the teleport instance, returning its uuid. // All nodes added this way have the same hostname. addNode := func() (string, error) { @@ -883,36 +863,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { uuid1, err := addNode() require.NoError(t, err) - uuid2, err := addNode() + _, err = addNode() require.NoError(t, err) - // wait up to 10 seconds for supplied node names to show up. - waitForNodes := func(site authclient.ClientI, nodes ...string) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - Outer: - for _, nodeName := range nodes { - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - for _, node := range nodesInSite { - if node.GetName() == nodeName { - continue Outer - } - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find node %s", nodeName) - } - } - } - return nil - } - - err = waitForNodes(site, uuid1, uuid2) + // wait for supplied node names to show up. + err = teleportSvr.WaitForNodeCount(ctx, helpers.Site, 3) require.NoError(t, err) // attempting to run a command by hostname should generate NodeIsAmbiguous error. @@ -2266,7 +2221,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.concurrentConns = 1 } - waitForNodesToRegister(t, teleport, helpers.Site) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 1) + require.NoError(t, err) asyncErrors := make(chan error, 1) @@ -2285,7 +2241,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.clientConfigOpts(&cc) } cl, err := teleport.NewClient(cc) - require.NoError(t, err) + if err != nil { + asyncErrors <- err + return + } + cl.Stdout = person cl.Stdin = person @@ -3253,6 +3213,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus cmd := []string{"echo", "hello world"} + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // Try and connect to a node in the Aux cluster from the Main cluster using // direct dialing. creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{ @@ -3338,6 +3302,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second, "Two clusters do not see each other: tunnels are not working.") + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // connection and client should recover and work again output = &bytes.Buffer{} tc.Stdout = output @@ -3744,7 +3712,7 @@ func testTrustedTunnelNode(t *testing.T, suite *integrationTestSuite) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + err = main.WaitForNodeCount(ctx, clusterAux, 2) require.NoError(t, err) cmd := []string{"echo", "hello world"} @@ -4140,7 +4108,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // execute the connection via first proxy cfg := helpers.ClientConfig{ @@ -4191,7 +4160,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // Requests going via main proxy should succeed. output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1) @@ -4971,11 +4941,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) { require.NoError(t, err) // Wait for the node to be visible before continuing. - require.EventuallyWithT(t, func(t *assert.CollectT) { - found, err := clt.GetNodes(context.Background(), defaults.Namespace) - assert.NoError(t, err) - assert.Len(t, found, 2) - }, 10*time.Second, 100*time.Millisecond) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 2) + require.NoError(t, err) _, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1) @@ -6137,27 +6104,9 @@ func testList(t *testing.T, suite *integrationTestSuite) { clt := teleport.GetSiteAPI(helpers.Site) require.NotNil(t, clt) - // Wait 10 seconds for both nodes to show up to make sure they both have + // Wait for both nodes to show up to make sure they both have // registered themselves. - waitForNodes := func(clt authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInCluster), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(clt, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) tests := []struct { @@ -6326,22 +6275,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) { } } -func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) { - t.Helper() - require.EventuallyWithT(t, func(t *assert.CollectT) { - // once the tunnel is established we need to wait until we have a - // connection to the remote auth - site := teleport.GetSiteAPI(site) - if !assert.NotNil(t, site) { - return - } - // we need to wait until we know about the node because direct dial to - // unregistered servers is no longer supported - _, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID) - assert.NoError(t, err) - }, time.Second*30, 250*time.Millisecond) -} - // TestDataTransfer makes sure that a "session.data" event is emitted at the // end of a session that matches the amount of data that was transferred. func testDataTransfer(t *testing.T, suite *integrationTestSuite) { @@ -6355,6 +6288,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { main := suite.newTeleport(t, nil, true) defer main.StopAll() + err := main.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) + // Create a client to the above Teleport cluster. clientConfig := helpers.ClientConfig{ Login: suite.Me.Username, @@ -6363,8 +6299,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { Port: helpers.Port(t, main.SSH), } - waitForNodesToRegister(t, main, helpers.Site) - // Write 1 MB to stdout. command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"} output, err := runCommand(t, main, command, clientConfig, 1) @@ -7323,6 +7257,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config { cfg.Log = s.Log cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig() cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient() + cfg.DebugService.Enabled = false return cfg } @@ -8141,7 +8076,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) { _, err = authServer.CreateUser(ctx, moderatorUser) require.NoError(t, err) - waitForNodesToRegister(t, instance, helpers.Site) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) // Start a shell so a moderated session is created peerClient, err := instance.NewClient(helpers.ClientConfig{ @@ -8399,7 +8335,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) { teleport.StopAll() }) - waitForNodesToRegister(t, teleport, helpers.Site) + err := teleport.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) teleportClient, err := teleport.NewClient(helpers.ClientConfig{ Login: suite.Me.Username, diff --git a/integration/proxy/proxy_helpers.go b/integration/proxy/proxy_helpers.go index 8c028bc62c7c6..a46a4d8ececb0 100644 --- a/integration/proxy/proxy_helpers.go +++ b/integration/proxy/proxy_helpers.go @@ -219,7 +219,7 @@ func (p *Suite) addNodeToLeafCluster(t *testing.T, tunnelNodeHostname string) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(context.Background(), p.root, p.leaf.Secrets.SiteName, 2) + err = p.root.WaitForNodeCount(context.Background(), p.leaf.Secrets.SiteName, 2) require.NoError(t, err) } diff --git a/integration/proxy/proxy_test.go b/integration/proxy/proxy_test.go index 77a32c64d0e7e..f39245fbbe9ea 100644 --- a/integration/proxy/proxy_test.go +++ b/integration/proxy/proxy_test.go @@ -1593,7 +1593,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) defer cancel() - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 1) + err = rc.WaitForNodeCount(ctx, "root.example.com", 1) require.NoError(t, err) require.Zero(t, ph.Count()) @@ -1603,7 +1603,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { require.NoError(t, os.Unsetenv("no_proxy")) _, err = rc.StartNode(makeNodeConfig("second-root-node", rcProxyAddr)) require.NoError(t, err) - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 2) + err = rc.WaitForNodeCount(ctx, "root.example.com", 2) require.NoError(t, err) require.NotZero(t, ph.Count()) @@ -1702,7 +1702,7 @@ func TestALPNProxyHTTPProxyBasicAuthDial(t *testing.T) { startErrC <- err }() require.NoError(t, <-startErrC) - require.NoError(t, helpers.WaitForNodeCount(context.Background(), rc, rc.Secrets.SiteName, 1)) + require.NoError(t, rc.WaitForNodeCount(context.Background(), rc.Secrets.SiteName, 1)) require.Greater(t, ph.Count(), 0) }