Skip to content

Commit

Permalink
[API change]: dsort: is now 'xaction' as well
Browse files Browse the repository at this point in the history
* is a (src bucket, dst bucket) type xaction, to be specific
* add the corresponding static descriptor and flags (API change)
* remove dsort-name and related constants
* CLI: 'ais show job'
* CLI: show extended stats reported by erasure-coding and dsort jobs
*
* part eighteen, prev. commit: 3d9545b
* separately, tweak downloader (minor ref)

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 17, 2023
1 parent e54b2fe commit 3159d68
Show file tree
Hide file tree
Showing 22 changed files with 311 additions and 126 deletions.
31 changes: 17 additions & 14 deletions ais/prxdl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,14 @@ func (p *proxy) httpdlpost(w http.ResponseWriter, r *http.Request) {
}

func (p *proxy) dladm(method, path string, msg *dload.AdminBody) ([]byte, int, error) {
config := cmn.GCO.Get()
if msg.ID != "" && method == http.MethodGet && msg.OnlyActive {
nl := p.notifs.entry(msg.ID)
if nl != nil {
return p.dlstatus(nl)
return p.dlstatus(nl, config)
}
}

var (
config = cmn.GCO.Get()
body = cos.MustMarshal(msg)
args = allocBcArgs()
xid = cos.GenUUID()
Expand Down Expand Up @@ -221,9 +220,9 @@ func (p *proxy) dladm(method, path string, msg *dload.AdminBody) ([]byte, int, e
}
}

func (p *proxy) dlstatus(nl nl.Listener) ([]byte, int, error) {
func (p *proxy) dlstatus(nl nl.Listener, config *cmn.Config) ([]byte, int, error) {
// bcast
p.notifs.bcastGetStats(nl, cmn.GCO.Get().Periodic.NotifTime.D())
p.notifs.bcastGetStats(nl, config.Periodic.NotifTime.D())
stats := nl.NodeStats()

var resp *dload.StatusResp
Expand All @@ -248,24 +247,28 @@ func (p *proxy) dlstatus(nl nl.Listener) ([]byte, int, error) {
}

func (p *proxy) dlstart(r *http.Request, xid, jobID string, body []byte) (errCode int, err error) {
query := make(url.Values, 2)
var (
config = cmn.GCO.Get()
query = make(url.Values, 2)
args = allocBcArgs()
)
query.Set(apc.QparamUUID, xid)
query.Set(apc.QparamJobID, jobID)
args := allocBcArgs()
args.req = cmn.HreqArgs{Method: http.MethodPost, Path: r.URL.Path, Body: body, Query: query}
config := cmn.GCO.Get()
args.timeout = config.Timeout.MaxHostBusy.D()

results := p.bcastGroup(args)
defer freeBcastRes(results)
freeBcArgs(args)

errCode = http.StatusOK
for _, res := range results {
if res.err == nil {
continue
if res.err != nil {
errCode, err = res.status, res.err
break
}
errCode, err = res.status, res.err
return
}
return http.StatusOK, nil
freeBcastRes(results)
return
}

func (p *proxy) validateDownload(w http.ResponseWriter, r *http.Request, body []byte) (dlb dload.Body, dlBase dload.Base, ok bool) {
Expand Down
26 changes: 13 additions & 13 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

const (
dsortDescAllPrefix = dsort.DSortName + "-test-integration"
dsortDescAllPrefix = apc.ActDsort + "-test-integration"

scopeConfig = "config"
scopeSpec = "spec"
Expand Down Expand Up @@ -503,10 +503,10 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC
case cmn.IgnoreReaction:
for target, metrics := range allMetrics {
if len(metrics.Warnings) != 0 {
df.m.t.Errorf("%s: target %q has %s warnings: %s", df.job(), target, dsort.DSortName, metrics.Warnings)
df.m.t.Errorf("%s: target %q has %s warnings: %s", df.job(), target, apc.ActDsort, metrics.Warnings)
}
if len(metrics.Errors) != 0 {
df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, dsort.DSortName, metrics.Errors)
df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, apc.ActDsort, metrics.Errors)
}
}
case cmn.WarnReaction:
Expand All @@ -515,7 +515,7 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC
totalWarnings += len(metrics.Warnings)

if len(metrics.Errors) != 0 {
df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, dsort.DSortName, metrics.Errors)
df.m.t.Errorf("%s: target %q has %s errors: %s", df.job(), target, apc.ActDsort, metrics.Errors)
}
}

Expand All @@ -526,7 +526,7 @@ func (df *dsortFramework) checkReactionResult(reaction string, expectedProblemsC
totalErrors := 0
for target, metrics := range allMetrics {
if !metrics.Aborted.Load() {
df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), dsort.DSortName, target)
df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), apc.ActDsort, target)
}
totalErrors += len(metrics.Errors)
}
Expand Down Expand Up @@ -580,9 +580,9 @@ func (df *dsortFramework) checkMetrics(expectAbort bool) map[string]*dsort.Metri
}
for target, metrics := range allMetrics {
if expectAbort && !metrics.Aborted.Load() {
df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), dsort.DSortName, target)
df.m.t.Errorf("%s: %s was not aborted by target: %s", df.job(), apc.ActDsort, target)
} else if !expectAbort && metrics.Aborted.Load() {
df.m.t.Errorf("%s: %s was aborted by target: %s", df.job(), dsort.DSortName, target)
df.m.t.Errorf("%s: %s was aborted by target: %s", df.job(), apc.ActDsort, target)
}
}
return allMetrics
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func TestDsortContent(t *testing.T) {
aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
if entry.missingKeys && !aborted {
t.Errorf("%s was not aborted", dsort.DSortName)
t.Errorf("%s was not aborted", apc.ActDsort)
}

tlog.Logf("%s: checking metrics\n", df.job())
Expand All @@ -1320,7 +1320,7 @@ func TestDsortContent(t *testing.T) {

for target, metrics := range allMetrics {
if entry.missingKeys && !metrics.Aborted.Load() {
t.Errorf("%s was not aborted by target: %s", target, dsort.DSortName)
t.Errorf("%s was not aborted by target: %s", target, apc.ActDsort)
}
}

Expand Down Expand Up @@ -1456,7 +1456,7 @@ func TestDsortKillTargetDuringPhases(t *testing.T) {
aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckError(t, err)
if !aborted {
t.Errorf("%s was not aborted", dsort.DSortName)
t.Errorf("%s was not aborted", apc.ActDsort)
}

tlog.Logf("%s: checking metrics\n", df.job())
Expand All @@ -1469,7 +1469,7 @@ func TestDsortKillTargetDuringPhases(t *testing.T) {

for target, metrics := range allMetrics {
if !metrics.Aborted.Load() {
t.Errorf("%s was not aborted by target: %s", dsort.DSortName, target)
t.Errorf("%s was not aborted by target: %s", apc.ActDsort, target)
}
}

Expand Down Expand Up @@ -1623,15 +1623,15 @@ func TestDsortAddTarget(t *testing.T) {
aborted, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
if !aborted {
t.Errorf("%s was not aborted", dsort.DSortName)
t.Errorf("%s was not aborted", apc.ActDsort)
}

tlog.Logf("%s: checking metrics\n", df.job())
allMetrics, err := api.MetricsDSort(df.baseParams, df.managerUUID)
tassert.CheckFatal(t, err)
if len(allMetrics) != m.originalTargetCount-1 {
t.Errorf("number of metrics %d is different than number of targets when %s started %d",
len(allMetrics), dsort.DSortName, m.originalTargetCount-1)
len(allMetrics), apc.ActDsort, m.originalTargetCount-1)
}
},
)
Expand Down
62 changes: 37 additions & 25 deletions api/apc/actmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,50 @@ import (
// ActMsg.Action
// includes Xaction.Kind == ActMsg.Action (when the action is asynchronous)
const (
ActCreateBck = "create-bck" // NOTE: compare w/ ActAddRemoteBck below
ActDestroyBck = "destroy-bck" // destroy bucket data and metadata
ActSummaryBck = "summary-bck"
ActCopyBck = "copy-bck"
ActDownload = "download"
ActECEncode = "ec-encode" // erasure code a bucket
ActECGet = "ec-get" // erasure decode objects
ActECPut = "ec-put" // erasure encode objects
ActECRespond = "ec-resp" // respond to other targets' EC requests
ActETLInline = "etl-inline"
ActETLBck = "etl-bck"
ActElection = "election"
ActCreateBck = "create-bck" // NOTE: compare w/ ActAddRemoteBck below
ActDestroyBck = "destroy-bck" // destroy bucket data and metadata
ActSetBprops = "set-bprops"
ActResetBprops = "reset-bprops"

ActSummaryBck = "summary-bck"

ActECEncode = "ec-encode" // erasure code a bucket
ActECGet = "ec-get" // erasure decode objects
ActECPut = "ec-put" // erasure encode objects
ActECRespond = "ec-resp" // respond to other targets' EC requests

ActCopyBck = "copy-bck"
ActETLBck = "etl-bck"

ActETLInline = "etl-inline"

ActDsort = "dsort"
ActDownload = "download"

ActMakeNCopies = "make-n-copies"
ActPutCopies = "put-copies"

ActRebalance = "rebalance"
ActMoveBck = "move-bck"

ActResilver = "resilver"

ActElection = "election"

ActLRU = "lru"
ActStoreCleanup = "cleanup-store"

ActEvictRemoteBck = "evict-remote-bck" // evict remote bucket's data
ActInvalListCache = "inval-listobj-cache"
ActLRU = "lru"
ActList = "list"
ActLoadLomCache = "load-lom-cache"
ActMakeNCopies = "make-n-copies"
ActMoveBck = "move-bck"
ActNewPrimary = "new-primary"
ActPromote = "promote"
ActPutCopies = "put-copies"
ActRebalance = "rebalance"
ActRenameObject = "rename-obj"
ActResetStats = "reset-stats"
ActResetBprops = "reset-bprops"
ActResetConfig = "reset-config"
ActResilver = "resilver"
ActResyncBprops = "resync-bprops"
ActSetBprops = "set-bprops"
ActSetConfig = "set-config"
ActStoreCleanup = "cleanup-store"

ActResetStats = "reset-stats"
ActResetConfig = "reset-config"
ActSetConfig = "set-config"

ActShutdownCluster = "shutdown" // see also: ActShutdownNode

Expand Down
2 changes: 1 addition & 1 deletion cmd/aisfs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

// direct
require (
github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c
github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da
github.com/OneOfOne/xxhash v1.2.8
github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f
github.com/jacobsa/fuse v0.0.0-20230124164109-5e0f2e6b432b
Expand Down
4 changes: 2 additions & 2 deletions cmd/aisfs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c h1:3BKIXPlLMMqAlW3/xs5CRlJyv/YFslZ+rchC1vEopTQ=
github.com/NVIDIA/aistore v1.3.19-0.20230810144327-04e74a9a726c/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da h1:e7r6/FN/yV2oU6jXt/cvK2x/yPRrIp/42yf5xwkofV8=
github.com/NVIDIA/aistore v1.3.19-0.20230817173330-7b467e9193da/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func runningJobCompletions(c *cli.Context) {
case 0: // 1. NAME
if flagIsSet(c, allJobsFlag) {
names := xact.ListDisplayNames(false /*only-startable*/)
names = append(names, dsort.DSortName)
names = append(names, apc.ActDsort)
sort.Strings(names)
fmt.Println(strings.Join(names, " "))
return
Expand Down
17 changes: 10 additions & 7 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/ext/dload"
"github.com/NVIDIA/aistore/ext/dsort"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -90,14 +89,13 @@ const (
commandPrefetch = "prefetch" // apc.ActPrefetchObjects

cmdDownload = apc.ActDownload
cmdDsort = apc.ActDsort
cmdRebalance = apc.ActRebalance
cmdLRU = apc.ActLRU
cmdStgCleanup = "cleanup" // display name for apc.ActStoreCleanup
cmdStgValidate = "validate"
cmdSummary = "summary" // ditto apc.ActSummaryBck

cmdDsort = dsort.DSortName

cmdCluster = commandCluster
cmdNode = "node"
cmdPrimary = "set-primary"
Expand Down Expand Up @@ -456,10 +454,15 @@ var (
noHeaderFlag = cli.BoolFlag{Name: "no-headers,H", Usage: "display tables without headers"}
noFooterFlag = cli.BoolFlag{Name: "no-footers", Usage: "display tables without footers"}

progressFlag = cli.BoolFlag{Name: "progress", Usage: "show progress bar(s) and progress of execution in real time"}
dryRunFlag = cli.BoolFlag{Name: "dry-run", Usage: "preview the results without really running the action"}
verboseFlag = cli.BoolFlag{Name: "verbose,v", Usage: "verbose"}
nonverboseFlag = cli.BoolFlag{Name: "non-verbose,nv", Usage: "non-verbose"}
progressFlag = cli.BoolFlag{Name: "progress", Usage: "show progress bar(s) and progress of execution in real time"}
dryRunFlag = cli.BoolFlag{Name: "dry-run", Usage: "preview the results without really running the action"}

verboseFlag = cli.BoolFlag{Name: "verbose,v", Usage: "verbose output"}
nonverboseFlag = cli.BoolFlag{Name: "non-verbose,nv", Usage: "non-verbose output"}
verboseJobFlag = cli.BoolFlag{
Name: verboseFlag.Name,
Usage: "show extended statistics",
}

averageSizeFlag = cli.BoolFlag{Name: "average-size", Usage: "show average GET, PUT, etc. request size"}

Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/cli/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type (

var dsortStartCmd = cli.Command{
Name: cmdDsort,
Usage: "start " + dsort.DSortName + " job\n" +
Usage: "start " + apc.ActDsort + " job\n" +
indent1 + "Required parameters:\n" +
indent1 + "\t- input_bck: source bucket (used as both source and destination if the latter is not specified)\n" +
indent1 + "\t- input_format: see docs and examples below\n" +
Expand Down Expand Up @@ -293,7 +293,7 @@ func setupBucket(c *cli.Context, bck cmn.Bck) error {

func (d dsortResult) String() string {
if d.aborted {
return dsort.DSortName + " job was aborted"
return apc.ActDsort + " job was aborted"
}

var sb strings.Builder
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/job_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ func jobArgs(c *cli.Context, shift int, ignoreDaemonID bool) (name, xid, daemonI
daemonID = c.Args().Get(shift + 2)

// validate and reassign
if name != "" && name != dsort.DSortName {
if name != "" && name != apc.ActDsort {
if xactKind, _ := xact.GetKindName(name); xactKind == "" {
daemonID = xid
xid = name
Expand Down
Loading

0 comments on commit 3159d68

Please sign in to comment.