Skip to content

Commit

Permalink
dsort: create destination on the fly
Browse files Browse the repository at this point in the history
* consistent with transform&copy bucket, transform&copy (multi)object op-s
* parts six, prev. commit: 5994072
* TCO dry-run option
* testing script: check existence and cleanup accordingly
* with minor refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 7, 2023
1 parent fa2d065 commit dedf578
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 66 deletions.
4 changes: 2 additions & 2 deletions ais/plstcx.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *lstcx) do() (string, error) {
cnt := cos.Min(len(names), 10)
nlog.Infof("(%s => %s): %s => %s %v...", c.amsg.Action, c.altmsg.Action, c.bckFrom, c.bckTo, names[:cnt])

c.tcomsg.TxnUUID, err = p.tcobjs(c.bckFrom, c.bckTo, &c.altmsg)
c.tcomsg.TxnUUID, err = p.tcobjs(c.bckFrom, c.bckTo, &c.altmsg, c.tcbmsg.DryRun)
if lst.ContinuationToken != "" {
c.lsmsg.ContinuationToken = lst.ContinuationToken
go c.pages(smap)
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *lstcx) pages(smap *smapX) {

// next tco action
c.altmsg.Value = &c.tcomsg
xid, err := p.tcobjs(c.bckFrom, c.bckTo, &c.altmsg)
xid, err := p.tcobjs(c.bckFrom, c.bckTo, &c.altmsg, c.tcbmsg.DryRun)
if err != nil {
nlog.Errorln(err)
return
Expand Down
33 changes: 25 additions & 8 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
}

nlog.Infof("multi-obj %s %s => %s", msg.Action, bck, bckTo)
if xid, err = p.tcobjs(bck, bckTo, msg); err != nil {
if xid, err = p.tcobjs(bck, bckTo, msg, tcomsg.TCBMsg.CopyBckMsg.DryRun); err != nil {
p.writeErr(w, r, err)
return
}
Expand Down Expand Up @@ -2857,7 +2857,6 @@ func (p *proxy) dsortHandler(w http.ResponseWriter, r *http.Request) {
if err := p.checkAccess(w, r, nil, apc.AceAdmin); err != nil {
return
}

apiItems, err := cmn.ParseURL(r.URL.Path, 0, true, apc.URLPathdSort.L)
if err != nil {
p.writeErrURL(w, r)
Expand All @@ -2868,8 +2867,15 @@ func (p *proxy) dsortHandler(w http.ResponseWriter, r *http.Request) {
case http.MethodPost:
// - validate request, check input_bck and output_bck
// - start dsort
body, err := io.ReadAll(r.Body)
if err != nil {
p.writeErrStatusf(w, r, http.StatusInternalServerError, "failed to receive dsort request: %v", err)
return
}
rs := &dsort.RequestSpec{}
if cmn.ReadJSON(w, r, rs) != nil {
if err := jsoniter.Unmarshal(body, rs); err != nil {
err = fmt.Errorf(cmn.FmtErrUnmarshal, p, "dsort request", cos.BHead(body), err)
p.writeErr(w, r, err)
return
}
parsc, err := rs.ParseCtx()
Expand All @@ -2889,15 +2895,26 @@ func (p *proxy) dsortHandler(w http.ResponseWriter, r *http.Request) {
return
}
if errCode == http.StatusNotFound {
// TODO -- FIXME: reuse common prxtxn logic to create
if true {
p.writeErr(w, r, cmn.NewErrRemoteBckNotFound(&parsc.OutputBck), http.StatusNotFound)
naction := "dsort-create-output-bck"
warnfmt := "%s: %screate 'output_bck' %s with the 'input_bck' (%s) props"
if p.forwardCP(w, r, nil /*msg*/, naction, body /*orig body*/) { // to create
return
}
if p.forwardCP(w, r, nil /*msg*/, "dsort-create-output-bck") { // to create
ctx := &bmdModifier{
pre: bmodCpProps,
final: p.bmodSync,
msg: &apc.ActMsg{Action: naction},
txnID: "",
bcks: []*meta.Bck{bck, bckTo},
wait: true,
}
if _, err = p.owner.bmd.modify(ctx); err != nil {
debug.AssertNoErr(err)
err = fmt.Errorf(warnfmt+": %w", p, "failed to ", bckTo, bck, err)
p.writeErr(w, r, err)
return
}
nlog.Warningf("%s: output_bck %s will be created with the input_bck (%s) props", p, bckTo, bck)
nlog.Warningf(warnfmt, p, "", bckTo, bck)
}
}
dsort.PstartHandler(w, r, parsc)
Expand Down
9 changes: 0 additions & 9 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1677,15 +1677,6 @@ func (p *proxy) _newRMD(ctx *smapModifier, clone *smapX) {
ctx.rmdCtx = rmdCtx
}

func (p *proxy) bmodSync(ctx *bmdModifier, clone *bucketMD) {
debug.Assert(clone._sgl != nil)
msg := p.newAmsg(ctx.msg, clone, ctx.txnID)
wg := p.metasyncer.sync(revsPair{clone, msg})
if ctx.wait {
wg.Wait()
}
}

func (p *proxy) cluSetPrimary(w http.ResponseWriter, r *http.Request) {
apiItems, err := p.parseURL(w, r, 1, false, apc.URLPathCluProxy.L)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions ais/prxdl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (p *proxy) httpdlpost(w http.ResponseWriter, r *http.Request) {

body, err := io.ReadAll(r.Body)
if err != nil {
p.writeErrStatusf(w, r, http.StatusInternalServerError, "Error starting download: %v", err)
p.writeErrStatusf(w, r, http.StatusInternalServerError, "failed to receive download request: %v", err)
return
}
dlb, dlBase, ok := p.validateStartDownload(w, r, body)
dlb, dlBase, ok := p.validateDownload(w, r, body)
if !ok {
return
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (p *proxy) dlstart(r *http.Request, xid, jobID string, body []byte) (errCod
return http.StatusOK, nil
}

func (p *proxy) validateStartDownload(w http.ResponseWriter, r *http.Request, body []byte) (dlb dload.Body, dlBase dload.Base, ok bool) {
func (p *proxy) validateDownload(w http.ResponseWriter, r *http.Request, body []byte) (dlb dload.Body, dlBase dload.Base, ok bool) {
if err := jsoniter.Unmarshal(body, &dlb); err != nil {
err = fmt.Errorf(cmn.FmtErrUnmarshal, p, "download request", cos.BHead(body), err)
p.writeErr(w, r, err)
Expand Down
73 changes: 42 additions & 31 deletions ais/prxtxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (p *proxy) tcb(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid
// 3. create dst bucket if doesn't exist - clone bckFrom props
if !dryRun && !existsTo {
ctx := &bmdModifier{
pre: bmodTCB,
pre: bmodCpProps,
final: p.bmodSync,
msg: msg,
txnID: c.uuid,
Expand Down Expand Up @@ -605,7 +605,8 @@ func (p *proxy) tcb(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid
}

// transform or copy a list or a range of objects
func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg) (xid string, err error) {
// TODO: check dry-run
func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid string, err error) {
// 1. confirm existence
bmd := p.owner.bmd.get()
if _, present := bmd.Get(bckFrom); !present {
Expand All @@ -615,7 +616,7 @@ func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg) (xid string, e
_, existsTo := bmd.Get(bckTo)
// 2. begin
var (
waitmsync = !existsTo
waitmsync = !dryRun && !existsTo
c = p.prepTxnClient(msg, bckFrom, waitmsync)
)
_ = bckTo.AddUnameToQuery(c.req.Query, apc.QparamBckTo)
Expand All @@ -624,9 +625,9 @@ func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg) (xid string, e
}

// 3. create dst bucket if doesn't exist - clone bckFrom props
if !existsTo {
if !dryRun && !existsTo {
ctx := &bmdModifier{
pre: bmodTCB,
pre: bmodCpProps,
final: p.bmodSync,
msg: msg,
txnID: c.uuid,
Expand Down Expand Up @@ -662,32 +663,6 @@ func (p *proxy) tcobjs(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg) (xid string, e
return strings.Join(all, xact.UUIDSepa), nil
}

func bmodTCB(ctx *bmdModifier, clone *bucketMD) error {
var (
bckFrom, bckTo = ctx.bcks[0], ctx.bcks[1]
bprops, present = clone.Get(bckFrom) // TODO: Bucket could be removed during begin.
)
debug.Assert(present)

// Skip destination bucket creation if it's dry run or it's already present.
if _, present = clone.Get(bckTo); present {
ctx.terminate = true
return nil
}

debug.Assert(bckTo.IsAIS())
bckFrom.Props = bprops.Clone()
// replicate bucket props - but only if the source is ais as well
if bckFrom.IsAIS() || bckFrom.IsRemoteAIS() {
bckTo.Props = bprops.Clone()
} else {
bckTo.Props = defaultBckProps(bckPropsArgs{bck: bckTo})
}
added := clone.add(bckTo, bckTo.Props)
debug.Assert(added)
return nil
}

func parseECConf(value any) (*cmn.ECConfToUpdate, error) {
switch v := value.(type) {
case string:
Expand Down Expand Up @@ -1000,6 +975,42 @@ func (p *proxy) prepTxnClient(msg *apc.ActMsg, bck *meta.Bck, waitmsync bool) *t
return c
}

// two helpers to create ais:// destination on the fly, copy source bucket props
func bmodCpProps(ctx *bmdModifier, clone *bucketMD) error {
var (
bckFrom, bckTo = ctx.bcks[0], ctx.bcks[1]
bprops, present = clone.Get(bckFrom) // TODO: Bucket could be removed during begin.
)
debug.Assert(present)

// Skip destination bucket creation if it's already present.
if _, present = clone.Get(bckTo); present {
ctx.terminate = true
return nil
}

debug.Assert(bckTo.IsAIS())
bckFrom.Props = bprops.Clone()
// replicate bucket props - but only if the source is ais as well
if bckFrom.IsAIS() || bckFrom.IsRemoteAIS() {
bckTo.Props = bprops.Clone()
} else {
bckTo.Props = defaultBckProps(bckPropsArgs{bck: bckTo})
}
added := clone.add(bckTo, bckTo.Props)
debug.Assert(added)
return nil
}

func (p *proxy) bmodSync(ctx *bmdModifier, clone *bucketMD) {
debug.Assert(clone._sgl != nil)
msg := p.newAmsg(ctx.msg, clone, ctx.txnID)
wg := p.metasyncer.sync(revsPair{clone, msg})
if ctx.wait {
wg.Wait()
}
}

// rollback create-bucket
func (p *proxy) undoCreateBucket(msg *apc.ActMsg, bck *meta.Bck) {
ctx := &bmdModifier{
Expand Down
11 changes: 6 additions & 5 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,22 +709,23 @@ func TestDistributedSortNonExistingBuckets(t *testing.T) {

df.init()

// Create local output bucket
// Create ais:// output
tools.CreateBucketWithCleanup(t, m.proxyURL, df.outputBck, nil)

tlog.Logln(startingDS)
spec := df.gen()
tlog.Logf("dsort %s(-) => %s\n", m.bck, df.outputBck)
if _, err := api.StartDSort(df.baseParams, &spec); err == nil {
t.Errorf("expected %s job to fail when input bucket does not exist", dsort.DSortName)
t.Error("expected dsort to fail when input bucket doesn't exist")
}

// Now destroy output bucket and create input bucket
tools.DestroyBucket(t, m.proxyURL, df.outputBck)
tools.CreateBucketWithCleanup(t, m.proxyURL, m.bck, nil)

tlog.Logln("starting second distributed sort...")
if _, err := api.StartDSort(df.baseParams, &spec); err == nil {
t.Errorf("expected %s job to fail when output bucket does not exist", dsort.DSortName)
tlog.Logf("dsort %s => %s(-)\n", m.bck, df.outputBck)
if _, err := api.StartDSort(df.baseParams, &spec); err != nil {
t.Errorf("expected dsort to create output bucket on the fly, got: %v", err)
}
},
)
Expand Down
24 changes: 17 additions & 7 deletions ais/test/scripts/dsort-ex1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ while (( "$#" )); do
esac
done

## generate 10 tar shards, each containing 5 files
## (50 files total)
ais create $srcbck $dstbck 2>/dev/null
## establish existence
ais show bucket $srcbck -c 1>/dev/null 2>&1
srcexists=$?
ais show bucket $dstbck -c 1>/dev/null 2>&1
dstexists=$?

## generate 10 tar shards, each containing 5 files (50 files total)
## note that dstbck, if doesn't exist, will be created on the fly
[[ $srcexists == 0 ]] || ais create $srcbck || exit 1
ais archive gen-shards "$srcbck/shard-{0..9}.tar" || \
exit 1

Expand All @@ -36,8 +42,12 @@ num=$(ais ls $dstbck --summary --H | awk '{print $3}')
echo "Successfully resharded $srcbck => $dstbck:"
ais ls $dstbck

## cleanup
if [[ ${nocleanup} != "true" ]]; then
echo "Cleanup: deleting $srcbck and $dstbck"
ais rmb $srcbck $dstbck -y 2>/dev/null 1>&2
## cleanup: rmb buckets created during this run
if [[ ${nocleanup} != "true" && $srcexists != 0 ]]; then
echo "Deleting source: $srcbck"
ais rmb $srcbck -y 2>/dev/null 1>&2
fi
if [[ ${nocleanup} != "true" && $dstexists != 0 ]]; then
echo "Deleting destination: $dstbck"
ais rmb $dstbck -y 2>/dev/null 1>&2
fi
2 changes: 1 addition & 1 deletion api/apc/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type (
CopyBckMsg struct {
Prepend string `json:"prepend"` // destination naming, as in: dest-obj-name = Prepend + source-obj-name
Prefix string `json:"prefix"` // prefix to select matching _source_ objects or virtual directories
DryRun bool `json:"dry_run"` // traverse the source, skip writing destination
DryRun bool `json:"dry_run"` // visit all source objects, don't make any modifications
Force bool `json:"force"` // force running in presence of "limited coexistence" conflict
}
Transform struct {
Expand Down

0 comments on commit dedf578

Please sign in to comment.