Skip to content

Commit

Permalink
feat: support and validate template format strings in dsort and `is…
Browse files Browse the repository at this point in the history
…hard` ekm

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Aug 22, 2024
1 parent 3500e92 commit 0722946
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 65 deletions.
108 changes: 51 additions & 57 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,46 @@ outer:
}
}

func (df *dsortFramework) checkOutputShardsWithEKM(ekm *shard.ExternalKeyMap) {
tlog.Logln("verifying all records are placed in the correct shards...")
shardRecords := df.getRecordNames(df.outputBck)
shardNamePools := make(map[string]map[string]struct{}, 8) // map from template to shard name pool

// for each record name within the generated shards, find its template and add its container shard name to the corresponding shard name pool
for _, shard := range shardRecords {
for _, recordName := range shard.recordNames {
shardFmtTmpl, err := ekm.Lookup(recordName)
tassert.CheckFatal(df.m.t, err)
if _, ok := shardNamePools[shardFmtTmpl]; !ok {
shardNamePools[shardFmtTmpl] = make(map[string]struct{}, 4)
}
shardNamePools[shardFmtTmpl][shard.name] = struct{}{}
}
}

// validate all shard name pools match the consecutively generated shard names from their corresponding templates
for tmpl, pool := range shardNamePools {
pt, _ := cos.NewParsedTemplate(tmpl)
pt.InitIter()
for {
if len(pool) == 0 {
break
}
shardName, hasNext := pt.Next()
if !hasNext {
df.m.t.Fatalf("Shard name template (%v) does not match the corresponding shard name pool, remaining names: %v", tmpl, pool)
}

_, exists := pool[shardName]
if !exists {
df.m.t.Fatalf("Shard name (%s) generated from template (%v) should exist in the corresponding shard name pool", shardName, tmpl)
}
delete(pool, shardName)
}
}
tlog.Logln("finished, all records are placed in the correct shards as specified in EKM")
}

func canonicalName(recordName string) string {
return strings.TrimSuffix(recordName, cos.Ext(recordName))
}
Expand Down Expand Up @@ -1958,7 +1998,7 @@ func TestDsortOrderFile(t *testing.T) {
}

orderFileName = "orderFileName"
ekm = make(map[string]string, 10)
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
"shard-%d-suf",
"input-%d-pref",
Expand Down Expand Up @@ -1996,7 +2036,7 @@ func TestDsortOrderFile(t *testing.T) {
for _, shard := range shardRecords {
for idx, recordName := range shard.recordNames {
buffer.WriteString(fmt.Sprintf("%s\t%s\n", recordName, shardFmts[idx%len(shardFmts)]))
ekm[recordName] = shardFmts[idx%len(shardFmts)]
ekm.Add(recordName, shardFmts[idx%len(shardFmts)])
}
}
args := api.PutArgs{
Expand All @@ -2022,29 +2062,14 @@ func TestDsortOrderFile(t *testing.T) {
if len(allMetrics) != m.originalTargetCount {
t.Errorf("number of metrics %d is not same as number of targets %d", len(allMetrics), m.originalTargetCount)
}

tlog.Logln("checking if all records are in specified shards...")
shardRecords = df.getRecordNames(df.outputBck)
for _, shard := range shardRecords {
for _, recordName := range shard.recordNames {
match := false
// Some shard with specified format contains the record
for i := range 30 {
match = match || fmt.Sprintf(ekm[recordName], i) == shard.name
}
if !match {
t.Errorf("record %q was not part of any shard with format %q but was in shard %q",
recordName, ekm[recordName], shard.name)
}
}
}
df.checkOutputShardsWithEKM(&ekm)
},
)
}

