Skip to content

Commit

Permalink
tests and tools: cleanup around stop-maintenance, wait-rebalance
Browse files Browse the repository at this point in the history
* consolidate timeout constants
* remove old polling patch
* simplify

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 28, 2023
1 parent 4dbe397 commit 270cdf5
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 190 deletions.
16 changes: 8 additions & 8 deletions ais/test/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ func TestListObjectsWithRebalance(t *testing.T) {

wg.Wait()
m.waitAndCheckCluState()
tools.WaitForRebalanceByID(t, m.originalTargetCount, baseParams, rebID)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}

func TestBucketSingleProp(t *testing.T) {
Expand Down Expand Up @@ -1972,7 +1972,7 @@ func TestRenameBucketEmpty(t *testing.T) {
uuid, err := api.RenameBucket(baseParams, srcBck, dstBck)
tassert.CheckFatal(t, err)

args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: uuid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -2037,7 +2037,7 @@ func TestRenameBucketNonEmpty(t *testing.T) {

tassert.CheckFatal(t, err)

args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -2153,7 +2153,7 @@ func TestRenameBucketTwice(t *testing.T) {
}

// Wait for rename to complete
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -2465,11 +2465,11 @@ func TestCopyBucket(t *testing.T) {
// TODO -- FIXME: remove/simplify-out this `if` here and elsewhere
if test.evictRemoteSrc {
// wait for TCO idle (different x-kind)
args := xact.ArgsMsg{ID: uuid, Timeout: copyBucketTimeout}
args := xact.ArgsMsg{ID: uuid, Timeout: tools.CopyBucketTimeout}
err := api.WaitForXactionIdle(baseParams, args)
tassert.CheckFatal(t, err)
} else {
args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: copyBucketTimeout}
args := xact.ArgsMsg{ID: uuid, Kind: apc.ActCopyBck, Timeout: tools.CopyBucketTimeout}
_, err := api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)
}
Expand Down Expand Up @@ -2782,7 +2782,7 @@ func TestRenameAndCopyBucket(t *testing.T) {
// Wait for rename to finish
tlog.Logf("Waiting for x-%s[%s] to finish\n", apc.ActMoveBck, xid)
time.Sleep(2 * time.Second)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -2870,7 +2870,7 @@ func TestCopyAndRenameBucket(t *testing.T) {
}

// Wait for copy to complete
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: xid, Kind: apc.ActMoveBck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down
57 changes: 16 additions & 41 deletions ais/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ import (
const rebalanceObjectDistributionTestCoef = 0.3

const (
prefixDir = "filter"
largeFileSize = 4 * cos.MiB
copyBucketTimeout = 3 * time.Minute
rebalanceTimeout = 5 * time.Minute
rebalanceStartTimeout = 10 * time.Second
multiProxyTestTimeout = 3 * time.Minute
prefixDir = "filter"
largeFileSize = 4 * cos.MiB

workerCnt = 10
)
Expand Down Expand Up @@ -572,7 +568,7 @@ func (m *ioContext) stopGets() {
func (m *ioContext) ensureNumCopies(baseParams api.BaseParams, expectedCopies int, greaterOk bool) {
m.t.Helper()
time.Sleep(time.Second)
xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: rebalanceTimeout}
xargs := xact.ArgsMsg{Kind: apc.ActMakeNCopies, Bck: m.bck, Timeout: tools.RebalanceTimeout}
_, err := api.WaitForXactionIC(baseParams, xargs)
tassert.CheckFatal(m.t, err)

Expand Down Expand Up @@ -673,7 +669,7 @@ func ensurePrevRebalanceIsFinished(baseParams api.BaseParams, err error) bool {
}
tlog.Logln("Warning: wait for unfinished rebalance(?)")
time.Sleep(5 * time.Second)
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)
time.Sleep(5 * time.Second)
return true
Expand All @@ -696,41 +692,20 @@ func (m *ioContext) startMaintenanceNoRebalance() *meta.Snode {
return target
}

func (m *ioContext) stopMaintenance(target *meta.Snode) (rebID string) {
const (
timeout = time.Second * 10
interval = time.Millisecond * 10
iterations = int(timeout / interval)
)
var err error
tlog.Logf("Take %s out of maintenance...\n", target.StringEx())
args := &apc.ActValRmNode{DaemonID: target.ID()}
rebID, err = api.StopMaintenance(tools.BaseAPIParams(m.proxyURL), args)
func (m *ioContext) stopMaintenance(target *meta.Snode) string {
tlog.Logf("Take %s out of maintenance mode...\n", target.StringEx())
bp := tools.BaseAPIParams(m.proxyURL)
rebID, err := api.StopMaintenance(bp, &apc.ActValRmNode{DaemonID: target.ID()})
tassert.CheckFatal(m.t, err)
baseParams := tools.BaseAPIParams(target.URL(cmn.NetPublic))
smap := tools.GetClusterMap(m.t, m.proxyURL)
for i := 0; i < iterations; i++ {
time.Sleep(interval)
if _, ok := smap.Tmap[target.ID()]; !ok {
smap = tools.GetClusterMap(m.t, m.proxyURL)
} else {
query := cmn.QueryBcks(m.bck)
baseParams.URL = m.proxyURL
proxyBcks, err := api.ListBuckets(baseParams, query, apc.FltExists)
tassert.CheckFatal(m.t, err)

baseParams.URL = target.URL(cmn.NetPublic)
targetBcks, err := api.ListBuckets(baseParams, query, apc.FltExists)
tassert.CheckFatal(m.t, err)
if proxyBcks.Equal(targetBcks) {
tlog.Logf("%s got updated with the current BMD\n", target.StringEx())
return
}
}
if rebID == "" {
return ""
}
m.t.Fatalf("failed to bring %s out of maintenance: not in the %s and/or did not get updated BMD",
target.StringEx(), smap.StringEx())
return
tassert.Fatalf(m.t, xact.IsValidRebID(rebID), "invalid reb ID %q", rebID)

xargs := xact.ArgsMsg{ID: rebID, Kind: apc.ActRebalance, Timeout: tools.RebalanceStartTimeout}
api.WaitForXactionNode(bp, xargs, xactSnapRunning)

return rebID
}

func (m *ioContext) setNonDefaultBucketProps() {
Expand Down
4 changes: 2 additions & 2 deletions ais/test/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func TestDownloadRemote(t *testing.T) {
tlog.Logf("(1) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck)
xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -553,7 +553,7 @@ func TestDownloadRemote(t *testing.T) {
tlog.Logf("(2) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck)
xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: rebalanceTimeout}
args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, args)
if test.srcBck.Equal(&test.dstBck) {
tassert.CheckFatal(t, err)
Expand Down
5 changes: 3 additions & 2 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,8 @@ func TestDistributedSortKillTargetDuringPhases(t *testing.T) {
allMetrics, err := api.MetricsDSort(df.baseParams, df.managerUUID)
tassert.CheckError(t, err)
if len(allMetrics) == m.originalTargetCount {
t.Errorf("number of metrics %d is same as number of original targets %d", len(allMetrics), m.originalTargetCount)
t.Errorf("number of metrics %d is same as number of original targets %d",
len(allMetrics), m.originalTargetCount)
}

for target, metrics := range allMetrics {
Expand All @@ -1435,7 +1436,7 @@ func TestDistributedSortKillTargetDuringPhases(t *testing.T) {
}

rebID := m.stopMaintenance(target)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, df.baseParams, rebID)
tools.WaitForRebalanceByID(t, df.baseParams, rebID)
},
)
}
Expand Down
28 changes: 14 additions & 14 deletions ais/test/ec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ func TestECEmergencyTargetForSlices(t *testing.T) {
val := &apc.ActValRmNode{DaemonID: removedTarget.ID()}
rebID, err := api.StopMaintenance(baseParams, val)
tassert.CheckError(t, err)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}()

// 3. Read objects
Expand Down Expand Up @@ -1848,7 +1848,7 @@ func TestECEmergencyTargetForReplica(t *testing.T) {
if rebID == "" {
return
}
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}()

hasTarget := func(targets meta.Nodes, target *meta.Snode) bool {
Expand Down Expand Up @@ -2127,9 +2127,9 @@ func ecOnlyRebalance(t *testing.T, o *ecOptions, proxyURL string, bck cmn.Bck) {
tassert.CheckFatal(t, err)
defer func() {
rebID, _ := tools.RestoreTarget(t, proxyURL, removedTarget)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}()
tools.WaitForRebalanceByID(t, -1, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)

newObjList, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
Expand Down Expand Up @@ -2206,8 +2206,8 @@ func TestECBucketEncode(t *testing.T) {
_, err = api.SetBucketProps(baseParams, m.bck, bckPropsToUpate)
tassert.CheckFatal(t, err)

tlog.Logf("EC encode must start automatically for bucket %s\n", m.bck)
xargs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: m.bck, Timeout: rebalanceTimeout}
tlog.Logf("Wait for EC %s\n", m.bck)
xargs := xact.ArgsMsg{Kind: apc.ActECEncode, Bck: m.bck, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, xargs)
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -2281,7 +2281,7 @@ func ecAndRegularRebalance(t *testing.T, o *ecOptions, proxyURL string, bckReg,
args := &apc.ActValRmNode{DaemonID: tgtLost.ID()}
rebID, err := api.StopMaintenance(baseParams, args)
tassert.CheckError(t, err)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}
}()

Expand Down Expand Up @@ -2319,12 +2319,12 @@ func ecAndRegularRebalance(t *testing.T, o *ecOptions, proxyURL string, bckReg,
tlog.Logf("Created %d objects in %s, %d objects in %s. Starting rebalance\n",
len(resECOld.Entries), bckEC, len(resRegOld.Entries), bckReg)

tlog.Logf("Take %s out of maintenance\n", tgtLost.StringEx())
tlog.Logf("Take %s out of maintenance mode ...\n", tgtLost.StringEx())
args = &apc.ActValRmNode{DaemonID: tgtLost.ID()}
rebID, err := api.StopMaintenance(baseParams, args)
tassert.CheckFatal(t, err)
registered = true
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)

tlog.Logln("list objects after rebalance")
resECNew, err := api.ListObjects(baseParams, bckEC, msg, api.ListArgs{})
Expand Down Expand Up @@ -2522,7 +2522,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm
args := &apc.ActValRmNode{DaemonID: tgtLost.ID()}
rebID, err := api.StopMaintenance(baseParams, args)
tassert.CheckError(t, err)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}
}()

Expand All @@ -2547,7 +2547,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm
tassert.CheckFatal(t, err)
tlog.Logf("Created %d objects in %s - starting global rebalance...\n", len(resECOld.Entries), bckEC)

tlog.Logf("Take %s out of maintenance\n", tgtLost.StringEx())
tlog.Logf("Take %s out of maintenance mode ...\n", tgtLost.StringEx())
args = &apc.ActValRmNode{DaemonID: tgtLost.ID()}
_, err = api.StopMaintenance(baseParams, args)
tassert.CheckFatal(t, err)
Expand Down Expand Up @@ -2580,7 +2580,7 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm

err = api.AbortXaction(baseParams, xargs)
tassert.CheckError(t, err)
tools.WaitForRebalAndResil(t, baseParams, rebalanceTimeout)
tools.WaitForRebalAndResil(t, baseParams)
tassert.CheckError(t, err)

tlog.Logf("Put %s in maintenance\n", tgtGone.StringEx())
Expand All @@ -2590,13 +2590,13 @@ func ecAndRegularUnregisterWhileRebalancing(t *testing.T, o *ecOptions, bckEC cm
defer func() {
args = &apc.ActValRmNode{DaemonID: tgtGone.ID()}
rebID, _ := api.StopMaintenance(baseParams, args)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID)
tools.WaitForRebalanceByID(t, baseParams, rebID)
}()

stopCh.Close()

tassert.CheckFatal(t, err)
tools.WaitForRebalanceByID(t, -1 /*orig target cnt*/, baseParams, rebID, rebalanceTimeout)
tools.WaitForRebalanceByID(t, baseParams, rebID)
tlog.Logln("Reading objects")
for _, obj := range resECOld.Entries {
_, err := api.GetObject(baseParams, bckEC, obj.Name, nil)
Expand Down
2 changes: 1 addition & 1 deletion ais/test/etl_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestETLTargetDown(t *testing.T) {
tools.RestoreNode(tcmd, false, "target")
m.waitAndCheckCluState()

args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)

tetl.CheckNoRunningETLContainers(t, baseParams)
Expand Down
8 changes: 4 additions & 4 deletions ais/test/fshc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) {
tassert.CheckFatal(t, err)
tlog.Logf("Wait for rebalance (triggered by %s leaving the cluster after having lost all mountpaths)\n",
target.StringEx())
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)

tlog.Logf("Restoring target %s mountpaths\n", target.ID())
Expand All @@ -533,7 +533,7 @@ func TestFSCheckerTargetDisableAllMountpaths(t *testing.T) {
tassert.CheckFatal(t, err)

tlog.Logf("Wait for rebalance (when target %s that has previously lost all mountpaths joins back)\n", target.StringEx())
args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args = xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)

tools.WaitForResilvering(t, baseParams, nil)
Expand Down Expand Up @@ -600,7 +600,7 @@ func TestFSAddMountpathRestartNode(t *testing.T) {
t.Fatalf("Removed target didn't rejoin")
}
tlog.Logf("Wait for rebalance\n")
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)

// Check if the node has newly added mountpath
Expand Down Expand Up @@ -677,7 +677,7 @@ func TestFSDisableAllExceptOneMountpathRestartNode(t *testing.T) {
tassert.CheckFatal(t, err)
tassert.Fatalf(t, smap.GetTarget(target.ID()) != nil, "removed target didn't rejoin")

args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: rebalanceTimeout}
args := xact.ArgsMsg{Kind: apc.ActRebalance, Timeout: tools.RebalanceTimeout}
_, _ = api.WaitForXactionIC(baseParams, args)

// Check if the the mountpaths are disabled after restart.
Expand Down
5 changes: 1 addition & 4 deletions ais/test/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func unregisteredNodeHealth(t *testing.T, proxyURL string, si *meta.Snode) {
_, err = api.StartMaintenance(baseParams, args)
tassert.CheckFatal(t, err)
targetCount := smapOrig.CountActiveTs()
origTargetCnt := targetCount
proxyCount := smapOrig.CountActivePs()
if si.IsProxy() {
proxyCount--
Expand All @@ -41,9 +40,7 @@ func unregisteredNodeHealth(t *testing.T, proxyURL string, si *meta.Snode) {
_, err = tools.WaitForClusterState(proxyURL, "join node", smapOrig.Version, smapOrig.CountActivePs(),
smapOrig.CountActiveTs())
tassert.CheckFatal(t, err)
if rebID != "" {
tools.WaitForRebalanceByID(t, origTargetCnt, baseParams, rebID)
}
tools.WaitForRebalanceByID(t, baseParams, rebID)
}()

err = api.Health(tools.BaseAPIParams(si.PubNet.URL))
Expand Down
Loading

0 comments on commit 270cdf5

Please sign in to comment.