Skip to content

Commit

Permalink
dsort: rename order_file to EKM and improve EKM file parsing logic
Browse files Browse the repository at this point in the history
* The term order_file was misleading, as it suggested functionality related to "ordering," whereas its purpose is only to provide rules for categorizing source records without any specific order. Renaming it to EKM clarifies its role and makes the code and API spec more intuitive.
* Enhanced the EKM file parsing logic by removing the reliance on file extensions. The new logic now auto-detects the file type by first attempting to parse it as JSON, and then falls back to line-based parsing if fails.

Signed-off-by: Tony Chen <[email protected]>
  • Loading branch information
Nahemah1022 committed Aug 22, 2024
1 parent 0722946 commit ccbeefe
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 122 deletions.
68 changes: 34 additions & 34 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type (

inputTempl apc.ListRange
outputTempl string
orderFileURL string
EKMFileURL string
shardCnt int
shardCntToSkip int
filesPerShard int
Expand Down Expand Up @@ -257,7 +257,7 @@ func (df *dsortFramework) gen() dsort.RequestSpec {
OutputFormat: df.outputTempl,
OutputShardSize: df.outputShardSize,
Algorithm: *df.alg,
OrderFileURL: df.orderFileURL,
EKMFileURL: df.EKMFileURL,
ExtractConcMaxLimit: 10,
CreateConcMaxLimit: 10,
MaxMemUsage: df.maxMemUsage,
Expand Down Expand Up @@ -1977,7 +1977,7 @@ func TestDsortDuplications(t *testing.T) {
}
}

func TestDsortOrderFile(t *testing.T) {
func TestDsortEKMFile(t *testing.T) {
runDsortTest(
t, dsortTestSpec{p: true, types: dsorterTypes},
func(dsorterType string, t *testing.T) {
Expand All @@ -1997,9 +1997,9 @@ func TestDsortOrderFile(t *testing.T) {
filesPerShard: 10,
}

orderFileName = "orderFileName"
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
EKMFileName = "ekm_file_name"
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
"shard-%d-suf",
"input-%d-pref",
"smth-%d",
Expand All @@ -2011,10 +2011,10 @@ func TestDsortOrderFile(t *testing.T) {
m.initAndSaveState(true /*cleanup*/)
m.expectTargets(3)

// Set URL for order file (points to the object in cluster).
df.orderFileURL = fmt.Sprintf(
// Set URL for the ekm file (points to the object in cluster).
df.EKMFileURL = fmt.Sprintf(
"%s/%s/%s/%s/%s?%s=%s",
proxyURL, apc.Version, apc.Objects, m.bck.Name, orderFileName,
proxyURL, apc.Version, apc.Objects, m.bck.Name, EKMFileName,
apc.QparamProvider, apc.AIS,
)

Expand All @@ -2027,8 +2027,8 @@ func TestDsortOrderFile(t *testing.T) {

df.createInputShards()

// Generate content for the orderFile
tlog.Logln("generating and putting order file into cluster...")
// Generate content for the ekm file
tlog.Logln("generating and putting ekm file into cluster...")
var (
buffer bytes.Buffer
shardRecords = df.getRecordNames(m.bck)
Expand All @@ -2042,7 +2042,7 @@ func TestDsortOrderFile(t *testing.T) {
args := api.PutArgs{
BaseParams: baseParams,
Bck: m.bck,
ObjName: orderFileName,
ObjName: EKMFileName,
Reader: readers.NewBytes(buffer.Bytes()),
}
_, err = api.PutObject(&args)
Expand All @@ -2067,7 +2067,7 @@ func TestDsortOrderFile(t *testing.T) {
)
}

func TestDsortRegexOrderFile(t *testing.T) {
func TestDsortRegexEKMFile(t *testing.T) {
runDsortTest(
t, dsortTestSpec{p: true, types: dsorterTypes},
func(dsorterType string, t *testing.T) {
Expand All @@ -2083,18 +2083,18 @@ func TestDsortRegexOrderFile(t *testing.T) {
recordNames: []string{"n01440764.JPEG", "n02097658.JPEG", "n03495258.JPEG", "n02965783.JPEG", "n01631663.JPEG"},
}

orderFileName = "orderFileName.json"
proxyURL = tools.RandomProxyURL()
baseParams = tools.BaseAPIParams(proxyURL)
EKMFileName = "ekm_file_name.json"
proxyURL = tools.RandomProxyURL()
baseParams = tools.BaseAPIParams(proxyURL)
)

m.initAndSaveState(true /*cleanup*/)
m.expectTargets(3)

// Set URL for order file (points to the object in cluster).
df.orderFileURL = fmt.Sprintf(
// Set URL for ekm file (points to the object in cluster).
df.EKMFileURL = fmt.Sprintf(
"%s/%s/%s/%s/%s?%s=%s",
proxyURL, apc.Version, apc.Objects, m.bck.Name, orderFileName,
proxyURL, apc.Version, apc.Objects, m.bck.Name, EKMFileName,
apc.QparamProvider, apc.AIS,
)
df.init()
Expand All @@ -2113,8 +2113,8 @@ func TestDsortRegexOrderFile(t *testing.T) {
// Create local output bucket
tools.CreateBucket(t, m.proxyURL, df.outputBck, nil, true /*cleanup*/)

// Generate content for the orderFile
tlog.Logln("generating and putting order file into cluster...")
// Generate content for the ekm file
tlog.Logln("generating and putting ekm file into cluster...")

jsonContent := map[string][]string{
"shard-%d.tar": {".*string_dont_match.*"},
Expand All @@ -2124,7 +2124,7 @@ func TestDsortRegexOrderFile(t *testing.T) {
args := api.PutArgs{
BaseParams: baseParams,
Bck: m.bck,
ObjName: orderFileName,
ObjName: EKMFileName,
Reader: readers.NewBytes(jsonBytes),
}
_, err = api.PutObject(&args)
Expand Down Expand Up @@ -2163,8 +2163,8 @@ func TestDsortRegexOrderFile(t *testing.T) {
// Create local output bucket
tools.CreateBucket(t, m.proxyURL, df.outputBck, nil, true /*cleanup*/)

// Generate content for the orderFile
tlog.Logln("generating and putting order file into cluster...")
// Generate content for the ekm file
tlog.Logln("generating and putting ekm file into cluster...")
ekm := shard.NewExternalKeyMap(8)
jsonContent := map[string][]string{
"tench-shard-%d.tar": {".*n01440764.*"},
Expand All @@ -2184,7 +2184,7 @@ func TestDsortRegexOrderFile(t *testing.T) {
args := api.PutArgs{
BaseParams: baseParams,
Bck: m.bck,
ObjName: orderFileName,
ObjName: EKMFileName,
Reader: readers.NewBytes(jsonBytes),
}
_, err = api.PutObject(&args)
Expand Down Expand Up @@ -2231,9 +2231,9 @@ func TestDsortOrderJSONFile(t *testing.T) {
filesPerShard: 10,
}

orderFileName = "order_file_name.json"
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
EKMFileName = "ekm_file_name.json"
ekm = shard.NewExternalKeyMap(8)
shardFmts = []string{
"prefix-{0..100}-suffix.tar",
"prefix-@[email protected]",
"prefix-%06d-suffix.tar",
Expand All @@ -2245,10 +2245,10 @@ func TestDsortOrderJSONFile(t *testing.T) {
m.initAndSaveState(true /*cleanup*/)
m.expectTargets(3)

// Set URL for order file (points to the object in cluster).
df.orderFileURL = fmt.Sprintf(
// Set URL for the ekm file (points to the object in cluster).
df.EKMFileURL = fmt.Sprintf(
"%s/%s/%s/%s/%s?%s=%s",
proxyURL, apc.Version, apc.Objects, m.bck.Name, orderFileName,
proxyURL, apc.Version, apc.Objects, m.bck.Name, EKMFileName,
apc.QparamProvider, apc.AIS,
)

Expand All @@ -2261,8 +2261,8 @@ func TestDsortOrderJSONFile(t *testing.T) {

df.createInputShards()

// Generate content for the orderFile
tlog.Logln("generating and putting order file into cluster...")
// Generate content for the ekm file
tlog.Logln("generating and putting ekm file into cluster...")
var (
content = make(map[string][]string, 10)
shardRecords = df.getRecordNames(m.bck)
Expand All @@ -2279,7 +2279,7 @@ func TestDsortOrderJSONFile(t *testing.T) {
args := api.PutArgs{
BaseParams: baseParams,
Bck: m.bck,
ObjName: orderFileName,
ObjName: EKMFileName,
Reader: readers.NewBytes(jsonBytes),
}
_, err = api.PutObject(&args)
Expand Down
6 changes: 3 additions & 3 deletions cmd/cli/cli/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ const (
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
"description": "pack records into categorized shards",
"order_file": "http://website.web/static/order_file.txt",
"order_file_sep": " "
"ekm_file": "http://website.web/static/ekm_file.txt",
"ekm_file_sep": " "
}'`
dsortExampleY = `$ ais start dsort -f - <<EOM
input_extension: .tar
Expand Down Expand Up @@ -246,7 +246,7 @@ func _flattenSpec(spec *dsort.RequestSpec) (flat, config nvpairList) {
if v == "" {
v = dsort.Alphanumeric
}
case "order_file_sep":
case "ekm_file_sep":
if v == "" {
v = `\t`
}
Expand Down
26 changes: 13 additions & 13 deletions docs/cli/dsort.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ NAME:
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
"description": "pack records into categorized shards",
"order_file": "http://website.web/static/order_file.txt",
"order_file_sep": " "
"ekm_file": "http://website.web/static/ekm_file.txt",
"ekm_file_sep": " "
}'
E.g. inline YAML spec:
$ ais start dsort -f - <<EOM
Expand Down Expand Up @@ -96,8 +96,8 @@ input_bck ais://src
input_format.objnames -
input_format.template shard-{0..9}
max_mem_usage -
order_file -
order_file_sep \t
ekm_file -
ekm_file_sep \t
output_bck ais://dst
output_format new-shard-{0000..1000}
output_shard_size 10KB
Expand Down Expand Up @@ -144,8 +144,8 @@ The following table describes JSON/YAML keys which can be used in the specificat
| `algorithm.seed` | `string` | seed provided to random generator, used when `kind=shuffle` | no | `""` - `time.Now()` is used |
| `algorithm.extension` | `string` | content of the file with provided extension will be used as sorting key, used when `kind=content` | yes (only when `kind=content`) |
| `algorithm.content_key_type` | `string` | content key type; may have one of the following values: "int", "float", or "string"; used exclusively with `kind=content` sorting | yes (only when `kind=content`) |
| `order_file` | `string` | URL to the file containing external key map (it should contain lines in format: `record_key[sep]shard-%d-fmt`) | yes (only when `output_format` not provided) | `""` |
| `order_file_sep` | `string` | separator used for splitting `record_key` and `shard-%d-fmt` in the lines in external key map | no | `\t` (TAB) |
| `ekm_file` | `string` | URL to the file containing external key map (it should contain lines in format: `record_key[sep]shard-%d-fmt`) | yes (only when `output_format` not provided) | `""` |
| `ekm_file_sep` | `string` | separator used for splitting `record_key` and `shard-%d-fmt` in the lines in external key map | no | `\t` (TAB) |
| `max_mem_usage` | `string` | limits the amount of total system memory allocated by both dSort and other running processes. Once and if this threshold is crossed, dSort will continue extracting onto local drives. Can be in format 60% or 10GB | no | same as in `/deploy/dev/local/aisnode_config.sh` |
| `extract_concurrency_max_limit` | `int` | limits maximum number of concurrent shards extracted per disk | no | (calculated based on different factors) ~50 |
| `create_concurrency_max_limit` | `int` | limits maximum number of concurrent shards created per disk| no | (calculated based on different factors) ~50 |
Expand Down Expand Up @@ -218,10 +218,10 @@ JGHEoo89gg
#### Pack records into shards with different categories - EKM (External Key Map)

One of the key features of the dSort is that user can specify the exact mapping from the record key to the output shard.
To use this feature `output_format` should be empty and `order_file`, as well as `order_file_sep`, must be set.
To use this feature `output_format` should be empty and `ekm_file`, as well as `ekm_file_sep`, must be set.
The output shards will be created with provided [template format](/docs/batch.md#operations-on-multiple-selected-objects).

Assuming that `order_file` (URL: `http://website.web/static/order_file.txt`) has content:
Assuming that `ekm_file` (URL: `http://website.web/static/ekm_file.txt`) has content:

```
cat_0.txt shard-cats-%d
Expand All @@ -235,7 +235,7 @@ car_1.txt shard-car-%d
...
```

or if `order_file` (URL: `http://website.web/static/order_file.json`, notice `.json` extension) and has content:
or if `ekm_file` (URL: `http://website.web/static/ekm_file.json`, notice `.json` extension) and has content:

```json
{
Expand All @@ -258,7 +258,7 @@ or if `order_file` (URL: `http://website.web/static/order_file.json`, notice `.j
}
```

or, you can also use regex as the record identifier. The `order_file` can contain regex patterns as keys to match multiple records that fit the regex pattern to provided format.
or, you can also use regex as the record identifier. The `ekm_file` can contain regex patterns as keys to match multiple records that fit the regex pattern to provided format.

```json
{
Expand Down Expand Up @@ -299,8 +299,8 @@ $ ais start dsort '{
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
"description": "pack records into categorized shards",
"order_file": "http://website.web/static/order_file.txt",
"order_file_sep": " "
"ekm_file": "http://website.web/static/ekm_file.txt",
"ekm_file_sep": " "
}'
JGHEoo89gg
```
Expand All @@ -322,7 +322,7 @@ shard-dogs-0.tar:
```

EKM also supports [template syntax](/docs/batch.md#operations-on-multiple-selected-objects) to express output shard names.
For example, if `order_file` has content:
For example, if `ekm_file` has content:

```json
{
Expand Down
4 changes: 4 additions & 0 deletions docs/dsort.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Eg. if we have a tarball which contains files named: `file1.txt`, `file1.png`,
`file2.png`, then we would have 2 *records*: one for `file1` and one for
`file2`.

**Algorithm** - the sorting algorithm applied during the sorting phase of dSort. After dSort execution, all records within a shard, or across shards with adjacent indices, are guaranteed to be sorted according to the specified algorithm's order.

**External Key Map (EKM)** - a dSort feature that allows users to precisely control how records are packed into output shards. EKM provides a flexible mechanism to map each individual record to a specific shard based on rules defined in an external file.

**Extraction phase** - dSort has multiple phases in which it does the whole
operation. The first of them is **extraction**. In this phase, dSort is reading
input shards and looks inside them to get to the objects and metadata. Objects
Expand Down
4 changes: 2 additions & 2 deletions ext/dsort/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ type RequestSpec struct {
// Default: alphanumeric, increasing
Algorithm Algorithm `json:"algorithm" yaml:"algorithm"`
// Default: ""
OrderFileURL string `json:"order_file" yaml:"order_file"`
EKMFileURL string `json:"ekm_file" yaml:"ekm_file"`
// Default: "\t"
OrderFileSep string `json:"order_file_sep" yaml:"order_file_sep"`
EKMFileSep string `json:"ekm_file_sep" yaml:"ekm_file_sep"`
// Default: "80%"
MaxMemUsage string `json:"max_mem_usage" yaml:"max_mem_usage"`
// Default: calcMaxLimit()
Expand Down
Loading

0 comments on commit ccbeefe

Please sign in to comment.