func TestDsortRegexOrderFile(t *testing.T) {
runDsortTest(
t, dsortTestSpec{p: true, types: []string{dsort.GeneralType}},
t, dsortTestSpec{p: true, types: dsorterTypes},
func(dsorterType string, t *testing.T) {
var (
m = &ioContext{
Expand Down Expand Up @@ -2180,24 +2205,7 @@ func TestDsortRegexOrderFile(t *testing.T) {
t.Errorf("number of metrics %d is not same as number of targets %d",
len(allMetrics), m.originalTargetCount)
}

tlog.Logln("checking if all records are in specified shards...")
shardRecords := df.getRecordNames(df.outputBck)
for _, shard := range shardRecords {
for _, recordName := range shard.recordNames {
shardFmt, err := ekm.Lookup(recordName)
tassert.CheckFatal(t, err)
match := false
// Some shard with specified format contains the record
for i := range 30 {
match = match || fmt.Sprintf(shardFmt, i) == shard.name
}
if !match {
t.Errorf("record %q was not part of any shard with format %q but was in shard %q",
recordName, shardFmt, shard.name)
}
}
}
df.checkOutputShardsWithEKM(&ekm)
})
},
)
Expand All @@ -2224,11 +2232,11 @@ func TestDsortOrderJSONFile(t *testing.T) {
}

orderFileName = "order_file_name.json"
ekm = make(map[string]string, 10)
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
"shard-%d-suf",
"input-%d-pref",
"smth-%d",
"prefix-{0..100}-suffix.tar",
"prefix-@[email protected]",
"prefix-%06d-suffix.tar",
}
proxyURL = tools.RandomProxyURL()
baseParams = tools.BaseAPIParams(proxyURL)
Expand Down Expand Up @@ -2263,7 +2271,7 @@ func TestDsortOrderJSONFile(t *testing.T) {
for idx, recordName := range shard.recordNames {
shardFmt := shardFmts[idx%len(shardFmts)]
content[shardFmt] = append(content[shardFmt], recordName)
ekm[recordName] = shardFmts[idx%len(shardFmts)]
ekm.Add(recordName, shardFmts[idx%len(shardFmts)])
}
}
jsonBytes, err := jsoniter.Marshal(content)
Expand Down Expand Up @@ -2293,21 +2301,7 @@ func TestDsortOrderJSONFile(t *testing.T) {
len(allMetrics), m.originalTargetCount)
}

tlog.Logln("checking if all records are in specified shards...")
shardRecords = df.getRecordNames(df.outputBck)
for _, shard := range shardRecords {
for _, recordName := range shard.recordNames {
match := false
// Some shard with specified format contains the record
for i := range 30 {
match = match || fmt.Sprintf(ekm[recordName], i) == shard.name
}
if !match {
t.Errorf("record %q was not part of any shard with format %q but was in shard %q",
recordName, ekm[recordName], shard.name)
}
}
}
df.checkOutputShardsWithEKM(&ekm)
},
)
}
Expand Down
59 changes: 58 additions & 1 deletion docs/cli/dsort.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ JGHEoo89gg

