Skip to content

Commit

Permalink
aisloader: add an option to randomize gateways
Browse files Browse the repository at this point in the history
* `--randomproxy` - select a random ais proxy to execute I/O request
* separately, tools: rand file reader cleanup logic (not used but still)

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 28, 2023
1 parent 301e4ef commit a2902ad
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 20 deletions.
16 changes: 15 additions & 1 deletion bench/tools/aisloader/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/NVIDIA/aistore/api/env"
"github.com/NVIDIA/aistore/bench/tools/aisloader/namegetter"
"github.com/NVIDIA/aistore/bench/tools/aisloader/stats"
"github.com/NVIDIA/aistore/cluster/meta"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
Expand Down Expand Up @@ -124,7 +125,8 @@ type (

duration DurationExt // stop after the run for at least that much

bp api.BaseParams
bp api.BaseParams
smap *meta.Smap

bck cmn.Bck
bProps cmn.BucketProps
Expand Down Expand Up @@ -153,6 +155,7 @@ type (
statsdProbe bool
getLoaderID bool
randomObjName bool
randomProxy bool
uniqueGETs bool
verifyHash bool // verify xxhash during get
getConfig bool // true: load control plane (read proxy config)
Expand Down Expand Up @@ -281,6 +284,7 @@ func parseCmdLine() (params, error) {
BoolExtVar(f, &p.cleanUp, "cleanup", "true: remove bucket upon benchmark termination (mandatory: must be specified)")
f.BoolVar(&p.verifyHash, "verifyhash", false,
"true: checksum-validate GET: recompute object checksums and validate it against the one received with the GET metadata")

f.StringVar(&p.minSizeStr, "minsize", "", "Minimum object size (with or without multiplicative suffix K, MB, GiB, etc.)")
f.StringVar(&p.maxSizeStr, "maxsize", "", "Maximum object size (with or without multiplicative suffix K, MB, GiB, etc.)")
f.StringVar(&p.readerType, "readertype", readers.TypeSG,
Expand Down Expand Up @@ -310,6 +314,8 @@ func parseCmdLine() (params, error) {
"Size (in bits) of the generated aisloader identifier. Cannot be used together with loadernum")
f.BoolVar(&p.randomObjName, "randomname", true,
"true: generate object names of 32 random characters. This option is ignored when loadernum is defined")
f.BoolVar(&p.randomProxy, "randomproxy", false,
"true: select random gateway (\"proxy\") to execute I/O request")
f.StringVar(&p.subDir, "subdir", "", "Virtual destination directory for all aisloader-generated objects")
f.Uint64Var(&p.putShards, "putshards", 0, "Spread generated objects over this many subdirectories (max 100k)")
f.BoolVar(&p.uniqueGETs, "uniquegets", true,
Expand Down Expand Up @@ -571,6 +577,7 @@ func parseCmdLine() (params, error) {
return params{}, fmt.Errorf("unknown scheme %q", scheme)
}

// TODO: validate against cluster map (see api.GetClusterMap below)
p.proxyURL = scheme + "://" + address

transportArgs.UseHTTPS = scheme == "https"
Expand Down Expand Up @@ -726,6 +733,13 @@ func Start(version, buildtime string) (err error) {
}
}

// usage is currently limited to randomizing proxies (to access cluster)
if runParams.randomProxy {
runParams.smap, err = api.GetClusterMap(runParams.bp)
if err != nil {
return fmt.Errorf("failed to get cluster map: %v", err)
}
}
loggedUserToken = authn.LoadToken(runParams.tokenFile)
runParams.bp.Token = loggedUserToken
runParams.bp.UA = ua
Expand Down
43 changes: 33 additions & 10 deletions bench/tools/aisloader/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@
package aisloader

import (
"fmt"
"os"
"path"
"sync"
"time"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/tools/readers"
)

func doPut(wo *workOrder) {
var sgl *memsys.SGL
var (
sgl *memsys.SGL
url = wo.proxyURL
)
if runParams.readerType == readers.TypeSG {
sgl = gmm.NewSGL(wo.size)
wo.sgl = sgl
}

r, err := readers.New(readers.Params{
Type: runParams.readerType,
SGL: sgl,
Expand All @@ -33,27 +37,46 @@ func doPut(wo *workOrder) {
Size: wo.size,
}, wo.cksumType)

if runParams.readerType == readers.TypeFile {
defer os.Remove(path.Join(runParams.tmpDir, wo.objName))
}

if err != nil {
wo.err = err
return
}
if runParams.randomProxy {
psi, err := runParams.smap.GetRandProxy(false /*excl. primary*/)
if err != nil {
fmt.Printf("PUT(wo): %v\n", err)
os.Exit(1)
}
url = psi.URL(cmn.NetPublic)
}
if !traceHTTPSig.Load() {
wo.err = put(wo.proxyURL, wo.bck, wo.objName, r.Cksum(), r)
wo.err = put(url, wo.bck, wo.objName, r.Cksum(), r)
} else {
wo.latencies, wo.err = putWithTrace(wo.proxyURL, wo.bck, wo.objName, r.Cksum(), r)
wo.latencies, wo.err = putWithTrace(url, wo.bck, wo.objName, r.Cksum(), r)
}
if runParams.readerType == readers.TypeFile {
r.Close()
os.Remove(path.Join(runParams.tmpDir, wo.objName))
}
}

func doGet(wo *workOrder) {
var (
url = wo.proxyURL
)
if runParams.randomProxy {
psi, err := runParams.smap.GetRandProxy(false /*excl. primary*/)
if err != nil {
fmt.Printf("GET(wo): %v\n", err)
os.Exit(1)
}
url = psi.URL(cmn.NetPublic)
}
if !traceHTTPSig.Load() {
wo.size, wo.err = getDiscard(wo.proxyURL, wo.bck,
wo.size, wo.err = getDiscard(url, wo.bck,
wo.objName, runParams.verifyHash, runParams.readOff, runParams.readLen)
} else {
wo.size, wo.latencies, wo.err = getTraceDiscard(wo.proxyURL, wo.bck,
wo.size, wo.latencies, wo.err = getTraceDiscard(url, wo.bck,
wo.objName, runParams.verifyHash, runParams.readOff, runParams.readLen)
}
}
Expand Down
9 changes: 6 additions & 3 deletions cluster/meta/smap.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,15 @@ func (m *Smap) GetActiveNode(sid string) (si *Snode) {

// (random active)
func (m *Smap) GetRandTarget() (tsi *Snode, err error) {
var cnt int
for _, tsi = range m.Tmap {
if !tsi.InMaintOrDecomm() {
return
}
cnt++
}
return nil, cmn.NewErrNoNodes(apc.Target, len(m.Tmap))
err = fmt.Errorf("GetRandTarget failure: %s, in maintenance >= %d", m.StringEx(), cnt)
return
}

func (m *Smap) GetRandProxy(excludePrimary bool) (si *Snode, err error) {
Expand All @@ -446,8 +449,8 @@ func (m *Smap) GetRandProxy(excludePrimary bool) (si *Snode, err error) {
return psi, nil
}
}
return nil, fmt.Errorf("failed to find a random proxy (num=%d, in-maintenance=%d, exclude-primary=%t)",
len(m.Pmap), cnt, excludePrimary)
err = fmt.Errorf("GetRandProxy failure: %s, in maintenance >= %d, excl-primary %t", m.StringEx(), cnt, excludePrimary)
return
}

// whether IP is in use by a different node
Expand Down
19 changes: 13 additions & 6 deletions tools/readers/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,35 @@ func NewRandFile(filepath, name string, size int64, cksumType string) (Reader, e
cksumHash *cos.CksumHash
fn = path.Join(filepath, name)
f, err = os.OpenFile(fn, os.O_RDWR|os.O_CREATE, cos.PermRWR)
exists bool
)
if err != nil {
return nil, err
}
if size == -1 {
// Assuming that the file already exists and contains data.
// checksum existing file
exists = true
if cksumType != cos.ChecksumNone {
debug.Assert(cksumType != "")
_, cksumHash, err = cos.CopyAndChecksum(io.Discard, f, nil, cksumType)
}
} else {
// Populate the file with random data.
// Write random file
cksumHash, err = copyRandWithHash(f, size, cksumType, cos.NowRand())
}
if err != nil {
f.Close()
return nil, err
if err == nil {
_, err = f.Seek(0, io.SeekStart)
}
if _, err := f.Seek(0, io.SeekStart); err != nil {

if err != nil {
// cleanup and ret
f.Close()
if !exists {
os.Remove(fn)
}
return nil, err
}

if cksumType != cos.ChecksumNone {
cksum = cksumHash.Clone()
}
Expand Down

0 comments on commit a2902ad

Please sign in to comment.