Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] Attempt to reduce flakiness of integration tests #49894

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions integration/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ func ExternalSSHCommand(o CommandOptions) (*exec.Cmd, error) {
}

// Create an exec.Command and tell it where to find the SSH agent.
cmd, err := exec.Command(sshpath, execArgs...), nil
if err != nil {
return nil, trace.Wrap(err)
}
cmd := exec.Command(sshpath, execArgs...)
cmd.Env = []string{fmt.Sprintf("SSH_AUTH_SOCK=%v", o.SocketPath)}

return cmd, nil
Expand Down
50 changes: 50 additions & 0 deletions integration/helpers/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import (

"github.com/gravitational/teleport/api/breaker"
clientproto "github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth/keygen"
"github.com/gravitational/teleport/lib/auth/state"
Expand Down Expand Up @@ -1737,3 +1739,51 @@ func (i *TeleInstance) StopAll() error {
i.Log.Infof("Stopped all teleport services for site %q", i.Secrets.SiteName)
return trace.NewAggregate(errors...)
}

// WaitForNodeCount waits for a certain number of nodes in the provided cluster
// to be visible to the Proxy. This should be called prior to any client dialing
// of nodes to be sure that the node is registered and routable.
func (i *TeleInstance) WaitForNodeCount(ctx context.Context, cluster string, count int) error {
const (
deadline = time.Second * 30
iterWaitTime = time.Second
)

err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error {
site, err := i.Tunnel.GetSite(cluster)
if err != nil {
return trace.Wrap(err)
}

// Validate that the site cache contains the expected count.
accessPoint, err := site.CachingAccessPoint()
if err != nil {
return trace.Wrap(err)
}

nodes, err := accessPoint.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
if len(nodes) != count {
return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count)
}

// Validate that the site watcher contains the expected count.
watcher, err := site.NodeWatcher()
if err != nil {
return trace.Wrap(err)
}

if watcher.NodeCount() != count {
return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.NodeCount(), count)
}

return nil
})
if err != nil {
return trace.Wrap(err)
}

return nil
}
33 changes: 0 additions & 33 deletions integration/helpers/trustedclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/reversetunnelclient"
)
Expand Down Expand Up @@ -112,37 +110,6 @@ func WaitForClusters(tun reversetunnelclient.Server, expected int) func() bool {
}
}

// WaitForNodeCount waits for a certain number of nodes to show up in the remote site.
func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, count int) error {
const (
deadline = time.Second * 30
iterWaitTime = time.Second
)

err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error {
remoteSite, err := t.Tunnel.GetSite(clusterName)
if err != nil {
return trace.Wrap(err)
}
accessPoint, err := remoteSite.CachingAccessPoint()
if err != nil {
return trace.Wrap(err)
}
nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
if len(nodes) == count {
return nil
}
return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count)
})
if err != nil {
return trace.Wrap(err)
}
return nil
}

// WaitForActiveTunnelConnections waits for remote cluster to report a minimum number of active connections
func WaitForActiveTunnelConnections(t *testing.T, tunnel reversetunnelclient.Server, clusterName string, expectedCount int) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
Expand Down
137 changes: 37 additions & 100 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,27 +440,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {

ctx := context.Background()

// wait 10 seconds for both nodes to show up, otherwise
// wait for both nodes to show up, otherwise
// we'll have trouble connecting to the node below.
waitForNodes := func(site authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInSite), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(site, 2)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 2)
require.NoError(t, err)

// should have no sessions:
Expand Down Expand Up @@ -855,8 +837,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
teleportSvr := suite.newTeleport(t, nil, true)
defer teleportSvr.StopAll()

site := teleportSvr.GetSiteAPI(helpers.Site)

// addNode adds a node to the teleport instance, returning its uuid.
// All nodes added this way have the same hostname.
addNode := func() (string, error) {
Expand All @@ -883,36 +863,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
uuid1, err := addNode()
require.NoError(t, err)

uuid2, err := addNode()
_, err = addNode()
require.NoError(t, err)

// wait up to 10 seconds for supplied node names to show up.
waitForNodes := func(site authclient.ClientI, nodes ...string) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
Outer:
for _, nodeName := range nodes {
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
for _, node := range nodesInSite {
if node.GetName() == nodeName {
continue Outer
}
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find node %s", nodeName)
}
}
}
return nil
}

err = waitForNodes(site, uuid1, uuid2)
// wait for supplied node names to show up.
err = teleportSvr.WaitForNodeCount(ctx, helpers.Site, 3)
require.NoError(t, err)

// attempting to run a command by hostname should generate NodeIsAmbiguous error.
Expand Down Expand Up @@ -2266,7 +2221,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.concurrentConns = 1
}

waitForNodesToRegister(t, teleport, helpers.Site)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 1)
require.NoError(t, err)

asyncErrors := make(chan error, 1)

Expand All @@ -2285,7 +2241,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.clientConfigOpts(&cc)
}
cl, err := teleport.NewClient(cc)
require.NoError(t, err)
if err != nil {
asyncErrors <- err
return
}

