From 0dd87027a822e7cc93507b250c8c8c7ba5a85ad5 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 27 Aug 2023 12:15:54 -0400 Subject: [PATCH] local generation of global IDs * revise and simplify; add unit test * part three, prev. commit: 896aca0cc686be Signed-off-by: Alex Aizman --- cmn/cos/bytepack.go | 15 ++++++--------- cmn/cos/err.go | 10 ++++++++++ cmn/cos/quantity.go | 18 +++++------------ cmn/cos/uuid.go | 21 +++++++++++++++++--- ext/dsort/request_spec_test.go | 6 +++--- mirror/put_copies.go | 6 ++---- xact/xreg/uuid.go | 25 ++++++++++++------------ xact/xs/archive.go | 8 +++----- xact/xs/tcobjs.go | 10 ++-------- xact/xs/xaction_test.go | 35 ++++++++++++++++++++++++++++++++++ 10 files changed, 97 insertions(+), 57 deletions(-) diff --git a/cmn/cos/bytepack.go b/cmn/cos/bytepack.go index 6c4d2ae66a..ea43d7e97c 100644 --- a/cmn/cos/bytepack.go +++ b/cmn/cos/bytepack.go @@ -6,7 +6,6 @@ package cos import ( "encoding/binary" - "errors" "github.com/NVIDIA/aistore/cmn/debug" ) @@ -90,8 +89,6 @@ type ( // it is possible to use int32 instead of int64 to keep the length of the data. const SizeofLen = SizeofI32 -var ErrBufferUnderrun = errors.New("buffer underrun") - // PackedStrLen returns the size occupied by a given string in the output func PackedStrLen(s string) int { return SizeofLen + len(s) @@ -118,7 +115,7 @@ func (br *ByteUnpack) Bytes() []byte { func (br *ByteUnpack) ReadByte() (byte, error) { if br.off >= len(br.b) { - return 0, ErrBufferUnderrun + return 0, errBufferUnderrun } debug.Assert(br.off < len(br.b)) b := br.b[br.off] @@ -138,7 +135,7 @@ func (br *ByteUnpack) ReadInt64() (int64, error) { func (br *ByteUnpack) ReadUint64() (uint64, error) { if len(br.b)-br.off < SizeofI64 { - return 0, ErrBufferUnderrun + return 0, errBufferUnderrun } n := binary.BigEndian.Uint64(br.b[br.off:]) br.off += SizeofI64 @@ -152,7 +149,7 @@ func (br *ByteUnpack) ReadInt16() (int16, error) { func (br *ByteUnpack) ReadUint16() (uint16, error) { if len(br.b)-br.off < SizeofI16 { - return 0, ErrBufferUnderrun + return 0, errBufferUnderrun } n := binary.BigEndian.Uint16(br.b[br.off:]) br.off += SizeofI16 @@ -166,7 +163,7 @@ func (br *ByteUnpack) ReadInt32() (int32, error) { func (br *ByteUnpack) ReadUint32() (uint32, error) { if len(br.b)-br.off < SizeofI32 { - return 0, ErrBufferUnderrun + return 0, errBufferUnderrun } n := binary.BigEndian.Uint32(br.b[br.off:]) br.off += SizeofI32 @@ -175,14 +172,14 @@ func (br *ByteUnpack) ReadUint32() (uint32, error) { func (br *ByteUnpack) ReadBytes() ([]byte, error) { if len(br.b)-br.off < SizeofLen { - return nil, ErrBufferUnderrun + return nil, errBufferUnderrun } l, err := br.ReadUint32() if err != nil { return nil, err } if len(br.b)-br.off < int(l) { - return nil, ErrBufferUnderrun + return nil, errBufferUnderrun } start := br.off br.off += int(l) diff --git a/cmn/cos/err.go b/cmn/cos/err.go index 1e8fcbe1eb..953537735a 100644 --- a/cmn/cos/err.go +++ b/cmn/cos/err.go @@ -34,6 +34,16 @@ type ( } ) +var ( + ErrQuantityUsage = errors.New("invalid quantity, format should be '81%' or '1GB'") + ErrQuantityPercent = errors.New("percent must be in the range (0, 100)") + ErrQuantityBytes = errors.New("value (bytes) must be non-negative") + + errQuantityNonNegative = errors.New("quantity should not be negative") +) + +var errBufferUnderrun = errors.New("buffer underrun") + // ErrNotFound func NewErrNotFound(format string, a ...any) *ErrNotFound { diff --git a/cmn/cos/quantity.go b/cmn/cos/quantity.go index 6b527a1819..03e44d72e3 100644 --- a/cmn/cos/quantity.go +++ b/cmn/cos/quantity.go @@ -5,7 +5,6 @@ package cos import ( - "errors" "fmt" "strconv" "strings" @@ -24,13 +23,6 @@ type ( } ) -var ( - ErrInvalidQuantityUsage = errors.New("invalid quantity, format should be '81%' or '1GB'") - errInvalidQuantityNonNegative = errors.New("quantity should not be negative") - ErrInvalidQuantityPercent = errors.New("percent must be in the range (0, 100)") - ErrInvalidQuantityBytes = errors.New("value (bytes) must be non-negative") -) - /////////////////// // ParseQuantity // /////////////////// @@ -45,27 +37,27 @@ func ParseQuantity(quantity string) (ParsedQuantity, error) { parsedQ := ParsedQuantity{} if value, err := strconv.Atoi(number); err != nil { - return parsedQ, ErrInvalidQuantityUsage + return parsedQ, ErrQuantityUsage } else if value < 0 { - return parsedQ, errInvalidQuantityNonNegative + return parsedQ, errQuantityNonNegative } else { parsedQ.Value = uint64(value) } if len(quantity) <= idx { - return parsedQ, ErrInvalidQuantityUsage + return parsedQ, ErrQuantityUsage } suffix := quantity[idx:] if suffix == "%" { parsedQ.Type = QuantityPercent if parsedQ.Value == 0 || parsedQ.Value >= 100 { - return parsedQ, ErrInvalidQuantityPercent + return parsedQ, ErrQuantityPercent } } else if value, err := ParseSize(quantity, UnitsIEC); err != nil { return parsedQ, err } else if value < 0 { - return parsedQ, ErrInvalidQuantityBytes + return parsedQ, ErrQuantityBytes } else { parsedQ.Type = QuantityBytes parsedQ.Value = uint64(value) diff --git a/cmn/cos/uuid.go b/cmn/cos/uuid.go index bacf2a388b..4214b5436b 100644 --- a/cmn/cos/uuid.go +++ b/cmn/cos/uuid.go @@ -11,7 +11,7 @@ import ( "github.com/teris-io/shortid" ) -const LenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length +const lenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length const ( // Alphabet for generating UUIDs similar to the shortid.DEFAULT_ABC @@ -40,7 +40,7 @@ func InitShortID(seed uint64) { // UUID // -// compare with xreg.GenBeUID +// compare with xreg.GenBEID func GenUUID() (uuid string) { var h, t string uuid = sid.MustGenerate() @@ -56,8 +56,23 @@ func GenUUID() (uuid string) { return h + uuid + t } +// "best-effort ID" - to independently and locally generate globally unique ID +// called by xreg.GenBEID +func GenBEID(val uint64) string { + b := make([]byte, lenShortID) + for i := 0; i < lenShortID; i++ { + if idx := int(val & letterIdxMask); idx < LenRunes { + b[i] = LetterRunes[idx] + } else { + b[i] = LetterRunes[idx-LenRunes] + } + val >>= letterIdxBits + } + return UnsafeS(b) +} + func IsValidUUID(uuid string) bool { - return len(uuid) >= LenShortID && IsAlphaNice(uuid) + return len(uuid) >= lenShortID && IsAlphaNice(uuid) } func ValidateNiceID(id string, minlen int, tag string) (err error) { diff --git a/ext/dsort/request_spec_test.go b/ext/dsort/request_spec_test.go index 8b5365c6eb..bd70641d05 100644 --- a/ext/dsort/request_spec_test.go +++ b/ext/dsort/request_spec_test.go @@ -400,7 +400,7 @@ var _ = Describe("RequestSpec", func() { } _, err := rs.parse() Expect(err).Should(HaveOccurred()) - Expect(err).To(Equal(cos.ErrInvalidQuantityUsage)) + Expect(err).To(Equal(cos.ErrQuantityUsage)) }) It("should fail due to invalid mem usage percent specified", func() { @@ -415,7 +415,7 @@ var _ = Describe("RequestSpec", func() { } _, err := rs.parse() Expect(err).Should(HaveOccurred()) - Expect(err).To(Equal(cos.ErrInvalidQuantityPercent)) + Expect(err).To(Equal(cos.ErrQuantityPercent)) }) It("should fail due to invalid mem usage bytes specified", func() { @@ -430,7 +430,7 @@ var _ = Describe("RequestSpec", func() { } _, err := rs.parse() Expect(err).Should(HaveOccurred()) - Expect(err).To(Equal(cos.ErrInvalidQuantityUsage)) + Expect(err).To(Equal(cos.ErrQuantityUsage)) }) It("should fail due to invalid extract concurrency specified", func() { diff --git a/mirror/put_copies.go b/mirror/put_copies.go index 355fc6245a..bd7672951f 100644 --- a/mirror/put_copies.go +++ b/mirror/put_copies.go @@ -22,7 +22,6 @@ import ( "github.com/NVIDIA/aistore/memsys" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" - "github.com/OneOfOne/xxhash" ) type ( @@ -76,9 +75,8 @@ func (p *putFactory) Start() error { // // target-local generation of a global UUID // - div := int64(xact.IdleDefault) - slt := xxhash.ChecksumString64S(bck.MakeUname(""), 3421170679 /*m.b per xkind*/) - r.DemandBase.Init(xreg.GenBeUID(div, int64(slt)), apc.ActPutCopies, bck, xact.IdleDefault) + div := uint64(xact.IdleDefault) + r.DemandBase.Init(xreg.GenBEID(div, p.Kind()+"|"+bck.MakeUname("")), p.Kind(), bck, xact.IdleDefault) // joggers r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{ diff --git a/xact/xreg/uuid.go b/xact/xreg/uuid.go index 4849aeede8..5a03b9a298 100644 --- a/xact/xreg/uuid.go +++ b/xact/xreg/uuid.go @@ -5,11 +5,11 @@ package xreg import ( - "math/rand" "time" "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/cmn/cos" + "github.com/OneOfOne/xxhash" ) var ( @@ -19,20 +19,21 @@ var ( // see related: cmn/cos/uuid.go -func GenBeUID(div, slt int64) (buid string) { - now := time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load() +// "best-effort ID" - to independently and locally generate globally unique xaction ID +func GenBEID(div uint64, tag string) (beid string) { + now := uint64(time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load()) + if div%2 == 0 { + div++ + } rem := now % div + val := now - rem + val ^= xxhash.ChecksumString64S(tag, val) - seed := now - rem + slt - if seed < 0 { - seed = now - rem - slt - } - rnd := rand.New(rand.NewSource(seed)) - buid = cos.RandStringWithSrc(rnd, cos.LenShortID) + beid = cos.GenBEID(val) - if xctn, err := GetXact(buid); err != nil /*unlikely*/ || xctn != nil /*idling away*/ { - // fallback - buid = cos.GenUUID() + if xctn, err := GetXact(beid); err != nil /*unlikely*/ || xctn != nil { + // idling away? fallback to common default + beid = cos.GenUUID() } return } diff --git a/xact/xs/archive.go b/xact/xs/archive.go index 0fb4f5c8d5..34cbff7de1 100644 --- a/xact/xs/archive.go +++ b/xact/xs/archive.go @@ -28,7 +28,6 @@ import ( "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" - "github.com/OneOfOne/xxhash" ) // TODO (feature): one source multiple destinations (buckets) @@ -84,14 +83,13 @@ func (p *archFactory) Start() error { // // target-local generation of a global UUID // - div := int64(xact.IdleDefault) + div := uint64(xact.IdleDefault) bckTo, ok := p.Args.Custom.(*meta.Bck) debug.Assertf(ok, "%+v", bckTo) if !ok || bckTo.IsEmpty() { bckTo = &meta.Bck{Name: "any"} // local usage to gen uuid, see r.bckTo below } - slt := xxhash.ChecksumString64S(p.Bck.MakeUname("")+"|"+bckTo.MakeUname(""), 8214808651 /*m.b per xkind*/) - p.Args.UUID = xreg.GenBeUID(div, int64(slt)) + p.Args.UUID = xreg.GenBEID(div, p.kind+"|"+p.Bck.MakeUname("")+"|"+bckTo.MakeUname("")) // // new x-archive @@ -100,7 +98,7 @@ func (p *archFactory) Start() error { r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, workCh: workCh} r.pending.m = make(map[string]*archwi, maxNumInParallel) p.xctn = r - r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, apc.ActArchive, p.Bck /*from*/, xact.IdleDefault) + r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, p.kind, p.Bck /*from*/, xact.IdleDefault) bmd := p.Args.T.Bowner().Get() trname := fmt.Sprintf("arch-%s%s-%s-%d", p.Bck.Provider, p.Bck.Ns, p.Bck.Name, bmd.Version) // NOTE: (bmd.Version) diff --git a/xact/xs/tcobjs.go b/xact/xs/tcobjs.go index e46b5bf7e4..3d67743d6f 100644 --- a/xact/xs/tcobjs.go +++ b/xact/xs/tcobjs.go @@ -24,7 +24,6 @@ import ( "github.com/NVIDIA/aistore/transport" "github.com/NVIDIA/aistore/xact" "github.com/NVIDIA/aistore/xact/xreg" - "github.com/OneOfOne/xxhash" ) type ( @@ -70,13 +69,8 @@ func (p *tcoFactory) Start() error { // // target-local generation of a global UUID // - div := int64(xact.IdleDefault) - sed := uint64(3282306647) - if p.kind == apc.ActETLObjects { - sed = 5058223172 - } - slt := xxhash.ChecksumString64S(p.args.BckFrom.MakeUname("")+"|"+p.args.BckTo.MakeUname(""), sed) - p.Args.UUID = xreg.GenBeUID(div, int64(slt)) + div := uint64(xact.IdleDefault) + p.Args.UUID = xreg.GenBEID(div, p.kind+"|"+p.args.BckFrom.MakeUname("")+"|"+p.args.BckTo.MakeUname("")) // new x-tco workCh := make(chan *cmn.TCObjsMsg, maxNumInParallel) diff --git a/xact/xs/xaction_test.go b/xact/xs/xaction_test.go index f34be68a90..0c8a937811 100644 --- a/xact/xs/xaction_test.go +++ b/xact/xs/xaction_test.go @@ -280,3 +280,38 @@ func TestXactionQueryFinished(t *testing.T) { f(t, test) } } + +func TestBeid(t *testing.T) { + num := 10000 + if testing.Short() { + num = 10 + } + results := make(map[string]uint64, num) + compare := make(map[uint64]string, num) + tags := []string{"tag1", "tag2"} + for i := 0; i < num; i++ { + val := uint64(time.Now().UnixNano()) + beid := xreg.GenBEID(val, tags[i%2]) + compare[val] = beid + if _, ok := results[beid]; ok { + t.Fatalf("%s duplicated", beid) + } + results[beid] = val + time.Sleep(time.Millisecond) + } + if len(compare) != len(results) { + t.Fatalf("lengths differ %d != %d", len(compare), len(results)) + } + // repro + for val, beid := range compare { + b1 := xreg.GenBEID(val, tags[0]) + b2 := xreg.GenBEID(val, tags[1]) + if b1 == beid && b2 != beid { + continue + } + if b2 == beid && b1 != beid { + continue + } + t.Fatalf("failed to repro for %x: %s, %s, %s", val, beid, b1, b2) + } +}