diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index 160f1209e8..7bfbc76051 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -491,6 +491,46 @@ outer: } } +func (df *dsortFramework) checkOutputShardsWithEKM(ekm *shard.ExternalKeyMap) { + tlog.Logln("verifying all records are placed in the correct shards...") + shardRecords := df.getRecordNames(df.outputBck) + shardNamePools := make(map[string]map[string]struct{}, 8) // map from template to shard name pool + + // for each record name within the generated shards, find its template and add its container shard name to the corresponding shard name pool + for _, shard := range shardRecords { + for _, recordName := range shard.recordNames { + shardFmtTmpl, err := ekm.Lookup(recordName) + tassert.CheckFatal(df.m.t, err) + if _, ok := shardNamePools[shardFmtTmpl]; !ok { + shardNamePools[shardFmtTmpl] = make(map[string]struct{}, 4) + } + shardNamePools[shardFmtTmpl][shard.name] = struct{}{} + } + } + + // validate all shard name pools match the consecutively generated shard names from their corresponding templates + for tmpl, pool := range shardNamePools { + pt, _ := cos.NewParsedTemplate(tmpl) + pt.InitIter() + for { + if len(pool) == 0 { + break + } + shardName, hasNext := pt.Next() + if !hasNext { + df.m.t.Fatalf("Shard name template (%v) does not match the corresponding shard name pool, remaining names: %v", tmpl, pool) + } + + _, exists := pool[shardName] + if !exists { + df.m.t.Fatalf("Shard name (%s) generated from template (%v) should exist in the corresponding shard name pool", shardName, tmpl) + } + delete(pool, shardName) + } + } + tlog.Logln("finished, all records are placed in the correct shards as specified in EKM") +} + func canonicalName(recordName string) string { return strings.TrimSuffix(recordName, cos.Ext(recordName)) } @@ -1958,7 +1998,7 @@ func TestDsortOrderFile(t *testing.T) { } orderFileName = "orderFileName" - ekm = make(map[string]string, 10) + ekm = shard.NewExternalKeyMap(8) shardFmts = []string{ "shard-%d-suf", "input-%d-pref", @@ -1996,7 +2036,7 @@ func TestDsortOrderFile(t *testing.T) { for _, shard := range shardRecords { for idx, recordName := range shard.recordNames { buffer.WriteString(fmt.Sprintf("%s\t%s\n", recordName, shardFmts[idx%len(shardFmts)])) - ekm[recordName] = shardFmts[idx%len(shardFmts)] + ekm.Add(recordName, shardFmts[idx%len(shardFmts)]) } } args := api.PutArgs{ @@ -2022,29 +2062,14 @@ func TestDsortOrderFile(t *testing.T) { if len(allMetrics) != m.originalTargetCount { t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount) } - - tlog.Logln("checking if all records are in specified shards...") - shardRecords = df.getRecordNames(df.outputBck) - for _, shard := range shardRecords { - for _, recordName := range shard.recordNames { - match := false - // Some shard with specified format contains the record - for i := range 30 { - match = match || fmt.Sprintf(ekm[recordName], i) == shard.name - } - if !match { - t.Errorf("record %q was not part of any shard with format %q but was in shard %q", - recordName, ekm[recordName], shard.name) - } - } - } + df.checkOutputShardsWithEKM(&ekm) }, ) } func TestDsortRegexOrderFile(t *testing.T) { runDsortTest( - t, dsortTestSpec{p: true, types: []string{dsort.GeneralType}}, + t, dsortTestSpec{p: true, types: dsorterTypes}, func(dsorterType string, t *testing.T) { var ( m = &ioContext{ @@ -2180,24 +2205,7 @@ func TestDsortRegexOrderFile(t *testing.T) { t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount) } - - tlog.Logln("checking if all records are in specified shards...") - shardRecords := df.getRecordNames(df.outputBck) - for _, shard := range shardRecords { - for _, recordName := range shard.recordNames { - shardFmt, err := ekm.Lookup(recordName) - tassert.CheckFatal(t, err) - match := false - // Some shard with specified format contains the record - for i := range 30 { - match = match || fmt.Sprintf(shardFmt, i) == shard.name - } - if !match { - t.Errorf("record %q was not part of any shard with format %q but was in shard %q", - recordName, shardFmt, shard.name) - } - } - } + df.checkOutputShardsWithEKM(&ekm) }) }, ) @@ -2224,11 +2232,11 @@ func TestDsortOrderJSONFile(t *testing.T) { } orderFileName = "order_file_name.json" - ekm = make(map[string]string, 10) + ekm = shard.NewExternalKeyMap(8) shardFmts = []string{ - "shard-%d-suf", - "input-%d-pref", - "smth-%d", + "prefix-{0..100}-suffix.tar", + "prefix-@00001-gap-@100-suffix.tar", + "prefix-%06d-suffix.tar", } proxyURL = tools.RandomProxyURL() baseParams = tools.BaseAPIParams(proxyURL) @@ -2263,7 +2271,7 @@ func TestDsortOrderJSONFile(t *testing.T) { for idx, recordName := range shard.recordNames { shardFmt := shardFmts[idx%len(shardFmts)] content[shardFmt] = append(content[shardFmt], recordName) - ekm[recordName] = shardFmts[idx%len(shardFmts)] + ekm.Add(recordName, shardFmts[idx%len(shardFmts)]) } } jsonBytes, err := jsoniter.Marshal(content) @@ -2293,21 +2301,7 @@ func TestDsortOrderJSONFile(t *testing.T) { len(allMetrics), m.originalTargetCount) } - tlog.Logln("checking if all records are in specified shards...") - shardRecords = df.getRecordNames(df.outputBck) - for _, shard := range shardRecords { - for _, recordName := range shard.recordNames { - match := false - // Some shard with specified format contains the record - for i := range 30 { - match = match || fmt.Sprintf(ekm[recordName], i) == shard.name - } - if !match { - t.Errorf("record %q was not part of any shard with format %q but was in shard %q", - recordName, ekm[recordName], shard.name) - } - } - } + df.checkOutputShardsWithEKM(&ekm) }, ) } diff --git a/docs/cli/dsort.md b/docs/cli/dsort.md index 562928d0a7..12c8b0e6a1 100644 --- a/docs/cli/dsort.md +++ b/docs/cli/dsort.md @@ -219,7 +219,7 @@ JGHEoo89gg One of the key features of the dSort is that user can specify the exact mapping from the record key to the output shard. To use this feature `output_format` should be empty and `order_file`, as well as `order_file_sep`, must be set. -The output shards will be created with provided format which must contain mandatory `%d` which is required to enumerate the shards. +The output shards will be created with provided [template format](/docs/batch.md#operations-on-multiple-selected-objects). Assuming that `order_file` (URL: `http://website.web/static/order_file.txt`) has content: @@ -321,6 +321,63 @@ shard-dogs-0.tar: ... ``` +EKM also supports [template syntax](/docs/batch.md#operations-on-multiple-selected-objects) to express output shard names. +For example, if `order_file` has content: + +```json +{ + "shard-{0..100..3}-cats": [ + "cat_0.txt", + "cat_1.txt", + "cat_3.txt", + "cat_4.txt", + "cat_5.txt", + "cat_6.txt", + ... + ], + "shard-@00001-gap-@100-dogs": [ + "dog_0.txt", + "dog_1.txt", + ... + ], + "shard-%06d-cars": [ + "car_0.txt", + "car_1.txt", + ... + ], + ... +} +``` + +After running `dsort`, the output would be look like this: + +``` +shard-0-cats.tar: +- cat_0.txt +- cat_1.txt +shard-3-cats.tar: +- cat_2.txt +- cat_3.txt +shard-6-cats.tar: +- cat_4.txt +- cat_5.txt +... +shard-00001-gap-001-dogs.tar: +- dog_0.txt +- dog_1.txt +shard-00001-gap-002-dogs.tar: +- dog_2.txt +- dog_3.txt +... +shard-1-cars.tar: +- car_0.txt +- car_1.txt +shard-2-cars.tar: +- car_2.txt +- car_3.txt +... +``` + ## Show dSort jobs and job status `ais show job dsort [JOB_ID]` diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index 4adc625bcd..d13c5767c7 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -685,8 +685,9 @@ func (m *Manager) parseOrderFile() (shard.ExternalKeyMap, error) { func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, error) { var ( - shards = make([]*shard.Shard, 0) - shardsBuilder = make(map[string][]*shard.Shard) + shards = make([]*shard.Shard, 0) + shardTemplates = make(map[string]*cos.ParsedTemplate, 8) + shardsBuilder = make(map[string][]*shard.Shard, 8) ) if maxSize <= 0 { return nil, fmt.Errorf(fmtErrInvalidMaxSize, maxSize) @@ -697,6 +698,18 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, return nil, err } + for _, shardNameFmt := range ekm.All() { + tmpl, err := cos.NewParsedTemplate(shardNameFmt) + if err != nil { + return nil, err + } + if len(tmpl.Ranges) == 0 { + return nil, fmt.Errorf("invalid output template %q: no ranges (prefix-only output is not supported)", shardNameFmt) + } + shardTemplates[shardNameFmt] = &tmpl + shardTemplates[shardNameFmt].InitIter() + } + for _, r := range m.recm.Records.All() { key := fmt.Sprintf("%v", r.Key) shardNameFmt, err := ekm.Lookup(key) @@ -707,12 +720,21 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, } } - shards := shardsBuilder[shardNameFmt] recordSize := r.TotalSize() + m.shardRW.MetadataSize()*int64(len(r.Objects)) - shardCount := len(shards) - if shardCount == 0 || shards[shardCount-1].Size > maxSize { + + // retrieve all shards created using the current template format + shards := shardsBuilder[shardNameFmt] + // if no shards exist for this template, or the last shard exceeds the max size, create a new shard + if len(shards) == 0 || shards[len(shards)-1].Size > maxSize { + shardName, hasNext := shardTemplates[shardNameFmt].Next() + if !hasNext { + return nil, fmt.Errorf( + "number of shards to be created using %s template exceeds expected number of shards (%d)", + shardTemplates[shardNameFmt].Prefix, shardTemplates[shardNameFmt].Count(), + ) + } shard := &shard.Shard{ - Name: fmt.Sprintf(shardNameFmt, shardCount), + Name: shardName, Size: recordSize, Records: shard.NewRecords(1), } @@ -720,7 +742,7 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, shardsBuilder[shardNameFmt] = append(shardsBuilder[shardNameFmt], shard) } else { // Append records - lastShard := shards[shardCount-1] + lastShard := shards[len(shards)-1] lastShard.Size += recordSize lastShard.Records.Insert(r) } diff --git a/ext/dsort/shard/key.go b/ext/dsort/shard/key.go index 7cc3bc67e6..a31be506e5 100644 --- a/ext/dsort/shard/key.go +++ b/ext/dsort/shard/key.go @@ -164,6 +164,7 @@ func (ekm ExternalKeyMap) Add(key, value string) error { if err != nil { return err } + ekm[key] = struct { regex *regexp.Regexp value string @@ -190,3 +191,16 @@ func (ekm ExternalKeyMap) Lookup(input string) (string, error) { } return matches[0], nil } + +func (ekm ExternalKeyMap) All() (result []string) { + set := make(map[string]struct{}, 8) + for _, v := range ekm { + set[v.value] = struct{}{} + } + + result = make([]string, 0, 8) + for key := range set { + result = append(result, key) + } + return +}