From 5994072ca79b5851e98b9f46566f90cac1fc9b38 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 6 Aug 2023 14:19:02 -0400 Subject: [PATCH] dsort: [API change] revise request spec (major upd) * (feature) reshard into a different format - default format - add output_extension - set input and output extensions via the respective namesake field or (input, output) templates, respectively * API: remove/revise stream multiplier * API: rename "extension" => "input_extension" * refactor: extract functions, reduce stutter, shortne names * hide parsed request * refactor dsort handler * parts four & five, prev. commit: e7c82a392e28f9f * separately: - downloader: close body - nlog println to always insert " " Signed-off-by: Alex Aizman --- ais/proxy.go | 57 ++- ais/prxdsort.go | 41 --- ais/test/dsort_test.go | 5 +- ais/test/scripts/dsort-ex1-spec.json | 2 +- ais/test/scripts/dsort-ex1.sh | 9 +- cmd/aisfs/go.mod | 2 +- cmd/aisfs/go.sum | 4 +- cmd/cli/cli/dsort.go | 4 +- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 +- cmd/cli/test/dsort.in | 8 +- cmn/nlog/nlog.go | 4 +- docs/cli/dsort.md | 1 - ext/dload/task.go | 13 +- ext/dload/utils.go | 4 +- ext/dsort/api.go | 17 +- ext/dsort/dsort.go | 30 +- ext/dsort/dsort_general.go | 6 +- ext/dsort/dsort_mem.go | 8 +- ext/dsort/dsort_test.go | 12 +- ext/dsort/extract/record.go | 2 +- ext/dsort/handler.go | 327 +++++++++--------- ext/dsort/manager.go | 10 +- ext/dsort/manager_group_test.go | 3 +- ext/dsort/manager_test.go | 12 +- ext/dsort/request_spec.go | 130 +++++-- ext/dsort/request_spec_test.go | 157 ++++----- .../tests/integration/sdk/test_dsort_ops.py | 2 +- 28 files changed, 469 insertions(+), 407 deletions(-) delete mode 100644 ais/prxdsort.go diff --git a/ais/proxy.go b/ais/proxy.go index 4f052f602e..ffee58e0ea 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -1095,18 +1095,14 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg } } - // POST /bucket operations (this cluster) - - // Initialize bucket; if doesn't exist try creating it on the fly - // but only if it's a remote bucket (and user did not explicitly disallowed). - bckArgs := bckInitArgs{p: p, w: w, r: r, bck: bck, msg: msg, query: query} + bckArgs := bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AceObjLIST | apc.AceGET, msg: msg, query: query} bckArgs.createAIS = false if bck, err = bckArgs.initAndTry(); err != nil { return } // - // {action} on bucket + // POST {action} on bucket // var xid string switch msg.Action { @@ -1293,16 +1289,17 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg // init existing or create remote // not calling `initAndTry` - delegating ais:from// props cloning to the separate method -func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Values, - bckTo *meta.Bck) (*meta.Bck, int, error) { +func (p *proxy) initBckTo(w http.ResponseWriter, r *http.Request, query url.Values, bckTo *meta.Bck) (*meta.Bck, int, error) { bckToArgs := bckInitArgs{p: p, w: w, r: r, bck: bckTo, perms: apc.AcePUT, query: query} bckToArgs.createAIS = true + errCode, err := bckToArgs.init() if err != nil && errCode != http.StatusNotFound { p.writeErr(w, r, err, errCode) return nil, 0, err } - // creating (BMD-wise) remote destination on the fly + + // remote bucket: create it (BMD-wise) on the fly if errCode == http.StatusNotFound && bckTo.IsRemote() { if bckTo, err = bckToArgs.try(); err != nil { return nil, 0, err @@ -2869,14 +2866,48 @@ func (p *proxy) dsortHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: - p.proxyStartSortHandler(w, r) + // - validate request, check input_bck and output_bck + // - start dsort + rs := &dsort.RequestSpec{} + if cmn.ReadJSON(w, r, rs) != nil { + return + } + parsc, err := rs.ParseCtx() + if err != nil { + p.writeErr(w, r, err) + return + } + bck := meta.CloneBck(&parsc.InputBck) + args := bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AceObjLIST | apc.AceGET} + if _, err = args.initAndTry(); err != nil { + return + } + if !parsc.OutputBck.Equal(&parsc.InputBck) { + bckTo := meta.CloneBck(&parsc.OutputBck) + bckTo, errCode, err := p.initBckTo(w, r, nil /*query*/, bckTo) + if err != nil { + return + } + if errCode == http.StatusNotFound { + // TODO -- FIXME: reuse common prxtxn logic to create + if true { + p.writeErr(w, r, cmn.NewErrRemoteBckNotFound(&parsc.OutputBck), http.StatusNotFound) + return + } + if p.forwardCP(w, r, nil /*msg*/, "dsort-create-output-bck") { // to create + return + } + nlog.Warningf("%s: output_bck %s will be created with the input_bck (%s) props", p, bckTo, bck) + } + } + dsort.PstartHandler(w, r, parsc) case http.MethodGet: - dsort.ProxyGetHandler(w, r) + dsort.PgetHandler(w, r) case http.MethodDelete: if len(apiItems) == 1 && apiItems[0] == apc.Abort { - dsort.ProxyAbortSortHandler(w, r) + dsort.PabortHandler(w, r) } else if len(apiItems) == 0 { - dsort.ProxyRemoveSortHandler(w, r) + dsort.PremoveHandler(w, r) } else { p.writeErrURL(w, r) } diff --git a/ais/prxdsort.go b/ais/prxdsort.go deleted file mode 100644 index 22dbb74218..0000000000 --- a/ais/prxdsort.go +++ /dev/null @@ -1,41 +0,0 @@ -// Package ais provides core functionality for the AIStore object storage. -/* - * Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. - */ -package ais - -import ( - "net/http" - - "github.com/NVIDIA/aistore/api/apc" - "github.com/NVIDIA/aistore/cluster/meta" - "github.com/NVIDIA/aistore/cmn" - "github.com/NVIDIA/aistore/ext/dsort" -) - -// POST /v1/sort -func (p *proxy) proxyStartSortHandler(w http.ResponseWriter, r *http.Request) { - rs := &dsort.RequestSpec{} - if cmn.ReadJSON(w, r, rs) != nil { - return - } - parsedRS, err := rs.Parse() - if err != nil { - p.writeErr(w, r, err) - return - } - - bck := meta.CloneBck(&parsedRS.Bck) - args := bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AceObjLIST | apc.AceGET} - if _, err = args.initAndTry(); err != nil { - return - } - - bck = meta.CloneBck(&parsedRS.OutputBck) - args = bckInitArgs{p: p, w: w, r: r, bck: bck, perms: apc.AcePUT} - if _, err = args.initAndTry(); err != nil { - return - } - - dsort.ProxyStartSortHandler(w, r, parsedRS) -} diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index ad5c790502..10d6a5369e 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -202,9 +202,8 @@ func (df *dsortFramework) init() { if df.outputTempl == "" { df.outputTempl = "output-{00000..10000}" } - // NOTE: default extension/format/MIME if df.extension == "" { - df.extension = archive.ExtTar + df.extension = dsort.DefaultExt } // Assumption is that all prefixes end with dash: "-" @@ -243,7 +242,7 @@ func (df *dsortFramework) gen() dsort.RequestSpec { Description: generateDSortDesc(), InputBck: df.m.bck, OutputBck: df.outputBck, - Extension: df.extension, + InputExtension: df.extension, InputFormat: df.inputTempl, OutputFormat: df.outputTempl, OutputShardSize: df.outputShardSize, diff --git a/ais/test/scripts/dsort-ex1-spec.json b/ais/test/scripts/dsort-ex1-spec.json index bb849bd8e9..5b6baa2aa0 100644 --- a/ais/test/scripts/dsort-ex1-spec.json +++ b/ais/test/scripts/dsort-ex1-spec.json @@ -1,5 +1,5 @@ { - "extension": ".tar", + "input_extension": ".tar", "bck": { "name": "src" }, diff --git a/ais/test/scripts/dsort-ex1.sh b/ais/test/scripts/dsort-ex1.sh index eac946f056..372ab9d67f 100755 --- a/ais/test/scripts/dsort-ex1.sh +++ b/ais/test/scripts/dsort-ex1.sh @@ -13,6 +13,7 @@ while (( "$#" )); do case "${1}" in --srcbck) srcbck=$2; shift; shift;; --dstbck) dstbck=$2; shift; shift;; + --nocleanup) nocleanup="true"; shift;; *) echo "fatal: unknown argument '${1}'"; exit 1;; esac done @@ -35,6 +36,8 @@ num=$(ais ls $dstbck --summary --H | awk '{print $3}') echo "Successfully resharded $srcbck => $dstbck:" ais ls $dstbck -## _not_ to remove test buckets comment out the following 2 lines -echo "Cleanup: deleting $srcbck and $dstbck" -ais rmb $srcbck $dstbck -y 2>/dev/null 1>&2 +## cleanup +if [[ ${nocleanup} != "true" ]]; then + echo "Cleanup: deleting $srcbck and $dstbck" + ais rmb $srcbck $dstbck -y 2>/dev/null 1>&2 +fi diff --git a/cmd/aisfs/go.mod b/cmd/aisfs/go.mod index 1d59182878..73bcc81070 100644 --- a/cmd/aisfs/go.mod +++ b/cmd/aisfs/go.mod @@ -4,7 +4,7 @@ go 1.20 // direct require ( - github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb + github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e github.com/OneOfOne/xxhash v1.2.8 github.com/jacobsa/daemonize v0.0.0-20160101105449-e460293e890f github.com/jacobsa/fuse v0.0.0-20230124164109-5e0f2e6b432b diff --git a/cmd/aisfs/go.sum b/cmd/aisfs/go.sum index 08b6179fbf..b436920ed3 100644 --- a/cmd/aisfs/go.sum +++ b/cmd/aisfs/go.sum @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb h1:2M+63GS26FaYZRXEahMpGd+dZ8Od4IC4tssLRFaAFBY= -github.com/NVIDIA/aistore v1.3.19-0.20230803180608-b1c54338c1eb/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= +github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e h1:41esHpnBnfdZPOyrS8fg0zbd4npVggfvyvUcmSZtK2Y= +github.com/NVIDIA/aistore v1.3.19-0.20230806172900-eea04958543e/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= diff --git a/cmd/cli/cli/dsort.go b/cmd/cli/cli/dsort.go index 25a37b925e..0eee6bdb65 100644 --- a/cmd/cli/cli/dsort.go +++ b/cmd/cli/cli/dsort.go @@ -32,7 +32,7 @@ import ( const ( dsortExampleJ = `$ ais start dsort '{ - "extension": ".tar", + "input_extension": ".tar", "input_bck": {"name": "dsort-testing"}, "input_format": {"template": "shard-{0..9}"}, "output_shard_size": "200KB", @@ -41,7 +41,7 @@ const ( "order_file_sep": " " }'` dsortExampleY = `$ ais start dsort -f - < /tmp/dsort.json +echo '{"input_extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' > /tmp/dsort.json ais job start dsort -f /tmp/dsort.json // IGNORE rm -f /tmp/dsort.json -ais job start dsort '{"extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' // SAVE_RESULT +ais job start dsort '{"input_extension": ".tar","input_bck": {name: "$BUCKET_1"},"input_format": {"template": "shard-{0..9}"},"output_format": "new-shard-{0000..1000}","output_shard_size": "10KB","description": "sort shards from 0 to 9","algorithm": {"kind": "alphanumeric"},"extended_metrics": true}' // SAVE_RESULT ais wait $RESULT # YAML -echo -e "extension: .tar\ninput_bck:\n name: $BUCKET_1\ninput_format:\n template: shard-{0..9}\noutput_format: new-shard-{0000..1000}\noutput_shard_size: 10KB\ndescription: sort shards from 0 to 9\nalgorithm:\n kind: alphanumeric\nextended_metrics: true\n" > /tmp/dsort.yaml +echo -e "input_extension: .tar\ninput_bck:\n name: $BUCKET_1\ninput_format:\n template: shard-{0..9}\noutput_format: new-shard-{0000..1000}\noutput_shard_size: 10KB\ndescription: sort shards from 0 to 9\nalgorithm:\n kind: alphanumeric\nextended_metrics: true\n" > /tmp/dsort.yaml ais job start dsort -f /tmp/dsort.yaml // IGNORE rm -f /tmp/dsort.yaml diff --git a/cmn/nlog/nlog.go b/cmn/nlog/nlog.go index 21b666c159..1df580064e 100644 --- a/cmn/nlog/nlog.go +++ b/cmn/nlog/nlog.go @@ -283,11 +283,11 @@ func formatHdr(s severity, depth int, fb *fixed) { func sprintf(sev severity, depth int, format string, fb *fixed, args ...any) { formatHdr(sev, depth+1, fb) if format == "" { - fmt.Fprint(fb, args...) + fmt.Fprintln(fb, args...) } else { fmt.Fprintf(fb, format, args...) + fb.eol() } - fb.eol() } // mem pool of additional buffers diff --git a/docs/cli/dsort.md b/docs/cli/dsort.md index e6b64661d4..13511f8769 100644 --- a/docs/cli/dsort.md +++ b/docs/cli/dsort.md @@ -102,7 +102,6 @@ order_file_sep \t output_bck ais://dst output_format new-shard-{0000..1000} output_shard_size 10KB -stream_multiplier 0 Config override: none diff --git a/ext/dload/task.go b/ext/dload/task.go index 38b5376cda..41b3a6baad 100644 --- a/ext/dload/task.go +++ b/ext/dload/task.go @@ -103,7 +103,7 @@ func (task *singleTask) download(lom *cluster.LOM, config *cmn.Config) { task.xdl.ObjsAdd(1, task.currentSize.Load()) } -func (task *singleTask) tryDownloadLocal(lom *cluster.LOM, timeout time.Duration) (bool /*err is fatal*/, error) { +func (task *singleTask) _dlocal(lom *cluster.LOM, timeout time.Duration) (bool /*err is fatal*/, error) { ctx, cancel := context.WithTimeout(task.downloadCtx, timeout) defer cancel() @@ -120,12 +120,17 @@ func (task *singleTask) tryDownloadLocal(lom *cluster.LOM, timeout time.Duration req.Header.Add("User-Agent", gcsUA) } - resp, err := clientForURL(task.obj.link).Do(req) + resp, err := clientForURL(task.obj.link).Do(req) //nolint:bodyclose // cos.Close if err != nil { return false, err } - defer resp.Body.Close() + fatal, err := task._dput(lom, req, resp) + cos.Close(resp.Body) + return fatal, err +} + +func (task *singleTask) _dput(lom *cluster.LOM, req *http.Request, resp *http.Response) (bool /*err is fatal*/, error) { if resp.StatusCode >= http.StatusBadRequest { if resp.StatusCode == http.StatusNotFound { return false, cmn.NewErrHTTP(req, fmt.Errorf("%q does not exist", task.obj.link), http.StatusNotFound) @@ -164,7 +169,7 @@ func (task *singleTask) downloadLocal(lom *cluster.LOM) (err error) { fatal bool ) for i := 0; i < retryCnt; i++ { - fatal, err = task.tryDownloadLocal(lom, timeout) + fatal, err = task._dlocal(lom, timeout) if err == nil || fatal { return err } diff --git a/ext/dload/utils.go b/ext/dload/utils.go index 71069112c4..6c183cf623 100644 --- a/ext/dload/utils.go +++ b/ext/dload/utils.go @@ -240,11 +240,11 @@ func headLink(link string) (*http.Response, error) { func CompareObjects(lom *cluster.LOM, dst *DstElement) (equal bool, err error) { var oa *cmn.ObjAttrs if dst.Link != "" { - resp, errHead := headLink(dst.Link) + resp, errHead := headLink(dst.Link) //nolint:bodyclose // cos.Close if errHead != nil { return false, errHead } - resp.Body.Close() + cos.Close(resp.Body) oa = &cmn.ObjAttrs{} oa.Size = attrsFromLink(dst.Link, resp, oa) } else { diff --git a/ext/dsort/api.go b/ext/dsort/api.go index 1a1db03b6d..20eae1e915 100644 --- a/ext/dsort/api.go +++ b/ext/dsort/api.go @@ -7,8 +7,11 @@ package dsort import ( "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/archive" ) +const DefaultExt = archive.ExtTar // default shard extension/format/MIME when spec's input_extension is empty + const ( algDefault = "" // default (alphanumeric, increasing) Alphanumeric = "alphanumeric" // string comparison (decreasing or increasing) @@ -30,8 +33,9 @@ type Algorithm struct { // when sort is a random shuffle Seed string `json:"seed"` - // exclusively with Content sorting - // e.g. usage: ".cls" to provide sorting key for each record (sample) - see next + // usage: exclusively for Content sorting + // e.g.: ".cls" containing sorting key for each record (sample) - see next + // NOTE: not to confuse with shards "input_extension" Ext string `json:"extension"` // ditto: Content only @@ -43,12 +47,17 @@ type Algorithm struct { type RequestSpec struct { // Required InputBck cmn.Bck `json:"input_bck" yaml:"input_bck"` - Extension string `json:"extension" yaml:"extension"` InputFormat apc.ListRange `json:"input_format" yaml:"input_format"` OutputFormat string `json:"output_format" yaml:"output_format"` OutputShardSize string `json:"output_shard_size" yaml:"output_shard_size"` + // Desirable + InputExtension string `json:"input_extension" yaml:"input_extension"` + // Optional + // Default: InputExtension + OutputExtension string `json:"output_extension" yaml:"output_extension"` + // Default: "" Description string `json:"description" yaml:"description"` // Default: same as `bck` field OutputBck cmn.Bck `json:"output_bck" yaml:"output_bck"` @@ -64,8 +73,6 @@ type RequestSpec struct { ExtractConcMaxLimit int `json:"extract_concurrency_max_limit" yaml:"extract_concurrency_max_limit"` // Default: calcMaxLimit() CreateConcMaxLimit int `json:"create_concurrency_max_limit" yaml:"create_concurrency_max_limit"` - // Default: bundle.Multiplier - StreamMultiplier int `json:"stream_multiplier" yaml:"stream_multiplier"` // Default: false ExtendedMetrics bool `json:"extended_metrics" yaml:"extended_metrics"` diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index 752aa49c4a..ab1d03164d 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -302,11 +302,9 @@ func (m *Manager) createShard(s *extract.Shard, lom *cluster.LOM) (err error) { }() ec := m.ec - if m.pars.Extension == "" { - ext, err := archive.Mime("", lom.FQN) - debug.AssertNoErr(err) - // NOTE: extract-creator for _this_ output shard (compare with extractShard._do) - ec = newExtractCreator(m.ctx.t, ext) + if m.pars.InputExtension != m.pars.OutputExtension { + // NOTE: resharding into a different format + ec = newExtractCreator(m.ctx.t, m.pars.OutputExtension) } _, err = ec.Create(s, w, m.dsorter) @@ -578,7 +576,13 @@ func (m *Manager) generateShardsWithTemplate(maxSize int64) ([]*extract.Shard, e return nil, errors.Errorf("number of shards to be created exceeds expected number of shards (%d)", shardCount) } shard := &extract.Shard{ - Name: name + m.pars.Extension, + Name: name, + } + ext, err := archive.Mime("", name) + if err == nil { + debug.Assert(m.pars.OutputExtension == ext) + } else { + shard.Name = name + m.pars.OutputExtension } shard.Size = curShardSize @@ -839,7 +843,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*extract.Shard, order map[string]*ex return err }) group.Go(func() error { - query := m.pars.Bck.AddToQuery(nil) + query := m.pars.InputBck.AddToQuery(nil) reqArgs := &cmn.HreqArgs{ Method: http.MethodPost, Base: si.URL(cmn.NetIntraData), @@ -893,18 +897,18 @@ type extractShard struct { func (es *extractShard) do() (err error) { m := es.m shardName := es.name - if es.isRange && m.pars.Extension != "" { + if es.isRange && m.pars.InputExtension != "" { ext, errV := archive.Mime("", es.name) // from filename if errV == nil { - if !archive.EqExt(ext, m.pars.Extension) { + if !archive.EqExt(ext, m.pars.InputExtension) { if m.config.FastV(4, cos.SmoduleDsort) { nlog.Infof("%s: %s skipping %s: %q vs %q", m.ctx.t, m.ManagerUUID, - es.name, ext, m.pars.Extension) + es.name, ext, m.pars.InputExtension) } return } } else { - shardName = es.name + m.pars.Extension + shardName = es.name + m.pars.InputExtension } } lom := cluster.AllocLOM(shardName) @@ -923,7 +927,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error { estimateTotalRecordsSize uint64 warnPossibleOOM bool ) - if err := lom.InitBck(&m.pars.Bck); err != nil { + if err := lom.InitBck(&m.pars.InputBck); err != nil { return err } if _, local, err := lom.HrwTarget(m.smap); err != nil || !local { @@ -938,7 +942,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error { } ec := m.ec - if m.pars.Extension == "" { + if m.pars.InputExtension == "" { ext, err := archive.Mime("", lom.FQN) if err != nil { return nil // skip diff --git a/ext/dsort/dsort_general.go b/ext/dsort/dsort_general.go index ea3edb7eeb..614d43a233 100644 --- a/ext/dsort/dsort_general.go +++ b/ext/dsort/dsort_general.go @@ -146,12 +146,8 @@ func (ds *dsorterGeneral) start() error { } trname = fmt.Sprintf(recvRespStreamNameFmt, ds.m.ManagerUUID) - streamMultiplier := config.DSort.SbundleMult - if ds.m.pars.StreamMultiplier != 0 { - streamMultiplier = ds.m.pars.StreamMultiplier - } respSbArgs := bundle.Args{ - Multiplier: streamMultiplier, + Multiplier: ds.m.pars.SbundleMult, Net: respNetwork, Trname: trname, Ntype: cluster.Targets, diff --git a/ext/dsort/dsort_mem.go b/ext/dsort/dsort_mem.go index dac81dd46d..6383ae2af7 100644 --- a/ext/dsort/dsort_mem.go +++ b/ext/dsort/dsort_mem.go @@ -208,13 +208,9 @@ func (ds *dsorterMem) start() error { client := transport.NewIntraDataClient() - streamMultiplier := config.DSort.SbundleMult - if ds.m.pars.StreamMultiplier != 0 { - streamMultiplier = ds.m.pars.StreamMultiplier - } trname := fmt.Sprintf(recvReqStreamNameFmt, ds.m.ManagerUUID) reqSbArgs := bundle.Args{ - Multiplier: streamMultiplier, + Multiplier: ds.m.pars.SbundleMult, Net: reqNetwork, Trname: trname, Ntype: cluster.Targets, @@ -225,7 +221,7 @@ func (ds *dsorterMem) start() error { trname = fmt.Sprintf(recvRespStreamNameFmt, ds.m.ManagerUUID) respSbArgs := bundle.Args{ - Multiplier: streamMultiplier, + Multiplier: ds.m.pars.SbundleMult, Net: respNetwork, Trname: trname, Ntype: cluster.Targets, diff --git a/ext/dsort/dsort_test.go b/ext/dsort/dsort_test.go index 984ee6d077..8848dc51ad 100644 --- a/ext/dsort/dsort_test.go +++ b/ext/dsort/dsort_test.go @@ -146,8 +146,8 @@ type testContext struct { func newTargetMock(daemonID string, smap *testSmap) *targetNodeMock { // Initialize dSort manager - rs := &ParsedRequestSpec{ - Extension: archive.ExtTar, + rs := &parsedReqSpec{ + InputExtension: archive.ExtTar, Algorithm: &Algorithm{ ContentKeyType: extract.ContentKeyString, }, @@ -454,14 +454,14 @@ var _ = Describe("Distributed Sort", func() { manager, exists := target.managers.Get(globalManagerUUID) Expect(exists).To(BeTrue()) - rs := &ParsedRequestSpec{ + rs := &parsedReqSpec{ Algorithm: &Algorithm{ Decreasing: true, ContentKeyType: extract.ContentKeyString, }, - Extension: archive.ExtTar, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, - DSorterType: DSorterGeneralType, + InputExtension: archive.ExtTar, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, + DSorterType: DSorterGeneralType, } ctx.node = ctx.smapOwner.Get().Tmap[target.daemonID] manager.lock() diff --git a/ext/dsort/extract/record.go b/ext/dsort/extract/record.go index 272a286948..24fb67057c 100644 --- a/ext/dsort/extract/record.go +++ b/ext/dsort/extract/record.go @@ -65,7 +65,7 @@ type ( // Record represents the metadata corresponding to a single file from a shard. Record struct { Key any `msg:"k" json:"k"` // Used to determine the sorting order. - Name string `msg:"n" json:"n"` // Name which uniquely identifies record across all shards. + Name string `msg:"n" json:"n"` // Name that uniquely identifies record across all shards. DaemonID string `msg:"d" json:"d"` // ID of the target which maintains the contents for this record. // All objects associated with given record. Record can be composed of // multiple objects which have the same name but different extension. diff --git a/ext/dsort/handler.go b/ext/dsort/handler.go index b3f7701f84..bd13a7b125 100644 --- a/ext/dsort/handler.go +++ b/ext/dsort/handler.go @@ -38,8 +38,11 @@ type response struct { ////////////////// // POST /v1/sort -func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, pars *ParsedRequestSpec) { - var err error +func PstartHandler(w http.ResponseWriter, r *http.Request, parsc *ParsedReq) { + var ( + err error + pars = parsc.pars + ) pars.TargetOrderSalt = []byte(cos.FormatNowStamp()) // TODO: handle case when bucket was removed during dSort job - this should @@ -47,7 +50,7 @@ func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, pars *ParsedR // This would also be helpful for Downloader (in the middle of downloading // large file the bucket can be easily deleted). - pars.DSorterType, err = determineDSorterType(pars) + pars.DSorterType, err = dsorterType(pars) if err != nil { cmn.WriteErr(w, r, err) return @@ -64,25 +67,6 @@ func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, pars *ParsedR managerUUID = PrefixJobID + cos.GenUUID() // compare w/ p.httpdlpost smap = ctx.smapOwner.Get() ) - checkResponses := func(responses []response) error { - for _, resp := range responses { - if resp.err == nil { - continue - } - nlog.Errorf("[%s] start sort request failed to be broadcast, err: %s", - managerUUID, resp.err.Error()) - - path := apc.URLPathdSortAbort.Join(managerUUID) - broadcastTargets(http.MethodDelete, path, nil, nil, smap) - - s := fmt.Sprintf("failed to execute start sort, err: %s, status: %d", - resp.err.Error(), resp.statusCode) - cmn.WriteErrMsg(w, r, s, http.StatusInternalServerError) - return resp.err - } - - return nil - } // Starting dSort has two phases: // 1. Initialization, ensures that all targets successfully initialized all @@ -95,47 +79,63 @@ func ProxyStartSortHandler(w http.ResponseWriter, r *http.Request, pars *ParsedR // given dSort job. Also bug where we could send abort (which triggers cleanup) // to not yet initialized target. + // phase 1 config := cmn.GCO.Get() if config.FastV(4, cos.SmoduleDsort) { nlog.Infof("[dsort] %s broadcasting init request to all targets", managerUUID) } path := apc.URLPathdSortInit.Join(managerUUID) - responses := broadcastTargets(http.MethodPost, path, nil, b, smap) - if err := checkResponses(responses); err != nil { + responses := bcast(http.MethodPost, path, nil, b, smap) + if err := _handleResp(w, r, smap, managerUUID, responses); err != nil { return } + // phase 2 if config.FastV(4, cos.SmoduleDsort) { nlog.Infof("[dsort] %s broadcasting start request to all targets", managerUUID) } path = apc.URLPathdSortStart.Join(managerUUID) - responses = broadcastTargets(http.MethodPost, path, nil, nil, smap) - if err := checkResponses(responses); err != nil { + responses = bcast(http.MethodPost, path, nil, nil, smap) + if err := _handleResp(w, r, smap, managerUUID, responses); err != nil { return } w.Write([]byte(managerUUID)) } +func _handleResp(w http.ResponseWriter, r *http.Request, smap *meta.Smap, managerUUID string, responses []response) error { + for _, resp := range responses { + if resp.err == nil { + continue + } + // cleanup + path := apc.URLPathdSortAbort.Join(managerUUID) + _ = bcast(http.MethodDelete, path, nil, nil, smap) + + msg := fmt.Sprintf("failed to start [dsort] %s: %v(%d)", managerUUID, resp.err, resp.statusCode) + cmn.WriteErrMsg(w, r, msg, http.StatusInternalServerError) + return resp.err + } + return nil +} + // GET /v1/sort -func ProxyGetHandler(w http.ResponseWriter, r *http.Request) { +func PgetHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodGet) { return } - query := r.URL.Query() managerUUID := query.Get(apc.QparamUUID) - if managerUUID == "" { - proxyListSortHandler(w, r, query) + plistHandler(w, r, query) return } - proxyMetricsSortHandler(w, r, query) + pmetricsHandler(w, r, query) } // GET /v1/sort?regex=... -func proxyListSortHandler(w http.ResponseWriter, r *http.Request, query url.Values) { +func plistHandler(w http.ResponseWriter, r *http.Request, query url.Values) { var ( path = apc.URLPathdSortList.S regexStr = query.Get(apc.QparamRegex) @@ -146,7 +146,7 @@ func proxyListSortHandler(w http.ResponseWriter, r *http.Request, query url.Valu return } } - responses := broadcastTargets(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) + responses := bcast(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) resultList := make([]*JobInfo, 0) for _, r := range responses { @@ -173,24 +173,22 @@ func proxyListSortHandler(w http.ResponseWriter, r *http.Request, query url.Valu } } } - body := cos.MustMarshal(resultList) if _, err := w.Write(body); err != nil { nlog.Errorln(err) // When we fail write we cannot call InvalidHandler since it will be // double header write. - return } } // GET /v1/sort?id=... -func proxyMetricsSortHandler(w http.ResponseWriter, r *http.Request, query url.Values) { +func pmetricsHandler(w http.ResponseWriter, r *http.Request, query url.Values) { var ( smap = ctx.smapOwner.Get() allMetrics = make(map[string]*Metrics, smap.CountActiveTs()) managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortMetrics.Join(managerUUID) - responses = broadcastTargets(http.MethodGet, path, nil, nil, smap) + responses = bcast(http.MethodGet, path, nil, nil, smap) notFound int ) for _, resp := range responses { @@ -226,7 +224,7 @@ func proxyMetricsSortHandler(w http.ResponseWriter, r *http.Request, query url.V } // DELETE /v1/sort/abort -func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request) { +func PabortHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodDelete) { return } @@ -239,7 +237,7 @@ func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request) { query = r.URL.Query() managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortAbort.Join(managerUUID) - responses = broadcastTargets(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get()) + responses = bcast(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get()) ) allNotFound := true for _, resp := range responses { @@ -261,7 +259,7 @@ func ProxyAbortSortHandler(w http.ResponseWriter, r *http.Request) { } // DELETE /v1/sort -func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request) { +func PremoveHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodDelete) { return } @@ -275,7 +273,7 @@ func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request) { query = r.URL.Query() managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortMetrics.Join(managerUUID) - responses = broadcastTargets(http.MethodGet, path, nil, nil, smap) + responses = bcast(http.MethodGet, path, nil, nil, smap) ) // First, broadcast to see if process is cleaned up first @@ -309,7 +307,7 @@ func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request) { // Next, broadcast the remove once we've checked that all targets have run cleanup path = apc.URLPathdSortRemove.Join(managerUUID) - responses = broadcastTargets(http.MethodDelete, path, nil, nil, smap) + responses = bcast(http.MethodDelete, path, nil, nil, smap) var failed []string //nolint:prealloc // will remain not allocated when no errors for _, r := range responses { if r.statusCode == http.StatusOK { @@ -323,6 +321,80 @@ func ProxyRemoveSortHandler(w http.ResponseWriter, r *http.Request) { } } +// Determine dsorter type. We need to make this decision based on (e.g.) size targets' memory. +func dsorterType(pars *parsedReqSpec) (string, error) { + if pars.DSorterType != "" { + return pars.DSorterType, nil // in case the dsorter type is already set, we need to respect it + } + + // Get memory stats from targets + var ( + totalAvailMemory uint64 + err error + path = apc.URLPathDae.S + moreThanThreshold = true + ) + + dsorterMemThreshold, err := cos.ParseSize(pars.DSorterMemThreshold, cos.UnitsIEC) + debug.AssertNoErr(err) + + query := make(url.Values) + query.Add(apc.QparamWhat, apc.WhatNodeStatsAndStatus) + responses := bcast(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) + for _, response := range responses { + if response.err != nil { + return "", response.err + } + + daemonStatus := stats.NodeStatus{} + if err := jsoniter.Unmarshal(response.res, &daemonStatus); err != nil { + return "", err + } + + memStat := sys.MemStat{Total: daemonStatus.MemCPUInfo.MemAvail + daemonStatus.MemCPUInfo.MemUsed} + dsortAvailMemory := calcMaxMemoryUsage(pars.MaxMemUsage, &memStat) + totalAvailMemory += dsortAvailMemory + moreThanThreshold = moreThanThreshold && dsortAvailMemory > uint64(dsorterMemThreshold) + } + + // TODO: currently, we have import cycle: dsort -> api -> dsort. Need to + // think of a way to get the total size of bucket without copy-and-paste + // the API code. + // + // baseParams := &api.BaseParams{ + // Client: http.DefaultClient, + // URL: ctx.smap.Get().Primary.URL(cmn.NetIntraControl), + // } + // msg := &apc.LsoMsg{Props: "size,status"} + // objList, err := api.ListObjects(baseParams, pars.Bucket, msg, 0) + // if err != nil { + // return "", err + // } + // + // totalBucketSize := uint64(0) + // for _, obj := range objList.Entries { + // if obj.IsStatusOK() { + // totalBucketSize += uint64(obj.Size) + // } + // } + // + // if totalBucketSize < totalAvailMemory { + // // "general type" is capable of extracting whole dataset into memory + // // In this case the creation phase is super fast. + // return DSorterGeneralType, nil + // } + + if moreThanThreshold { + // If there is enough memory to use "memory type", we should do that. + // It behaves better for cases when we have a lot of memory available. + return DSorterMemType, nil + } + + // For all other cases we should use "general type", as we don't know + // exactly what to expect, so we should prepare for the worst. + return DSorterGeneralType, nil +} + /////////////////// ///// TARGET ////// /////////////////// @@ -336,32 +408,32 @@ func TargetHandler(w http.ResponseWriter, r *http.Request) { switch apiItems[0] { case apc.Init: - initSortHandler(w, r) + tinitHandler(w, r) case apc.Start: - startSortHandler(w, r) + tstartHandler(w, r) case apc.Records: Managers.recordsHandler(w, r) case apc.Shards: Managers.shardsHandler(w, r) case apc.Abort: - abortSortHandler(w, r) + tabortHandler(w, r) case apc.Remove: - removeSortHandler(w, r) + tremoveHandler(w, r) case apc.List: - listSortHandler(w, r) + tlistHandler(w, r) case apc.Metrics: - metricsHandler(w, r) + tmetricsHandler(w, r) case apc.FinishedAck: - finishedAckHandler(w, r) + tfiniHandler(w, r) default: cmn.WriteErrMsg(w, r, "invalid path") } } -// initSortHandler is the handler called for the HTTP endpoint /v1/sort/init. +// /v1/sort/init. // It is responsible for initializing the dSort manager so it will be ready // to start receiving requests. -func initSortHandler(w http.ResponseWriter, r *http.Request) { +func tinitHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodPost) { return } @@ -370,7 +442,7 @@ func initSortHandler(w http.ResponseWriter, r *http.Request) { return } var ( - rs *ParsedRequestSpec + rs *parsedReqSpec b, err = io.ReadAll(r.Body) ) if err != nil { @@ -378,7 +450,7 @@ func initSortHandler(w http.ResponseWriter, r *http.Request) { return } if err = js.Unmarshal(b, &rs); err != nil { - err := fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "ParsedRequestSpec", cos.BHead(b), err) + err := fmt.Errorf(cmn.FmtErrUnmarshal, DSortName, "parsedReqSpec", cos.BHead(b), err) cmn.WriteErr(w, r, err) return } @@ -395,12 +467,12 @@ func initSortHandler(w http.ResponseWriter, r *http.Request) { dsortManager.unlock() } -// startSortHandler is the handler called for the HTTP endpoint /v1/sort/start. +// /v1/sort/start. // There are three major phases to this function: // 1. extractLocalShards // 2. participateInRecordDistribution // 3. distributeShardRecords -func startSortHandler(w http.ResponseWriter, r *http.Request) { +func tstartHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodPost) { return } @@ -421,35 +493,35 @@ func startSortHandler(w http.ResponseWriter, r *http.Request) { } func (m *Manager) startDSort() { - errHandler := func(err error) { - nlog.Errorf("%+v", err) // print error with stack trace - - // If we were aborted by some other process this means that we do not - // broadcast abort (we assume that daemon aborted us, aborted also others). - if !m.aborted() { - // Self-abort: better do it before sending broadcast to avoid - // inconsistent state: other have aborted but we didn't due to some - // problem. - if isReportableError(err) { - m.abort(err) - } else { - m.abort() - } - - nlog.Warningln("broadcasting abort to other targets") - path := apc.URLPathdSortAbort.Join(m.ManagerUUID) - broadcastTargets(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get(), ctx.node) - } - } - if err := m.start(); err != nil { - errHandler(err) + m.errHandler(err) return } nlog.Infof("[dsort] %s broadcasting finished ack to other targets", m.ManagerUUID) path := apc.URLPathdSortAck.Join(m.ManagerUUID, m.ctx.node.ID()) - broadcastTargets(http.MethodPut, path, nil, nil, ctx.smapOwner.Get(), ctx.node) + bcast(http.MethodPut, path, nil, nil, ctx.smapOwner.Get(), ctx.node) +} + +func (m *Manager) errHandler(err error) { + nlog.Errorf("%+v", err) + + // If we were aborted by some other process this means that we do not + // broadcast abort (we assume that daemon aborted us, aborted also others). + if !m.aborted() { + // Self-abort: better do it before sending broadcast to avoid + // inconsistent state: other have aborted but we didn't due to some + // problem. + if isReportableError(err) { + m.abort(err) + } else { + m.abort() + } + + nlog.Warningln("broadcasting abort to other targets") + path := apc.URLPathdSortAbort.Join(m.ManagerUUID) + bcast(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get(), ctx.node) + } } // shardsHandler is the handler for the HTTP endpoint /v1/sort/shards. @@ -501,7 +573,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque dsortManager.startShardCreation <- struct{}{} } -// recordsHandler is the handler called for the HTTP endpoint /v1/sort/records. +// recordsHandler is the handler /v1/sort/records. // A valid POST to this endpoint updates this target's dsortManager.Records with the // []Records from the request body, along with some related state variables. func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Request) { @@ -580,10 +652,10 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ } } -// abortSortHandler is the handler called for the HTTP endpoint /v1/sort/abort. +// /v1/sort/abort. // A valid DELETE to this endpoint aborts currently running sort job and cleans // up the state. -func abortSortHandler(w http.ResponseWriter, r *http.Request) { +func tabortHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodDelete) { return } @@ -608,7 +680,7 @@ func abortSortHandler(w http.ResponseWriter, r *http.Request) { dsortManager.abort(fmt.Errorf("%s has been aborted via API (remotely)", DSortName)) } -func removeSortHandler(w http.ResponseWriter, r *http.Request) { +func tremoveHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodDelete) { return } @@ -624,7 +696,7 @@ func removeSortHandler(w http.ResponseWriter, r *http.Request) { } } -func listSortHandler(w http.ResponseWriter, r *http.Request) { +func tlistHandler(w http.ResponseWriter, r *http.Request) { var ( query = r.URL.Query() regexStr = query.Get(apc.QparamRegex) @@ -651,9 +723,9 @@ func listSortHandler(w http.ResponseWriter, r *http.Request) { } } -// metricsHandler is the handler called for the HTTP endpoint /v1/sort/metrics. +// /v1/sort/metrics. // A valid GET to this endpoint sends response with sort metrics. -func metricsHandler(w http.ResponseWriter, r *http.Request) { +func tmetricsHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodGet) { return } @@ -680,9 +752,9 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) { } } -// finishedAckHandler is the handler called for the HTTP endpoint /v1/sort/finished-ack. +// /v1/sort/finished-ack. // A valid PUT to this endpoint acknowledges that daemonID has finished dSort operation. -func finishedAckHandler(w http.ResponseWriter, r *http.Request) { +func tfiniHandler(w http.ResponseWriter, r *http.Request) { if !checkHTTPMethod(w, r, http.MethodPut) { return } @@ -702,7 +774,11 @@ func finishedAckHandler(w http.ResponseWriter, r *http.Request) { dsortManager.updateFinishedAck(daemonID) } -func broadcastTargets(method, path string, urlParams url.Values, body []byte, smap *meta.Smap, ignore ...*meta.Snode) []response { +// +// common: PROXY and TARGET +// + +func bcast(method, path string, urlParams url.Values, body []byte, smap *meta.Smap, ignore ...*meta.Snode) []response { var ( responses = make([]response, smap.CountActiveTs()) wg = &sync.WaitGroup{} @@ -787,78 +863,3 @@ func checkRESTItems(w http.ResponseWriter, r *http.Request, itemsAfter int, item return items, err } - -// Determine what dsorter type we should use. We need to make this decision -// based on eg. how much memory targets have. -func determineDSorterType(pars *ParsedRequestSpec) (string, error) { - if pars.DSorterType != "" { - return pars.DSorterType, nil // in case the dsorter type is already set, we need to respect it - } - - // Get memory stats from targets - var ( - totalAvailMemory uint64 - err error - path = apc.URLPathDae.S - moreThanThreshold = true - ) - - dsorterMemThreshold, err := cos.ParseSize(pars.DSorterMemThreshold, cos.UnitsIEC) - debug.AssertNoErr(err) - - query := make(url.Values) - query.Add(apc.QparamWhat, apc.WhatNodeStatsAndStatus) - responses := broadcastTargets(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) - for _, response := range responses { - if response.err != nil { - return "", response.err - } - - daemonStatus := stats.NodeStatus{} - if err := jsoniter.Unmarshal(response.res, &daemonStatus); err != nil { - return "", err - } - - memStat := sys.MemStat{Total: daemonStatus.MemCPUInfo.MemAvail + daemonStatus.MemCPUInfo.MemUsed} - dsortAvailMemory := calcMaxMemoryUsage(pars.MaxMemUsage, &memStat) - totalAvailMemory += dsortAvailMemory - moreThanThreshold = moreThanThreshold && dsortAvailMemory > uint64(dsorterMemThreshold) - } - - // TODO: currently, we have import cycle: dsort -> api -> dsort. Need to - // think of a way to get the total size of bucket without copy-and-paste - // the API code. - // - // baseParams := &api.BaseParams{ - // Client: http.DefaultClient, - // URL: ctx.smap.Get().Primary.URL(cmn.NetIntraControl), - // } - // msg := &apc.LsoMsg{Props: "size,status"} - // objList, err := api.ListObjects(baseParams, pars.Bucket, msg, 0) - // if err != nil { - // return "", err - // } - // - // totalBucketSize := uint64(0) - // for _, obj := range objList.Entries { - // if obj.IsStatusOK() { - // totalBucketSize += uint64(obj.Size) - // } - // } - // - // if totalBucketSize < totalAvailMemory { - // // "general type" is capable of extracting whole dataset into memory - // // In this case the creation phase is super fast. - // return DSorterGeneralType, nil - // } - - if moreThanThreshold { - // If there is enough memory to use "memory type", we should do that. - // It behaves better for cases when we have a lot of memory available. - return DSorterMemType, nil - } - - // For all other cases we should use "general type", as we don't know - // exactly what to expect, so we should prepare for the worst. - return DSorterGeneralType, nil -} diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index d6cb010d46..3d1f09085e 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -107,7 +107,7 @@ type ( ec extract.Creator startShardCreation chan struct{} - pars *ParsedRequestSpec + pars *parsedReqSpec client *http.Client // Client for sending records metadata compression struct { @@ -175,7 +175,7 @@ func (m *Manager) unlock() { m.mu.Unlock() } // init initializes all necessary fields. // PRECONDITION: `m.mu` must be locked. -func (m *Manager) init(pars *ParsedRequestSpec) error { +func (m *Manager) init(pars *parsedReqSpec) error { debug.AssertMutexLocked(&m.mu) m.ctx = ctx @@ -467,9 +467,9 @@ func (m *Manager) setRW() (err error) { return errors.WithStack(err) } - m.ec = newExtractCreator(m.ctx.t, m.pars.Extension) + m.ec = newExtractCreator(m.ctx.t, m.pars.InputExtension) if m.ec == nil { - debug.Assert(m.pars.Extension == "", m.pars.Extension) + debug.Assert(m.pars.InputExtension == "", m.pars.InputExtension) // NOTE: [feature] allow non-specified extension; assign default extract-creator; // handle all shards we encounter - all supported formats m.ec = extract.NewTarRW(m.ctx.t) @@ -478,7 +478,7 @@ func (m *Manager) setRW() (err error) { debug.Assert(m.ec != nil, "dry-run in combination with _any_ shard extension is not supported yet") m.ec = extract.NopRW(m.ec) } - m.recManager = extract.NewRecordManager(m.ctx.t, m.pars.Bck, m.pars.Extension, m.ec, ke, m.onDupRecs) + m.recManager = extract.NewRecordManager(m.ctx.t, m.pars.InputBck, m.pars.InputExtension, m.ec, ke, m.onDupRecs) return nil } diff --git a/ext/dsort/manager_group_test.go b/ext/dsort/manager_group_test.go index 4da42fb659..e47b8da5b3 100644 --- a/ext/dsort/manager_group_test.go +++ b/ext/dsort/manager_group_test.go @@ -25,7 +25,8 @@ const ( var _ = Describe("ManagerGroup", func() { var ( mgrp *ManagerGroup - validRS = &ParsedRequestSpec{Extension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} + validRS = &parsedReqSpec{InputExtension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} ) BeforeEach(func() { diff --git a/ext/dsort/manager_test.go b/ext/dsort/manager_test.go index 0d483192e0..2b835ce7c3 100644 --- a/ext/dsort/manager_test.go +++ b/ext/dsort/manager_test.go @@ -35,7 +35,8 @@ var _ = Describe("Init", func() { m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} m.lock() defer m.unlock() - sr := &ParsedRequestSpec{Extension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} + sr := &parsedReqSpec{InputExtension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} Expect(m.init(sr)).NotTo(HaveOccurred()) Expect(m.ec.UsingCompression()).To(BeFalse()) }) @@ -44,7 +45,8 @@ var _ = Describe("Init", func() { m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} m.lock() defer m.unlock() - sr := &ParsedRequestSpec{Extension: archive.ExtTarGz, Algorithm: &Algorithm{Kind: None}, MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} + sr := &parsedReqSpec{InputExtension: archive.ExtTarGz, Algorithm: &Algorithm{Kind: None}, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} Expect(m.init(sr)).NotTo(HaveOccurred()) Expect(m.ec.UsingCompression()).To(BeTrue()) }) @@ -53,7 +55,8 @@ var _ = Describe("Init", func() { m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} m.lock() defer m.unlock() - sr := &ParsedRequestSpec{Extension: archive.ExtTgz, Algorithm: &Algorithm{Kind: None}, MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} + sr := &parsedReqSpec{InputExtension: archive.ExtTgz, Algorithm: &Algorithm{Kind: None}, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} Expect(m.init(sr)).NotTo(HaveOccurred()) Expect(m.ec.UsingCompression()).To(BeTrue()) }) @@ -62,7 +65,8 @@ var _ = Describe("Init", func() { m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} m.lock() defer m.unlock() - sr := &ParsedRequestSpec{Extension: archive.ExtZip, Algorithm: &Algorithm{Kind: None}, MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} + sr := &parsedReqSpec{InputExtension: archive.ExtZip, Algorithm: &Algorithm{Kind: None}, + MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} Expect(m.init(sr)).NotTo(HaveOccurred()) Expect(m.ec.UsingCompression()).To(BeTrue()) }) diff --git a/ext/dsort/request_spec.go b/ext/dsort/request_spec.go index a505fa4a2b..b0d133468f 100644 --- a/ext/dsort/request_spec.go +++ b/ext/dsort/request_spec.go @@ -29,11 +29,18 @@ type parsedOutputTemplate struct { Template cos.ParsedTemplate } -type ParsedRequestSpec struct { - Bck cmn.Bck `json:"bck"` +type ParsedReq struct { + InputBck cmn.Bck + OutputBck cmn.Bck + pars *parsedReqSpec +} + +type parsedReqSpec struct { + InputBck cmn.Bck `json:"input_bck"` Description string `json:"description"` OutputBck cmn.Bck `json:"output_bck"` - Extension string `json:"extension"` + InputExtension string `json:"input_extension"` + OutputExtension string `json:"output_extension"` OutputShardSize int64 `json:"output_shard_size,string"` Pit *parsedInputTemplate `json:"pit"` Pot *parsedOutputTemplate `json:"pot"` @@ -44,7 +51,7 @@ type ParsedRequestSpec struct { TargetOrderSalt []byte `json:"target_order_salt"` ExtractConcMaxLimit int `json:"extract_concurrency_max_limit"` CreateConcMaxLimit int `json:"create_concurrency_max_limit"` - StreamMultiplier int `json:"stream_multiplier"` // TODO: remove + SbundleMult int `json:"bundle_multiplier"` ExtendedMetrics bool `json:"extended_metrics"` // debug @@ -58,68 +65,95 @@ type ParsedRequestSpec struct { // RequestSpec // ///////////////// -// Parse returns a non-nil error if a RequestSpec is invalid. When RequestSpec -// is valid it parses all the fields, sets the values and returns ParsedRequestSpec. -func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error) { +func specErr(s string, err error) error { return fmt.Errorf("[dsort] parse-spec: %q %w", s, err) } + +func (rs *RequestSpec) ParseCtx() (*ParsedReq, error) { + pars, err := rs.parse() + return &ParsedReq{pars.InputBck, pars.OutputBck, pars}, err +} + +func (rs *RequestSpec) parse() (*parsedReqSpec, error) { var ( cfg = cmn.GCO.Get().DSort - pars = &ParsedRequestSpec{} + pars = &parsedReqSpec{} ) - if rs.InputBck.Name == "" { - return pars, errMissingSrcBucket + // src bck + if rs.InputBck.IsEmpty() { + return pars, specErr("input_bck", errMissingSrcBucket) } + pars.InputBck = rs.InputBck if rs.InputBck.Provider == "" { - rs.InputBck.Provider = apc.AIS - } - if _, err := cmn.NormalizeProvider(rs.InputBck.Provider); err != nil { - return pars, err + pars.InputBck.Provider = apc.AIS // NOTE: ais:// is the default + } else { + normp, err := cmn.NormalizeProvider(rs.InputBck.Provider) + if err != nil { + return pars, specErr("input_bck_provider", err) + } + pars.InputBck.Provider = normp } if err := rs.InputBck.Validate(); err != nil { - return pars, err + return pars, specErr("input_bck", err) } + pars.Description = rs.Description - pars.Bck = rs.InputBck + + // dst bck pars.OutputBck = rs.OutputBck if pars.OutputBck.IsEmpty() { - pars.OutputBck = pars.Bck - } else if _, err := cmn.NormalizeProvider(rs.OutputBck.Provider); err != nil { - return pars, err - } else if err := rs.OutputBck.Validate(); err != nil { - return pars, err + pars.OutputBck = pars.InputBck // NOTE: source can be the destination as well + } else { + normp, err := cmn.NormalizeProvider(rs.OutputBck.Provider) + if err != nil { + return pars, specErr("output_bck_provider", err) + } + pars.OutputBck.Provider = normp + if err := rs.OutputBck.Validate(); err != nil { + return pars, specErr("output_bck", err) + } } + // input format var err error pars.Pit, err = parseInputFormat(rs.InputFormat) if err != nil { - return nil, err + return nil, specErr("input_format", err) } - - if rs.Extension != "" { - pars.Extension, err = archive.Mime(rs.Extension, "") + if rs.InputFormat.Template != "" { + // template is not a filename but all we do here is + // checking the template's suffix for specific supported extensions + if ext, err := archive.Mime("", rs.InputFormat.Template); err == nil { + if rs.InputExtension != "" && rs.InputExtension != ext { + return nil, fmt.Errorf("input_extension: %q vs %q", rs.InputExtension, ext) + } + rs.InputExtension = ext + } + } + if rs.InputExtension != "" { + pars.InputExtension, err = archive.Mime(rs.InputExtension, "") if err != nil { - return nil, err + return nil, specErr("input_extension", err) } } + // output format pars.OutputShardSize, err = cos.ParseSize(rs.OutputShardSize, cos.UnitsIEC) if err != nil { - return nil, err + return nil, specErr("output_shard_size", err) } if pars.OutputShardSize < 0 { return nil, fmt.Errorf(fmtErrNegOutputSize, pars.OutputShardSize) } - pars.Algorithm, err = parseAlgorithm(rs.Algorithm) if err != nil { - return nil, err + return nil, specErr("algorithm", err) } - empty, err := validateOrderFileURL(rs.OrderFileURL) - if err != nil { + var isOrder bool + if isOrder, err = validateOrderFileURL(rs.OrderFileURL); err != nil { return nil, fmt.Errorf(fmtErrOrderURL, rs.OrderFileURL, err) } - if empty { + if isOrder { if pars.Pot, err = parseOutputFormat(rs.OutputFormat); err != nil { return nil, err } @@ -129,20 +163,36 @@ func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error) { return nil, errMissingOutputSize } } - } else { // Valid and not empty. + if rs.OutputFormat != "" { + // (ditto) + if ext, err := archive.Mime("", rs.OutputFormat); err == nil { + if rs.OutputExtension != "" && rs.OutputExtension != ext { + return nil, fmt.Errorf("output_extension: %q vs %q", rs.OutputExtension, ext) + } + rs.OutputExtension = ext + } + } + } else { // For the order file the output shard size must be set. if pars.OutputShardSize == 0 { return nil, errMissingOutputSize } - pars.OrderFileURL = rs.OrderFileURL - pars.OrderFileSep = rs.OrderFileSep if pars.OrderFileSep == "" { pars.OrderFileSep = "\t" } } + if rs.OutputExtension == "" { + pars.OutputExtension = pars.InputExtension + } else { + pars.OutputExtension, err = archive.Mime(rs.OutputExtension, "") + if err != nil { + return nil, specErr("output_extension", err) + } + } + // mem & conc if rs.MaxMemUsage == "" { rs.MaxMemUsage = cfg.DefaultMaxMemUsage } @@ -159,16 +209,22 @@ func (rs *RequestSpec) Parse() (*ParsedRequestSpec, error) { pars.ExtractConcMaxLimit = rs.ExtractConcMaxLimit pars.CreateConcMaxLimit = rs.CreateConcMaxLimit - pars.StreamMultiplier = rs.StreamMultiplier pars.ExtendedMetrics = rs.ExtendedMetrics pars.DSorterType = rs.DSorterType pars.DryRun = rs.DryRun - // Check for values that override the global config. + // `cfg` here contains inherited (aka global) part of the dsort config - + // apply this request's rs.Config values to override or assign defaults + if err := rs.Config.ValidateWithOpts(true); err != nil { return nil, err } pars.DSortConf = rs.Config + + pars.SbundleMult = rs.Config.SbundleMult + if pars.SbundleMult == 0 { + pars.SbundleMult = cfg.SbundleMult + } if pars.MissingShards == "" { pars.MissingShards = cfg.MissingShards } diff --git a/ext/dsort/request_spec_test.go b/ext/dsort/request_spec_test.go index 9e0b483624..8b5365c6eb 100644 --- a/ext/dsort/request_spec_test.go +++ b/ext/dsort/request_spec_test.go @@ -31,21 +31,21 @@ var _ = Describe("RequestSpec", func() { It("should parse minimal spec", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), OutputFormat: "prefix-{10..111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "80%", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) - Expect(pars.Bck.Name).To(Equal("test")) - Expect(pars.Bck.Provider).To(Equal(apc.AIS)) + Expect(pars.InputBck.Name).To(Equal("test")) + Expect(pars.InputBck.Provider).To(Equal(apc.AIS)) Expect(pars.OutputBck.Name).To(Equal("test")) Expect(pars.OutputBck.Provider).To(Equal(apc.AIS)) - Expect(pars.Extension).To(Equal(archive.ExtTar)) + Expect(pars.InputExtension).To(Equal(archive.ExtTar)) Expect(pars.Pit.Template).To(Equal(cos.ParsedTemplate{ Prefix: "prefix-", @@ -79,18 +79,18 @@ var _ = Describe("RequestSpec", func() { rs := RequestSpec{ InputBck: cmn.Bck{Provider: apc.AWS, Name: "test"}, OutputBck: cmn.Bck{Provider: apc.AWS, Name: "testing"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), OutputFormat: "prefix-{10..111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "80%", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) - Expect(pars.Bck.Name).To(Equal("test")) - Expect(pars.Bck.Provider).To(Equal(apc.AWS)) + Expect(pars.InputBck.Name).To(Equal("test")) + Expect(pars.InputBck.Provider).To(Equal(apc.AWS)) Expect(pars.OutputBck.Name).To(Equal("testing")) Expect(pars.OutputBck.Provider).To(Equal(apc.AWS)) }) @@ -99,14 +99,14 @@ var _ = Describe("RequestSpec", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "80 GB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) Expect(pars.MaxMemUsage.Type).To(Equal(cos.QuantityBytes)) @@ -116,58 +116,58 @@ var _ = Describe("RequestSpec", func() { It("should parse spec with .tgz extension", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTgz, + InputExtension: archive.ExtTgz, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) - Expect(pars.Extension).To(Equal(archive.ExtTgz)) + Expect(pars.InputExtension).To(Equal(archive.ExtTgz)) }) It("should parse spec with .tar.gz extension", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTarGz, + InputExtension: archive.ExtTarGz, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) - Expect(pars.Extension).To(Equal(archive.ExtTarGz)) + Expect(pars.InputExtension).To(Equal(archive.ExtTarGz)) }) It("should parse spec with .tar.gz extension", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtZip, + InputExtension: archive.ExtZip, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) - Expect(pars.Extension).To(Equal(archive.ExtZip)) + Expect(pars.InputExtension).To(Equal(archive.ExtZip)) }) It("should parse spec with %06d syntax", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTgz, + InputExtension: archive.ExtTgz, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-%06d-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) Expect(pars.Pot.Template).To(Equal(cos.ParsedTemplate{ @@ -185,13 +185,13 @@ var _ = Describe("RequestSpec", func() { It("should parse spec with @ syntax", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTgz, + InputExtension: archive.ExtTgz, InputFormat: newInputFormat("prefix@0111-suffix"), OutputFormat: "prefix-@000111-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) Expect(pars.Pit.Template).To(Equal(cos.ParsedTemplate{ @@ -220,7 +220,7 @@ var _ = Describe("RequestSpec", func() { It("should parse spec and set default conc limits", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", @@ -228,7 +228,7 @@ var _ = Describe("RequestSpec", func() { ExtractConcMaxLimit: 0, Algorithm: Algorithm{Kind: None}, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) Expect(pars.CreateConcMaxLimit).To(BeEquivalentTo(0)) @@ -243,7 +243,7 @@ var _ = Describe("RequestSpec", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", @@ -259,7 +259,7 @@ var _ = Describe("RequestSpec", func() { DSorterMemThreshold: "", }, } - pars, err := rs.Parse() + pars, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) Expect(pars.DuplicatedRecords).To(Equal(cmn.AbortReaction)) @@ -271,23 +271,23 @@ var _ = Describe("RequestSpec", func() { It("should pass when output shard is zero and bash or @ template is used for output format", func() { rs := RequestSpec{ - InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, - InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), - OutputFormat: "prefix-{10..111}-suffix", - MaxMemUsage: "80%", + InputBck: cmn.Bck{Name: "test"}, + InputExtension: archive.ExtTar, + InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), + OutputFormat: "prefix-{10..111}-suffix", + MaxMemUsage: "80%", } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).ShouldNot(HaveOccurred()) rs = RequestSpec{ - InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, - InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), - OutputFormat: "prefix-@111-suffix", - MaxMemUsage: "80%", + InputBck: cmn.Bck{Name: "test"}, + InputExtension: archive.ExtTar, + InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), + OutputFormat: "prefix-@111-suffix", + MaxMemUsage: "80%", } - _, err = rs.Parse() + _, err = rs.parse() Expect(err).ShouldNot(HaveOccurred()) }) }) @@ -295,34 +295,34 @@ var _ = Describe("RequestSpec", func() { Context("request specs which shall NOT pass", func() { It("should fail due to missing bucket property", func() { rs := RequestSpec{ - Extension: ".txt", + InputExtension: ".txt", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) - Expect(err).To(Equal(errMissingSrcBucket)) + Expect(errors.Is(err, errMissingSrcBucket)).To(BeTrue()) }) It("should fail due to invalid bucket provider", func() { rs := RequestSpec{ - InputBck: cmn.Bck{Provider: "invalid", Name: "test"}, - Extension: ".txt", - Algorithm: Algorithm{Kind: None}, + InputBck: cmn.Bck{Provider: "invalid", Name: "test"}, + InputExtension: ".txt", + Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(err).Should(MatchError(&cmn.ErrInvalidBackendProvider{})) }) It("should fail due to invalid output bucket provider", func() { rs := RequestSpec{ - InputBck: cmn.Bck{Provider: apc.AIS, Name: "test"}, - OutputBck: cmn.Bck{Provider: "invalid", Name: "test"}, - Extension: ".txt", - Algorithm: Algorithm{Kind: None}, + InputBck: cmn.Bck{Provider: apc.AIS, Name: "test"}, + OutputBck: cmn.Bck{Provider: "invalid", Name: "test"}, + InputExtension: ".txt", + Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(err).Should(MatchError(&cmn.ErrInvalidBackendProvider{})) }) @@ -330,13 +330,13 @@ var _ = Describe("RequestSpec", func() { It("should fail due to start after end in input format", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, OutputShardSize: "10KB", InputFormat: newInputFormat("prefix-{0112..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) contains := strings.Contains(err.Error(), "start") Expect(contains).To(BeTrue()) @@ -345,13 +345,13 @@ var _ = Describe("RequestSpec", func() { It("should fail due to start after end in output format", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, OutputShardSize: "10KB", InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0112..0111}-suffix", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) contains := strings.Contains(err.Error(), "start") Expect(contains).To(BeTrue()) @@ -360,13 +360,13 @@ var _ = Describe("RequestSpec", func() { It("should fail due invalid parentheses", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, OutputShardSize: "10KB", InputFormat: newInputFormat("prefix-}{0001..0111}-suffix"), OutputFormat: "prefix-}{0010..0111}-suffix", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) contains := strings.Contains(err.Error(), "invalid") Expect(contains).To(BeTrue()) @@ -375,14 +375,15 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid extension", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: ".jpg", + InputExtension: ".jpg", InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) + err = errors.Unwrap(err) check := archive.IsErrUnknownMime(err) Expect(check).To(BeTrue()) }) @@ -390,14 +391,14 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid mem usage specification", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "80", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(err).To(Equal(cos.ErrInvalidQuantityUsage)) }) @@ -405,14 +406,14 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid mem usage percent specified", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "120%", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(err).To(Equal(cos.ErrInvalidQuantityPercent)) }) @@ -420,14 +421,14 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid mem usage bytes specified", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", MaxMemUsage: "-1 GB", Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(err).To(Equal(cos.ErrInvalidQuantityUsage)) }) @@ -435,14 +436,14 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid extract concurrency specified", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", ExtractConcMaxLimit: -1, Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(errors.Is(err, errNegConcLimit)).To(BeTrue()) }) @@ -450,14 +451,14 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid create concurrency specified", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111}-suffix"), OutputFormat: "prefix-{0010..0111}-suffix", OutputShardSize: "10KB", CreateConcMaxLimit: -1, Algorithm: Algorithm{Kind: None}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) Expect(errors.Is(err, errNegConcLimit)).To(BeTrue()) }) @@ -465,7 +466,7 @@ var _ = Describe("RequestSpec", func() { It("should fail due to invalid dsort config value", func() { rs := RequestSpec{ InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, + InputExtension: archive.ExtTar, InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), OutputFormat: "prefix-{10..111}-suffix", OutputShardSize: "10KB", @@ -473,19 +474,19 @@ var _ = Describe("RequestSpec", func() { Algorithm: Algorithm{Kind: None}, Config: cmn.DSortConf{DuplicatedRecords: "something"}, } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) }) It("should fail when output shard size is empty and output format is %06d", func() { rs := RequestSpec{ - InputBck: cmn.Bck{Name: "test"}, - Extension: archive.ExtTar, - InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), - OutputFormat: "prefix-%06d-suffix", - MaxMemUsage: "80%", + InputBck: cmn.Bck{Name: "test"}, + InputExtension: archive.ExtTar, + InputFormat: newInputFormat("prefix-{0010..0111..2}-suffix"), + OutputFormat: "prefix-%06d-suffix", + MaxMemUsage: "80%", } - _, err := rs.Parse() + _, err := rs.parse() Expect(err).Should(HaveOccurred()) }) }) diff --git a/python/tests/integration/sdk/test_dsort_ops.py b/python/tests/integration/sdk/test_dsort_ops.py index 07b62baf7e..0900d6c7ce 100644 --- a/python/tests/integration/sdk/test_dsort_ops.py +++ b/python/tests/integration/sdk/test_dsort_ops.py @@ -71,7 +71,7 @@ def _get_object_content_map(self, bucket_name, object_names): def _start_with_spec(self, input_bck_name, out_bck_name, input_object_prefix): spec = { - "extension": ".tar", + "input_extension": ".tar", "input_bck": {"name": input_bck_name}, "output_bck": {"name": out_bck_name}, "input_format": {"template": input_object_prefix + "-{0..1}"},