Skip to content

Commit

Permalink
dsort: Go API change; add dsort/api.go; CLI: print job spec
Browse files Browse the repository at this point in the history
* --verbose flag
* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 31, 2023
1 parent 0c3a1da commit 55962da
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 254 deletions.
2 changes: 1 addition & 1 deletion ais/prxdsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// POST /v1/sort
func (p *proxy) proxyStartSortHandler(w http.ResponseWriter, r *http.Request) {
rs := &dsort.RequestSpec{}
if cmn.ReadJSON(w, r, &rs) != nil {
if cmn.ReadJSON(w, r, rs) != nil {
return
}
parsedRS, err := rs.Parse()
Expand Down
99 changes: 54 additions & 45 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ const (
scopeSpec = "spec"
)

const (
startingDS = "starting distributed sort"
finishedDS = "finished distributed sort"
)

var (
dsortDescCurPrefix = fmt.Sprintf("%s-%d-", dsortDescAllPrefix, os.Getpid())

Expand Down Expand Up @@ -259,11 +264,10 @@ func (df *dsortFramework) gen() dsort.RequestSpec {

func (df *dsortFramework) start() {
var (
err error
rs = df.gen()
err error
spec = df.gen()
)

df.managerUUID, err = api.StartDSort(df.baseParams, rs)
df.managerUUID, err = api.StartDSort(df.baseParams, &spec)
tassert.CheckFatal(df.m.t, err)
}

Expand Down Expand Up @@ -337,7 +341,7 @@ func (df *dsortFramework) checkOutputShards(zeros int) {
baseParams = tools.BaseAPIParams(df.m.proxyURL)
records = make(map[string]int, 100)
)
tlog.Logln("checking if files are sorted...")
tlog.Logln("checking that files are sorted...")
for i := 0; i < df.outputShardCnt; i++ {
var (
buffer bytes.Buffer
Expand Down Expand Up @@ -573,12 +577,12 @@ func dispatchDSortJob(m *ioContext, dsorterType string, i int) {
df.init()
df.createInputShards()

tlog.Logln("starting distributed sort...")
tlog.Logln(startingDS)
df.start()

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(m.t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -666,12 +670,12 @@ func testDsort(t *testing.T, ext, lr string) {
df.missingShards = cmn.AbortReaction // (when shards are explicitly enumerated...)
}

tlog.Logln("starting distributed sort ...")
tlog.Logln(startingDS)
df.start()

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -709,9 +713,9 @@ func TestDistributedSortNonExistingBuckets(t *testing.T) {
// Create local output bucket
tools.CreateBucketWithCleanup(t, m.proxyURL, df.outputBck, nil)

tlog.Logln("starting distributed sort...")
rs := df.gen()
if _, err := api.StartDSort(df.baseParams, rs); err == nil {
tlog.Logln(startingDS)
spec := df.gen()
if _, err := api.StartDSort(df.baseParams, &spec); err == nil {
t.Errorf("expected %s job to fail when input bucket does not exist", dsort.DSortName)
}

Expand All @@ -720,7 +724,7 @@ func TestDistributedSortNonExistingBuckets(t *testing.T) {
tools.CreateBucketWithCleanup(t, m.proxyURL, m.bck, nil)

tlog.Logln("starting second distributed sort...")
if _, err := api.StartDSort(df.baseParams, rs); err == nil {
if _, err := api.StartDSort(df.baseParams, &spec); err == nil {
t.Errorf("expected %s job to fail when output bucket does not exist", dsort.DSortName)
}
},
Expand Down Expand Up @@ -752,12 +756,12 @@ func TestDistributedSortEmptyBucket(t *testing.T) {

df.init()

tlog.Logln("starting distributed sort...")
tlog.Logln(startingDS)
df.start()

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(reaction == cmn.AbortReaction /*expectAbort*/)
df.checkReactionResult(reaction, df.shardCnt)
Expand Down Expand Up @@ -804,7 +808,7 @@ func TestDistributedSortOutputBucket(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -898,7 +902,7 @@ func TestDistributedSortShuffle(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -935,7 +939,7 @@ func TestDistributedSortDisk(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

allMetrics := df.checkMetrics(false /* expectAbort */)
for target, metrics := range allMetrics {
Expand Down Expand Up @@ -982,7 +986,7 @@ func TestDistributedSortCompressionDisk(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -1031,7 +1035,7 @@ func TestDistributedSortMemDisk(t *testing.T) {

_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

allMetrics := df.checkMetrics(false /* expectAbort */)
var (
Expand Down Expand Up @@ -1094,7 +1098,7 @@ func TestDistributedSortMemDiskTarCompression(t *testing.T) {

_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

allMetrics := df.checkMetrics(false /*expectAbort*/)
var (
Expand Down Expand Up @@ -1154,7 +1158,7 @@ func TestDistributedSortZipLz4(t *testing.T) {

_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -1199,7 +1203,7 @@ func TestDistributedSortTarCompression(t *testing.T) {

_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -1274,7 +1278,8 @@ func TestDistributedSortContent(t *testing.T) {
allMetrics, err := api.MetricsDSort(df.baseParams, df.managerUUID)
tassert.CheckFatal(t, err)
if len(allMetrics) != m.originalTargetCount {
t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount)
t.Errorf("number of metrics %d is not same as the number of targets %d",
len(allMetrics), m.originalTargetCount)
}

for target, metrics := range allMetrics {
Expand Down Expand Up @@ -1526,7 +1531,8 @@ func TestDistributedSortManipulateMountpathDuringPhases(t *testing.T) {
tassert.CheckFatal(t, err)
} else {
tlog.Logf("removing mountpath %q from %s...\n", mpath, target.ID())
err := api.DetachMountpath(df.baseParams, target, mpath, false /*dont-resil*/)
err := api.DetachMountpath(df.baseParams, target,
mpath, false /*dont-resil*/)
tassert.CheckFatal(t, err)
}
}
Expand Down Expand Up @@ -1629,7 +1635,7 @@ func TestDistributedSortMetricsAfterFinish(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(0)
Expand Down Expand Up @@ -1672,7 +1678,7 @@ func TestDistributedSortSelfAbort(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

// Wait a while for all targets to abort
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -1723,7 +1729,7 @@ func TestDistributedSortOnOOM(t *testing.T) {

_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down Expand Up @@ -1789,7 +1795,7 @@ func TestDistributedSortMissingShards(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkReactionResult(reaction, df.shardCntToSkip)
},
Expand Down Expand Up @@ -1854,7 +1860,7 @@ func TestDistributedSortDuplications(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkReactionResult(reaction, df.recordDuplicationsCnt)
},
Expand Down Expand Up @@ -1935,13 +1941,13 @@ func TestDistributedSortOrderFile(t *testing.T) {
tassert.CheckFatal(t, err)

tlog.Logln("starting distributed sort...")
rs := df.gen()
managerUUID, err := api.StartDSort(baseParams, rs)
spec := df.gen()
managerUUID, err := api.StartDSort(baseParams, &spec)
tassert.CheckFatal(t, err)

_, err = tools.WaitForDSortToFinish(m.proxyURL, managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

allMetrics, err := api.MetricsDSort(baseParams, managerUUID)
tassert.CheckFatal(t, err)
Expand All @@ -1959,7 +1965,8 @@ func TestDistributedSortOrderFile(t *testing.T) {
match = match || fmt.Sprintf(ekm[recordName], i) == shard.name
}
if !match {
t.Errorf("record %q was not part of any shard with format %q but was in shard %q", recordName, ekm[recordName], shard.name)
t.Errorf("record %q was not part of any shard with format %q but was in shard %q",
recordName, ekm[recordName], shard.name)
}
}
}
Expand Down Expand Up @@ -2042,18 +2049,19 @@ func TestDistributedSortOrderJSONFile(t *testing.T) {
tassert.CheckFatal(t, err)

tlog.Logln("starting distributed sort...")
rs := df.gen()
managerUUID, err := api.StartDSort(baseParams, rs)
spec := df.gen()
managerUUID, err := api.StartDSort(baseParams, &spec)
tassert.CheckFatal(t, err)

_, err = tools.WaitForDSortToFinish(m.proxyURL, managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

allMetrics, err := api.MetricsDSort(baseParams, managerUUID)
tassert.CheckFatal(t, err)
if len(allMetrics) != m.originalTargetCount {
t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount)
t.Errorf("number of metrics %d is not same as number of targets %d",
len(allMetrics), m.originalTargetCount)
}

tlog.Logln("checking if all records are in specified shards...")
Expand All @@ -2066,7 +2074,8 @@ func TestDistributedSortOrderJSONFile(t *testing.T) {
match = match || fmt.Sprintf(ekm[recordName], i) == shard.name
}
if !match {
t.Errorf("record %q was not part of any shard with format %q but was in shard %q", recordName, ekm[recordName], shard.name)
t.Errorf("record %q was not part of any shard with format %q but was in shard %q",
recordName, ekm[recordName], shard.name)
}
}
}
Expand Down Expand Up @@ -2106,7 +2115,7 @@ func TestDistributedSortDryRun(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
},
Expand Down Expand Up @@ -2145,7 +2154,7 @@ func TestDistributedSortDryRunDisk(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
},
Expand Down Expand Up @@ -2187,7 +2196,7 @@ func TestDistributedSortLongerExt(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /*expectAbort*/)
df.checkOutputShards(5)
Expand Down Expand Up @@ -2229,7 +2238,7 @@ func TestDistributedSortAutomaticallyCalculateOutputShards(t *testing.T) {

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /*expectAbort*/)
df.checkOutputShards(0)
Expand Down Expand Up @@ -2270,12 +2279,12 @@ func TestDistributedSortWithTarFormats(t *testing.T) {
df.init()
df.createInputShards()

tlog.Logln("starting distributed sort...")
tlog.Logln(startingDS)
df.start()

_, err := tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)
tlog.Logln("finished distributed sort")
tlog.Logln(finishedDS)

df.checkMetrics(false /* expectAbort */)
df.checkOutputShards(5)
Expand Down
2 changes: 1 addition & 1 deletion api/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/NVIDIA/aistore/ext/dsort"
)

func StartDSort(bp BaseParams, rs dsort.RequestSpec) (id string, err error) {
func StartDSort(bp BaseParams, rs *dsort.RequestSpec) (id string, err error) {
bp.Method = http.MethodPost
reqParams := AllocRp()
{
Expand Down
10 changes: 5 additions & 5 deletions bench/aisloader/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,22 +589,22 @@ func parseCmdLine() (params, error) {
func printArguments(set *flag.FlagSet) {
w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)

_, _ = fmt.Fprintf(w, "==== COMMAND LINE ARGUMENTS ====\n")
_, _ = fmt.Fprintf(w, "=========== DEFAULTS ===========\n")
fmt.Fprintf(w, "==== COMMAND LINE ARGUMENTS ====\n")
fmt.Fprintf(w, "=========== DEFAULTS ===========\n")
set.VisitAll(func(f *flag.Flag) {
if f.Value.String() == f.DefValue {
_, _ = fmt.Fprintf(w, "%s:\t%s\n", f.Name, f.Value.String())
}
})
_, _ = fmt.Fprintf(w, "============ CUSTOM ============\n")
fmt.Fprintf(w, "============ CUSTOM ============\n")
set.VisitAll(func(f *flag.Flag) {
if f.Value.String() != f.DefValue {
_, _ = fmt.Fprintf(w, "%s:\t%s\n", f.Name, f.Value.String())
}
})
fmt.Fprintf(w, "HTTP trace:\t%v\n", runParams.traceHTTP)
_, _ = fmt.Fprintf(w, "=================================\n\n")
_ = w.Flush()
fmt.Fprintf(w, "=================================\n\n")
w.Flush()
}

// newStats returns a new stats object with given time as the starting point
Expand Down
Loading

0 comments on commit 55962da

Please sign in to comment.