From 270cdf5f5ae8e48fea01422f15cf7b12942d74c0 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Fri, 28 Jul 2023 10:17:30 -0400 Subject: [PATCH] tests and tools: cleanup around stop-maintenance, wait-rebalance * consolidate timeout constants * remove old polling patch * simplify Signed-off-by: Alex Aizman --- ais/test/bucket_test.go | 16 ++++---- ais/test/common.go | 57 ++++++++--------------------- ais/test/downloader_test.go | 4 +- ais/test/dsort_test.go | 5 ++- ais/test/ec_test.go | 28 +++++++------- ais/test/etl_stress_test.go | 2 +- ais/test/fshc_test.go | 8 ++-- ais/test/health_test.go | 5 +-- ais/test/integration_test.go | 71 +++++++++++++++++------------------- ais/test/maintain_test.go | 51 ++++++++++++-------------- ais/test/multiproxy_test.go | 4 +- ais/test/object_test.go | 16 ++++---- ais/test/objprops_test.go | 4 +- ais/test/promote_test.go | 2 +- ais/test/regression_test.go | 34 ++++++++--------- tools/client.go | 50 +++++++++++++++---------- 16 files changed, 167 insertions(+), 190 deletions(-) diff --git a/ais/test/bucket_test.go b/ais/test/bucket_test.go index 34d2adfc5e..292f8b6684 100644 --- a/ais/test/bucket_test.go +++ b/ais/test/bucket_test.go @@ -1567,7 +1567,7 @@ func TestListObjectsWithRebalance(t *testing.T) { wg.Wait() m.waitAndCheckCluState() - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) } func TestBucketSingleProp(t *testing.T) { @@ -1972,7 +1972,7 @@ func TestRenameBucketEmpty(t *testing.T) { uuid, err := api.RenameBucket(baseParams, srcBck, dstBck) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -2037,7 +2037,7 @@ func TestRenameBucketNonEmpty(t *testing.T) { tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -2153,7 +2153,7 @@ func TestRenameBucketTwice(t *testing.T) { } // Wait for rename to complete - args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -2465,11 +2465,11 @@ func TestCopyBucket(t *testing.T) { // TODO -- FIXME: remove/simplify-out this `if` here and elsewhere if test.evictRemoteSrc { // wait for TCO idle (different x-kind) - args := xact.ArgsMsg{ID: uuid, Timeout: copyBucketTimeout} + args := xact.ArgsMsg{ID: uuid, Timeout: tools.CopyBucketTimeout} err := api.WaitForXactionIdle(baseParams, args) tassert.CheckFatal(t, err) } else { - args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: copyBucketTimeout} + args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: tools.CopyBucketTimeout} _, err := api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) } @@ -2782,7 +2782,7 @@ func TestRenameAndCopyBucket(t *testing.T) { // Wait for rename to finish tlog.Logf("Waiting for x-%s[%s] to finish\n", apc.ActMoveBck, xid) time.Sleep(2 * time.Second) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -2870,7 +2870,7 @@ func TestCopyAndRenameBucket(t *testing.T) { } // Wait for copy to complete - args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) diff --git a/ais/test/common.go b/ais/test/common.go index 8cbfcaf8a6..6863c71d0f 100644 --- a/ais/test/common.go +++ b/ais/test/common.go @@ -37,12 +37,8 @@ import ( const rebalanceObjectDistributionTestCoef = 0.3 const ( - prefixDir = "filter" - largeFileSize = 4 * cos.MiB - copyBucketTimeout = 3 * time.Minute - rebalanceTimeout = 5 * time.Minute - rebalanceStartTimeout = 10 * time.Second - multiProxyTestTimeout = 3 * time.Minute + prefixDir = "filter" + largeFileSize = 4 * cos.MiB workerCnt = 10 ) @@ -572,7 +568,7 @@ func (m *ioContext) stopGets() { func (m *ioContext) ensureNumCopies(baseParams api.BaseParams, expectedCopies int, greaterOk bool) { m.t.Helper() time.Sleep(time.Second) - xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: tools.RebalanceTimeout} _, err := api.WaitForXactionIC(baseParams, xargs) tassert.CheckFatal(m.t, err) @@ -673,7 +669,7 @@ func ensurePrevRebalanceIsFinished(baseParams api.BaseParams, err error) bool { } tlog.Logln("Warning: wait for unfinished rebalance(?)") time.Sleep(5 * time.Second) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) time.Sleep(5 * time.Second) return true @@ -696,41 +692,20 @@ func (m *ioContext) startMaintenanceNoRebalance() *meta.Snode { return target } -func (m *ioContext) stopMaintenance(target *meta.Snode) (rebID string) { - const ( - timeout = time.Second * 10 - interval = time.Millisecond * 10 - iterations = int(timeout / interval) - ) - var err error - tlog.Logf("Take %s out of maintenance...\n", target.StringEx()) - args := &apc.ActValRmNode{DaemonID: target.ID()} - rebID, err = api.StopMaintenance(tools.BaseAPIParams(m.proxyURL), args) +func (m *ioContext) stopMaintenance(target *meta.Snode) string { + tlog.Logf("Take %s out of maintenance mode...\n", target.StringEx()) + bp := tools.BaseAPIParams(m.proxyURL) + rebID, err := api.StopMaintenance(bp, &apc.ActValRmNode{DaemonID: target.ID()}) tassert.CheckFatal(m.t, err) - baseParams := tools.BaseAPIParams(target.URL(cmn.NetPublic)) - smap := tools.GetClusterMap(m.t, m.proxyURL) - for i := 0; i < iterations; i++ { - time.Sleep(interval) - if _, ok := smap.Tmap[target.ID()]; !ok { - smap = tools.GetClusterMap(m.t, m.proxyURL) - } else { - query := cmn.QueryBcks(m.bck) - baseParams.URL = m.proxyURL - proxyBcks, err := api.ListBuckets(baseParams, query, apc.FltExists) - tassert.CheckFatal(m.t, err) - - baseParams.URL = target.URL(cmn.NetPublic) - targetBcks, err := api.ListBuckets(baseParams, query, apc.FltExists) - tassert.CheckFatal(m.t, err) - if proxyBcks.Equal(targetBcks) { - tlog.Logf("%s got updated with the current BMD\n", target.StringEx()) - return - } - } + if rebID == "" { + return "" } - m.t.Fatalf("failed to bring %s out of maintenance: not in the %s and/or did not get updated BMD", - target.StringEx(), smap.StringEx()) - return + tassert.Fatalf(m.t, xact.IsValidRebID(rebID), "invalid reb ID %q", rebID) + + xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} + api.WaitForXactionNode(bp, xargs, xactSnapRunning) + + return rebID } func (m *ioContext) setNonDefaultBucketProps() { diff --git a/ais/test/downloader_test.go b/ais/test/downloader_test.go index aa03cf3eb6..1e5d6bff64 100644 --- a/ais/test/downloader_test.go +++ b/ais/test/downloader_test.go @@ -521,7 +521,7 @@ func TestDownloadRemote(t *testing.T) { tlog.Logf("(1) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck) xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -553,7 +553,7 @@ func TestDownloadRemote(t *testing.T) { tlog.Logf("(2) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck) xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) if test.srcBck.Equal(&test.dstBck) { tassert.CheckFatal(t, err) diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index fd20070fe6..288547add6 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -1425,7 +1425,8 @@ func TestDistributedSortKillTargetDuringPhases(t *testing.T) { allMetrics, err := api.MetricsDSort(df.baseParams, df.managerUUID) tassert.CheckError(t, err) if len(allMetrics) == m.originalTargetCount { - t.Errorf("number of metrics %d is same as number of original targets %d", len(allMetrics), m.originalTargetCount) + t.Errorf("number of metrics %d is same as number of original targets %d", + len(allMetrics), m.originalTargetCount) } for target, metrics := range allMetrics { @@ -1435,7 +1436,7 @@ func TestDistributedSortKillTargetDuringPhases(t *testing.T) { } rebID := m.stopMaintenance(target) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, df.baseParams, rebID) + tools.WaitForRebalanceByID(t, df.baseParams, rebID) }, ) } diff --git a/ais/test/ec_test.go b/ais/test/ec_test.go index f1419da838..b5d2888554 100644 --- a/ais/test/ec_test.go +++ b/ais/test/ec_test.go @@ -1759,7 +1759,7 @@ func TestECEmergencyTargetForSlices(t *testing.T) { val := &apc.ActValRmNode{DaemonID: removedTarget.ID()} rebID, err := api.StopMaintenance(baseParams, val) tassert.CheckError(t, err) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) }() // 3. Read objects @@ -1848,7 +1848,7 @@ func TestECEmergencyTargetForReplica(t *testing.T) { if rebID == "" { return } - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) }() hasTarget := func(targets meta.Nodes, target *meta.Snode) bool { @@ -2127,9 +2127,9 @@ func ecOnlyRebalance(t *testing.T, o *ecOptions, proxyURL string, bck cmn.Bck) { tassert.CheckFatal(t, err) defer func() { rebID, _ := tools.RestoreTarget(t, proxyURL, removedTarget) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) }() - tools.WaitForRebalanceByID(t, -1, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) newObjList, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) @@ -2206,8 +2206,8 @@ func TestECBucketEncode(t *testing.T) { _, err = api.SetBucketProps(baseParams, m.bck, bckPropsToUpate) tassert.CheckFatal(t, err) - tlog.Logf("EC encode must start automatically for bucket %s\n", m.bck) - xargs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: m.bck, Timeout: rebalanceTimeout} + tlog.Logf("Wait for EC %s\n", m.bck) + xargs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: m.bck, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, xargs) tassert.CheckFatal(t, err) @@ -2281,7 +2281,7 @@ func ecAndRegularRebalance(t *testing.T, o *ecOptions, proxyURL string, bckReg, args := &apc.ActValRmNode{DaemonID: tgtLost.ID()} rebID, err := api.StopMaintenance(baseParams, args) tassert.CheckError(t, err) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) } }() @@ -2319,12 +2319,12 @@ func ecAndRegularRebalance(t *testing.T, o *ecOptions, proxyURL string, bckReg, tlog.Logf("Created %d objects in %s, %d objects in %s. Starting rebalance\n", len(resECOld.Entries), bckEC, len(resRegOld.Entries), bckReg) - tlog.Logf("Take %s out of maintenance\n", tgtLost.StringEx()) + tlog.Logf("Take %s out of maintenance mode ...\n", tgtLost.StringEx()) args = &apc.ActValRmNode{DaemonID: tgtLost.ID()} rebID, err := api.StopMaintenance(baseParams, args) tassert.CheckFatal(t, err) registered = true - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) tlog.Logln("list objects after rebalance") resECNew, err := api.ListObjects(baseParams, bckEC, msg, api.ListArgs{}) @@ -2522,7 +2522,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm args := &apc.ActValRmNode{DaemonID: tgtLost.ID()} rebID, err := api.StopMaintenance(baseParams, args) tassert.CheckError(t, err) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) } }() @@ -2547,7 +2547,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm tassert.CheckFatal(t, err) tlog.Logf("Created %d objects in %s - starting global rebalance...\n", len(resECOld.Entries), bckEC) - tlog.Logf("Take %s out of maintenance\n", tgtLost.StringEx()) + tlog.Logf("Take %s out of maintenance mode ...\n", tgtLost.StringEx()) args = &apc.ActValRmNode{DaemonID: tgtLost.ID()} _, err = api.StopMaintenance(baseParams, args) tassert.CheckFatal(t, err) @@ -2580,7 +2580,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm err = api.AbortXaction(baseParams, xargs) tassert.CheckError(t, err) - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalAndResil(t, baseParams) tassert.CheckError(t, err) tlog.Logf("Put %s in maintenance\n", tgtGone.StringEx()) @@ -2590,13 +2590,13 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm defer func() { args = &apc.ActValRmNode{DaemonID: tgtGone.ID()} rebID, _ := api.StopMaintenance(baseParams, args) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) }() stopCh.Close() tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) tlog.Logln("Reading objects") for _, obj := range resECOld.Entries { _, err := api.GetObject(baseParams, bckEC, obj.Name, nil) diff --git a/ais/test/etl_stress_test.go b/ais/test/etl_stress_test.go index 8dad6ecbc3..30ef709b96 100644 --- a/ais/test/etl_stress_test.go +++ b/ais/test/etl_stress_test.go @@ -133,7 +133,7 @@ func TestETLTargetDown(t *testing.T) { tools.RestoreNode(tcmd, false, "target") m.waitAndCheckCluState() - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tetl.CheckNoRunningETLContainers(t, baseParams) diff --git a/ais/test/fshc_test.go b/ais/test/fshc_test.go index 3b9f8ad888..f7c1aabec6 100644 --- a/ais/test/fshc_test.go +++ b/ais/test/fshc_test.go @@ -520,7 +520,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) { tassert.CheckFatal(t, err) tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", target.StringEx()) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tlog.Logf("Restoring target %s mountpaths\n", target.ID()) @@ -533,7 +533,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) { tassert.CheckFatal(t, err) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) - args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tools.WaitForResilvering(t, baseParams, nil) @@ -600,7 +600,7 @@ func TestFSAddMountpathRestartNode(t *testing.T) { t.Fatalf("Removed target didn't rejoin") } tlog.Logf("Wait for rebalance\n") - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) // Check if the node has newly added mountpath @@ -677,7 +677,7 @@ func TestFSDisableAllExceptOneMountpathRestartNode(t *testing.T) { tassert.CheckFatal(t, err) tassert.Fatalf(t, smap.GetTarget(target.ID()) != nil, "removed target didn't rejoin") - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) // Check if the the mountpaths are disabled after restart. diff --git a/ais/test/health_test.go b/ais/test/health_test.go index cae19b1384..e0e96de75c 100644 --- a/ais/test/health_test.go +++ b/ais/test/health_test.go @@ -25,7 +25,6 @@ func unregisteredNodeHealth(t *testing.T, proxyURL string, si *meta.Snode) { _, err = api.StartMaintenance(baseParams, args) tassert.CheckFatal(t, err) targetCount := smapOrig.CountActiveTs() - origTargetCnt := targetCount proxyCount := smapOrig.CountActivePs() if si.IsProxy() { proxyCount-- @@ -41,9 +40,7 @@ func unregisteredNodeHealth(t *testing.T, proxyURL string, si *meta.Snode) { _, err = tools.WaitForClusterState(proxyURL, "join node", smapOrig.Version, smapOrig.CountActivePs(), smapOrig.CountActiveTs()) tassert.CheckFatal(t, err) - if rebID != "" { - tools.WaitForRebalanceByID(t, origTargetCnt, baseParams, rebID) - } + tools.WaitForRebalanceByID(t, baseParams, rebID) }() err = api.Health(tools.BaseAPIParams(si.PubNet.URL)) diff --git a/ais/test/integration_test.go b/ais/test/integration_test.go index 55ec517f82..0adcb9c857 100644 --- a/ais/test/integration_test.go +++ b/ais/test/integration_test.go @@ -75,9 +75,7 @@ func TestGetAndReRegisterInParallel(t *testing.T) { m.ensureNoGetErrors() m.waitAndCheckCluState() - if rebID != "" { - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) - } + tools.WaitForRebalanceByID(t, baseParams, rebID) } // All of the above PLUS proxy failover/failback sequence in parallel: @@ -138,7 +136,7 @@ func TestProxyFailbackAndReRegisterInParallel(t *testing.T) { }() wg.Wait() - xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActRebalance, OnlyRunning: true, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, xargs) // Step 5. @@ -224,7 +222,7 @@ func TestUnregisterPreviouslyUnregisteredTarget(t *testing.T) { // Register target (bring cluster to normal state) rebID := m.stopMaintenance(target) m.waitAndCheckCluState() - tools.WaitForRebalanceByID(m.t, m.originalTargetCount, tools.BaseAPIParams(m.proxyURL), rebID) + tools.WaitForRebalanceByID(m.t, tools.BaseAPIParams(m.proxyURL), rebID) } func TestRegisterAndUnregisterTargetAndPutInParallel(t *testing.T) { @@ -273,7 +271,7 @@ func TestRegisterAndUnregisterTargetAndPutInParallel(t *testing.T) { go func() { defer wg.Done() args := &apc.ActValRmNode{DaemonID: targets[0].ID()} - tlog.Logf("Take %s out of maintenance\n", targets[0].StringEx()) + tlog.Logf("Take %s out of maintenance mode ...\n", targets[0].StringEx()) _, err = api.StopMaintenance(baseParams, args) tassert.CheckFatal(t, err) }() @@ -293,7 +291,7 @@ func TestRegisterAndUnregisterTargetAndPutInParallel(t *testing.T) { rebID := m.stopMaintenance(targets[1]) // wait for rebalance to complete - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) m.waitAndCheckCluState() } @@ -321,7 +319,7 @@ func TestAckRebalance(t *testing.T) { // Wait for everything to finish. baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) m.gets() @@ -403,7 +401,7 @@ func testStressRebalance(t *testing.T, bck cmn.Bck) { // wait for the rebalance to finish baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalAndResil(t, baseParams) // wait for the reads to run out wg.Wait() @@ -448,7 +446,7 @@ func TestRebalanceAfterUnregisterAndReregister(t *testing.T) { wg.Add(2) go func() { defer wg.Done() - tlog.Logf("Take %s out of maintenance\n", target0.StringEx()) + tlog.Logf("Take %s out of maintenance mode ...\n", target0.StringEx()) args := &apc.ActValRmNode{DaemonID: target0.ID()} _, err = api.StopMaintenance(baseParams, args) tassert.CheckFatal(t, err) @@ -479,9 +477,8 @@ func TestRebalanceAfterUnregisterAndReregister(t *testing.T) { ) tassert.CheckFatal(m.t, err) - tlog.Logf("Wait for rebalance (%q?)...\n", rebID) time.Sleep(sleep) - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) m.gets() @@ -520,7 +517,7 @@ func TestPutDuringRebalance(t *testing.T) { // Wait for everything to finish. wg.Wait() baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) // Main check - try to read all objects. m.gets() @@ -611,7 +608,7 @@ func TestGetDuringLocalAndGlobalRebalance(t *testing.T) { // make sure that the cluster has all targets enabled _, err = tools.WaitForClusterState( m.proxyURL, - "join target back", + "target joined back", smap.Version, m.originalProxyCount, m.originalTargetCount, @@ -620,7 +617,7 @@ func TestGetDuringLocalAndGlobalRebalance(t *testing.T) { // wait for rebalance to complete baseParams = tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) // TODO -- FIXME: revise + tools.WaitForRebalAndResil(t, baseParams) m.ensureNoGetErrors() m.waitAndCheckCluState() @@ -685,7 +682,7 @@ func TestGetDuringResilver(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tools.WaitForResilvering(t, baseParams, nil) @@ -723,7 +720,7 @@ func TestGetDuringRebalance(t *testing.T) { // Wait for everything to finish. baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) wg.Wait() // Get objects once again to check if they are still accessible after rebalance. @@ -788,7 +785,7 @@ func TestRegisterTargetsAndCreateBucketsInParallel(t *testing.T) { } wg.Wait() m.waitAndCheckCluState() - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalAndResil(t, baseParams) } func TestMountpathDetachAll(t *testing.T) { @@ -823,7 +820,7 @@ func TestMountpathDetachAll(t *testing.T) { time.Sleep(time.Second) tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", tname) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) // Check if mountpaths were actually removed @@ -845,7 +842,7 @@ func TestMountpathDetachAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) - args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tools.WaitForResilvering(t, baseParams, target) @@ -1048,7 +1045,7 @@ func TestMountpathDisableAll(t *testing.T) { if len(disabled) != 0 { tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", tname) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tools.WaitForResilvering(t, baseParams, nil) @@ -1062,7 +1059,7 @@ func TestMountpathDisableAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n", tname) - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) // Check if mountpaths were actually disabled @@ -1090,7 +1087,7 @@ func TestMountpathDisableAll(t *testing.T) { time.Sleep(2 * time.Second) tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx()) - args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) tools.WaitForResilvering(t, baseParams, target) @@ -1197,14 +1194,14 @@ func TestAtimeRebalance(t *testing.T) { // make sure that the cluster has all targets enabled _, err = tools.WaitForClusterState( m.proxyURL, - "join target back", + "target joined back", m.smap.Version, m.originalProxyCount, m.originalTargetCount, ) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) msg = &apc.LsoMsg{TimeFormat: time.StampNano} msg.AddProps(apc.GetPropsAtime, apc.GetPropsStatus) @@ -1344,7 +1341,7 @@ func TestAtimePrefetch(t *testing.T) { } xid, err := api.EvictList(baseParams, bck, objs) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: xid, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -1352,7 +1349,7 @@ func TestAtimePrefetch(t *testing.T) { xid, err = api.PrefetchList(baseParams, bck, objs) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -1428,7 +1425,7 @@ func TestGetAndPutAfterReregisterWithMissedBucketUpdate(t *testing.T) { m.ensureNoGetErrors() m.waitAndCheckCluState() baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) } // 1. Unregister target @@ -1472,9 +1469,9 @@ func TestGetAfterReregisterWithMissedBucketUpdate(t *testing.T) { // Reregister target 0 rebID := m.stopMaintenance(targets[0]) - // Wait for rebalance and do gets + // Wait for rebalance and execute GETs baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) m.gets() @@ -1511,10 +1508,10 @@ func TestRenewRebalance(t *testing.T) { // Step 4: Re-register target (triggers rebalance) m.stopMaintenance(target) - xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceStartTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} err := api.WaitForXactionNode(baseParams, xargs, xactSnapRunning) tassert.CheckError(t, err) - tlog.Logf("automatic rebalance started\n") + tlog.Logf("rebalance started\n") wg := &sync.WaitGroup{} wg.Add(2) @@ -1538,7 +1535,7 @@ func TestRenewRebalance(t *testing.T) { }() wg.Wait() - args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckError(t, err) @@ -1784,7 +1781,7 @@ func TestICRebalance(t *testing.T) { rebID, err = api.StartXaction(baseParams, xact.ArgsMsg{Kind: apc.ActRebalance}) tassert.CheckFatal(t, err) - xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceStartTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout} api.WaitForXactionNode(baseParams, xargs, xactSnapRunning) tlog.Logf("Killing %s\n", icNode.StringEx()) @@ -1808,7 +1805,7 @@ func TestICRebalance(t *testing.T) { checkSmaps(t, m.proxyURL) tlog.Logf("Wait for rebalance: %s\n", rebID) - args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, _ = api.WaitForXactionIC(baseParams, args) m.waitAndCheckCluState() @@ -1853,7 +1850,7 @@ func TestICDecommission(t *testing.T) { args := &apc.ActValRmNode{DaemonID: tsi.ID()} rebID, err := api.StopMaintenance(baseParams, args) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) + tools.WaitForRebalanceByID(t, baseParams, rebID) tassert.CheckFatal(t, err) }() @@ -1898,7 +1895,7 @@ func TestSingleResilver(t *testing.T) { tassert.CheckFatal(t, err) // Wait for specific resilvering x[id] - args = xact.ArgsMsg{ID: id, Kind: apc.ActResilver, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: id, Kind: apc.ActResilver, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) diff --git a/ais/test/maintain_test.go b/ais/test/maintain_test.go index 2632fa1659..c2d298ddcd 100644 --- a/ais/test/maintain_test.go +++ b/ais/test/maintain_test.go @@ -100,7 +100,7 @@ func TestMaintenanceListObjects(t *testing.T) { tassert.CheckFatal(t, err) _, err = tools.WaitForClusterState(proxyURL, "target is back", m.smap.Version, m.smap.CountActivePs(), m.smap.CountTargets()) - args := xact.ArgsMsg{ID: rebID, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: rebID, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) }() @@ -110,7 +110,7 @@ func TestMaintenanceListObjects(t *testing.T) { tassert.CheckFatal(t, err) // Wait for reb to complete - args := xact.ArgsMsg{ID: rebID, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: rebID, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -147,7 +147,7 @@ func TestMaintenanceMD(t *testing.T) { tlog.Logf("targets: %d, proxies: %d\n", smap.CountActiveTs(), smap.CountActivePs()) t.Cleanup(func() { - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} api.WaitForXactionIC(baseParams, args) }) @@ -189,11 +189,10 @@ func TestMaintenanceDecommissionRebalance(t *testing.T) { objPath = "ic-decomm/" fileSize = cos.KiB - dcmTarget, _ = smap.GetRandTarget() - origTargetCount = smap.CountTargets() - origActiveTargetCount = smap.CountActiveTs() - origActiveProxyCount = smap.CountActivePs() - bck = cmn.Bck{Name: t.Name(), Provider: apc.AIS} + dcmTarget, _ = smap.GetRandTarget() + origTargetCount = smap.CountTargets() + origActiveProxyCount = smap.CountActivePs() + bck = cmn.Bck{Name: t.Name(), Provider: apc.AIS} ) tlog.Logf("targets: %d, proxies: %d\n", smap.CountActiveTs(), smap.CountActivePs()) @@ -220,7 +219,7 @@ func TestMaintenanceDecommissionRebalance(t *testing.T) { smap.Version, origActiveProxyCount, origTargetCount-1, dcmTarget.ID()) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, origActiveTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) msgList := &apc.LsoMsg{Prefix: objPath} lst, err := api.ListObjects(baseParams, bck, msgList, api.ListArgs{}) tassert.CheckError(t, err) @@ -254,9 +253,9 @@ func TestMaintenanceDecommissionRebalance(t *testing.T) { val := &apc.ActValRmNode{DaemonID: dcm.ID()} rebID, err = api.StopMaintenance(baseParams, val) tassert.CheckError(t, err) - tools.WaitForRebalanceByID(t, origActiveTargetCount, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) } else { - args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckError(t, err) } @@ -304,7 +303,7 @@ func TestMaintenanceRebalance(t *testing.T) { m.puts() tsi, _ := m.smap.GetRandTarget() - tlog.Logf("Removing target %s\n", tsi) + tlog.Logf("Removing %s\n", tsi.StringEx()) restored := false actVal.DaemonID = tsi.ID() rebID, err := api.StartMaintenance(baseParams, actVal) @@ -315,16 +314,15 @@ func TestMaintenanceRebalance(t *testing.T) { tassert.CheckError(t, err) _, err = tools.WaitForClusterState( proxyURL, - "target joined the cluster", + "target joined", m.smap.Version, origProxyCnt, origTargetCount, ) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) } tools.ClearMaintenance(baseParams, tsi) }() - tlog.Logf("Wait for rebalance %s\n", rebID) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) smap, err := tools.WaitForClusterState( proxyURL, @@ -342,13 +340,13 @@ func TestMaintenanceRebalance(t *testing.T) { restored = true smap, err = tools.WaitForClusterState( proxyURL, - "target joined the cluster", + "target joined", m.smap.Version, origProxyCnt, origTargetCount, ) tassert.CheckFatal(t, err) m.smap = smap - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) } func TestMaintenanceGetWhileRebalance(t *testing.T) { @@ -378,7 +376,7 @@ func TestMaintenanceGetWhileRebalance(t *testing.T) { stopped := false tsi, _ := m.smap.GetRandTarget() - tlog.Logf("Removing target %s\n", tsi) + tlog.Logf("Removing %s\n", tsi.StringEx()) restored := false actVal.DaemonID = tsi.ID() rebID, err := api.StartMaintenance(baseParams, actVal) @@ -392,16 +390,15 @@ func TestMaintenanceGetWhileRebalance(t *testing.T) { tassert.CheckFatal(t, err) _, err = tools.WaitForClusterState( proxyURL, - "target joined the cluster", + "target joined", m.smap.Version, origProxyCnt, origTargetCount, ) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) } tools.ClearMaintenance(baseParams, tsi) }() - tlog.Logf("Wait for rebalance %s\n", rebID) - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) smap, err := tools.WaitForClusterState( proxyURL, @@ -420,12 +417,12 @@ func TestMaintenanceGetWhileRebalance(t *testing.T) { restored = true smap, err = tools.WaitForClusterState( proxyURL, - "target joined the cluster", + "target joined", m.smap.Version, origProxyCnt, origTargetCount, ) tassert.CheckFatal(t, err) m.smap = smap - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID, time.Minute) + tools.WaitForRebalanceByID(t, baseParams, rebID) } func TestNodeShutdown(t *testing.T) { @@ -475,7 +472,7 @@ func testNodeShutdown(t *testing.T, nodeType string) { tassert.CheckFatal(t, err) if nodeType == apc.Target && origTargetCount > 1 { time.Sleep(time.Second) - xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} for i := 0; i < 3; i++ { status, err := api.WaitForXactionIC(baseParams, xargs) if err == nil { @@ -581,7 +578,7 @@ func TestShutdownListObjects(t *testing.T) { if origTargetCount > 1 { time.Sleep(time.Second) - xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout} for i := 0; i < 3; i++ { status, err := api.WaitForXactionIC(baseParams, xargs) if err == nil { diff --git a/ais/test/multiproxy_test.go b/ais/test/multiproxy_test.go index 5da6bd8fd3..af0f211772 100644 --- a/ais/test/multiproxy_test.go +++ b/ais/test/multiproxy_test.go @@ -941,7 +941,7 @@ func proxyStress(t *testing.T) { wg.Add(1) go primaryKiller(t, proxyURL, stopChs[workerCnt], proxyURLChs, errChs[workerCnt], &wg) - timer := time.After(multiProxyTestTimeout) + timer := time.After(tools.MultiProxyTestTimeout) loop: for { for _, ch := range errChs { @@ -1629,7 +1629,7 @@ func startCPBckAndWait(t testing.TB, srcBck cmn.Bck, count int) *sync.WaitGroup tools.DestroyBucket(t, proxyURL, dstBck) wg.Done() }() - xargs := xact.ArgsMsg{ID: xid, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, xargs) tassert.CheckError(t, err) }(i) diff --git a/ais/test/object_test.go b/ais/test/object_test.go index c98c3c9516..072dc0b79d 100644 --- a/ais/test/object_test.go +++ b/ais/test/object_test.go @@ -351,28 +351,28 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { tlog.Logf("PrefetchList num=%d\n", len(files)) prefetchListID, err := api.PrefetchList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) tlog.Logf("PrefetchRange %s\n", objRange) prefetchRangeID, err := api.PrefetchRange(baseParams, bckRemote, objRange) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) tlog.Logf("EvictList %v\n", files) evictListID, err := api.EvictList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects") tlog.Logf("EvictRange\n") evictRangeID, err := api.EvictRange(baseParams, bckRemote, objRange) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: evictRangeID, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: evictRangeID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -402,13 +402,13 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { // Prefetch/Evict should work prefetchListID, err = api.PrefetchList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) evictListID, err = api.EvictList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -416,7 +416,7 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { tlog.Logf("Deleting %s and %s from cloud bucket ...\n", fileName1, fileName2) deleteID, err := api.DeleteList(baseParams, bckRemote, files) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -424,7 +424,7 @@ func Test_SameLocalAndRemoteBckNameValidate(t *testing.T) { tlog.Logf("Deleting %s and %s from ais bucket ...\n", fileName1, fileName2) deleteID, err = api.DeleteList(baseParams, bckLocal, files) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) diff --git a/ais/test/objprops_test.go b/ais/test/objprops_test.go index b55b18654c..64a7b5e1af 100644 --- a/ais/test/objprops_test.go +++ b/ais/test/objprops_test.go @@ -133,7 +133,7 @@ func propsEvict(t *testing.T, proxyURL string, bck cmn.Bck, objMap map[string]st if err != nil { t.Errorf("Failed to evict objects: %v\n", err) } - args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -252,7 +252,7 @@ func propsRebalance(t *testing.T, proxyURL string, bck cmn.Bck, objects map[stri smap.CountActiveTs()+1, ) tassert.CheckFatal(t, err) - tools.WaitForRebalanceByID(t, origActiveTargetCnt, baseParams, rebID, rebalanceTimeout) + tools.WaitForRebalanceByID(t, baseParams, rebID) tlog.Logf("Listing objects...\n") reslist := testListObjects(t, proxyURL, bck, msg) diff --git a/ais/test/promote_test.go b/ais/test/promote_test.go index 5ac0d4f091..08767a4004 100644 --- a/ais/test/promote_test.go +++ b/ais/test/promote_test.go @@ -276,7 +276,7 @@ func (test *prmTests) do(t *testing.T, bck *meta.Bck) { // wait for an xaction (if there's one) and then query all targets for stats func (test *prmTests) wait(t *testing.T, xid, tempdir string, target *meta.Snode, m *ioContext) (locObjs, outObjs, inObjs int64) { time.Sleep(4 * time.Second) - xargs := xact.ArgsMsg{Kind: apc.ActPromote, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActPromote, Timeout: tools.RebalanceTimeout} xname := fmt.Sprintf("%q", apc.ActPromote) if xid != "" { xargs.ID = xid diff --git a/ais/test/regression_test.go b/ais/test/regression_test.go index 9d7bf36e2e..f8bfa942f7 100644 --- a/ais/test/regression_test.go +++ b/ais/test/regression_test.go @@ -333,7 +333,7 @@ func doBucketRegressionTest(t *testing.T, proxyURL string, rtd regressionTestDat if rtd.rename { // Rename bucket fails when rebalance or resilver is running. // Ensure rebalance or resilver isn't running before performing a rename. - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalAndResil(t, baseParams) _, err := api.RenameBucket(baseParams, rtd.bck, rtd.renamedBck) tassert.CheckFatal(t, err) @@ -356,7 +356,7 @@ func doBucketRegressionTest(t *testing.T, proxyURL string, rtd regressionTestDat } func postRenameWaitAndCheck(t *testing.T, baseParams api.BaseParams, rtd regressionTestData, numPuts int, objNames []string) { - xargs := xact.ArgsMsg{Kind: apc.ActMoveBck, Bck: rtd.renamedBck, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{Kind: apc.ActMoveBck, Bck: rtd.renamedBck, Timeout: tools.RebalanceTimeout} _, err := api.WaitForXactionIC(baseParams, xargs) if err != nil { if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound { @@ -501,9 +501,7 @@ func TestReregisterMultipleTargets(t *testing.T) { for _, tgt := range removed { rebID = m.stopMaintenance(tgt) } - if len(removed) != 0 && rebID != "" { - tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID) - } + tools.WaitForRebalanceByID(t, baseParams, rebID) }() targets := m.smap.Tmap.ActiveNodes() @@ -543,7 +541,7 @@ func TestReregisterMultipleTargets(t *testing.T) { m.stopGets() baseParams := tools.BaseAPIParams(m.proxyURL) - tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout) + tools.WaitForRebalAndResil(t, baseParams) clusterStats = tools.GetClusterStats(t, m.proxyURL) for targetID, targetStats := range clusterStats.Target { @@ -696,7 +694,7 @@ func TestLRU(t *testing.T) { xid, err := api.StartXaction(baseParams, xact.ArgsMsg{Kind: apc.ActLRU}) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActLRU, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActLRU, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -743,7 +741,7 @@ func TestPrefetchList(t *testing.T) { t.Error(err) } - args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -753,12 +751,12 @@ func TestPrefetchList(t *testing.T) { t.Error(err) } - args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) // 4. Ensure that all the prefetches occurred. - xargs := xact.ArgsMsg{ID: xid, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} snaps, err := api.QueryXactionSnaps(baseParams, xargs) tassert.CheckFatal(t, err) locObjs, _, _ := snaps.ObjCounts(xid) @@ -803,7 +801,7 @@ func TestDeleteList(t *testing.T) { xid, err := api.DeleteList(baseParams, b, files) tassert.CheckError(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -859,18 +857,18 @@ func TestPrefetchRange(t *testing.T) { rng := fmt.Sprintf("%s%s", m.prefix, prefetchRange) xid, err := api.EvictRange(baseParams, bck, rng) tassert.CheckError(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) xid, err = api.PrefetchRange(baseParams, bck, rng) tassert.CheckError(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) // 4. Ensure all done - xargs := xact.ArgsMsg{ID: xid, Timeout: rebalanceTimeout} + xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} snaps, err := api.QueryXactionSnaps(baseParams, xargs) tassert.CheckFatal(t, err) locObjs, _, _ := snaps.ObjCounts(xid) @@ -916,7 +914,7 @@ func TestDeleteRange(t *testing.T) { tlog.Logf("Delete in range %s\n", smallrange) xid, err := api.DeleteRange(baseParams, b, smallrange) tassert.CheckError(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -945,7 +943,7 @@ func TestDeleteRange(t *testing.T) { // 4. Delete the big range of objects xid, err = api.DeleteRange(baseParams, b, bigrange) tassert.CheckError(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -1022,7 +1020,7 @@ func TestStressDeleteRange(t *testing.T) { tlog.Logf("Deleting objects in range: %s\n", partialRange) xid, err := api.DeleteRange(baseParams, bck, partialRange) tassert.CheckError(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) @@ -1054,7 +1052,7 @@ func TestStressDeleteRange(t *testing.T) { tlog.Logf("Deleting objects in range: %s\n", fullRange) xid, err = api.DeleteRange(baseParams, bck, fullRange) tassert.CheckError(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: rebalanceTimeout} + args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, args) tassert.CheckFatal(t, err) diff --git a/tools/client.go b/tools/client.go index 15a693ecdd..0f3ce80470 100644 --- a/tools/client.go +++ b/tools/client.go @@ -35,15 +35,25 @@ const ( // 1. local instance (no docker) - works // 2. local docker instance - works // 3. AWS-deployed cluster - not tested (but runs mainly with Ansible) - MockDaemonID = "MOCK" - dsortFinishTimeout = 6 * time.Minute - - waitClusterStartup = 20 * time.Second + MockDaemonID = "MOCK" ) -const evictPrefetchTimeout = 2 * time.Minute +// times and timeouts +const ( + WaitClusterStartup = 20 * time.Second + RebalanceStartTimeout = 10 * time.Second + MaxCplaneTimeout = 10 * time.Second + + CopyBucketTimeout = 3 * time.Minute + MultiProxyTestTimeout = 3 * time.Minute -const xactPollSleep = time.Second + DsortFinishTimeout = 6 * time.Minute + RebalanceTimeout = 2 * time.Minute + EvictPrefetchTimeout = 2 * time.Minute + BucketCleanupTimeout = time.Minute + + xactPollSleep = time.Second +) type Ctx struct { Client *http.Client @@ -228,7 +238,7 @@ func CleanupRemoteBucket(t *testing.T, proxyURL string, bck cmn.Bck, prefix stri bp := BaseAPIParams(proxyURL) xid, err := api.DeleteList(bp, bck, toDelete) tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: time.Minute} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: BucketCleanupTimeout} _, err = api.WaitForXactionIC(bp, args) tassert.CheckFatal(t, err) } @@ -273,7 +283,7 @@ func RmTargetSkipRebWait(t *testing.T, proxyURL string, smap *meta.Smap) (*meta. // Internal API to remove a node from Smap: use it to unregister MOCK targets/proxies. // Use `JoinCluster` to attach node back. func RemoveNodeUnsafe(proxyURL, sid string) error { - return _removeNodeFromSmap(gctx, proxyURL, sid, time.Minute) + return _removeNodeFromSmap(gctx, proxyURL, sid, MaxCplaneTimeout) } func WaitForObjectToBeDowloaded(bp api.BaseParams, bck cmn.Bck, objName string, timeout time.Duration) error { @@ -416,7 +426,7 @@ func WaitForDSortToFinish(proxyURL, managerUUID string) (allAborted bool, err er tlog.Logln("Waiting for distributed sort to finish...") bp := BaseAPIParams(proxyURL) - deadline := time.Now().Add(dsortFinishTimeout) + deadline := time.Now().Add(DsortFinishTimeout) for time.Now().Before(deadline) { allMetrics, err := api.MetricsDSort(bp, managerUUID) if err != nil { @@ -488,7 +498,7 @@ func EvictObjects(t *testing.T, proxyURL string, bck cmn.Bck, objList []string) t.Errorf("Evict bucket %s failed, err = %v", bck, err) } - args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: evictPrefetchTimeout} + args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: EvictPrefetchTimeout} if _, err := api.WaitForXactionIC(bp, args); err != nil { t.Errorf("Wait for xaction to finish failed, err = %v", err) } @@ -502,9 +512,8 @@ func EvictObjects(t *testing.T, proxyURL string, bck cmn.Bck, objList []string) // the function ends with fatal. func WaitForRebalAndResil(t testing.TB, bp api.BaseParams, timeouts ...time.Duration) { var ( - timeout = 2 * time.Minute - wg = &sync.WaitGroup{} - errCh = make(chan error, 2) + wg = &sync.WaitGroup{} + errCh = make(chan error, 2) ) smap, err := api.GetClusterMap(bp) tassert.CheckFatal(t, err) @@ -518,6 +527,7 @@ func WaitForRebalAndResil(t testing.TB, bp api.BaseParams, timeouts ...time.Dura } _waitReToStart(bp) + timeout := RebalanceTimeout if len(timeouts) > 0 { timeout = timeouts[0] } @@ -577,14 +587,16 @@ func _waitResil(t testing.TB, bp api.BaseParams, timeout time.Duration) { tassert.CheckError(t, err) } -func WaitForRebalanceByID(t *testing.T, origTargetCnt int, bp api.BaseParams, rebID string, timeouts ...time.Duration) { - if origTargetCnt >= 0 && origTargetCnt < 2 { +func WaitForRebalanceByID(t *testing.T, bp api.BaseParams, rebID string, timeouts ...time.Duration) { + if rebID == "" { return } - timeout := 2 * time.Minute + tassert.Fatalf(t, xact.IsValidRebID(rebID), "invalid reb ID %q", rebID) + timeout := RebalanceTimeout if len(timeouts) > 0 { timeout = timeouts[0] } + tlog.Logf("Wait for rebalance %s\n", rebID) xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, OnlyRunning: true, Timeout: timeout} _, err := api.WaitForXactionIC(bp, xargs) tassert.CheckFatal(t, err) @@ -593,7 +605,7 @@ func WaitForRebalanceByID(t *testing.T, origTargetCnt int, bp api.BaseParams, re func _waitReToStart(bp api.BaseParams) { var ( kinds = []string{apc.ActRebalance, apc.ActResilver} - timeout = cos.MaxDuration(10*xactPollSleep, 10*time.Second) + timeout = cos.MaxDuration(10*xactPollSleep, MaxCplaneTimeout) retries = int(timeout / xactPollSleep) args = xact.ArgsMsg{Timeout: xactPollSleep, OnlyRunning: true} ) @@ -690,11 +702,11 @@ func waitForStartup(bp api.BaseParams, ts ...testing.TB) (*meta.Smap, error) { if err != nil { if api.HTTPStatus(err) == http.StatusServiceUnavailable { tlog.Logln("Waiting for the cluster to start up...") - time.Sleep(waitClusterStartup) + time.Sleep(WaitClusterStartup) continue } - tlog.Logf("Unable to get usable cluster map, err: %v\n", err) + tlog.Logf("Unable to get usable cluster map: %v\n", err) if len(ts) > 0 { tassert.CheckFatal(ts[0], err) }