diff --git a/ais/prxdl.go b/ais/prxdl.go index 5d121d7d26..b753bb5dba 100644 --- a/ais/prxdl.go +++ b/ais/prxdl.go @@ -124,15 +124,14 @@ func (p *proxy) httpdlpost(w http.ResponseWriter, r *http.Request) { } func (p *proxy) dladm(method, path string, msg *dload.AdminBody) ([]byte, int, error) { + config := cmn.GCO.Get() if msg.ID != "" && method == http.MethodGet && msg.OnlyActive { nl := p.notifs.entry(msg.ID) if nl != nil { - return p.dlstatus(nl) + return p.dlstatus(nl, config) } } - var ( - config = cmn.GCO.Get() body = cos.MustMarshal(msg) args = allocBcArgs() xid = cos.GenUUID() @@ -221,9 +220,9 @@ func (p *proxy) dladm(method, path string, msg *dload.AdminBody) ([]byte, int, e } } -func (p *proxy) dlstatus(nl nl.Listener) ([]byte, int, error) { +func (p *proxy) dlstatus(nl nl.Listener, config *cmn.Config) ([]byte, int, error) { // bcast - p.notifs.bcastGetStats(nl, cmn.GCO.Get().Periodic.NotifTime.D()) + p.notifs.bcastGetStats(nl, config.Periodic.NotifTime.D()) stats := nl.NodeStats() var resp *dload.StatusResp @@ -248,24 +247,28 @@ func (p *proxy) dlstatus(nl nl.Listener) ([]byte, int, error) { } func (p *proxy) dlstart(r *http.Request, xid, jobID string, body []byte) (errCode int, err error) { - query := make(url.Values, 2) + var ( + config = cmn.GCO.Get() + query = make(url.Values, 2) + args = allocBcArgs() + ) query.Set(apc.QparamUUID, xid) query.Set(apc.QparamJobID, jobID) - args := allocBcArgs() args.req = cmn.HreqArgs{Method: http.MethodPost, Path: r.URL.Path, Body: body, Query: query} - config := cmn.GCO.Get() args.timeout = config.Timeout.MaxHostBusy.D() + results := p.bcastGroup(args) - defer freeBcastRes(results) freeBcArgs(args) + + errCode = http.StatusOK for _, res := range results { - if res.err == nil { - continue + if res.err != nil { + errCode, err = res.status, res.err + break } - errCode, err = res.status, res.err - return } - return http.StatusOK, nil + freeBcastRes(results) + return } func (p *proxy) validateDownload(w http.ResponseWriter, r *http.Request, body []byte) (dlb dload.Body, dlBase dload.Base, ok bool) { diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index 695a668729..cdf6fd178f 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -38,7 +38,7 @@ import ( ) const ( - dsortDescAllPrefix = dsort.DSortName + "-test-integration" + dsortDescAllPrefix = apc.ActDsort + "-test-integration" scopeConfig = "config" scopeSpec = "spec" @@ -503,10 +503,10 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC case cmn.IgnoreReaction: for target, metrics := range allMetrics { if len(metrics.Warnings) != 0 { - df.m.t.Errorf("%s: target %q has %s warnings: %s", df.job(), target, dsort.DSortName, metrics.Warnings) + df.m.t.Errorf("%s: target %q has %s warnings: %s", df.job(), target, apc.ActDsort, metrics.Warnings) } if len(metrics.Errors) != 0 { - df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, dsort.DSortName, metrics.Errors) + df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, apc.ActDsort, metrics.Errors) } } case cmn.WarnReaction: @@ -515,7 +515,7 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC totalWarnings += len(metrics.Warnings) if len(metrics.Errors) != 0 { - df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, dsort.DSortName, metrics.Errors) + df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, apc.ActDsort, metrics.Errors) } } @@ -526,7 +526,7 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC totalErrors := 0 for target, metrics := range allMetrics { if !metrics.Aborted.Load() { - df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), dsort.DSortName, target) + df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), apc.ActDsort, target) } totalErrors += len(metrics.Errors) } @@ -580,9 +580,9 @@ func (df *dsortFramework) checkMetrics(expectAbort bool) map[string]*dsort.Metri } for target, metrics := range allMetrics { if expectAbort && !metrics.Aborted.Load() { - df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), dsort.DSortName, target) + df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), apc.ActDsort, target) } else if !expectAbort && metrics.Aborted.Load() { - df.m.t.Errorf("%s: %s was aborted by target: %s", df.job(), dsort.DSortName, target) + df.m.t.Errorf("%s: %s was aborted by target: %s", df.job(), apc.ActDsort, target) } } return allMetrics @@ -1307,7 +1307,7 @@ func TestDsortContent(t *testing.T) { aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) if entry.missingKeys && !aborted { - t.Errorf("%s was not aborted", dsort.DSortName) + t.Errorf("%s was not aborted", apc.ActDsort) } tlog.Logf("%s: checking metrics\n", df.job()) @@ -1320,7 +1320,7 @@ func TestDsortContent(t *testing.T) { for target, metrics := range allMetrics { if entry.missingKeys && !metrics.Aborted.Load() { - t.Errorf("%s was not aborted by target: %s", target, dsort.DSortName) + t.Errorf("%s was not aborted by target: %s", target, apc.ActDsort) } } @@ -1456,7 +1456,7 @@ func TestDsortKillTargetDuringPhases(t *testing.T) { aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckError(t, err) if !aborted { - t.Errorf("%s was not aborted", dsort.DSortName) + t.Errorf("%s was not aborted", apc.ActDsort) } tlog.Logf("%s: checking metrics\n", df.job()) @@ -1469,7 +1469,7 @@ func TestDsortKillTargetDuringPhases(t *testing.T) { for target, metrics := range allMetrics { if !metrics.Aborted.Load() { - t.Errorf("%s was not aborted by target: %s", dsort.DSortName, target) + t.Errorf("%s was not aborted by target: %s", apc.ActDsort, target) } } @@ -1623,7 +1623,7 @@ func TestDsortAddTarget(t *testing.T) { aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) if !aborted { - t.Errorf("%s was not aborted", dsort.DSortName) + t.Errorf("%s was not aborted", apc.ActDsort) } tlog.Logf("%s: checking metrics\n", df.job()) @@ -1631,7 +1631,7 @@ func TestDsortAddTarget(t *testing.T) { tassert.CheckFatal(t, err) if len(allMetrics) != m.originalTargetCount-1 { t.Errorf("number of metrics %d is different than number of targets when %s started %d", - len(allMetrics), dsort.DSortName, m.originalTargetCount-1) + len(allMetrics), apc.ActDsort, m.originalTargetCount-1) } }, ) diff --git a/api/apc/actmsg.go b/api/apc/actmsg.go index 52d51c4e93..e4f24d809a 100644 --- a/api/apc/actmsg.go +++ b/api/apc/actmsg.go @@ -14,38 +14,50 @@ import ( // ActMsg.Action // includes Xaction.Kind == ActMsg.Action (when the action is asynchronous) const ( - ActCreateBck = "create-bck" // NOTE: compare w/ ActAddRemoteBck below - ActDestroyBck = "destroy-bck" // destroy bucket data and metadata - ActSummaryBck = "summary-bck" - ActCopyBck = "copy-bck" - ActDownload = "download" - ActECEncode = "ec-encode" // erasure code a bucket - ActECGet = "ec-get" // erasure decode objects - ActECPut = "ec-put" // erasure encode objects - ActECRespond = "ec-resp" // respond to other targets' EC requests - ActETLInline = "etl-inline" - ActETLBck = "etl-bck" - ActElection = "election" + ActCreateBck = "create-bck" // NOTE: compare w/ ActAddRemoteBck below + ActDestroyBck = "destroy-bck" // destroy bucket data and metadata + ActSetBprops = "set-bprops" + ActResetBprops = "reset-bprops" + + ActSummaryBck = "summary-bck" + + ActECEncode = "ec-encode" // erasure code a bucket + ActECGet = "ec-get" // erasure decode objects + ActECPut = "ec-put" // erasure encode objects + ActECRespond = "ec-resp" // respond to other targets' EC requests + + ActCopyBck = "copy-bck" + ActETLBck = "etl-bck" + + ActETLInline = "etl-inline" + + ActDsort = "dsort" + ActDownload = "download" + + ActMakeNCopies = "make-n-copies" + ActPutCopies = "put-copies" + + ActRebalance = "rebalance" + ActMoveBck = "move-bck" + + ActResilver = "resilver" + + ActElection = "election" + + ActLRU = "lru" + ActStoreCleanup = "cleanup-store" + ActEvictRemoteBck = "evict-remote-bck" // evict remote bucket's data ActInvalListCache = "inval-listobj-cache" - ActLRU = "lru" ActList = "list" ActLoadLomCache = "load-lom-cache" - ActMakeNCopies = "make-n-copies" - ActMoveBck = "move-bck" ActNewPrimary = "new-primary" ActPromote = "promote" - ActPutCopies = "put-copies" - ActRebalance = "rebalance" ActRenameObject = "rename-obj" - ActResetStats = "reset-stats" - ActResetBprops = "reset-bprops" - ActResetConfig = "reset-config" - ActResilver = "resilver" - ActResyncBprops = "resync-bprops" - ActSetBprops = "set-bprops" - ActSetConfig = "set-config" - ActStoreCleanup = "cleanup-store" + + ActResetStats = "reset-stats" + ActResetConfig = "reset-config" + ActSetConfig = "set-config" ActShutdownCluster = "shutdown" // see also: ActShutdownNode diff --git a/cmd/aisfs/go.mod b/cmd/aisfs/go.mod index e85baa697a..d014711716 100644 --- a/cmd/aisfs/go.mod +++ b/cmd/aisfs/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c + github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da github.com/OneOfOne/xxhash v1.2.8 github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f github.com/jacobsa/fuse v0.0.0-20230124164109-5e0f2e6b432b diff --git a/cmd/aisfs/go.sum b/cmd/aisfs/go.sum index e41d7a2f61..387a0f8d44 100644 --- a/cmd/aisfs/go.sum +++ b/cmd/aisfs/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c h1:3BKIXPlLMMqAlW3/xs5CRlJyv/YFslZ+rchC1vEopTQ= -github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da h1:e7r6/FN/yV2oU6jXt/cvK2x/yPRrIp/42yf5xwkofV8= +github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/cmd/cli/cli/completions.go b/cmd/cli/cli/completions.go index 73dd0b94ab..1c51548b40 100644 --- a/cmd/cli/cli/completions.go +++ b/cmd/cli/cli/completions.go @@ -507,7 +507,7 @@ func runningJobCompletions(c *cli.Context) { case 0: // 1. NAME if flagIsSet(c, allJobsFlag) { names := xact.ListDisplayNames(false /*only-startable*/) - names = append(names, dsort.DSortName) + names = append(names, apc.ActDsort) sort.Strings(names) fmt.Println(strings.Join(names, " ")) return diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index 5c36a2823e..d70fcb6b10 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -14,7 +14,6 @@ import ( "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/ext/dload" - "github.com/NVIDIA/aistore/ext/dsort" "github.com/urfave/cli" ) @@ -90,14 +89,13 @@ const ( commandPrefetch = "prefetch" // apc.ActPrefetchObjects cmdDownload = apc.ActDownload + cmdDsort = apc.ActDsort cmdRebalance = apc.ActRebalance cmdLRU = apc.ActLRU cmdStgCleanup = "cleanup" // display name for apc.ActStoreCleanup cmdStgValidate = "validate" cmdSummary = "summary" // ditto apc.ActSummaryBck - cmdDsort = dsort.DSortName - cmdCluster = commandCluster cmdNode = "node" cmdPrimary = "set-primary" @@ -456,10 +454,15 @@ var ( noHeaderFlag = cli.BoolFlag{Name: "no-headers,H", Usage: "display tables without headers"} noFooterFlag = cli.BoolFlag{Name: "no-footers", Usage: "display tables without footers"} - progressFlag = cli.BoolFlag{Name: "progress", Usage: "show progress bar(s) and progress of execution in real time"} - dryRunFlag = cli.BoolFlag{Name: "dry-run", Usage: "preview the results without really running the action"} - verboseFlag = cli.BoolFlag{Name: "verbose,v", Usage: "verbose"} - nonverboseFlag = cli.BoolFlag{Name: "non-verbose,nv", Usage: "non-verbose"} + progressFlag = cli.BoolFlag{Name: "progress", Usage: "show progress bar(s) and progress of execution in real time"} + dryRunFlag = cli.BoolFlag{Name: "dry-run", Usage: "preview the results without really running the action"} + + verboseFlag = cli.BoolFlag{Name: "verbose,v", Usage: "verbose output"} + nonverboseFlag = cli.BoolFlag{Name: "non-verbose,nv", Usage: "non-verbose output"} + verboseJobFlag = cli.BoolFlag{ + Name: verboseFlag.Name, + Usage: "show extended statistics", + } averageSizeFlag = cli.BoolFlag{Name: "average-size", Usage: "show average GET, PUT, etc. request size"} diff --git a/cmd/cli/cli/dsort.go b/cmd/cli/cli/dsort.go index 0eee6bdb65..0d80308524 100644 --- a/cmd/cli/cli/dsort.go +++ b/cmd/cli/cli/dsort.go @@ -82,7 +82,7 @@ type ( var dsortStartCmd = cli.Command{ Name: cmdDsort, - Usage: "start " + dsort.DSortName + " job\n" + + Usage: "start " + apc.ActDsort + " job\n" + indent1 + "Required parameters:\n" + indent1 + "\t- input_bck: source bucket (used as both source and destination if the latter is not specified)\n" + indent1 + "\t- input_format: see docs and examples below\n" + @@ -293,7 +293,7 @@ func setupBucket(c *cli.Context, bck cmn.Bck) error { func (d dsortResult) String() string { if d.aborted { - return dsort.DSortName + " job was aborted" + return apc.ActDsort + " job was aborted" } var sb strings.Builder diff --git a/cmd/cli/cli/job_hdlr.go b/cmd/cli/cli/job_hdlr.go index 2f0a2f8dee..88a321199b 100644 --- a/cmd/cli/cli/job_hdlr.go +++ b/cmd/cli/cli/job_hdlr.go @@ -1192,7 +1192,7 @@ func jobArgs(c *cli.Context, shift int, ignoreDaemonID bool) (name, xid, daemonI daemonID = c.Args().Get(shift + 2) // validate and reassign - if name != "" && name != dsort.DSortName { + if name != "" && name != apc.ActDsort { if xactKind, _ := xact.GetKindName(name); xactKind == "" { daemonID = xid xid = name diff --git a/cmd/cli/cli/show_hdlr.go b/cmd/cli/cli/show_hdlr.go index 4d895f34eb..ad613486d0 100644 --- a/cmd/cli/cli/show_hdlr.go +++ b/cmd/cli/cli/show_hdlr.go @@ -22,7 +22,6 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" - "github.com/NVIDIA/aistore/ext/dsort" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/xact" "github.com/urfave/cli" @@ -49,7 +48,7 @@ var ( allJobsFlag, regexJobsFlag, noHeaderFlag, - verboseFlag, + verboseJobFlag, unitsFlag, // download and dsort only progressFlag, @@ -247,7 +246,6 @@ func showJobsDo(c *cli.Context, name, xid, daemonID string, bck cmn.Bck) (int, e err error ) names := xact.ListDisplayNames(false /*only-startable*/) - names = append(names, dsort.DSortName) // NOTE: dsort isn't an x (the only exception) sort.Strings(names) for _, name = range names { l, errV := _showJobs(c, name, "" /*xid*/, daemonID, bck, true) @@ -261,19 +259,30 @@ func showJobsDo(c *cli.Context, name, xid, daemonID string, bck cmn.Bck) (int, e } func jobCptn(c *cli.Context, name string, onlyActive bool, xid string, byTarget bool) { + var ( + s, tip string + ) + if !flagIsSet(c, verboseJobFlag) { + // xactions that have extended stats + var extended bool + if _, dtor, err := xact.GetDescriptor(name); err == nil { + extended = dtor.ExtendedStats + } + if extended { + tip = fmt.Sprintf(" (hint: use %s to include extended stats)", qflprn(verboseJobFlag)) + } + } if xid != "" { - actionCptn(c, jobName(name, xid), "") + actionCptn(c, jobName(name, xid), tip) return } - - var s string if byTarget { s = " by target" } if onlyActive { - actionCptn(c, name, " jobs"+s+":") + actionCptn(c, name, " jobs"+s+tip) } else { - actionCptn(c, name, " jobs"+s+" (including finished):") + actionCptn(c, name, " jobs"+s+" including finished"+tip) } } @@ -281,10 +290,14 @@ func _showJobs(c *cli.Context, name, xid, daemonID string, bck cmn.Bck, caption switch name { case cmdDownload: return showDownloads(c, xid, caption) - case cmdDsort: - return showDsorts(c, xid, caption) case commandETL: return showETLs(c, xid, caption) + case cmdDsort: + l, err := showDsorts(c, xid, caption) + if l == 0 || err != nil || !flagIsSet(c, verboseJobFlag) { + return l, err + } + fallthrough default: var ( // finished or not, always try to show when xid provided @@ -481,14 +494,16 @@ func xlistByKindID(c *cli.Context, xargs xact.ArgsMsg, caption bool, xs xact.Mul switch xargs.Kind { case apc.ActECGet: if hideHeader { - return l, teb.Print(dts, teb.XactECGetNoHdrTmpl, opts) + err = teb.Print(dts, teb.XactECGetNoHdrTmpl, opts) + } else { + err = teb.Print(dts, teb.XactECGetTmpl, opts) } - return l, teb.Print(dts, teb.XactECGetTmpl, opts) case apc.ActECPut: if hideHeader { - return l, teb.Print(dts, teb.XactECPutNoHdrTmpl, opts) + err = teb.Print(dts, teb.XactECPutNoHdrTmpl, opts) + } else { + err = teb.Print(dts, teb.XactECPutTmpl, opts) } - return l, teb.Print(dts, teb.XactECPutTmpl, opts) default: switch { case fromToBck && hideHeader: @@ -507,7 +522,7 @@ func xlistByKindID(c *cli.Context, xargs xact.ArgsMsg, caption bool, xs xact.Mul } } } - if err != nil || !flagIsSet(c, verboseFlag) { + if err != nil || !flagIsSet(c, verboseJobFlag) { return l, err } diff --git a/cmd/cli/cli/xact.go b/cmd/cli/cli/xact.go index a6ea851340..9c395115bb 100644 --- a/cmd/cli/cli/xact.go +++ b/cmd/cli/cli/xact.go @@ -167,8 +167,14 @@ func flattenXactStats(snap *cluster.Snap, units string) nvpairList { value = cos.ToSizeIEC(i, 2) } } - if value == "" { - value = fmt.Sprintf("%v", v) + if value == "" { // not ".size" + if mapVal, ok := v.(map[string]any); ok { + vv, err := jsonMarshalIndent(mapVal) + debug.AssertNoErr(err) + value = string(vv) + } else { + value = fmt.Sprintf("%v", v) + } } props = append(props, nvpair{Name: k, Value: value}) } diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index c0b044341f..74777b1e76 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c + github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da github.com/fatih/color v1.14.1 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 6ee6e410d9..fc2724bfc3 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c h1:3BKIXPlLMMqAlW3/xs5CRlJyv/YFslZ+rchC1vEopTQ= -github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da h1:e7r6/FN/yV2oU6jXt/cvK2x/yPRrIp/42yf5xwkofV8= +github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index 1eae8c9e2f..786fc39df4 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -40,8 +40,6 @@ import ( "golang.org/x/sync/errgroup" ) -const DSortName = "dsort" - const PrefixJobID = "srt-" type ( diff --git a/ext/dsort/dsort_general.go b/ext/dsort/dsort_general.go index c14b4782a8..6b1051a31b 100644 --- a/ext/dsort/dsort_general.go +++ b/ext/dsort/dsort_general.go @@ -11,6 +11,7 @@ import ( "os" "sync" + "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cluster" "github.com/NVIDIA/aistore/cluster/meta" "github.com/NVIDIA/aistore/cmn" @@ -470,7 +471,7 @@ func (ds *dsorterGeneral) recvReq(hdr transport.ObjHdr, objReader io.Reader, err transport.FreeRecv(objReader) req := remoteRequest{} if err := jsoniter.Unmarshal(hdr.Opaque, &req); err != nil { - err := fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "recv request", cos.BHead(hdr.Opaque), err) + err := fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "recv request", cos.BHead(hdr.Opaque), err) ds.m.abort(err) return err } diff --git a/ext/dsort/error.go b/ext/dsort/error.go index 005fe1ee1c..7ca4a14345 100644 --- a/ext/dsort/error.go +++ b/ext/dsort/error.go @@ -7,11 +7,12 @@ package dsort import ( "fmt" + "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" ) func newDSortAbortedError(managerUUID string) *cmn.ErrAborted { - return cmn.NewErrAborted(fmt.Sprintf("%s[%s]", DSortName, managerUUID), "", nil) + return cmn.NewErrAborted(fmt.Sprintf("%s[%s]", apc.ActDsort, managerUUID), "", nil) } // Returns if the error is not abort error - in other cases we need to report diff --git a/ext/dsort/handler.go b/ext/dsort/handler.go index 56e734f168..96ff78e44a 100644 --- a/ext/dsort/handler.go +++ b/ext/dsort/handler.go @@ -23,6 +23,7 @@ import ( "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/stats" "github.com/NVIDIA/aistore/sys" + "github.com/NVIDIA/aistore/xact/xreg" jsoniter "github.com/json-iterator/go" "github.com/tinylib/msgp/msgp" ) @@ -213,7 +214,7 @@ func pmetricsHandler(w http.ResponseWriter, r *http.Request, query url.Values) { } if notFound == len(responses) && notFound > 0 { - msg := fmt.Sprintf("%s job %q not found", DSortName, managerUUID) + msg := fmt.Sprintf("%s job %q not found", apc.ActDsort, managerUUID) cmn.WriteErrMsg(w, r, msg, http.StatusNotFound) return } @@ -255,7 +256,7 @@ func PabortHandler(w http.ResponseWriter, r *http.Request) { } } if allNotFound { - err := cos.NewErrNotFound("%s job %q", DSortName, managerUUID) + err := cos.NewErrNotFound("%s job %q", apc.ActDsort, managerUUID) cmn.WriteErr(w, r, err, http.StatusNotFound) return } @@ -297,7 +298,7 @@ func PremoveHandler(w http.ResponseWriter, r *http.Request) { } if !metrics.Archived.Load() { cmn.WriteErrMsg(w, r, fmt.Sprintf("%s process %s still in progress and cannot be removed", - DSortName, managerUUID)) + apc.ActDsort, managerUUID)) return } seenOne = true @@ -458,7 +459,7 @@ func tinitHandler(w http.ResponseWriter, r *http.Request) { return } if err = js.Unmarshal(b, &pars); err != nil { - err := fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "parsedReqSpec", cos.BHead(b), err) + err := fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "parsedReqSpec", cos.BHead(b), err) cmn.WriteErr(w, r, err) return } @@ -471,6 +472,16 @@ func tinitHandler(w http.ResponseWriter, r *http.Request) { } if err = m.init(pars); err != nil { cmn.WriteErr(w, r, err) + } else { + // setup xaction + debug.Assert(!pars.OutputBck.IsEmpty()) + custom := &xreg.DsortArgs{BckFrom: meta.CloneBck(&pars.InputBck), BckTo: meta.CloneBck(&pars.OutputBck)} + rns := xreg.RenewDsort(managerUUID, custom) + debug.AssertNoErr(rns.Err) + xctn := rns.Entry.Get() + debug.Assert(xctn.ID() == managerUUID, xctn.ID()+" vs "+managerUUID) + + m.xctn = xctn.(*xaction) } m.unlock() } @@ -552,11 +563,11 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque } if !m.inProgress() { - cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process in progress", DSortName)) + cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process in progress", apc.ActDsort)) return } if m.aborted() { - cmn.WriteErrMsg(w, r, fmt.Sprintf("%s process was aborted", DSortName)) + cmn.WriteErrMsg(w, r, fmt.Sprintf("%s process was aborted", apc.ActDsort)) return } @@ -567,13 +578,13 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque defer slab.Free(buf) if err := tmpMetadata.DecodeMsg(msgp.NewReaderBuf(r.Body, buf)); err != nil { - err = fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "creation phase metadata", "-", err) + err = fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "creation phase metadata", "-", err) cmn.WriteErr(w, r, err, http.StatusInternalServerError) return } if !m.inProgress() || m.aborted() { - cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process", DSortName)) + cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process", apc.ActDsort)) return } @@ -600,11 +611,11 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ return } if !m.inProgress() { - cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process in progress", DSortName)) + cmn.WriteErrMsg(w, r, fmt.Sprintf("no %s process in progress", apc.ActDsort)) return } if m.aborted() { - cmn.WriteErrMsg(w, r, fmt.Sprintf("%s process was aborted", DSortName)) + cmn.WriteErrMsg(w, r, fmt.Sprintf("%s process was aborted", apc.ActDsort)) return } @@ -638,7 +649,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ defer slab.Free(buf) if err := records.DecodeMsg(msgp.NewReaderBuf(r.Body, buf)); err != nil { - err = fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "records", "-", err) + err = fmt.Errorf(cmn.FmtErrUnmarshal, apc.ActDsort, "records", "-", err) cmn.WriteErr(w, r, err, http.StatusInternalServerError) return } @@ -675,12 +686,12 @@ func tabortHandler(w http.ResponseWriter, r *http.Request) { return } if m.Metrics.Archived.Load() { - s := fmt.Sprintf("invalid request: %s job %q has already finished", DSortName, managerUUID) + s := fmt.Sprintf("invalid request: %s job %q has already finished", apc.ActDsort, managerUUID) cmn.WriteErrMsg(w, r, s, http.StatusGone) return } - m.abort(fmt.Errorf("%s has been aborted via API (remotely)", DSortName)) + m.abort(fmt.Errorf("%s has been aborted via API (remotely)", apc.ActDsort)) } func tremoveHandler(w http.ResponseWriter, r *http.Request) { diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index 072d1f3a1f..9e3eb3e7d7 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cluster" "github.com/NVIDIA/aistore/cluster/meta" "github.com/NVIDIA/aistore/cmn" @@ -29,14 +30,15 @@ import ( "github.com/NVIDIA/aistore/sys" "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/transport/bundle" + "github.com/NVIDIA/aistore/xact/xreg" "github.com/pkg/errors" ) const ( // Stream names - recvReqStreamNameFmt = DSortName + "-%s-recv_req" - recvRespStreamNameFmt = DSortName + "-%s-recv_resp" - shardStreamNameFmt = DSortName + "-%s-shard" + recvReqStreamNameFmt = apc.ActDsort + "-%s-recv_req" + recvRespStreamNameFmt = apc.ActDsort + "-%s-recv_resp" + shardStreamNameFmt = apc.ActDsort + "-%s-shard" ) // State of the cleans - see `cleanup` and `finalCleanup` @@ -116,6 +118,7 @@ type ( dsorterStarted sync.WaitGroup callTimeout time.Duration // max time to wait for another node to respond config *cmn.Config + xctn *xaction } ) @@ -126,6 +129,7 @@ var ( _ meta.Slistener = (*Manager)(nil) _ cos.Packer = (*buildingShardInfo)(nil) _ cos.Unpacker = (*buildingShardInfo)(nil) + _ cluster.Xact = (*xaction)(nil) ) func Pinit(si cluster.Node) { @@ -136,6 +140,8 @@ func Pinit(si cluster.Node) { func Tinit(t cluster.Target, stats stats.Tracker, db kvdb.Driver) { Managers = NewManagerGroup(db, false) + xreg.RegBckXact(&xfactory{}) + debug.Assert(g.mm == nil) // only once g.mm = t.PageMM() g.t = t @@ -194,6 +200,8 @@ func (m *Manager) init(pars *parsedReqSpec) error { // NOTE: Total size of the records metadata can sometimes be large // and so this is why we need such a long timeout. m.config = cmn.GCO.Get() + + // TODO -- FIXME: must be a single instance (similar to streams) m.client = cmn.NewClient(cmn.TransportArgs{ DialTimeout: 5 * time.Minute, Timeout: 30 * time.Minute, diff --git a/ext/dsort/manager_group.go b/ext/dsort/manager_group.go index 74df7ab723..9f00a879e8 100644 --- a/ext/dsort/manager_group.go +++ b/ext/dsort/manager_group.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/kvdb" "github.com/NVIDIA/aistore/cmn/nlog" @@ -40,7 +41,7 @@ func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup { db: db, } if !skipHk { - hk.Reg(DSortName+hk.NameSuffix, mg.housekeep, hk.DayInterval) + hk.Reg(apc.ActDsort+hk.NameSuffix, mg.housekeep, hk.DayInterval) } return mg } @@ -140,7 +141,7 @@ func (mg *ManagerGroup) Remove(managerUUID string) error { defer mg.mtx.Unlock() if manager, ok := mg.managers[managerUUID]; ok && !manager.Metrics.Archived.Load() { - return errors.Errorf("%s process %s still in progress and cannot be removed", DSortName, managerUUID) + return errors.Errorf("%s process %s still in progress and cannot be removed", apc.ActDsort, managerUUID) } else if ok { delete(mg.managers, managerUUID) } diff --git a/ext/dsort/xact.go b/ext/dsort/xact.go new file mode 100644 index 0000000000..5abe7a0037 --- /dev/null +++ b/ext/dsort/xact.go @@ -0,0 +1,80 @@ +// Package dsort provides distributed massively parallel resharding for very large datasets. +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + */ +package dsort + +import ( + "sync" + + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cluster" + "github.com/NVIDIA/aistore/cluster/meta" + "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/xact" + "github.com/NVIDIA/aistore/xact/xreg" +) + +////////////// +// xfactory // +////////////// + +type ( + xaction struct { + xact.Base + args *xreg.DsortArgs + } + xfactory struct { + xreg.RenewBase + xctn *xaction + } +) + +func (*xfactory) New(args xreg.Args, _ *meta.Bck) xreg.Renewable { + return &xfactory{RenewBase: xreg.RenewBase{Args: args}} +} + +func (p *xfactory) Start() error { + custom := p.Args.Custom + args, ok := custom.(*xreg.DsortArgs) + debug.Assert(ok) + p.xctn = &xaction{args: args} + p.xctn.InitBase(p.UUID(), apc.ActDsort, args.BckTo /*compare w/ tcb and tco*/) + return nil +} + +func (*xfactory) Kind() string { return apc.ActDsort } +func (p *xfactory) Get() cluster.Xact { return p.xctn } + +// TODO -- FIXME: compare w/ tcb/tco +func (*xfactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) { + return xreg.WprKeepAndStartNew, nil +} + +///////////// +// xaction // +///////////// + +func (*xaction) Run(*sync.WaitGroup) { debug.Assert(false) } + +func (r *xaction) Snap() (snap *cluster.Snap) { + snap = &cluster.Snap{} + r.ToSnap(snap) + + m, exists := Managers.Get(r.ID(), true /*allowPersisted*/) + if exists { + // TODO -- FIXME: new CLI table; consider JobInfo instead + m.Metrics.update() + snap.Ext = m.Metrics + + j := m.Metrics.ToJobInfo(r.ID()) + snap.StartTime = j.StartedTime + snap.EndTime = j.FinishTime + snap.SrcBck = r.args.BckFrom.Clone() + snap.DstBck = r.args.BckTo.Clone() + snap.AbortedX = j.Aborted + + // TODO -- FIXME: extended (creation, extraction) stats => snap.Stats.Objs et al. + } + return +} diff --git a/xact/api.go b/xact/api.go index b473c0cf8a..51637a139f 100644 --- a/xact/api.go +++ b/xact/api.go @@ -89,10 +89,13 @@ type ( Resilver bool // moves data between mountpaths MassiveBck bool // massive data copying (transforming, encoding) operation on a bucket - // xaction has an intermediate `idle` state whereby it becomes `idle` in-between - // requests - typically, for up to a few dozen seconds prior to finishing - // (see also: xact/demand.go) + // xaction has an intermediate `idle` state whereby it "idles" between requests + // (see related: xact/demand.go) Idles bool + + // xaction returns extended xaction-specific stats + // (see related: `Snap.Ext` in cluster/xaction.go) + ExtendedStats bool } ) @@ -130,18 +133,52 @@ var Table = map[string]Descriptor{ // on-demand EC and n-way replication // (non-startable, triggered by PUT => erasure-coded or mirrored bucket) - apc.ActECGet: {Scope: ScopeB, Startable: false, Idles: true}, - apc.ActECPut: {Scope: ScopeB, Startable: false, Mountpath: true, RefreshCap: true, Idles: true}, + apc.ActECGet: {Scope: ScopeB, Startable: false, Idles: true, ExtendedStats: true}, + apc.ActECPut: {Scope: ScopeB, Startable: false, Mountpath: true, RefreshCap: true, Idles: true, ExtendedStats: true}, apc.ActECRespond: {Scope: ScopeB, Startable: false, Idles: true}, apc.ActPutCopies: {Scope: ScopeB, Startable: false, Mountpath: true, RefreshCap: true, Idles: true}, - // on-demand multi-object - apc.ActArchive: {Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true}, - apc.ActCopyObjects: {DisplayName: "copy-objects", Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true}, - apc.ActETLObjects: {DisplayName: "etl-objects", Scope: ScopeB, Startable: false, RefreshCap: true, Idles: true}, + // + // on-demand multi-object (TODO: consider MassiveBck: true) + // + apc.ActArchive: {Scope: ScopeB, Access: apc.AccessRW, Startable: false, RefreshCap: true, Idles: true}, + apc.ActCopyObjects: { + DisplayName: "copy-objects", + Scope: ScopeB, + Access: apc.AccessRW, // TODO -- FIXME: apc.AceCreateBucket but only if + Startable: false, + RefreshCap: true, + Idles: true, + }, + apc.ActETLObjects: { + DisplayName: "etl-objects", + Scope: ScopeB, + Access: apc.AccessRW, // ditto + Startable: false, + RefreshCap: true, + Idles: true, + }, + + // in its own class + apc.ActDsort: { + DisplayName: "dsort", + Scope: ScopeB, + Access: apc.AccessRW, + Startable: false, + RefreshCap: true, + Mountpath: true, + MassiveBck: true, + ExtendedStats: true, + }, // multi-object - apc.ActPromote: {DisplayName: "promote-files", Scope: ScopeB, Access: apc.AcePromote, Startable: false, RefreshCap: true}, + apc.ActPromote: { + DisplayName: "promote-files", + Scope: ScopeB, + Access: apc.AcePromote, + Startable: false, + RefreshCap: true, + }, apc.ActEvictObjects: { DisplayName: "evict-objects", Scope: ScopeB, @@ -202,8 +239,8 @@ var Table = map[string]Descriptor{ apc.ActCopyBck: { DisplayName: "copy-bucket", Scope: ScopeB, - Access: apc.AccessRW, - Startable: false, // ditto + Access: apc.AccessRW, // TODO -- FIXME: apc.AceCreateBucket but only if destination doesn't exist + Startable: false, // ditto Metasync: true, Owned: false, RefreshCap: true, @@ -213,8 +250,8 @@ var Table = map[string]Descriptor{ apc.ActETLBck: { DisplayName: "etl-bucket", Scope: ScopeB, - Access: apc.AccessRW, - Startable: false, // ditto + Access: apc.AccessRW, // ditto + Startable: false, // ditto Metasync: true, Owned: false, RefreshCap: true, diff --git a/xact/xreg/bucket.go b/xact/xreg/bucket.go index aee6de5122..665d1f6f1d 100644 --- a/xact/xreg/bucket.go +++ b/xact/xreg/bucket.go @@ -21,17 +21,18 @@ type ( Msg *apc.TCBMsg Phase string } - TCObjsArgs struct { BckFrom *meta.Bck BckTo *meta.Bck DP cluster.DP } - + DsortArgs struct { + BckFrom *meta.Bck + BckTo *meta.Bck + } ECEncodeArgs struct { Phase string } - BckRenameArgs struct { T cluster.TargetExt BckFrom *meta.Bck @@ -39,7 +40,6 @@ type ( RebID string Phase string } - MNCArgs struct { Tag string Copies int @@ -132,6 +132,15 @@ func RenewTCObjs(t cluster.Target, uuid, kind string, custom *TCObjsArgs) RenewR ) } +func RenewDsort(id string, custom *DsortArgs) RenewRes { + return RenewBucketXact( + apc.ActDsort, + custom.BckFrom, + Args{Custom: custom, UUID: id}, + custom.BckFrom, custom.BckTo, + ) +} + func RenewBckRename(t cluster.TargetExt, bckFrom, bckTo *meta.Bck, uuid string, rmdVersion int64, phase string) RenewRes { custom := &BckRenameArgs{ T: t,