Skip to content

Commit

Permalink
dsort: [API change] revise request spec (major upd)
Browse files Browse the repository at this point in the history
* (feature) reshard into a different format
  - default format
  - add output_extension
  - set input and output extensions via the respective namesake field
    or (input, output) templates, respectively
* API: remove/revise stream multiplier
* API: rename "extension" => "input_extension"
* refactor: extract functions, reduce stutter, shortne names
* hide parsed request
* refactor dsort handler
* parts four & five, prev. commit: e7c82a3
* separately:
  - downloader: close body
  - nlog println to always insert " "

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 6, 2023
1 parent 4cdf732 commit 5994072
Show file tree
Hide file tree
Showing 28 changed files with 469 additions and 407 deletions.
57 changes: 44 additions & 13 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,18 +1095,14 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
}
}

// POST /bucket operations (this cluster)

// Initialize bucket; if doesn't exist try creating it on the fly
// but only if it's a remote bucket (and user did not explicitly disallowed).
bckArgs := bckInitArgs{p: p, w: w, r: r, bck: bck, msg: msg, query: query}
bckArgs := bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AceObjLIST | apc.AceGET, msg: msg, query: query}
bckArgs.createAIS = false
if bck, err = bckArgs.initAndTry(); err != nil {
return
}

//
// {action} on bucket
// POST {action} on bucket
//
var xid string
switch msg.Action {
Expand Down Expand Up @@ -1293,16 +1289,17 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg

// init existing or create remote
// not calling `initAndTry` - delegating ais:from// props cloning to the separate method
func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Values,
bckTo *meta.Bck) (*meta.Bck, int, error) {
func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Values, bckTo *meta.Bck) (*meta.Bck, int, error) {
bckToArgs := bckInitArgs{p: p, w: w, r: r, bck: bckTo, perms: apc.AcePUT, query: query}
bckToArgs.createAIS = true

errCode, err := bckToArgs.init()
if err != nil && errCode != http.StatusNotFound {
p.writeErr(w, r, err, errCode)
return nil, 0, err
}
// creating (BMD-wise) remote destination on the fly

// remote bucket: create it (BMD-wise) on the fly
if errCode == http.StatusNotFound && bckTo.IsRemote() {
if bckTo, err = bckToArgs.try(); err != nil {
return nil, 0, err
Expand Down Expand Up @@ -2869,14 +2866,48 @@ func (p *proxy) dsortHandler(w http.ResponseWriter, r *http.Request) {

switch r.Method {
case http.MethodPost:
p.proxyStartSortHandler(w, r)
// - validate request, check input_bck and output_bck
// - start dsort
rs := &dsort.RequestSpec{}
if cmn.ReadJSON(w, r, rs) != nil {
return
}
parsc, err := rs.ParseCtx()
if err != nil {
p.writeErr(w, r, err)
return
}
bck := meta.CloneBck(&parsc.InputBck)
args := bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AceObjLIST | apc.AceGET}
if _, err = args.initAndTry(); err != nil {
return
}
if !parsc.OutputBck.Equal(&parsc.InputBck) {
bckTo := meta.CloneBck(&parsc.OutputBck)
bckTo, errCode, err := p.initBckTo(w, r, nil /*query*/, bckTo)
if err != nil {
return
}
if errCode == http.StatusNotFound {
// TODO -- FIXME: reuse common prxtxn logic to create
if true {
p.writeErr(w, r, cmn.NewErrRemoteBckNotFound(&parsc.OutputBck), http.StatusNotFound)
return
}
if p.forwardCP(w, r, nil /*msg*/, "dsort-create-output-bck") { // to create
return
}
nlog.Warningf("%s: output_bck %s will be created with the input_bck (%s) props", p, bckTo, bck)
}
}
dsort.PstartHandler(w, r, parsc)
case http.MethodGet:
dsort.ProxyGetHandler(w, r)
dsort.PgetHandler(w, r)
case http.MethodDelete:
if len(apiItems) == 1 && apiItems[0] == apc.Abort {
dsort.ProxyAbortSortHandler(w, r)
dsort.PabortHandler(w, r)
} else if len(apiItems) == 0 {
dsort.ProxyRemoveSortHandler(w, r)
dsort.PremoveHandler(w, r)
} else {
p.writeErrURL(w, r)
}
Expand Down
41 changes: 0 additions & 41 deletions ais/prxdsort.go

This file was deleted.

5 changes: 2 additions & 3 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ func (df *dsortFramework) init() {
if df.outputTempl == "" {
df.outputTempl = "output-{00000..10000}"
}
// NOTE: default extension/format/MIME
if df.extension == "" {
df.extension = archive.ExtTar
df.extension = dsort.DefaultExt
}

// Assumption is that all prefixes end with dash: "-"
Expand Down Expand Up @@ -243,7 +242,7 @@ func (df *dsortFramework) gen() dsort.RequestSpec {
Description: generateDSortDesc(),
InputBck: df.m.bck,
OutputBck: df.outputBck,
Extension: df.extension,
InputExtension: df.extension,
InputFormat: df.inputTempl,
OutputFormat: df.outputTempl,
OutputShardSize: df.outputShardSize,
Expand Down
2 changes: 1 addition & 1 deletion ais/test/scripts/dsort-ex1-spec.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"extension": ".tar",
"input_extension": ".tar",
"bck": {
"name": "src"
},
Expand Down
9 changes: 6 additions & 3 deletions ais/test/scripts/dsort-ex1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ while (( "$#" )); do
case "${1}" in
--srcbck) srcbck=$2; shift; shift;;
--dstbck) dstbck=$2; shift; shift;;
--nocleanup) nocleanup="true"; shift;;
*) echo "fatal: unknown argument '${1}'"; exit 1;;
esac
done
Expand All @@ -35,6 +36,8 @@ num=$(ais ls $dstbck --summary --H | awk '{print $3}')
echo "Successfully resharded $srcbck => $dstbck:"
ais ls $dstbck

## _not_ to remove test buckets comment out the following 2 lines
echo "Cleanup: deleting $srcbck and $dstbck"
ais rmb $srcbck $dstbck -y 2>/dev/null 1>&2
## cleanup
if [[ ${nocleanup} != "true" ]]; then
echo "Cleanup: deleting $srcbck and $dstbck"
ais rmb $srcbck $dstbck -y 2>/dev/null 1>&2
fi
2 changes: 1 addition & 1 deletion cmd/aisfs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

// direct
require (
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e
github.com/OneOfOne/xxhash v1.2.8
github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f
github.com/jacobsa/fuse v0.0.0-20230124164109-5e0f2e6b432b
Expand Down
4 changes: 2 additions & 2 deletions cmd/aisfs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb h1:2M+63GS26FaYZRXEahMpGd+dZ8Od4IC4tssLRFaAFBY=
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e h1:41esHpnBnfdZPOyrS8fg0zbd4npVggfvyvUcmSZtK2Y=
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/cli/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

const (
dsortExampleJ = `$ ais start dsort '{
"extension": ".tar",
"input_extension": ".tar",
"input_bck": {"name": "dsort-testing"},
"input_format": {"template": "shard-{0..9}"},
"output_shard_size": "200KB",
Expand All @@ -41,7 +41,7 @@ const (
"order_file_sep": " "
}'`
dsortExampleY = `$ ais start dsort -f - <<EOM
extension: .tar
input_extension: .tar
input_bck:
name: dsort-testing
input_format:
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

// direct
require (
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e
github.com/fatih/color v1.14.1
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb h1:2M+63GS26FaYZRXEahMpGd+dZ8Od4IC4tssLRFaAFBY=
github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e h1:41esHpnBnfdZPOyrS8fg0zbd4npVggfvyvUcmSZtK2Y=
github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
8 changes: 4 additions & 4 deletions cmd/cli/test/dsort.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ ais job start dsort // FAIL "missing "[json"
ais job start dsort '{}' -f "path" // FAIL "failed to parse"

ais job start dsort -f /path/to/file.json // FAIL "open /path/to/file.json: no such file or directory"
ais job start dsort '{extension: .tar' // FAIL "failed to determine the type of the job specification"
ais job start dsort '{input_extension: .tar' // FAIL "failed to determine the type of the job specification"

ais bucket create ais://$BUCKET_1 // IGNORE

# JSON
echo '{"extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' > /tmp/dsort.json
echo '{"input_extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' > /tmp/dsort.json
ais job start dsort -f /tmp/dsort.json // IGNORE
rm -f /tmp/dsort.json
ais job start dsort '{"extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' // SAVE_RESULT
ais job start dsort '{"input_extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' // SAVE_RESULT
ais wait $RESULT

# YAML
echo -e "extension: .tar\ninput_bck:\n name: $BUCKET_1\ninput_format:\n template: shard-{0..9}\noutput_format: new-shard-{0000..1000}\noutput_shard_size: 10KB\ndescription: sort shards from 0 to 9\nalgorithm:\n kind: alphanumeric\nextended_metrics: true\n" > /tmp/dsort.yaml
echo -e "input_extension: .tar\ninput_bck:\n name: $BUCKET_1\ninput_format:\n template: shard-{0..9}\noutput_format: new-shard-{0000..1000}\noutput_shard_size: 10KB\ndescription: sort shards from 0 to 9\nalgorithm:\n kind: alphanumeric\nextended_metrics: true\n" > /tmp/dsort.yaml
ais job start dsort -f /tmp/dsort.yaml // IGNORE
rm -f /tmp/dsort.yaml
4 changes: 2 additions & 2 deletions cmn/nlog/nlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ func formatHdr(s severity, depth int, fb *fixed) {
func sprintf(sev severity, depth int, format string, fb *fixed, args ...any) {
formatHdr(sev, depth+1, fb)
if format == "" {
fmt.Fprint(fb, args...)
fmt.Fprintln(fb, args...)
} else {
fmt.Fprintf(fb, format, args...)
fb.eol()
}
fb.eol()
}

// mem pool of additional buffers
Expand Down
1 change: 0 additions & 1 deletion docs/cli/dsort.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ order_file_sep \t
output_bck ais://dst
output_format new-shard-{0000..1000}
output_shard_size 10KB
stream_multiplier 0

Config override: none

Expand Down
13 changes: 9 additions & 4 deletions ext/dload/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (task *singleTask) download(lom *cluster.LOM, config *cmn.Config) {
task.xdl.ObjsAdd(1, task.currentSize.Load())
}

func (task *singleTask) tryDownloadLocal(lom *cluster.LOM, timeout time.Duration) (bool /*err is fatal*/, error) {
func (task *singleTask) _dlocal(lom *cluster.LOM, timeout time.Duration) (bool /*err is fatal*/, error) {
ctx, cancel := context.WithTimeout(task.downloadCtx, timeout)
defer cancel()

Expand All @@ -120,12 +120,17 @@ func (task *singleTask) tryDownloadLocal(lom *cluster.LOM, timeout time.Duration
req.Header.Add("User-Agent", gcsUA)
}

resp, err := clientForURL(task.obj.link).Do(req)
resp, err := clientForURL(task.obj.link).Do(req) //nolint:bodyclose // cos.Close
if err != nil {
return false, err
}
defer resp.Body.Close()

fatal, err := task._dput(lom, req, resp)
cos.Close(resp.Body)
return fatal, err
}

func (task *singleTask) _dput(lom *cluster.LOM, req *http.Request, resp *http.Response) (bool /*err is fatal*/, error) {
if resp.StatusCode >= http.StatusBadRequest {
if resp.StatusCode == http.StatusNotFound {
return false, cmn.NewErrHTTP(req, fmt.Errorf("%q does not exist", task.obj.link), http.StatusNotFound)
Expand Down Expand Up @@ -164,7 +169,7 @@ func (task *singleTask) downloadLocal(lom *cluster.LOM) (err error) {
fatal bool
)
for i := 0; i < retryCnt; i++ {
fatal, err = task.tryDownloadLocal(lom, timeout)
fatal, err = task._dlocal(lom, timeout)
if err == nil || fatal {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions ext/dload/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ func headLink(link string) (*http.Response, error) {
func CompareObjects(lom *cluster.LOM, dst *DstElement) (equal bool, err error) {
var oa *cmn.ObjAttrs
if dst.Link != "" {
resp, errHead := headLink(dst.Link)
resp, errHead := headLink(dst.Link) //nolint:bodyclose // cos.Close
if errHead != nil {
return false, errHead
}
resp.Body.Close()
cos.Close(resp.Body)
oa = &cmn.ObjAttrs{}
oa.Size = attrsFromLink(dst.Link, resp, oa)
} else {
Expand Down
17 changes: 12 additions & 5 deletions ext/dsort/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package dsort
import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/archive"
)

const DefaultExt = archive.ExtTar // default shard extension/format/MIME when spec's input_extension is empty

const (
algDefault = "" // default (alphanumeric, increasing)
Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing)
Expand All @@ -30,8 +33,9 @@ type Algorithm struct {
// when sort is a random shuffle
Seed string `json:"seed"`

// exclusively with Content sorting
// e.g. usage: ".cls" to provide sorting key for each record (sample) - see next
// usage: exclusively for Content sorting
// e.g.: ".cls" containing sorting key for each record (sample) - see next
// NOTE: not to confuse with shards "input_extension"
Ext string `json:"extension"`

// ditto: Content only
Expand All @@ -43,12 +47,17 @@ type Algorithm struct {
type RequestSpec struct {
// Required
InputBck cmn.Bck `json:"input_bck" yaml:"input_bck"`
Extension string `json:"extension" yaml:"extension"`
InputFormat apc.ListRange `json:"input_format" yaml:"input_format"`
OutputFormat string `json:"output_format" yaml:"output_format"`
OutputShardSize string `json:"output_shard_size" yaml:"output_shard_size"`

// Desirable
InputExtension string `json:"input_extension" yaml:"input_extension"`

// Optional
// Default: InputExtension
OutputExtension string `json:"output_extension" yaml:"output_extension"`
// Default: ""
Description string `json:"description" yaml:"description"`
// Default: same as `bck` field
OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"`
Expand All @@ -64,8 +73,6 @@ type RequestSpec struct {
ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"`
// Default: calcMaxLimit()
CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"`
// Default: bundle.Multiplier
StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"`
// Default: false
ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"`

Expand Down
Loading

0 comments on commit 5994072

Please sign in to comment.