From a2902adbb7503ca756b1d590928626697f39e02d Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Mon, 28 Aug 2023 10:24:27 -0400 Subject: [PATCH] aisloader: add an option to randomize gateways * `--randomproxy` - select a random ais proxy to execute I/O request * separately, tools: rand file reader cleanup logic (not used but still) Signed-off-by: Alex Aizman --- bench/tools/aisloader/bootstrap.go | 16 ++++++++++- bench/tools/aisloader/worker.go | 43 +++++++++++++++++++++++------- cluster/meta/smap.go | 9 ++++--- tools/readers/readers.go | 19 ++++++++----- 4 files changed, 67 insertions(+), 20 deletions(-) diff --git a/bench/tools/aisloader/bootstrap.go b/bench/tools/aisloader/bootstrap.go index d7d89bea11..b8f2e91dec 100644 --- a/bench/tools/aisloader/bootstrap.go +++ b/bench/tools/aisloader/bootstrap.go @@ -59,6 +59,7 @@ import ( "github.com/NVIDIA/aistore/api/env" "github.com/NVIDIA/aistore/bench/tools/aisloader/namegetter" "github.com/NVIDIA/aistore/bench/tools/aisloader/stats" + "github.com/NVIDIA/aistore/cluster/meta" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/cmn/cos" @@ -124,7 +125,8 @@ type ( duration DurationExt // stop after the run for at least that much - bp api.BaseParams + bp api.BaseParams + smap *meta.Smap bck cmn.Bck bProps cmn.BucketProps @@ -153,6 +155,7 @@ type ( statsdProbe bool getLoaderID bool randomObjName bool + randomProxy bool uniqueGETs bool verifyHash bool // verify xxhash during get getConfig bool // true: load control plane (read proxy config) @@ -281,6 +284,7 @@ func parseCmdLine() (params, error) { BoolExtVar(f, &p.cleanUp, "cleanup", "true: remove bucket upon benchmark termination (mandatory: must be specified)") f.BoolVar(&p.verifyHash, "verifyhash", false, "true: checksum-validate GET: recompute object checksums and validate it against the one received with the GET metadata") + f.StringVar(&p.minSizeStr, "minsize", "", "Minimum object size (with or without multiplicative suffix K, MB, GiB, etc.)") f.StringVar(&p.maxSizeStr, "maxsize", "", "Maximum object size (with or without multiplicative suffix K, MB, GiB, etc.)") f.StringVar(&p.readerType, "readertype", readers.TypeSG, @@ -310,6 +314,8 @@ func parseCmdLine() (params, error) { "Size (in bits) of the generated aisloader identifier. Cannot be used together with loadernum") f.BoolVar(&p.randomObjName, "randomname", true, "true: generate object names of 32 random characters. This option is ignored when loadernum is defined") + f.BoolVar(&p.randomProxy, "randomproxy", false, + "true: select random gateway (\"proxy\") to execute I/O request") f.StringVar(&p.subDir, "subdir", "", "Virtual destination directory for all aisloader-generated objects") f.Uint64Var(&p.putShards, "putshards", 0, "Spread generated objects over this many subdirectories (max 100k)") f.BoolVar(&p.uniqueGETs, "uniquegets", true, @@ -571,6 +577,7 @@ func parseCmdLine() (params, error) { return params{}, fmt.Errorf("unknown scheme %q", scheme) } + // TODO: validate against cluster map (see api.GetClusterMap below) p.proxyURL = scheme + "://" + address transportArgs.UseHTTPS = scheme == "https" @@ -726,6 +733,13 @@ func Start(version, buildtime string) (err error) { } } + // usage is currently limited to randomizing proxies (to access cluster) + if runParams.randomProxy { + runParams.smap, err = api.GetClusterMap(runParams.bp) + if err != nil { + return fmt.Errorf("failed to get cluster map: %v", err) + } + } loggedUserToken = authn.LoadToken(runParams.tokenFile) runParams.bp.Token = loggedUserToken runParams.bp.UA = ua diff --git a/bench/tools/aisloader/worker.go b/bench/tools/aisloader/worker.go index aec398876d..21a25372e3 100644 --- a/bench/tools/aisloader/worker.go +++ b/bench/tools/aisloader/worker.go @@ -8,23 +8,27 @@ package aisloader import ( + "fmt" "os" "path" "sync" "time" + "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/memsys" "github.com/NVIDIA/aistore/tools/readers" ) func doPut(wo *workOrder) { - var sgl *memsys.SGL + var ( + sgl *memsys.SGL + url = wo.proxyURL + ) if runParams.readerType == readers.TypeSG { sgl = gmm.NewSGL(wo.size) wo.sgl = sgl } - r, err := readers.New(readers.Params{ Type: runParams.readerType, SGL: sgl, @@ -33,27 +37,46 @@ func doPut(wo *workOrder) { Size: wo.size, }, wo.cksumType) - if runParams.readerType == readers.TypeFile { - defer os.Remove(path.Join(runParams.tmpDir, wo.objName)) - } - if err != nil { wo.err = err return } + if runParams.randomProxy { + psi, err := runParams.smap.GetRandProxy(false /*excl. primary*/) + if err != nil { + fmt.Printf("PUT(wo): %v\n", err) + os.Exit(1) + } + url = psi.URL(cmn.NetPublic) + } if !traceHTTPSig.Load() { - wo.err = put(wo.proxyURL, wo.bck, wo.objName, r.Cksum(), r) + wo.err = put(url, wo.bck, wo.objName, r.Cksum(), r) } else { - wo.latencies, wo.err = putWithTrace(wo.proxyURL, wo.bck, wo.objName, r.Cksum(), r) + wo.latencies, wo.err = putWithTrace(url, wo.bck, wo.objName, r.Cksum(), r) + } + if runParams.readerType == readers.TypeFile { + r.Close() + os.Remove(path.Join(runParams.tmpDir, wo.objName)) } } func doGet(wo *workOrder) { + var ( + url = wo.proxyURL + ) + if runParams.randomProxy { + psi, err := runParams.smap.GetRandProxy(false /*excl. primary*/) + if err != nil { + fmt.Printf("GET(wo): %v\n", err) + os.Exit(1) + } + url = psi.URL(cmn.NetPublic) + } if !traceHTTPSig.Load() { - wo.size, wo.err = getDiscard(wo.proxyURL, wo.bck, + wo.size, wo.err = getDiscard(url, wo.bck, wo.objName, runParams.verifyHash, runParams.readOff, runParams.readLen) } else { - wo.size, wo.latencies, wo.err = getTraceDiscard(wo.proxyURL, wo.bck, + wo.size, wo.latencies, wo.err = getTraceDiscard(url, wo.bck, wo.objName, runParams.verifyHash, runParams.readOff, runParams.readLen) } } diff --git a/cluster/meta/smap.go b/cluster/meta/smap.go index 7ad9e4dd0d..16edd2822e 100644 --- a/cluster/meta/smap.go +++ b/cluster/meta/smap.go @@ -427,12 +427,15 @@ func (m *Smap) GetActiveNode(sid string) (si *Snode) { // (random active) func (m *Smap) GetRandTarget() (tsi *Snode, err error) { + var cnt int for _, tsi = range m.Tmap { if !tsi.InMaintOrDecomm() { return } + cnt++ } - return nil, cmn.NewErrNoNodes(apc.Target, len(m.Tmap)) + err = fmt.Errorf("GetRandTarget failure: %s, in maintenance >= %d", m.StringEx(), cnt) + return } func (m *Smap) GetRandProxy(excludePrimary bool) (si *Snode, err error) { @@ -446,8 +449,8 @@ func (m *Smap) GetRandProxy(excludePrimary bool) (si *Snode, err error) { return psi, nil } } - return nil, fmt.Errorf("failed to find a random proxy (num=%d, in-maintenance=%d, exclude-primary=%t)", - len(m.Pmap), cnt, excludePrimary) + err = fmt.Errorf("GetRandProxy failure: %s, in maintenance >= %d, excl-primary %t", m.StringEx(), cnt, excludePrimary) + return } // whether IP is in use by a different node diff --git a/tools/readers/readers.go b/tools/readers/readers.go index 2d38b88957..23870262d2 100644 --- a/tools/readers/readers.go +++ b/tools/readers/readers.go @@ -213,28 +213,35 @@ func NewRandFile(filepath, name string, size int64, cksumType string) (Reader, e cksumHash *cos.CksumHash fn = path.Join(filepath, name) f, err = os.OpenFile(fn, os.O_RDWR|os.O_CREATE, cos.PermRWR) + exists bool ) if err != nil { return nil, err } if size == -1 { - // Assuming that the file already exists and contains data. + // checksum existing file + exists = true if cksumType != cos.ChecksumNone { debug.Assert(cksumType != "") _, cksumHash, err = cos.CopyAndChecksum(io.Discard, f, nil, cksumType) } } else { - // Populate the file with random data. + // Write random file cksumHash, err = copyRandWithHash(f, size, cksumType, cos.NowRand()) } - if err != nil { - f.Close() - return nil, err + if err == nil { + _, err = f.Seek(0, io.SeekStart) } - if _, err := f.Seek(0, io.SeekStart); err != nil { + + if err != nil { + // cleanup and ret f.Close() + if !exists { + os.Remove(fn) + } return nil, err } + if cksumType != cos.ChecksumNone { cksum = cksumHash.Clone() }