Skip to content

Commit

Permalink
local generation of global IDs; archive, etl, mirror
Browse files Browse the repository at this point in the history
* part two, prev. commit c0698c2
* add prime-time and xact/xreg/uuid.go
* cover multi-object archive, multi-object copy, and etl
* PUT => mirrored bucket won't be dropping pending work anymore
  - via x-put-copies
* tests: copy-bucket _may_ return multiple UUIDs

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 27, 2023
1 parent c0698c2 commit 896aca0
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 58 deletions.
6 changes: 4 additions & 2 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ const (
fmtOutside = "%s is present (vs requested 'flt-outside'(%d))"
)

// intra-cluster JSON control
// intra-cluster control messages
type (
// cluster-wide control information - replicated, versioned, and synchronized
// usages: primary election, join-cluster
// usage: elect new primary, join cluster, ...
cluMeta struct {
Smap *smapX `json:"smap"`
BMD *bucketMD `json:"bmd"`
RMD *rebMD `json:"rmd"`
EtlMD *etlMD `json:"etlMD"`
Config *globalConfig `json:"config"`
SI *meta.Snode `json:"si"`
PrimeTime int64 `json:"prime_time"`
VoteInProgress bool `json:"voting"`
// target only
RebInterrupted bool `json:"reb_interrupted"`
Expand Down Expand Up @@ -189,6 +190,7 @@ type (
skipConfig bool
skipEtlMD bool
fillRebMarker bool
skipPrimeTime bool
}

getMaxCii struct {
Expand Down
9 changes: 7 additions & 2 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) {
}
}
// don't send Smap when it is undergoing changes (and is about to get metasync-ed)
smap := h.owner.smap.get()
if !opts.skipSmap {
cm.Smap = h.owner.smap.get()
cm.Smap = smap
}
if !opts.skipBMD {
cm.BMD = h.owner.bmd.get()
Expand All @@ -151,6 +152,9 @@ func (h *htrun) cluMeta(opts cmetaFillOpt) (*cluMeta, error) {
rebMarked := opts.htext.rebMarked()
cm.RebInterrupted, cm.Restarted = rebMarked.Interrupted, rebMarked.Restarted
}
if !opts.skipPrimeTime && smap.IsPrimary(h.si) {
cm.PrimeTime = time.Now().UnixNano()
}
return cm, nil
}

Expand Down Expand Up @@ -1048,7 +1052,7 @@ func (h *htrun) httpdaeget(w http.ResponseWriter, r *http.Request, query url.Val
body = h.owner.bmd.get()
case apc.WhatSmapVote:
var err error
body, err = h.cluMeta(cmetaFillOpt{htext: htext})
body, err = h.cluMeta(cmetaFillOpt{htext: htext, skipPrimeTime: true})
if err != nil {
nlog.Errorf("failed to fetch cluster config, err: %v", err)
}
Expand Down Expand Up @@ -1794,6 +1798,7 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, q url.Val
skipConfig: keepalive,
skipEtlMD: keepalive,
fillRebMarker: !keepalive,
skipPrimeTime: true,
}
)
cm, err := h.cluMeta(opts)
Expand Down
6 changes: 5 additions & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ func (p *proxy) recvCluMeta(cm *cluMeta, action, caller string) error {
msg = p.newAmsgStr(action, cm.BMD)
errs []error
)
if cm.PrimeTime != 0 {
xreg.PrimeTime.Store(cm.PrimeTime)
xreg.MyTime.Store(time.Now().UnixNano())
}
// Config
debug.Assert(cm.Config != nil)
if err := p.receiveConfig(cm.Config, msg, nil, caller); err != nil {
Expand Down Expand Up @@ -2759,7 +2763,7 @@ func (p *proxy) daeSetPrimary(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
force := cos.IsParseBool(query.Get(apc.QparamForce))

// forceful primary change
// force primary change
if force && apiItems[0] == apc.Proxy {
if smap := p.owner.smap.get(); !smap.isPrimary(p.si) {
p.writeErr(w, r, newErrNotPrimary(p.si, smap))
Expand Down
2 changes: 1 addition & 1 deletion ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.S
args := allocBcArgs()
args.req = cmn.HreqArgs{Method: http.MethodPut, Path: urlPath, Query: q}

cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true})
cluMeta, errM := p.cluMeta(cmetaFillOpt{skipSmap: true, skipPrimeTime: true})
if errM != nil {
p.writeErr(w, r, errM)
return
Expand Down
15 changes: 12 additions & 3 deletions ais/test/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,8 +2444,8 @@ func TestCopyBucket(t *testing.T) {
panic(bckTest)
}

xactIDs := make([]string, len(dstms))
for idx, dstm := range dstms {
xactIDs := make([]string, 0, len(dstms))
for _, dstm := range dstms {
var (
uuid string
err error
Expand All @@ -2457,7 +2457,15 @@ func TestCopyBucket(t *testing.T) {
} else {
uuid, err = api.CopyBucket(baseParams, srcm.bck, dstm.bck, cmsg)
}
xactIDs[idx] = uuid
if uuids := strings.Split(uuid, ","); len(uuids) > 1 {
for _, u := range uuids {
tassert.Fatalf(t, xact.IsValidUUID(u), "invalid UUID %q", u)
}
xactIDs = append(xactIDs, uuids...)
} else {
tassert.Fatalf(t, xact.IsValidUUID(uuid), "invalid UUID %q", uuid)
xactIDs = append(xactIDs, uuid)
}
tassert.CheckFatal(t, err)
}

Expand Down Expand Up @@ -2526,6 +2534,7 @@ func TestCopyBucket(t *testing.T) {
if test.dstRemote {
msg.Flags = apc.LsObjCached
}

dstBckList, err := api.ListObjects(baseParams, dstm.bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
if len(dstBckList.Entries) != expectedObjCount {
Expand Down
5 changes: 5 additions & 0 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (t *target) recvCluMetaBytes(action string, body []byte, caller string) err
if err := jsoniter.Unmarshal(body, &cm); err != nil {
return fmt.Errorf(cmn.FmtErrUnmarshal, t, "clumeta", cos.BHead(body), err)
}

debug.Assert(cm.PrimeTime != 0, t.String()) // expecting
xreg.PrimeTime.Store(cm.PrimeTime)
xreg.MyTime.Store(time.Now().UnixNano())

msg := t.newAmsgStr(action, cm.BMD)

// Config
Expand Down
6 changes: 2 additions & 4 deletions ais/tgttxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,13 @@ func (t *target) tcobjs(c *txnServerCtx, msg *cmn.TCObjsMsg, dp cluster.DP) (str
}
// begin
custom := &xreg.TCObjsArgs{BckFrom: bckFrom, BckTo: bckTo, DP: dp}
rns := xreg.RenewTCObjs(t, c.uuid, c.msg.Action /*kind*/, custom)
rns := xreg.RenewTCObjs(t, c.msg.Action /*kind*/, custom)
if rns.Err != nil {
nlog.Errorf("%s: %q %+v %v", t, c.uuid, c.msg, rns.Err)
return xid, rns.Err
}
xctn := rns.Entry.Get()
xid = xctn.ID()
debug.Assert((!rns.IsRunning() && xid == c.uuid) || (rns.IsRunning() && xid == rns.UUID))

xtco := xctn.(*xs.XactTCObjs)
msg.TxnUUID = c.uuid
Expand Down Expand Up @@ -789,14 +788,13 @@ func (t *target) createArchMultiObj(c *txnServerCtx) (string /*xaction uuid*/, e
return xid, cs.Err
}

rns := xreg.RenewPutArchive(c.uuid, t, bckFrom, bckTo)
rns := xreg.RenewPutArchive(t, bckFrom, bckTo)
if rns.Err != nil {
nlog.Errorf("%s: %q %+v %v", t, c.uuid, archMsg, rns.Err)
return xid, rns.Err
}
xctn := rns.Entry.Get()
xid = xctn.ID()
debug.Assert((!rns.IsRunning() && xid == c.uuid) || (rns.IsRunning() && xid == rns.UUID))

xarch := xctn.(*xs.XactArch)
// finalize the message and begin local transaction
Expand Down
21 changes: 4 additions & 17 deletions cmn/cos/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ package cos

import (
"fmt"
"math/rand"

"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/teris-io/shortid"
)

const LenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length

const (
// Alphabet for generating UUIDs similar to the shortid.DEFAULT_ABC
// NOTE: len(uuidABC) > 0x3f - see GenTie()
uuidABC = "-5nZJDft6LuzsjGNpPwY7rQa39vehq4i1cV2FROo8yHSlC0BUEdWbIxMmTgKXAk_"

lenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length
lenDaemonID = 8 // via cryptographic rand
lenTooLongID = 32 // suspiciously long
)
Expand All @@ -40,6 +40,7 @@ func InitShortID(seed uint64) {
// UUID
//

// compare with xreg.GenBeUID
func GenUUID() (uuid string) {
var h, t string
uuid = sid.MustGenerate()
Expand All @@ -56,7 +57,7 @@ func GenUUID() (uuid string) {
}

func IsValidUUID(uuid string) bool {
return len(uuid) >= lenShortID && IsAlphaNice(uuid)
return len(uuid) >= LenShortID && IsAlphaNice(uuid)
}

func ValidateNiceID(id string, minlen int, tag string) (err error) {
Expand All @@ -72,20 +73,6 @@ func ValidateNiceID(id string, minlen int, tag string) (err error) {
return
}

// BeUID
func GenBeUID(div, val, slt int64) string {
rem := val % div
if rem > div>>1+div>>2 {
rem -= div
}
seed := val - rem + slt
if seed < 0 {
seed = val - rem - slt
}
rnd := rand.New(rand.NewSource(seed))
return RandStringWithSrc(rnd, lenShortID)
}

//
// Daemon ID
//
Expand Down
39 changes: 31 additions & 8 deletions mirror/put_mirror.go → mirror/put_copies.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/fs/mpather"
Expand Down Expand Up @@ -72,24 +73,22 @@ func (p *putFactory) Start() error {
}
r := &XactPut{mirror: *mirror, workCh: make(chan cluster.LIF, mirror.Burst)}

// target-local best-effort to generate global UUID
//
// target-local generation of a global UUID
//
div := int64(xact.IdleDefault)
now := time.Now().UnixNano()
slt := xxhash.ChecksumString64S(bck.MakeUname(""), 3421170679 /*m.b per xkind*/)
uid := cos.GenBeUID(div, now, int64(slt))
if xctn, err := xreg.GetXact(uid); err != nil /*unlikely*/ || xctn != nil /* idling away? */ {
uid = cos.GenUUID()
}
r.DemandBase.Init(xreg.GenBeUID(div, int64(slt)), apc.ActPutCopies, bck, xact.IdleDefault)

r.DemandBase.Init(uid, apc.ActPutCopies, bck, xact.IdleDefault)
// joggers
r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{
Callback: r.do,
Slab: slab,
QueueSize: mirror.Burst,
})
p.xctn = r

// Run
// run
go r.Run(nil)
return nil
}
Expand Down Expand Up @@ -137,6 +136,7 @@ loop:
for {
select {
case <-r.IdleTimer():
r.waitPending()
break loop
case <-r.ChanAbort():
break loop
Expand All @@ -160,6 +160,29 @@ func (r *XactPut) Repl(lom *cluster.LOM) {
}
}

func (r *XactPut) waitPending() {
const minsleep, longtime = 4 * time.Second, 30 * time.Second
var (
started int64
cnt, iniCnt int
sleep = cos.MaxDuration(cmn.Timeout.MaxKeepalive(), minsleep)
)
if cnt = len(r.workCh); cnt == 0 {
return
}
started, iniCnt = mono.NanoTime(), cnt
// keep sleeping until the very end
for cnt > 0 {
r.IncPending()
time.Sleep(sleep)
r.DecPending()
cnt = len(r.workCh)
}
if d := mono.Since(started); d > longtime {
nlog.Infof("%s: took a while to finish %d pending copies: %v", r, iniCnt, d)
}
}

func (r *XactPut) stop() (err error) {
r.DemandBase.Stop()
n := r.workers.Stop()
Expand Down
9 changes: 0 additions & 9 deletions xact/xreg/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,6 @@ func RenewTCB(t cluster.Target, uuid, kind string, custom *TCBArgs) RenewRes {
)
}

func RenewTCObjs(t cluster.Target, uuid, kind string, custom *TCObjsArgs) RenewRes {
return RenewBucketXact(
kind,
custom.BckFrom,
Args{T: t, Custom: custom, UUID: uuid},
custom.BckFrom, custom.BckTo, // (ditto)
)
}

func RenewDsort(id string, custom *DsortArgs) RenewRes {
return RenewBucketXact(
apc.ActDsort,
Expand Down
14 changes: 7 additions & 7 deletions xact/xreg/multiobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@ import (
"github.com/NVIDIA/aistore/cluster/meta"
)

func RenewPutArchive(uuid string, t cluster.Target, bckFrom, bckTo *meta.Bck) RenewRes {
return RenewBucketXact(
apc.ActArchive,
bckFrom,
Args{T: t, UUID: uuid},
bckFrom, bckTo,
)
func RenewPutArchive(t cluster.Target, bckFrom, bckTo *meta.Bck) RenewRes {
return RenewBucketXact(apc.ActArchive, bckFrom, Args{T: t, Custom: bckTo}, bckFrom, bckTo)
}

func RenewEvictDelete(uuid string, t cluster.Target, kind string, bck *meta.Bck, msg *apc.ListRange) RenewRes {
Expand All @@ -26,3 +21,8 @@ func RenewEvictDelete(uuid string, t cluster.Target, kind string, bck *meta.Bck,
func RenewPrefetch(uuid string, t cluster.Target, bck *meta.Bck, msg *apc.ListRange) RenewRes {
return RenewBucketXact(apc.ActPrefetchObjects, bck, Args{T: t, UUID: uuid, Custom: msg})
}

// kind: (apc.ActCopyObjects | apc.ActETLObjects)
func RenewTCObjs(t cluster.Target, kind string, custom *TCObjsArgs) RenewRes {
return RenewBucketXact(kind, custom.BckFrom, Args{T: t, Custom: custom}, custom.BckFrom, custom.BckTo)
}
38 changes: 38 additions & 0 deletions xact/xreg/uuid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
/*
* Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
*/
package xreg

import (
"math/rand"
"time"

"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
)

var (
PrimeTime atomic.Int64
MyTime atomic.Int64
)

// see related: cmn/cos/uuid.go

func GenBeUID(div, slt int64) (buid string) {
now := time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load()
rem := now % div

seed := now - rem + slt
if seed < 0 {
seed = now - rem - slt
}
rnd := rand.New(rand.NewSource(seed))
buid = cos.RandStringWithSrc(rnd, cos.LenShortID)

if xctn, err := GetXact(buid); err != nil /*unlikely*/ || xctn != nil /*idling away*/ {
// fallback
buid = cos.GenUUID()
}
return
}
Loading

0 comments on commit 896aca0

Please sign in to comment.