diff --git a/ais/htcommon.go b/ais/htcommon.go index de15276934..0f296ff02e 100644 --- a/ais/htcommon.go +++ b/ais/htcommon.go @@ -50,10 +50,10 @@ const ( fmtOutside = "%s is present (vs requested 'flt-outside'(%d))" ) -// intra-cluster JSON control +// intra-cluster control messages type ( // cluster-wide control information - replicated, versioned, and synchronized - // usages: primary election, join-cluster + // usage: elect new primary, join cluster, ... cluMeta struct { Smap *smapX `json:"smap"` BMD *bucketMD `json:"bmd"` @@ -61,6 +61,7 @@ type ( EtlMD *etlMD `json:"etlMD"` Config *globalConfig `json:"config"` SI *meta.Snode `json:"si"` + PrimeTime int64 `json:"prime_time"` VoteInProgress bool `json:"voting"` // target only RebInterrupted bool `json:"reb_interrupted"` @@ -189,6 +190,7 @@ type ( skipConfig bool skipEtlMD bool fillRebMarker bool + skipPrimeTime bool } getMaxCii struct { diff --git a/ais/htrun.go b/ais/htrun.go index cadfaa1088..21f5f99d84 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -135,8 +135,9 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) { } } // don't send Smap when it is undergoing changes (and is about to get metasync-ed) + smap := h.owner.smap.get() if !opts.skipSmap { - cm.Smap = h.owner.smap.get() + cm.Smap = smap } if !opts.skipBMD { cm.BMD = h.owner.bmd.get() @@ -151,6 +152,9 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) { rebMarked := opts.htext.rebMarked() cm.RebInterrupted, cm.Restarted = rebMarked.Interrupted, rebMarked.Restarted } + if !opts.skipPrimeTime && smap.IsPrimary(h.si) { + cm.PrimeTime = time.Now().UnixNano() + } return cm, nil } @@ -1048,7 +1052,7 @@ func (h *htrun) httpdaeget(w http.ResponseWriter, r *http.Request, query url.Val body = h.owner.bmd.get() case apc.WhatSmapVote: var err error - body, err = h.cluMeta(cmetaFillOpt{htext: htext}) + body, err = h.cluMeta(cmetaFillOpt{htext: htext, skipPrimeTime: true}) if err != nil { nlog.Errorf("failed to fetch cluster config, err: %v", err) } @@ -1794,6 +1798,7 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val skipConfig: keepalive, skipEtlMD: keepalive, fillRebMarker: !keepalive, + skipPrimeTime: true, } ) cm, err := h.cluMeta(opts) diff --git a/ais/proxy.go b/ais/proxy.go index 147982225f..a71f23208a 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -287,6 +287,10 @@ func (p *proxy) recvCluMeta(cm *cluMeta, action, caller string) error { msg = p.newAmsgStr(action, cm.BMD) errs []error ) + if cm.PrimeTime != 0 { + xreg.PrimeTime.Store(cm.PrimeTime) + xreg.MyTime.Store(time.Now().UnixNano()) + } // Config debug.Assert(cm.Config != nil) if err := p.receiveConfig(cm.Config, msg, nil, caller); err != nil { @@ -2759,7 +2763,7 @@ func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() force := cos.IsParseBool(query.Get(apc.QparamForce)) - // forceful primary change + // force primary change if force && apiItems[0] == apc.Proxy { if smap := p.owner.smap.get(); !smap.isPrimary(p.si) { p.writeErr(w, r, newErrNotPrimary(p.si, smap)) diff --git a/ais/prxclu.go b/ais/prxclu.go index 880eea01c1..54208643ec 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -1735,7 +1735,7 @@ func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.S args := allocBcArgs() args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q} - cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true}) + cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true, skipPrimeTime: true}) if errM != nil { p.writeErr(w, r, errM) return diff --git a/ais/test/bucket_test.go b/ais/test/bucket_test.go index f2431e60fa..01569d12dd 100644 --- a/ais/test/bucket_test.go +++ b/ais/test/bucket_test.go @@ -2444,8 +2444,8 @@ func TestCopyBucket(t *testing.T) { panic(bckTest) } - xactIDs := make([]string, len(dstms)) - for idx, dstm := range dstms { + xactIDs := make([]string, 0, len(dstms)) + for _, dstm := range dstms { var ( uuid string err error @@ -2457,7 +2457,15 @@ func TestCopyBucket(t *testing.T) { } else { uuid, err = api.CopyBucket(baseParams, srcm.bck, dstm.bck, cmsg) } - xactIDs[idx] = uuid + if uuids := strings.Split(uuid, ","); len(uuids) > 1 { + for _, u := range uuids { + tassert.Fatalf(t, xact.IsValidUUID(u), "invalid UUID %q", u) + } + xactIDs = append(xactIDs, uuids...) + } else { + tassert.Fatalf(t, xact.IsValidUUID(uuid), "invalid UUID %q", uuid) + xactIDs = append(xactIDs, uuid) + } tassert.CheckFatal(t, err) } @@ -2526,6 +2534,7 @@ func TestCopyBucket(t *testing.T) { if test.dstRemote { msg.Flags = apc.LsObjCached } + dstBckList, err := api.ListObjects(baseParams, dstm.bck, msg, api.ListArgs{}) tassert.CheckFatal(t, err) if len(dstBckList.Entries) != expectedObjCount { diff --git a/ais/tgtcp.go b/ais/tgtcp.go index fd073bc52d..27b4c54d79 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -68,6 +68,11 @@ func (t *target) recvCluMetaBytes(action string, body []byte, caller string) err if err := jsoniter.Unmarshal(body, &cm); err != nil { return fmt.Errorf(cmn.FmtErrUnmarshal, t, "clumeta", cos.BHead(body), err) } + + debug.Assert(cm.PrimeTime != 0, t.String()) // expecting + xreg.PrimeTime.Store(cm.PrimeTime) + xreg.MyTime.Store(time.Now().UnixNano()) + msg := t.newAmsgStr(action, cm.BMD) // Config diff --git a/ais/tgttxn.go b/ais/tgttxn.go index 3cced0b610..226c37761c 100644 --- a/ais/tgttxn.go +++ b/ais/tgttxn.go @@ -641,14 +641,13 @@ func (t *target) tcobjs(c *txnServerCtx, msg *cmn.TCObjsMsg, dp cluster.DP) (str } // begin custom := &xreg.TCObjsArgs{BckFrom: bckFrom, BckTo: bckTo, DP: dp} - rns := xreg.RenewTCObjs(t, c.uuid, c.msg.Action /*kind*/, custom) + rns := xreg.RenewTCObjs(t, c.msg.Action /*kind*/, custom) if rns.Err != nil { nlog.Errorf("%s: %q %+v %v", t, c.uuid, c.msg, rns.Err) return xid, rns.Err } xctn := rns.Entry.Get() xid = xctn.ID() - debug.Assert((!rns.IsRunning() && xid == c.uuid) || (rns.IsRunning() && xid == rns.UUID)) xtco := xctn.(*xs.XactTCObjs) msg.TxnUUID = c.uuid @@ -789,14 +788,13 @@ func (t *target) createArchMultiObj(c *txnServerCtx) (string /*xaction uuid*/, e return xid, cs.Err } - rns := xreg.RenewPutArchive(c.uuid, t, bckFrom, bckTo) + rns := xreg.RenewPutArchive(t, bckFrom, bckTo) if rns.Err != nil { nlog.Errorf("%s: %q %+v %v", t, c.uuid, archMsg, rns.Err) return xid, rns.Err } xctn := rns.Entry.Get() xid = xctn.ID() - debug.Assert((!rns.IsRunning() && xid == c.uuid) || (rns.IsRunning() && xid == rns.UUID)) xarch := xctn.(*xs.XactArch) // finalize the message and begin local transaction diff --git a/cmn/cos/uuid.go b/cmn/cos/uuid.go index 045f919600..bacf2a388b 100644 --- a/cmn/cos/uuid.go +++ b/cmn/cos/uuid.go @@ -6,18 +6,18 @@ package cos import ( "fmt" - "math/rand" "github.com/NVIDIA/aistore/cmn/atomic" "github.com/teris-io/shortid" ) +const LenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length + const ( // Alphabet for generating UUIDs similar to the shortid.DEFAULT_ABC // NOTE: len(uuidABC) > 0x3f - see GenTie() uuidABC = "-5nZJDft6LuzsjGNpPwY7rQa39vehq4i1cV2FROo8yHSlC0BUEdWbIxMmTgKXAk_" - lenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length lenDaemonID = 8 // via cryptographic rand lenTooLongID = 32 // suspiciously long ) @@ -40,6 +40,7 @@ func InitShortID(seed uint64) { // UUID // +// compare with xreg.GenBeUID func GenUUID() (uuid string) { var h, t string uuid = sid.MustGenerate() @@ -56,7 +57,7 @@ func GenUUID() (uuid string) { } func IsValidUUID(uuid string) bool { - return len(uuid) >= lenShortID && IsAlphaNice(uuid) + return len(uuid) >= LenShortID && IsAlphaNice(uuid) } func ValidateNiceID(id string, minlen int, tag string) (err error) { @@ -72,20 +73,6 @@ func ValidateNiceID(id string, minlen int, tag string) (err error) { return } -// BeUID -func GenBeUID(div, val, slt int64) string { - rem := val % div - if rem > div>>1+div>>2 { - rem -= div - } - seed := val - rem + slt - if seed < 0 { - seed = val - rem - slt - } - rnd := rand.New(rand.NewSource(seed)) - return RandStringWithSrc(rnd, lenShortID) -} - // // Daemon ID // diff --git a/mirror/put_mirror.go b/mirror/put_copies.go similarity index 83% rename from mirror/put_mirror.go rename to mirror/put_copies.go index c62b03d2d3..355fc6245a 100644 --- a/mirror/put_mirror.go +++ b/mirror/put_copies.go @@ -15,6 +15,7 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/fs" "github.com/NVIDIA/aistore/fs/mpather" @@ -72,16 +73,14 @@ func (p *putFactory) Start() error { } r := &XactPut{mirror: *mirror, workCh: make(chan cluster.LIF, mirror.Burst)} - // target-local best-effort to generate global UUID + // + // target-local generation of a global UUID + // div := int64(xact.IdleDefault) - now := time.Now().UnixNano() slt := xxhash.ChecksumString64S(bck.MakeUname(""), 3421170679 /*m.b per xkind*/) - uid := cos.GenBeUID(div, now, int64(slt)) - if xctn, err := xreg.GetXact(uid); err != nil /*unlikely*/ || xctn != nil /* idling away? */ { - uid = cos.GenUUID() - } + r.DemandBase.Init(xreg.GenBeUID(div, int64(slt)), apc.ActPutCopies, bck, xact.IdleDefault) - r.DemandBase.Init(uid, apc.ActPutCopies, bck, xact.IdleDefault) + // joggers r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{ Callback: r.do, Slab: slab, @@ -89,7 +88,7 @@ func (p *putFactory) Start() error { }) p.xctn = r - // Run + // run go r.Run(nil) return nil } @@ -137,6 +136,7 @@ loop: for { select { case <-r.IdleTimer(): + r.waitPending() break loop case <-r.ChanAbort(): break loop @@ -160,6 +160,29 @@ func (r *XactPut) Repl(lom *cluster.LOM) { } } +func (r *XactPut) waitPending() { + const minsleep, longtime = 4 * time.Second, 30 * time.Second + var ( + started int64 + cnt, iniCnt int + sleep = cos.MaxDuration(cmn.Timeout.MaxKeepalive(), minsleep) + ) + if cnt = len(r.workCh); cnt == 0 { + return + } + started, iniCnt = mono.NanoTime(), cnt + // keep sleeping until the very end + for cnt > 0 { + r.IncPending() + time.Sleep(sleep) + r.DecPending() + cnt = len(r.workCh) + } + if d := mono.Since(started); d > longtime { + nlog.Infof("%s: took a while to finish %d pending copies: %v", r, iniCnt, d) + } +} + func (r *XactPut) stop() (err error) { r.DemandBase.Stop() n := r.workers.Stop() diff --git a/xact/xreg/bucket.go b/xact/xreg/bucket.go index 665d1f6f1d..ae7589b901 100644 --- a/xact/xreg/bucket.go +++ b/xact/xreg/bucket.go @@ -123,15 +123,6 @@ func RenewTCB(t cluster.Target, uuid, kind string, custom *TCBArgs) RenewRes { ) } -func RenewTCObjs(t cluster.Target, uuid, kind string, custom *TCObjsArgs) RenewRes { - return RenewBucketXact( - kind, - custom.BckFrom, - Args{T: t, Custom: custom, UUID: uuid}, - custom.BckFrom, custom.BckTo, // (ditto) - ) -} - func RenewDsort(id string, custom *DsortArgs) RenewRes { return RenewBucketXact( apc.ActDsort, diff --git a/xact/xreg/multiobj.go b/xact/xreg/multiobj.go index 12b82cc1b6..a67305ba7c 100644 --- a/xact/xreg/multiobj.go +++ b/xact/xreg/multiobj.go @@ -10,13 +10,8 @@ import ( "github.com/NVIDIA/aistore/cluster/meta" ) -func RenewPutArchive(uuid string, t cluster.Target, bckFrom, bckTo *meta.Bck) RenewRes { - return RenewBucketXact( - apc.ActArchive, - bckFrom, - Args{T: t, UUID: uuid}, - bckFrom, bckTo, - ) +func RenewPutArchive(t cluster.Target, bckFrom, bckTo *meta.Bck) RenewRes { + return RenewBucketXact(apc.ActArchive, bckFrom, Args{T: t, Custom: bckTo}, bckFrom, bckTo) } func RenewEvictDelete(uuid string, t cluster.Target, kind string, bck *meta.Bck, msg *apc.ListRange) RenewRes { @@ -26,3 +21,8 @@ func RenewEvictDelete(uuid string, t cluster.Target, kind string, bck *meta.Bck, func RenewPrefetch(uuid string, t cluster.Target, bck *meta.Bck, msg *apc.ListRange) RenewRes { return RenewBucketXact(apc.ActPrefetchObjects, bck, Args{T: t, UUID: uuid, Custom: msg}) } + +// kind: (apc.ActCopyObjects | apc.ActETLObjects) +func RenewTCObjs(t cluster.Target, kind string, custom *TCObjsArgs) RenewRes { + return RenewBucketXact(kind, custom.BckFrom, Args{T: t, Custom: custom}, custom.BckFrom, custom.BckTo) +} diff --git a/xact/xreg/uuid.go b/xact/xreg/uuid.go new file mode 100644 index 0000000000..4849aeede8 --- /dev/null +++ b/xact/xreg/uuid.go @@ -0,0 +1,38 @@ +// Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions). +/* + * Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. + */ +package xreg + +import ( + "math/rand" + "time" + + "github.com/NVIDIA/aistore/cmn/atomic" + "github.com/NVIDIA/aistore/cmn/cos" +) + +var ( + PrimeTime atomic.Int64 + MyTime atomic.Int64 +) + +// see related: cmn/cos/uuid.go + +func GenBeUID(div, slt int64) (buid string) { + now := time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load() + rem := now % div + + seed := now - rem + slt + if seed < 0 { + seed = now - rem - slt + } + rnd := rand.New(rand.NewSource(seed)) + buid = cos.RandStringWithSrc(rnd, cos.LenShortID) + + if xctn, err := GetXact(buid); err != nil /*unlikely*/ || xctn != nil /*idling away*/ { + // fallback + buid = cos.GenUUID() + } + return +} diff --git a/xact/xs/archive.go b/xact/xs/archive.go index cbf2a510ac..0fb4f5c8d5 100644 --- a/xact/xs/archive.go +++ b/xact/xs/archive.go @@ -28,6 +28,7 @@ import ( "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" + "github.com/OneOfOne/xxhash" ) // TODO (feature): one source multiple destinations (buckets) @@ -80,11 +81,26 @@ func (*archFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable { } func (p *archFactory) Start() error { + // + // target-local generation of a global UUID + // + div := int64(xact.IdleDefault) + bckTo, ok := p.Args.Custom.(*meta.Bck) + debug.Assertf(ok, "%+v", bckTo) + if !ok || bckTo.IsEmpty() { + bckTo = &meta.Bck{Name: "any"} // local usage to gen uuid, see r.bckTo below + } + slt := xxhash.ChecksumString64S(p.Bck.MakeUname("")+"|"+bckTo.MakeUname(""), 8214808651 /*m.b per xkind*/) + p.Args.UUID = xreg.GenBeUID(div, int64(slt)) + + // + // new x-archive + // workCh := make(chan *cmn.ArchiveBckMsg, maxNumInParallel) r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, workCh: workCh} r.pending.m = make(map[string]*archwi, maxNumInParallel) p.xctn = r - r.DemandBase.Init(p.UUID(), apc.ActArchive, p.Bck /*from*/, 0 /*use default*/) + r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, apc.ActArchive, p.Bck /*from*/, xact.IdleDefault) bmd := p.Args.T.Bowner().Get() trname := fmt.Sprintf("arch-%s%s-%s-%d", p.Bck.Provider, p.Bck.Ns, p.Bck.Name, bmd.Version) // NOTE: (bmd.Version) @@ -163,7 +179,7 @@ func (r *XactArch) Begin(msg *cmn.ArchiveBckMsg, archlom *cluster.LOM) (err erro } } - // most of the time there'll be a single dst bucket for the lifetime + // most of the time there'll be a single destination bucket for the lifetime if r.bckTo == nil { if from := r.Bck().Bucket(); !from.Equal(&wi.msg.ToBck) { r.bckTo = meta.CloneBck(&wi.msg.ToBck) diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index 5237f02b7f..e46b5bf7e4 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -24,6 +24,7 @@ import ( "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" + "github.com/OneOfOne/xxhash" ) type ( @@ -66,13 +67,28 @@ func (p *tcoFactory) New(args xreg.Args, bckFrom *meta.Bck) xreg.Renewable { } func (p *tcoFactory) Start() error { - var sizePDU int32 + // + // target-local generation of a global UUID + // + div := int64(xact.IdleDefault) + sed := uint64(3282306647) + if p.kind == apc.ActETLObjects { + sed = 5058223172 + } + slt := xxhash.ChecksumString64S(p.args.BckFrom.MakeUname("")+"|"+p.args.BckTo.MakeUname(""), sed) + p.Args.UUID = xreg.GenBeUID(div, int64(slt)) + + // new x-tco workCh := make(chan *cmn.TCObjsMsg, maxNumInParallel) r := &XactTCObjs{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, args: p.args, workCh: workCh} r.pending.m = make(map[string]*tcowi, maxNumInParallel) p.xctn = r - r.DemandBase.Init(p.UUID(), p.Kind(), p.Bck, 0 /*use default*/) + r.DemandBase.Init(p.UUID(), p.Kind(), p.Bck, xact.IdleDefault) + + var sizePDU int32 if p.kind == apc.ActETLObjects { + // unlike apc.ActCopyObjects (where we know the size) + // apc.ActETLObjects (transform) generates arbitrary sizes where we use PDU-based transport sizePDU = memsys.DefaultBufSize } trname := "tco-" + p.UUID()