One of the key features of the dSort is that user can specify the exact mapping from the record key to the output shard.
To use this feature `output_format` should be empty and `order_file`, as well as `order_file_sep`, must be set.
The output shards will be created with provided format which must contain mandatory `%d` which is required to enumerate the shards.
The output shards will be created with provided [template format](/docs/batch.md#operations-on-multiple-selected-objects).

Assuming that `order_file` (URL: `http://website.web/static/order_file.txt`) has content:

Expand Down Expand Up @@ -321,6 +321,63 @@ shard-dogs-0.tar:
...
```

EKM also supports [template syntax](/docs/batch.md#operations-on-multiple-selected-objects) to express output shard names.
For example, if `order_file` has content:

```json
{
"shard-{0..100..3}-cats": [
"cat_0.txt",
"cat_1.txt",
"cat_3.txt",
"cat_4.txt",
"cat_5.txt",
"cat_6.txt",
...
],
"shard-@00001-gap-@100-dogs": [
"dog_0.txt",
"dog_1.txt",
...
],
"shard-%06d-cars": [
"car_0.txt",
"car_1.txt",
...
],
...
}
```

After running `dsort`, the output would be look like this:

```
shard-0-cats.tar:
- cat_0.txt
- cat_1.txt
shard-3-cats.tar:
- cat_2.txt
- cat_3.txt
shard-6-cats.tar:
- cat_4.txt
- cat_5.txt
...
shard-00001-gap-001-dogs.tar:
- dog_0.txt
- dog_1.txt
shard-00001-gap-002-dogs.tar:
- dog_2.txt
- dog_3.txt
...
shard-1-cars.tar:
- car_0.txt
- car_1.txt
shard-2-cars.tar:
- car_2.txt
- car_3.txt
...
```

## Show dSort jobs and job status

`ais show job dsort [JOB_ID]`
Expand Down
36 changes: 29 additions & 7 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,9 @@ func (m *Manager) parseOrderFile() (shard.ExternalKeyMap, error) {

func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, error) {
var (
shards = make([]*shard.Shard, 0)
shardsBuilder = make(map[string][]*shard.Shard)
shards = make([]*shard.Shard, 0)
shardTemplates = make(map[string]*cos.ParsedTemplate, 8)
shardsBuilder = make(map[string][]*shard.Shard, 8)
)
if maxSize <= 0 {
return nil, fmt.Errorf(fmtErrInvalidMaxSize, maxSize)
Expand All @@ -697,6 +698,18 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard,
return nil, err
}

for _, shardNameFmt := range ekm.All() {
tmpl, err := cos.NewParsedTemplate(shardNameFmt)
if err != nil {
return nil, err
}
if len(tmpl.Ranges) == 0 {
return nil, fmt.Errorf("invalid output template %q: no ranges (prefix-only output is not supported)", shardNameFmt)
}
shardTemplates[shardNameFmt] = &tmpl
shardTemplates[shardNameFmt].InitIter()
}

for _, r := range m.recm.Records.All() {
key := fmt.Sprintf("%v", r.Key)
shardNameFmt, err := ekm.Lookup(key)
Expand All @@ -707,20 +720,29 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard,
}
}

shards := shardsBuilder[shardNameFmt]
recordSize := r.TotalSize() + m.shardRW.MetadataSize()*int64(len(r.Objects))
shardCount := len(shards)
if shardCount == 0 || shards[shardCount-1].Size > maxSize {

// retrieve all shards created using the current template format
shards := shardsBuilder[shardNameFmt]
// if no shards exist for this template, or the last shard exceeds the max size, create a new shard
if len(shards) == 0 || shards[len(shards)-1].Size > maxSize {
shardName, hasNext := shardTemplates[shardNameFmt].Next()
if !hasNext {
return nil, fmt.Errorf(
"number of shards to be created using %s template exceeds expected number of shards (%d)",
shardTemplates[shardNameFmt].Prefix, shardTemplates[shardNameFmt].Count(),
)
}
shard := &shard.Shard{
Name: fmt.Sprintf(shardNameFmt, shardCount),
Name: shardName,
Size: recordSize,
Records: shard.NewRecords(1),
}
shard.Records.Insert(r)
shardsBuilder[shardNameFmt] = append(shardsBuilder[shardNameFmt], shard)
} else {
// Append records
lastShard := shards[shardCount-1]
lastShard := shards[len(shards)-1]
lastShard.Size += recordSize
lastShard.Records.Insert(r)
}
Expand Down
14 changes: 14 additions & 0 deletions ext/dsort/shard/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (ekm ExternalKeyMap) Add(key, value string) error {
if err != nil {
return err
}

ekm[key] = struct {
regex *regexp.Regexp
value string
Expand All @@ -190,3 +191,16 @@ func (ekm ExternalKeyMap) Lookup(input string) (string, error) {
}
return matches[0], nil
}

func (ekm ExternalKeyMap) All() (result []string) {
set := make(map[string]struct{}, 8)
for _, v := range ekm {
set[v.value] = struct{}{}
}

result = make([]string, 0, 8)
for key := range set {
result = append(result, key)
}
return
}

0 comments on commit 0722946

Please sign in to comment.