cl.Stdout = person
cl.Stdin = person

Expand Down Expand Up @@ -3253,6 +3213,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus

cmd := []string{"echo", "hello world"}

// Wait for nodes to be visible before attempting connections
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

// Try and connect to a node in the Aux cluster from the Main cluster using
// direct dialing.
creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{
Expand Down Expand Up @@ -3338,6 +3302,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus
require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second,
"Two clusters do not see each other: tunnels are not working.")

// Wait for nodes to be visible before attempting connections
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

// connection and client should recover and work again
output = &bytes.Buffer{}
tc.Stdout = output
Expand Down Expand Up @@ -3744,7 +3712,7 @@ func testTrustedTunnelNode(t *testing.T, suite *integrationTestSuite) {
"Two clusters do not see each other: tunnels are not working.")

// Wait for both nodes to show up before attempting to dial to them.
err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2)
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

cmd := []string{"echo", "hello world"}
Expand Down Expand Up @@ -4140,7 +4108,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = main.WaitForNodeCount(ctx, "cluster-remote", 1)
require.NoError(t, err)

// execute the connection via first proxy
cfg := helpers.ClientConfig{
Expand Down Expand Up @@ -4191,7 +4160,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = main.WaitForNodeCount(ctx, "cluster-remote", 1)
require.NoError(t, err)

// Requests going via main proxy should succeed.
output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1)
Expand Down Expand Up @@ -4971,11 +4941,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) {
require.NoError(t, err)

// Wait for the node to be visible before continuing.
require.EventuallyWithT(t, func(t *assert.CollectT) {
found, err := clt.GetNodes(context.Background(), defaults.Namespace)
assert.NoError(t, err)
assert.Len(t, found, 2)
}, 10*time.Second, 100*time.Millisecond)
err = instance.WaitForNodeCount(context.Background(), helpers.Site, 2)
require.NoError(t, err)

_, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1)

Expand Down Expand Up @@ -6137,27 +6104,9 @@ func testList(t *testing.T, suite *integrationTestSuite) {
clt := teleport.GetSiteAPI(helpers.Site)
require.NotNil(t, clt)

// Wait 10 seconds for both nodes to show up to make sure they both have
// Wait for both nodes to show up to make sure they both have
// registered themselves.
waitForNodes := func(clt authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInCluster), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(clt, 2)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 2)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -6326,22 +6275,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) {
}
}

func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) {
t.Helper()
require.EventuallyWithT(t, func(t *assert.CollectT) {
// once the tunnel is established we need to wait until we have a
// connection to the remote auth
site := teleport.GetSiteAPI(site)
if !assert.NotNil(t, site) {
return
}
// we need to wait until we know about the node because direct dial to
// unregistered servers is no longer supported
_, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID)
assert.NoError(t, err)
}, time.Second*30, 250*time.Millisecond)
}

// TestDataTransfer makes sure that a "session.data" event is emitted at the
// end of a session that matches the amount of data that was transferred.
func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Expand All @@ -6355,6 +6288,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
main := suite.newTeleport(t, nil, true)
defer main.StopAll()

err := main.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

// Create a client to the above Teleport cluster.
clientConfig := helpers.ClientConfig{
Login: suite.Me.Username,
Expand All @@ -6363,8 +6299,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Port: helpers.Port(t, main.SSH),
}

waitForNodesToRegister(t, main, helpers.Site)

// Write 1 MB to stdout.
command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"}
output, err := runCommand(t, main, command, clientConfig, 1)
Expand Down Expand Up @@ -7323,6 +7257,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config {
cfg.Log = s.Log
cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig()
cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient()
cfg.DebugService.Enabled = false
return cfg
}

Expand Down Expand Up @@ -8141,7 +8076,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) {
_, err = authServer.CreateUser(ctx, moderatorUser)
require.NoError(t, err)

waitForNodesToRegister(t, instance, helpers.Site)
err = instance.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

// Start a shell so a moderated session is created
peerClient, err := instance.NewClient(helpers.ClientConfig{
Expand Down Expand Up @@ -8399,7 +8335,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) {
teleport.StopAll()
})

waitForNodesToRegister(t, teleport, helpers.Site)
err := teleport.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

teleportClient, err := teleport.NewClient(helpers.ClientConfig{
Login: suite.Me.Username,
Expand Down
2 changes: 1 addition & 1 deletion integration/proxy/proxy_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (p *Suite) addNodeToLeafCluster(t *testing.T, tunnelNodeHostname string) {
"Two clusters do not see each other: tunnels are not working.")

// Wait for both nodes to show up before attempting to dial to them.
err = helpers.WaitForNodeCount(context.Background(), p.root, p.leaf.Secrets.SiteName, 2)
err = p.root.WaitForNodeCount(context.Background(), p.leaf.Secrets.SiteName, 2)
require.NoError(t, err)
}

Expand Down
Loading
Loading