Skip to content

Commit

Permalink
dsort: refactoring and renaming
Browse files Browse the repository at this point in the history
* s/FormatType/ContentKeyType/
* bytes.NewBuffer
* minor fixes

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jul 28, 2023
1 parent 4ba19ab commit 4dbe397
Show file tree
Hide file tree
Showing 13 changed files with 327 additions and 330 deletions.
473 changes: 236 additions & 237 deletions ais/test/dsort_test.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ais/test/scripts/dsort-ex1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ ais rmb ais://src ais://dst -y 2>/dev/null 1>&2
ais create ais://src ais://dst
ais archive gen-shards 'ais://src/shard-{0..9}.tar'

## run dsort (from the JSON spec) and wait for the job to finish
## run dsort and wait for the job to finish
## (see 'dsort-ex1-spec.json' in this directory)
ais wait $(ais start dsort -f ${SCRIPT_PATH}/dsort-ex1-spec.json)

## list new shards to confirm 5 new shards, each containing 10 original files
Expand Down
2 changes: 1 addition & 1 deletion docs/cli/dsort.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The following table describes JSON/YAML keys which can be used in the specificat
| `algorithm.decreasing` | `bool` | determines if the algorithm should sort the records in decreasing or increasing order, used for `kind=alphanumeric` or `kind=content` | no | `false` |
| `algorithm.seed` | `string` | seed provided to random generator, used when `kind=shuffle` | no | `""` - `time.Now()` is used |
| `algorithm.extension` | `string` | content of the file with provided extension will be used as sorting key, used when `kind=content` | yes (only when `kind=content`) |
| `algorithm.format_type` | `string` | format type (`int`, `float` or `string`) describes how the content of the file should be interpreted, used when `kind=content` | yes (only when `kind=content`) |
| `algorithm.content_key_type` | `string` | content key type; may have one of the following values: "int", "float", or "string"; used exclusively with `kind=content` sorting | yes (only when `kind=content`) |
| `order_file` | `string` | URL to the file containing external key map (it should contain lines in format: `record_key[sep]shard-%d-fmt`) | yes (only when `output_format` not provided) | `""` |
| `order_file_sep` | `string` | separator used for splitting `record_key` and `shard-%d-fmt` in the lines in external key map | no | `\t` (TAB) |
| `max_mem_usage` | `string` | limits the amount of total system memory allocated by both dSort and other running processes. Once and if this threshold is crossed, dSort will continue extracting onto local drives. Can be in format 60% or 10GB | no | same as in `/deploy/dev/local/aisnode_config.sh` |
Expand Down
6 changes: 3 additions & 3 deletions ext/dsort/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func newTargetMock(daemonID string, smap *testSmap) *targetNodeMock {
rs := &ParsedRequestSpec{
Extension: archive.ExtTar,
Algorithm: &SortAlgorithm{
FormatType: extract.FormatTypeString,
ContentKeyType: extract.ContentKeyString,
},
MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0},
DSorterType: DSorterGeneralType,
Expand Down Expand Up @@ -455,8 +455,8 @@ var _ = Describe("Distributed Sort", func() {

rs := &ParsedRequestSpec{
Algorithm: &SortAlgorithm{
Decreasing: true,
FormatType: extract.FormatTypeString,
Decreasing: true,
ContentKeyType: extract.ContentKeyString,
},
Extension: archive.ExtTar,
MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0},
Expand Down
40 changes: 17 additions & 23 deletions ext/dsort/extract/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@ import (
"strconv"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/pkg/errors"
)

const (
FormatTypeInt = "int"
FormatTypeFloat = "float"
FormatTypeString = "string"
ContentKeyInt = "int"
ContentKeyFloat = "float"
ContentKeyString = "string"

fmtErrInvalidSortingKeyType = "invalid content sorting key %q (expecting one of: %+v)"
)

var (
supportedFormatTypes = []string{FormatTypeInt, FormatTypeFloat, FormatTypeString}

errInvalidAlgorithmFormatTypes = fmt.Errorf("invalid algorithm format type provided, shoule be one of: %+v", supportedFormatTypes)
contentKeyTypes = []string{ContentKeyInt, ContentKeyFloat, ContentKeyString}
)

type (
Expand All @@ -48,8 +47,8 @@ type (

nameKeyExtractor struct{}
contentKeyExtractor struct {
ty string // type of key extracted, supported: supportedFormatTypes
ext string // extension of object record whose content will be read
ty string // one of contentKeyTypes: {"int", "string", ... } - see above
ext string // file with this extension provides sorting key (of the type `ty`)
}
)

Expand Down Expand Up @@ -80,51 +79,46 @@ func (*nameKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error) {
}

func NewContentKeyExtractor(ty, ext string) (KeyExtractor, error) {
if err := ValidateAlgorithmFormatType(ty); err != nil {
if err := ValidateContentKeyT(ty); err != nil {
return nil, err
}

return &contentKeyExtractor{ty: ty, ext: ext}, nil
}

func (ke *contentKeyExtractor) PrepareExtractor(name string, r cos.ReadSizer, ext string) (cos.ReadSizer, *SingleKeyExtractor, bool) {
if ke.ext != ext {
return r, nil, false
}

buf := &bytes.Buffer{}
tee := cos.NewSizedReader(io.TeeReader(r, buf), r.Size())
return tee, &SingleKeyExtractor{name: name, buf: buf}, true
}

func (ke *contentKeyExtractor) ExtractKey(ske *SingleKeyExtractor) (any, error) {
if ske == nil { // is not valid to be read
if ske == nil {
return nil, nil
}

b, err := io.ReadAll(ske.buf)
ske.buf = nil
if err != nil {
return nil, err
}

key := string(b)
switch ke.ty {
case FormatTypeInt:
case ContentKeyInt:
return strconv.ParseInt(key, 10, 64)
case FormatTypeFloat:
case ContentKeyFloat:
return strconv.ParseFloat(key, 64)
case FormatTypeString:
case ContentKeyString:
return key, nil
default:
return nil, errors.Errorf("not implemented extractor type: %s", ke.ty)
return nil, fmt.Errorf(fmtErrInvalidSortingKeyType, ke.ty, contentKeyTypes)
}
}

func ValidateAlgorithmFormatType(ty string) error {
if !cos.StringInSlice(ty, supportedFormatTypes) {
return errInvalidAlgorithmFormatTypes
func ValidateContentKeyT(ty string) error {
if !cos.StringInSlice(ty, contentKeyTypes) {
return fmt.Errorf(fmtErrInvalidSortingKeyType, ty, contentKeyTypes)
}

return nil
}
10 changes: 5 additions & 5 deletions ext/dsort/extract/managers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ func (rm *RecordManager) ExtractRecordWithBuffer(args extractRecordArgs) (size i
recordUniqueName = rm.genRecordUniqueName(args.shardName, args.recordName)
)

// If the content already exists we should skip it but set error (caller
// needs to handle it properly).
// If the content already exists we should skip it but set error
// (caller must to handle it properly).
if rm.Records.Exists(recordUniqueName, ext) {
msg := fmt.Sprintf("record %q has been duplicated", args.recordName)
rm.Records.DeleteDup(recordUniqueName, ext)

// NOTE: There is no need to remove anything from `rm.extractionPaths`
// or `rm.contents` since it will be removed anyway in cleanup.
// Assumption is that there will be not much duplicates and we can live
// with a little bit more files/memory.
// or `rm.contents` since it'll be removed anyway during subsequent cleanup.
// The assumption is that there will be not too many duplicates and we can live
// with a few extra files/memory.
return 0, rm.onDuplicatedRecords(msg)
}

Expand Down
30 changes: 17 additions & 13 deletions ext/dsort/extract/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,36 +222,40 @@ func (r *Records) Len() int {

func (r *Records) Swap(i, j int) { r.arr[i], r.arr[j] = r.arr[j], r.arr[i] }

func (r *Records) Less(i, j int, formatType string) (bool, error) {
func (r *Records) Less(i, j int, keyType string) (bool, error) {
lhs, rhs := r.arr[i].Key, r.arr[j].Key
if lhs == nil {
return false, errors.Errorf("key is missing for %q", r.arr[i].Name)
} else if rhs == nil {
return false, errors.Errorf("key is missing for %q", r.arr[j].Name)
}

switch formatType {
case FormatTypeInt:
switch keyType {
case ContentKeyInt:
ilhs, lok := lhs.(int64)
irhs, rok := rhs.(int64)
if lok && rok {
return ilhs < irhs, nil
}

// One side was parsed as float64 - javascript does not support
// int64 type and it fallback to float64
// (motivation: javascript does not support int64 type)
if !lok {
ilhs = int64(lhs.(float64))
}
if !rok {
} else {
irhs = int64(rhs.(float64))
}

return ilhs < irhs, nil
case FormatTypeFloat:
return lhs.(float64) < rhs.(float64), nil
case FormatTypeString:
return lhs.(string) < rhs.(string), nil
case ContentKeyFloat:
flhs, lok := lhs.(float64)
frhs, rok := rhs.(float64)
debug.Assert(lok, lhs)
debug.Assert(rok, rhs)
return flhs < frhs, nil
case ContentKeyString:
slhs, lok := lhs.(string)
srhs, rok := rhs.(string)
debug.Assert(lok, lhs)
debug.Assert(rok, rhs)
return slhs < srhs, nil
}

debug.Assertf(false, "lhs: %v, rhs: %v, arr[i]: %v, arr[j]: %v", lhs, rhs, r.arr[i], r.arr[j])
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (m *Manager) setExtractCreator() (err error) {

switch m.rs.Algorithm.Kind {
case SortKindContent:
keyExtractor, err = extract.NewContentKeyExtractor(m.rs.Algorithm.FormatType, m.rs.Algorithm.Extension)
keyExtractor, err = extract.NewContentKeyExtractor(m.rs.Algorithm.ContentKeyType, m.rs.Algorithm.Extension)
case SortKindMD5:
keyExtractor, err = extract.NewMD5KeyExtractor()
default:
Expand Down
26 changes: 5 additions & 21 deletions ext/dsort/request_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/NVIDIA/aistore/ext/dsort/extract"
)

// TODO: move and refactor
var (
errMissingBucket = errors.New("missing field 'bucket'")
errInvalidExtension = errors.New("extension must be one of '.tar', '.tar.gz', or '.tgz'")
errInvalidExtension = errors.New("extension must be one of: '.tar', '.tar.gz' or '.tgz', '.zip', '.tar.lz4'")
errNegOutputShardSize = errors.New("output shard size must be >= 0")
errEmptyOutputShardSize = errors.New("output shard size must be set (cannot be 0)")
errNegativeConcurrencyLimit = errors.New("concurrency max limit must be 0 (limits will be calculated) or > 0")
errInvalidOrderParam = errors.New("could not parse order format, required URL")
errInvalidAlgorithm = errors.New("invalid algorithm specified")
errInvalidSeed = errors.New("invalid seed provided, should be int")
errInvalidAlgorithmExtension = errors.New("invalid extension provided, should be in the format: .ext")
)
Expand Down Expand Up @@ -107,20 +107,6 @@ type ParsedRequestSpec struct {
cmn.DSortConf
}

type SortAlgorithm struct {
Kind string `json:"kind"`

// Kind: alphanumeric, content
Decreasing bool `json:"decreasing"`

// Kind: shuffle
Seed string `json:"seed"` // seed provided to random generator

// Kind: content
Extension string `json:"extension"`
FormatType string `json:"format_type"`
}

/////////////////
// RequestSpec //
/////////////////
Expand Down Expand Up @@ -177,7 +163,7 @@ func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error) {

parsedRS.Algorithm, err = parseAlgorithm(rs.Algorithm)
if err != nil {
return nil, errInvalidAlgorithm
return nil, err
}

if empty, valid := validateOrderFileURL(rs.OrderFileURL); !valid {
Expand Down Expand Up @@ -274,16 +260,14 @@ func parseAlgorithm(algo SortAlgorithm) (parsedAlgo *SortAlgorithm, err error) {
if algo.Extension == "" {
return nil, errInvalidAlgorithmExtension
}

if algo.Extension[0] != '.' { // extension should begin with dot: .cls
return nil, errInvalidAlgorithmExtension
}

if err := extract.ValidateAlgorithmFormatType(algo.FormatType); err != nil {
if err := extract.ValidateContentKeyT(algo.ContentKeyType); err != nil {
return nil, err
}
} else {
algo.FormatType = extract.FormatTypeString
algo.ContentKeyType = extract.ContentKeyString
}

return &algo, nil
Expand Down
27 changes: 23 additions & 4 deletions ext/dsort/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,30 @@ const (

var supportedAlgorithms = []string{sortKindEmpty, SortKindAlphanumeric, SortKindMD5, SortKindShuffle, SortKindContent, SortKindNone}

type SortAlgorithm struct {
// one of the `supportedAlgorithms` (see above)
Kind string `json:"kind"`

// currently, used with two sorting alg-s: SortKindAlphanumeric and SortKindContent
Decreasing bool `json:"decreasing"`

// when sort is a random shuffle
Seed string `json:"seed"`

// exclusively with sorting alg. Kind = "content" (aka SortKindContent)
// used to select files that provide sorting keys - see next
Extension string `json:"extension"`

// ditto: SortKindContent only
// one of extract.contentKeyTypes, namely: {"int", "string", ... }
ContentKeyType string `json:"content_key_type"`
}

type (
alphaByKey struct {
err error
records *extract.Records
formatType string
keyType string
decreasing bool
}
)
Expand All @@ -50,9 +69,9 @@ func (s *alphaByKey) Less(i, j int) bool {
less bool
)
if s.decreasing {
less, err = s.records.Less(j, i, s.formatType)
less, err = s.records.Less(j, i, s.keyType)
} else {
less, err = s.records.Less(i, j, s.formatType)
less, err = s.records.Less(i, j, s.keyType)
}
if err != nil {
s.err = err
Expand Down Expand Up @@ -80,7 +99,7 @@ func sortRecords(r *extract.Records, algo *SortAlgorithm) (err error) {
r.Swap(i, j)
}
} else {
keys := &alphaByKey{records: r, decreasing: algo.Decreasing, formatType: algo.FormatType, err: nil}
keys := &alphaByKey{records: r, decreasing: algo.Decreasing, keyType: algo.ContentKeyType, err: nil}
sort.Sort(keys)

if keys.err != nil {
Expand Down
Loading

0 comments on commit 4dbe397

Please sign in to comment.