diff --git a/ais/prxdsort.go b/ais/prxdsort.go index 871f4bd618..22dbb74218 100644 --- a/ais/prxdsort.go +++ b/ais/prxdsort.go @@ -16,7 +16,7 @@ import ( // POST /v1/sort func (p *proxy) proxyStartSortHandler(w http.ResponseWriter, r *http.Request) { rs := &dsort.RequestSpec{} - if cmn.ReadJSON(w, r, &rs) != nil { + if cmn.ReadJSON(w, r, rs) != nil { return } parsedRS, err := rs.Parse() diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index d425993725..22679ef5f2 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -43,6 +43,11 @@ const ( scopeSpec = "spec" ) +const ( + startingDS = "starting distributed sort" + finishedDS = "finished distributed sort" +) + var ( dsortDescCurPrefix = fmt.Sprintf("%s-%d-", dsortDescAllPrefix, os.Getpid()) @@ -259,11 +264,10 @@ func (df *dsortFramework) gen() dsort.RequestSpec { func (df *dsortFramework) start() { var ( - err error - rs = df.gen() + err error + spec = df.gen() ) - - df.managerUUID, err = api.StartDSort(df.baseParams, rs) + df.managerUUID, err = api.StartDSort(df.baseParams, &spec) tassert.CheckFatal(df.m.t, err) } @@ -337,7 +341,7 @@ func (df *dsortFramework) checkOutputShards(zeros int) { baseParams = tools.BaseAPIParams(df.m.proxyURL) records = make(map[string]int, 100) ) - tlog.Logln("checking if files are sorted...") + tlog.Logln("checking that files are sorted...") for i := 0; i < df.outputShardCnt; i++ { var ( buffer bytes.Buffer @@ -573,12 +577,12 @@ func dispatchDSortJob(m *ioContext, dsorterType string, i int) { df.init() df.createInputShards() - tlog.Logln("starting distributed sort...") + tlog.Logln(startingDS) df.start() _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(m.t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -666,12 +670,12 @@ func testDsort(t *testing.T, ext, lr string) { df.missingShards = cmn.AbortReaction // (when shards are explicitly enumerated...) } - tlog.Logln("starting distributed sort ...") + tlog.Logln(startingDS) df.start() _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -709,9 +713,9 @@ func TestDistributedSortNonExistingBuckets(t *testing.T) { // Create local output bucket tools.CreateBucketWithCleanup(t, m.proxyURL, df.outputBck, nil) - tlog.Logln("starting distributed sort...") - rs := df.gen() - if _, err := api.StartDSort(df.baseParams, rs); err == nil { + tlog.Logln(startingDS) + spec := df.gen() + if _, err := api.StartDSort(df.baseParams, &spec); err == nil { t.Errorf("expected %s job to fail when input bucket does not exist", dsort.DSortName) } @@ -720,7 +724,7 @@ func TestDistributedSortNonExistingBuckets(t *testing.T) { tools.CreateBucketWithCleanup(t, m.proxyURL, m.bck, nil) tlog.Logln("starting second distributed sort...") - if _, err := api.StartDSort(df.baseParams, rs); err == nil { + if _, err := api.StartDSort(df.baseParams, &spec); err == nil { t.Errorf("expected %s job to fail when output bucket does not exist", dsort.DSortName) } }, @@ -752,12 +756,12 @@ func TestDistributedSortEmptyBucket(t *testing.T) { df.init() - tlog.Logln("starting distributed sort...") + tlog.Logln(startingDS) df.start() _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(reaction == cmn.AbortReaction /*expectAbort*/) df.checkReactionResult(reaction, df.shardCnt) @@ -804,7 +808,7 @@ func TestDistributedSortOutputBucket(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -898,7 +902,7 @@ func TestDistributedSortShuffle(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -935,7 +939,7 @@ func TestDistributedSortDisk(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) allMetrics := df.checkMetrics(false /* expectAbort */) for target, metrics := range allMetrics { @@ -982,7 +986,7 @@ func TestDistributedSortCompressionDisk(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -1031,7 +1035,7 @@ func TestDistributedSortMemDisk(t *testing.T) { _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) allMetrics := df.checkMetrics(false /* expectAbort */) var ( @@ -1094,7 +1098,7 @@ func TestDistributedSortMemDiskTarCompression(t *testing.T) { _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) allMetrics := df.checkMetrics(false /*expectAbort*/) var ( @@ -1154,7 +1158,7 @@ func TestDistributedSortZipLz4(t *testing.T) { _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -1199,7 +1203,7 @@ func TestDistributedSortTarCompression(t *testing.T) { _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -1274,7 +1278,8 @@ func TestDistributedSortContent(t *testing.T) { allMetrics, err := api.MetricsDSort(df.baseParams, df.managerUUID) tassert.CheckFatal(t, err) if len(allMetrics) != m.originalTargetCount { - t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount) + t.Errorf("number of metrics %d is not same as the number of targets %d", + len(allMetrics), m.originalTargetCount) } for target, metrics := range allMetrics { @@ -1526,7 +1531,8 @@ func TestDistributedSortManipulateMountpathDuringPhases(t *testing.T) { tassert.CheckFatal(t, err) } else { tlog.Logf("removing mountpath %q from %s...\n", mpath, target.ID()) - err := api.DetachMountpath(df.baseParams, target, mpath, false /*dont-resil*/) + err := api.DetachMountpath(df.baseParams, target, + mpath, false /*dont-resil*/) tassert.CheckFatal(t, err) } } @@ -1629,7 +1635,7 @@ func TestDistributedSortMetricsAfterFinish(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(0) @@ -1672,7 +1678,7 @@ func TestDistributedSortSelfAbort(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) // Wait a while for all targets to abort time.Sleep(2 * time.Second) @@ -1723,7 +1729,7 @@ func TestDistributedSortOnOOM(t *testing.T) { _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) @@ -1789,7 +1795,7 @@ func TestDistributedSortMissingShards(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkReactionResult(reaction, df.shardCntToSkip) }, @@ -1854,7 +1860,7 @@ func TestDistributedSortDuplications(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkReactionResult(reaction, df.recordDuplicationsCnt) }, @@ -1935,13 +1941,13 @@ func TestDistributedSortOrderFile(t *testing.T) { tassert.CheckFatal(t, err) tlog.Logln("starting distributed sort...") - rs := df.gen() - managerUUID, err := api.StartDSort(baseParams, rs) + spec := df.gen() + managerUUID, err := api.StartDSort(baseParams, &spec) tassert.CheckFatal(t, err) _, err = tools.WaitForDSortToFinish(m.proxyURL, managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) allMetrics, err := api.MetricsDSort(baseParams, managerUUID) tassert.CheckFatal(t, err) @@ -1959,7 +1965,8 @@ func TestDistributedSortOrderFile(t *testing.T) { match = match || fmt.Sprintf(ekm[recordName], i) == shard.name } if !match { - t.Errorf("record %q was not part of any shard with format %q but was in shard %q", recordName, ekm[recordName], shard.name) + t.Errorf("record %q was not part of any shard with format %q but was in shard %q", + recordName, ekm[recordName], shard.name) } } } @@ -2042,18 +2049,19 @@ func TestDistributedSortOrderJSONFile(t *testing.T) { tassert.CheckFatal(t, err) tlog.Logln("starting distributed sort...") - rs := df.gen() - managerUUID, err := api.StartDSort(baseParams, rs) + spec := df.gen() + managerUUID, err := api.StartDSort(baseParams, &spec) tassert.CheckFatal(t, err) _, err = tools.WaitForDSortToFinish(m.proxyURL, managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) allMetrics, err := api.MetricsDSort(baseParams, managerUUID) tassert.CheckFatal(t, err) if len(allMetrics) != m.originalTargetCount { - t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount) + t.Errorf("number of metrics %d is not same as number of targets %d", + len(allMetrics), m.originalTargetCount) } tlog.Logln("checking if all records are in specified shards...") @@ -2066,7 +2074,8 @@ func TestDistributedSortOrderJSONFile(t *testing.T) { match = match || fmt.Sprintf(ekm[recordName], i) == shard.name } if !match { - t.Errorf("record %q was not part of any shard with format %q but was in shard %q", recordName, ekm[recordName], shard.name) + t.Errorf("record %q was not part of any shard with format %q but was in shard %q", + recordName, ekm[recordName], shard.name) } } } @@ -2106,7 +2115,7 @@ func TestDistributedSortDryRun(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) }, @@ -2145,7 +2154,7 @@ func TestDistributedSortDryRunDisk(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) }, @@ -2187,7 +2196,7 @@ func TestDistributedSortLongerExt(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /*expectAbort*/) df.checkOutputShards(5) @@ -2229,7 +2238,7 @@ func TestDistributedSortAutomaticallyCalculateOutputShards(t *testing.T) { _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /*expectAbort*/) df.checkOutputShards(0) @@ -2270,12 +2279,12 @@ func TestDistributedSortWithTarFormats(t *testing.T) { df.init() df.createInputShards() - tlog.Logln("starting distributed sort...") + tlog.Logln(startingDS) df.start() _, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("finished distributed sort") + tlog.Logln(finishedDS) df.checkMetrics(false /* expectAbort */) df.checkOutputShards(5) diff --git a/api/dsort.go b/api/dsort.go index 18688aa0ad..da6586907a 100644 --- a/api/dsort.go +++ b/api/dsort.go @@ -13,7 +13,7 @@ import ( "github.com/NVIDIA/aistore/ext/dsort" ) -func StartDSort(bp BaseParams, rs dsort.RequestSpec) (id string, err error) { +func StartDSort(bp BaseParams, rs *dsort.RequestSpec) (id string, err error) { bp.Method = http.MethodPost reqParams := AllocRp() { diff --git a/bench/aisloader/bootstrap.go b/bench/aisloader/bootstrap.go index 1998a0717a..f44ada6e3f 100644 --- a/bench/aisloader/bootstrap.go +++ b/bench/aisloader/bootstrap.go @@ -589,22 +589,22 @@ func parseCmdLine() (params, error) { func printArguments(set *flag.FlagSet) { w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) - _, _ = fmt.Fprintf(w, "==== COMMAND LINE ARGUMENTS ====\n") - _, _ = fmt.Fprintf(w, "=========== DEFAULTS ===========\n") + fmt.Fprintf(w, "==== COMMAND LINE ARGUMENTS ====\n") + fmt.Fprintf(w, "=========== DEFAULTS ===========\n") set.VisitAll(func(f *flag.Flag) { if f.Value.String() == f.DefValue { _, _ = fmt.Fprintf(w, "%s:\t%s\n", f.Name, f.Value.String()) } }) - _, _ = fmt.Fprintf(w, "============ CUSTOM ============\n") + fmt.Fprintf(w, "============ CUSTOM ============\n") set.VisitAll(func(f *flag.Flag) { if f.Value.String() != f.DefValue { _, _ = fmt.Fprintf(w, "%s:\t%s\n", f.Name, f.Value.String()) } }) fmt.Fprintf(w, "HTTP trace:\t%v\n", runParams.traceHTTP) - _, _ = fmt.Fprintf(w, "=================================\n\n") - _ = w.Flush() + fmt.Fprintf(w, "=================================\n\n") + w.Flush() } // newStats returns a new stats object with given time as the starting point diff --git a/cmd/aisfs/go.mod b/cmd/aisfs/go.mod index 613af39167..2d6f43d342 100644 --- a/cmd/aisfs/go.mod +++ b/cmd/aisfs/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c + github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf github.com/OneOfOne/xxhash v1.2.8 github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f github.com/jacobsa/fuse v0.0.0-20230124164109-5e0f2e6b432b diff --git a/cmd/aisfs/go.sum b/cmd/aisfs/go.sum index a9959b598c..f0c85394a8 100644 --- a/cmd/aisfs/go.sum +++ b/cmd/aisfs/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c h1:4Dli+IHuxs88xxdsz7FW3/S9miVVyDGpOTMFHteYC04= -github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf h1:+1ya6Fv62YltZDBAdNYVPQtiMcaPGkNNSjGGOM5XNzU= +github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/cmd/cli/cli/config_hdlr.go b/cmd/cli/cli/config_hdlr.go index 915a956d47..cc4bd6d4ef 100644 --- a/cmd/cli/cli/config_hdlr.go +++ b/cmd/cli/cli/config_hdlr.go @@ -429,11 +429,11 @@ func showCfgCLI(c *cli.Context) (err error) { return } - flat := flattenConfig(cfg, c.Args().Get(0)) + flat := flattenJSON(cfg, c.Args().Get(0)) sort.Slice(flat, func(i, j int) bool { return flat[i].Name < flat[j].Name }) - return teb.Print(flat, teb.ConfigTmpl) + return teb.Print(flat, teb.FlatTmpl) } func setCfgCLI(c *cli.Context) (err error) { @@ -449,14 +449,14 @@ func setCfgCLI(c *cli.Context) (err error) { return err } - flatOld := flattenConfig(cfg, "") + flatOld := flattenJSON(cfg, "") for k, v := range nvs { if err := cmn.UpdateFieldValue(cfg, k, v); err != nil { return err } } - flatNew := flattenConfig(cfg, "") + flatNew := flattenJSON(cfg, "") diff := diffConfigs(flatNew, flatOld) for _, val := range diff { if val.Old == "-" { diff --git a/cmd/cli/cli/dsort.go b/cmd/cli/cli/dsort.go index 2c524cb9a7..2fc51ee661 100644 --- a/cmd/cli/cli/dsort.go +++ b/cmd/cli/cli/dsort.go @@ -6,10 +6,13 @@ package cli import ( + "bytes" jsonStd "encoding/json" + "errors" "fmt" "io" "math" + "os" "sort" "strings" "time" @@ -20,9 +23,37 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/ext/dsort" + jsoniter "github.com/json-iterator/go" "github.com/urfave/cli" "github.com/vbauerster/mpb/v4" "github.com/vbauerster/mpb/v4/decor" + "gopkg.in/yaml.v2" +) + +type ( + dsortResult struct { + dur time.Duration + created int64 + errors []string + warnings []string + aborted bool + } + dsortPhaseState struct { + total int64 + progress int64 + bar *mpb.Bar + } + dsortPB struct { + id string + apiBP api.BaseParams + refreshTime time.Duration + p *mpb.Progress + dur time.Duration + phases map[string]*dsortPhaseState + warnings []string + errors []string + aborted bool + } ) var phasesOrdered = []string{ @@ -31,6 +62,125 @@ var phasesOrdered = []string{ dsort.CreationPhase, } +func startDsortHandler(c *cli.Context) (err error) { + var ( + id string + specPath string + specBytes []byte + shift int + srcbck, dstbck cmn.Bck + spec dsort.RequestSpec + ) + // parse command line + specPath = parseStrFlag(c, dsortSpecFlag) + if c.NArg() == 0 && specPath == "" { + return fmt.Errorf("missing %q argument (see %s for details and usage examples)", + c.Command.ArgsUsage, qflprn(cli.HelpFlag)) + } + if specPath == "" { + // spec is inline + specBytes = []byte(c.Args().Get(0)) + shift = 1 + } + if c.NArg() > shift { + srcbck, err = parseBckURI(c, c.Args().Get(shift), true) + if err != nil { + return fmt.Errorf("failed to parse source bucket: %v\n(see %s for details)", + err, qflprn(cli.HelpFlag)) + } + } + if c.NArg() > shift+1 { + dstbck, err = parseBckURI(c, c.Args().Get(shift+1), true) + if err != nil { + return fmt.Errorf("failed to parse destination bucket: %v\n(see %s for details)", + err, qflprn(cli.HelpFlag)) + } + } + + // load spec from file or standard input + if specPath != "" { + var r io.Reader + if specPath == fileStdIO { + r = os.Stdin + } else { + f, err := os.Open(specPath) + if err != nil { + return err + } + defer f.Close() + r = f + } + + var b bytes.Buffer + // Read at most 1MB so we don't blow up when reading don't know what + if _, errV := io.CopyN(&b, r, cos.MiB); errV == nil { + return errors.New("file too big") + } else if errV != io.EOF { + return errV + } + specBytes = b.Bytes() + } + if errj := jsoniter.Unmarshal(specBytes, &spec); errj != nil { + if erry := yaml.Unmarshal(specBytes, &spec); erry != nil { + return fmt.Errorf( + "failed to determine the type of the job specification, errs: (%v, %v)", + errj, erry, + ) + } + } + + // NOTE: args SRC_BUCKET and DST_BUCKET, if defined, override specBytes/specPath (`dsort.RequestSpec`) + if !srcbck.IsEmpty() { + spec.Bck = srcbck + } + if !dstbck.IsEmpty() { + spec.OutputBck = dstbck + } + + // print resulting spec TODO -- FIXME + if flagIsSet(c, verboseFlag) { + flat := _flattenSpec(&spec) + err = teb.Print(flat, teb.FlatTmpl) + time.Sleep(time.Second / 2) + } + + // execute + if id, err = api.StartDSort(apiBP, &spec); err == nil { + fmt.Fprintln(c.App.Writer, id) + } + return +} + +// with minor editing +func _flattenSpec(spec *dsort.RequestSpec) (flat nvpairList) { + var src, dst cmn.Bck + cmn.IterFields(spec, func(tag string, field cmn.IterField) (error, bool) { + v := _toStr(field.Value()) + switch { + case tag == "bck.name": + src.Name = v + case tag == "bck.provider": + src.Provider = v + case tag == "output_bck.name": + dst.Name = v + case tag == "output_bck.provider": + dst.Provider = v + default: + flat = append(flat, nvpair{tag, v}) + } + return nil, false + }) + if dst.IsEmpty() { + dst = src + } + flat = append(flat, nvpair{"bck", src.Cname("")}, nvpair{"output_bck", dst.Cname("")}) + sort.Slice(flat, func(i, j int) bool { + di, dj := flat[i], flat[j] + return di.Name < dj.Name + }) + return +} + // Creates bucket if not exists. If exists uses it or deletes and creates new // one if cleanup flag was set. func setupBucket(c *cli.Context, bck cmn.Bck) error { @@ -55,14 +205,6 @@ func setupBucket(c *cli.Context, bck cmn.Bck) error { return nil } -type dsortResult struct { - dur time.Duration - created int64 - errors []string - warnings []string - aborted bool -} - func (d dsortResult) String() string { if d.aborted { return dsort.DSortName + " job was aborted" @@ -83,27 +225,6 @@ func (d dsortResult) String() string { return sb.String() } -type dsortPhaseState struct { - total int64 - progress int64 - bar *mpb.Bar -} - -type dsortPB struct { - id string - apiBP api.BaseParams - refreshTime time.Duration - - p *mpb.Progress - - dur time.Duration - phases map[string]*dsortPhaseState - aborted bool - - warnings []string - errors []string -} - func newPhases() map[string]*dsortPhaseState { phases := make(map[string]*dsortPhaseState, 3) phases[dsort.ExtractionPhase] = &dsortPhaseState{} diff --git a/cmd/cli/cli/job_hdlr.go b/cmd/cli/cli/job_hdlr.go index 6bea8b9f64..fb5bfa99e4 100644 --- a/cmd/cli/cli/job_hdlr.go +++ b/cmd/cli/cli/job_hdlr.go @@ -6,10 +6,8 @@ package cli import ( - "bytes" "errors" "fmt" - "io" "os" "strings" "time" @@ -26,7 +24,6 @@ import ( "github.com/NVIDIA/aistore/xact" jsoniter "github.com/json-iterator/go" "github.com/urfave/cli" - "gopkg.in/yaml.v2" ) const ( @@ -92,6 +89,7 @@ var ( }, cmdDsort: { dsortSpecFlag, + verboseFlag, }, commandPrefetch: append( listrangeFlags, @@ -634,88 +632,6 @@ func waitDownload(c *cli.Context, id string) (err error) { return nil } -func startDsortHandler(c *cli.Context) (err error) { - var ( - id string - specPath string - specBytes []byte - shift int - srcbck, dstbck cmn.Bck - rs dsort.RequestSpec - ) - // parse command line - specPath = parseStrFlag(c, dsortSpecFlag) - if c.NArg() == 0 && specPath == "" { - return fmt.Errorf("missing %q argument (see %s for details and usage examples)", - c.Command.ArgsUsage, qflprn(cli.HelpFlag)) - } - if specPath == "" { - // spec is inline - specBytes = []byte(c.Args().Get(0)) - shift = 1 - } - if c.NArg() > shift { - srcbck, err = parseBckURI(c, c.Args().Get(shift), true) - if err != nil { - return fmt.Errorf("failed to parse source bucket: %v\n(see %s for details)", - err, qflprn(cli.HelpFlag)) - } - } - if c.NArg() > shift+1 { - dstbck, err = parseBckURI(c, c.Args().Get(shift+1), true) - if err != nil { - return fmt.Errorf("failed to parse destination bucket: %v\n(see %s for details)", - err, qflprn(cli.HelpFlag)) - } - } - - // load spec from file or standard input - if specPath != "" { - var r io.Reader - if specPath == fileStdIO { - r = os.Stdin - } else { - f, err := os.Open(specPath) - if err != nil { - return err - } - defer f.Close() - r = f - } - - var b bytes.Buffer - // Read at most 1MB so we don't blow up when reading don't know what - if _, errV := io.CopyN(&b, r, cos.MiB); errV == nil { - return errors.New("file too big") - } else if errV != io.EOF { - return errV - } - specBytes = b.Bytes() - } - if errj := jsoniter.Unmarshal(specBytes, &rs); errj != nil { - if erry := yaml.Unmarshal(specBytes, &rs); erry != nil { - return fmt.Errorf( - "failed to determine the type of the job specification, errs: (%v, %v)", - errj, erry, - ) - } - } - - // NOTE: command-line SRC_BUCKET and DST_BUCKET override the spec - if !srcbck.IsEmpty() { - rs.Bck = srcbck - } - if !dstbck.IsEmpty() { - rs.OutputBck = dstbck - } - - // execute - if id, err = api.StartDSort(apiBP, rs); err == nil { - fmt.Fprintln(c.App.Writer, id) - } - return -} - func startLRUHandler(c *cli.Context) (err error) { if !flagIsSet(c, lruBucketsFlag) { return startXactionHandler(c) diff --git a/cmd/cli/cli/show_hdlr.go b/cmd/cli/cli/show_hdlr.go index 7656642ec8..4d895f34eb 100644 --- a/cmd/cli/cli/show_hdlr.go +++ b/cmd/cli/cli/show_hdlr.go @@ -679,8 +679,8 @@ func showClusterConfig(c *cli.Context, section string) error { if usejs { return teb.Print(cluConfig, "", teb.Jopts(usejs)) } - flat := flattenConfig(cluConfig, section) - err = teb.Print(flat, teb.ConfigTmpl) + flat := flattenJSON(cluConfig, section) + err = teb.Print(flat, teb.FlatTmpl) if err == nil && section == "" { msg := fmt.Sprintf("(Hint: use '[SECTION] %s' to show config section(s), see %s for details)", flprn(jsonFlag), qflprn(cli.HelpFlag)) @@ -772,18 +772,18 @@ func showNodeConfig(c *cli.Context) error { // fill-in `data` switch scope { case cfgScopeLocal: - data.LocalConfigPairs = flattenConfig(config.LocalConfig, section) + data.LocalConfigPairs = flattenJSON(config.LocalConfig, section) default: // cfgScopeInherited | cfgScopeAll cluConf, err := api.GetClusterConfig(apiBP) if err != nil { return V(err) } // diff cluster <=> this node - flatNode := flattenConfig(config.ClusterConfig, section) - flatCluster := flattenConfig(cluConf, section) + flatNode := flattenJSON(config.ClusterConfig, section) + flatCluster := flattenJSON(cluConf, section) data.ClusterConfigDiff = diffConfigs(flatNode, flatCluster) if scope == cfgScopeAll { - data.LocalConfigPairs = flattenConfig(config.LocalConfig, section) + data.LocalConfigPairs = flattenJSON(config.LocalConfig, section) } } // show "flat" diff-s diff --git a/cmd/cli/cli/utils.go b/cmd/cli/cli/utils.go index 6b21a1817c..9cd1d9004e 100644 --- a/cmd/cli/cli/utils.go +++ b/cmd/cli/cli/utils.go @@ -619,9 +619,9 @@ func parseURLtoBck(strURL string) (bck cmn.Bck) { } // see also authNConfPairs -func flattenConfig(cfg any, section string) (flat nvpairList) { +func flattenJSON(jstruct any, section string) (flat nvpairList) { flat = make(nvpairList, 0, 40) - cmn.IterFields(cfg, func(tag string, field cmn.IterField) (error, bool) { + cmn.IterFields(jstruct, func(tag string, field cmn.IterField) (error, bool) { if section == "" || strings.HasPrefix(tag, section) { v := _toStr(field.Value()) flat = append(flat, nvpair{tag, v}) @@ -759,7 +759,7 @@ func defaultBckProps(bck cmn.Bck) (*cmn.BucketProps, error) { return props, nil } -// see also flattenConfig +// see also flattenJSON func authNConfPairs(conf *authn.Config, prefix string) (nvpairList, error) { flat := make(nvpairList, 0, 8) err := cmn.IterFields(conf, func(tag string, field cmn.IterField) (error, bool) { diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 6d12b4b781..d4b88abb12 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c + github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf github.com/fatih/color v1.14.1 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 922ea0ad5b..c7132e865c 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c h1:4Dli+IHuxs88xxdsz7FW3/S9miVVyDGpOTMFHteYC04= -github.com/NVIDIA/aistore v1.3.19-0.20230729013522-e1958544e15c/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf h1:+1ya6Fv62YltZDBAdNYVPQtiMcaPGkNNSjGGOM5XNzU= +github.com/NVIDIA/aistore v1.3.19-0.20230731152016-da82c5b61baf/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/cmd/cli/teb/templates.go b/cmd/cli/teb/templates.go index 5ed40628b7..7bb03e290e 100644 --- a/cmd/cli/teb/templates.go +++ b/cmd/cli/teb/templates.go @@ -79,11 +79,12 @@ const ( indent1 + "Version:\t{{ ( Versions .Status) }}\n" + indent1 + "Build:\t{{ ( BuildTimes .Status) }}\n" - // Config - ConfigTmpl = "PROPERTY\t VALUE\n{{range $item := .}}" + + // any JSON struct (e.g. config) + FlatTmpl = "PROPERTY\t VALUE\n{{range $item := .}}" + "{{ $item.Name }}\t {{ $item.Value }}\n" + "{{end}}\n" + // Config DaemonConfigTmpl = "{{ if .ClusterConfigDiff }}PROPERTY\t VALUE\t DEFAULT\n{{range $item := .ClusterConfigDiff }}" + "{{ $item.Name }}\t {{ $item.Current }}\t {{ $item.Old }}\n" + "{{end}}\n{{end}}" + diff --git a/ext/dsort/api.go b/ext/dsort/api.go new file mode 100644 index 0000000000..e5d8401034 --- /dev/null +++ b/ext/dsort/api.go @@ -0,0 +1,77 @@ +// Package dsort provides distributed massively parallel resharding for very large datasets. +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + */ +package dsort + +import ( + "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cmn" +) + +const ( + algDefault = "" // default (alphanumeric, decreasing) + Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing) + None = "none" // none (used for resharding) + MD5 = "md5" // compare md5(name) + Shuffle = "shuffle" // random shuffle (use with the same seed to reproduce) + Content = "content" // extract (int, string, float) from a given file, and compare +) + +var algorithms = []string{algDefault, Alphanumeric, MD5, Shuffle, Content, None} + +type Algorithm struct { + // one of the `algorithms` above + Kind string `json:"kind"` + + // used with two sorting alg-s: Alphanumeric and Content + Decreasing bool `json:"decreasing"` + + // when sort is a random shuffle + Seed string `json:"seed"` + + // exclusively with Content sorting + // e.g. usage: ".cls" to provide sorting key for each record (sample) - see next + Ext string `json:"extension"` + + // ditto: Content only + // `extract.contentKeyTypes` enum values: {"int", "string", "float" } + ContentKeyType string `json:"content_key_type"` +} + +// RequestSpec defines the user specification for requests to the endpoint /v1/sort. +type RequestSpec struct { + // Required + Bck cmn.Bck `json:"bck" yaml:"bck"` + Extension string `json:"extension" yaml:"extension"` + InputFormat apc.ListRange `json:"input_format" yaml:"input_format"` + OutputFormat string `json:"output_format" yaml:"output_format"` + OutputShardSize string `json:"output_shard_size" yaml:"output_shard_size"` + + // Optional + Description string `json:"description" yaml:"description"` + // Default: same as `bck` field + OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"` + // Default: alphanumeric, increasing + Algorithm Algorithm `json:"algorithm" yaml:"algorithm"` + // Default: "" + OrderFileURL string `json:"order_file" yaml:"order_file"` + // Default: "\t" + OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"` + // Default: "80%" + MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"` + // Default: calcMaxLimit() + ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"` + // Default: calcMaxLimit() + CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"` + // Default: bundle.Multiplier + StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"` + // Default: false + ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"` + + // debug + DSorterType string `json:"dsorter_type"` + DryRun bool `json:"dry_run"` // Default: false + + cmn.DSortConf +} diff --git a/ext/dsort/request_spec.go b/ext/dsort/request_spec.go index 0635919a24..55b2045dc1 100644 --- a/ext/dsort/request_spec.go +++ b/ext/dsort/request_spec.go @@ -29,43 +29,6 @@ type parsedOutputTemplate struct { Template cos.ParsedTemplate } -// RequestSpec defines the user specification for requests to the endpoint /v1/sort. -type RequestSpec struct { - // Required - Bck cmn.Bck `json:"bck" yaml:"bck"` - Extension string `json:"extension" yaml:"extension"` - InputFormat apc.ListRange `json:"input_format" yaml:"input_format"` - OutputFormat string `json:"output_format" yaml:"output_format"` - OutputShardSize string `json:"output_shard_size" yaml:"output_shard_size"` - - // Optional - Description string `json:"description" yaml:"description"` - // Default: same as `bck` field - OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"` - // Default: alphanumeric, increasing - Algorithm Algorithm `json:"algorithm" yaml:"algorithm"` - // Default: "" - OrderFileURL string `json:"order_file" yaml:"order_file"` - // Default: "\t" - OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"` - // Default: "80%" - MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"` - // Default: calcMaxLimit() - ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"` - // Default: calcMaxLimit() - CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"` - // Default: bundle.Multiplier - StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"` - // Default: false - ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"` - - // debug - DSorterType string `json:"dsorter_type"` - DryRun bool `json:"dry_run"` // Default: false - - cmn.DSortConf -} - type ParsedRequestSpec struct { Bck cmn.Bck `json:"bck"` Description string `json:"description"` diff --git a/ext/dsort/sort.go b/ext/dsort/sort.go index ee9a9cb613..e6ac6bae35 100644 --- a/ext/dsort/sort.go +++ b/ext/dsort/sort.go @@ -14,36 +14,6 @@ import ( "github.com/NVIDIA/aistore/ext/dsort/extract" ) -const ( - algDefault = "" // default (alphanumeric, decreasing) - Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing) - None = "none" // none (used for resharding) - MD5 = "md5" // compare md5(name) - Shuffle = "shuffle" // random shuffle (use with the same seed to reproduce) - Content = "content" // extract (int, string, float) from a given file, and compare -) - -var algorithms = []string{algDefault, Alphanumeric, MD5, Shuffle, Content, None} - -type Algorithm struct { - // one of the `algorithms` above - Kind string `json:"kind"` - - // used with two sorting alg-s: Alphanumeric and Content - Decreasing bool `json:"decreasing"` - - // when sort is a random shuffle - Seed string `json:"seed"` - - // exclusively with Content sorting - // e.g. usage: ".cls" to provide sorting key for each record (sample) - see next - Ext string `json:"extension"` - - // ditto: Content only - // `extract.contentKeyTypes` enum values: {"int", "string", "float" } - ContentKeyType string `json:"content_key_type"` -} - type ( alphaByKey struct { err error