Skip to content

Commit

Permalink
local generation of global IDs
Browse files Browse the repository at this point in the history
* revise and simplify; add unit test
* part three, prev. commit: 896aca0

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 27, 2023
1 parent 896aca0 commit 0dd8702
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 57 deletions.
15 changes: 6 additions & 9 deletions cmn/cos/bytepack.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package cos

import (
"encoding/binary"
"errors"

"github.com/NVIDIA/aistore/cmn/debug"
)
Expand Down Expand Up @@ -90,8 +89,6 @@ type (
// it is possible to use int32 instead of int64 to keep the length of the data.
const SizeofLen = SizeofI32

var ErrBufferUnderrun = errors.New("buffer underrun")

// PackedStrLen returns the size occupied by a given string in the output
func PackedStrLen(s string) int {
return SizeofLen + len(s)
Expand All @@ -118,7 +115,7 @@ func (br *ByteUnpack) Bytes() []byte {

func (br *ByteUnpack) ReadByte() (byte, error) {
if br.off >= len(br.b) {
return 0, ErrBufferUnderrun
return 0, errBufferUnderrun
}
debug.Assert(br.off < len(br.b))
b := br.b[br.off]
Expand All @@ -138,7 +135,7 @@ func (br *ByteUnpack) ReadInt64() (int64, error) {

func (br *ByteUnpack) ReadUint64() (uint64, error) {
if len(br.b)-br.off < SizeofI64 {
return 0, ErrBufferUnderrun
return 0, errBufferUnderrun
}
n := binary.BigEndian.Uint64(br.b[br.off:])
br.off += SizeofI64
Expand All @@ -152,7 +149,7 @@ func (br *ByteUnpack) ReadInt16() (int16, error) {

func (br *ByteUnpack) ReadUint16() (uint16, error) {
if len(br.b)-br.off < SizeofI16 {
return 0, ErrBufferUnderrun
return 0, errBufferUnderrun
}
n := binary.BigEndian.Uint16(br.b[br.off:])
br.off += SizeofI16
Expand All @@ -166,7 +163,7 @@ func (br *ByteUnpack) ReadInt32() (int32, error) {

func (br *ByteUnpack) ReadUint32() (uint32, error) {
if len(br.b)-br.off < SizeofI32 {
return 0, ErrBufferUnderrun
return 0, errBufferUnderrun
}
n := binary.BigEndian.Uint32(br.b[br.off:])
br.off += SizeofI32
Expand All @@ -175,14 +172,14 @@ func (br *ByteUnpack) ReadUint32() (uint32, error) {

func (br *ByteUnpack) ReadBytes() ([]byte, error) {
if len(br.b)-br.off < SizeofLen {
return nil, ErrBufferUnderrun
return nil, errBufferUnderrun
}
l, err := br.ReadUint32()
if err != nil {
return nil, err
}
if len(br.b)-br.off < int(l) {
return nil, ErrBufferUnderrun
return nil, errBufferUnderrun
}
start := br.off
br.off += int(l)
Expand Down
10 changes: 10 additions & 0 deletions cmn/cos/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ type (
}
)

var (
ErrQuantityUsage = errors.New("invalid quantity, format should be '81%' or '1GB'")
ErrQuantityPercent = errors.New("percent must be in the range (0, 100)")
ErrQuantityBytes = errors.New("value (bytes) must be non-negative")

errQuantityNonNegative = errors.New("quantity should not be negative")
)

var errBufferUnderrun = errors.New("buffer underrun")

// ErrNotFound

func NewErrNotFound(format string, a ...any) *ErrNotFound {
Expand Down
18 changes: 5 additions & 13 deletions cmn/cos/quantity.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package cos

import (
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -24,13 +23,6 @@ type (
}
)

var (
ErrInvalidQuantityUsage = errors.New("invalid quantity, format should be '81%' or '1GB'")
errInvalidQuantityNonNegative = errors.New("quantity should not be negative")
ErrInvalidQuantityPercent = errors.New("percent must be in the range (0, 100)")
ErrInvalidQuantityBytes = errors.New("value (bytes) must be non-negative")
)

///////////////////
// ParseQuantity //
///////////////////
Expand All @@ -45,27 +37,27 @@ func ParseQuantity(quantity string) (ParsedQuantity, error) {

parsedQ := ParsedQuantity{}
if value, err := strconv.Atoi(number); err != nil {
return parsedQ, ErrInvalidQuantityUsage
return parsedQ, ErrQuantityUsage
} else if value < 0 {
return parsedQ, errInvalidQuantityNonNegative
return parsedQ, errQuantityNonNegative
} else {
parsedQ.Value = uint64(value)
}

if len(quantity) <= idx {
return parsedQ, ErrInvalidQuantityUsage
return parsedQ, ErrQuantityUsage
}

suffix := quantity[idx:]
if suffix == "%" {
parsedQ.Type = QuantityPercent
if parsedQ.Value == 0 || parsedQ.Value >= 100 {
return parsedQ, ErrInvalidQuantityPercent
return parsedQ, ErrQuantityPercent
}
} else if value, err := ParseSize(quantity, UnitsIEC); err != nil {
return parsedQ, err
} else if value < 0 {
return parsedQ, ErrInvalidQuantityBytes
return parsedQ, ErrQuantityBytes
} else {
parsedQ.Type = QuantityBytes
parsedQ.Value = uint64(value)
Expand Down
21 changes: 18 additions & 3 deletions cmn/cos/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/teris-io/shortid"
)

const LenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length
const lenShortID = 9 // UUID length, as per https://github.com/teris-io/shortid#id-length

const (
// Alphabet for generating UUIDs similar to the shortid.DEFAULT_ABC
Expand Down Expand Up @@ -40,7 +40,7 @@ func InitShortID(seed uint64) {
// UUID
//

// compare with xreg.GenBeUID
// compare with xreg.GenBEID
func GenUUID() (uuid string) {
var h, t string
uuid = sid.MustGenerate()
Expand All @@ -56,8 +56,23 @@ func GenUUID() (uuid string) {
return h + uuid + t
}

// "best-effort ID" - to independently and locally generate globally unique ID
// called by xreg.GenBEID
func GenBEID(val uint64) string {
b := make([]byte, lenShortID)
for i := 0; i < lenShortID; i++ {
if idx := int(val & letterIdxMask); idx < LenRunes {
b[i] = LetterRunes[idx]
} else {
b[i] = LetterRunes[idx-LenRunes]
}
val >>= letterIdxBits
}
return UnsafeS(b)
}

func IsValidUUID(uuid string) bool {
return len(uuid) >= LenShortID && IsAlphaNice(uuid)
return len(uuid) >= lenShortID && IsAlphaNice(uuid)
}

func ValidateNiceID(id string, minlen int, tag string) (err error) {
Expand Down
6 changes: 3 additions & 3 deletions ext/dsort/request_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ var _ = Describe("RequestSpec", func() {
}
_, err := rs.parse()
Expect(err).Should(HaveOccurred())
Expect(err).To(Equal(cos.ErrInvalidQuantityUsage))
Expect(err).To(Equal(cos.ErrQuantityUsage))
})

It("should fail due to invalid mem usage percent specified", func() {
Expand All @@ -415,7 +415,7 @@ var _ = Describe("RequestSpec", func() {
}
_, err := rs.parse()
Expect(err).Should(HaveOccurred())
Expect(err).To(Equal(cos.ErrInvalidQuantityPercent))
Expect(err).To(Equal(cos.ErrQuantityPercent))
})

It("should fail due to invalid mem usage bytes specified", func() {
Expand All @@ -430,7 +430,7 @@ var _ = Describe("RequestSpec", func() {
}
_, err := rs.parse()
Expect(err).Should(HaveOccurred())
Expect(err).To(Equal(cos.ErrInvalidQuantityUsage))
Expect(err).To(Equal(cos.ErrQuantityUsage))
})

It("should fail due to invalid extract concurrency specified", func() {
Expand Down
6 changes: 2 additions & 4 deletions mirror/put_copies.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/xact"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/OneOfOne/xxhash"
)

type (
Expand Down Expand Up @@ -76,9 +75,8 @@ func (p *putFactory) Start() error {
//
// target-local generation of a global UUID
//
div := int64(xact.IdleDefault)
slt := xxhash.ChecksumString64S(bck.MakeUname(""), 3421170679 /*m.b per xkind*/)
r.DemandBase.Init(xreg.GenBeUID(div, int64(slt)), apc.ActPutCopies, bck, xact.IdleDefault)
div := uint64(xact.IdleDefault)
r.DemandBase.Init(xreg.GenBEID(div, p.Kind()+"|"+bck.MakeUname("")), p.Kind(), bck, xact.IdleDefault)

// joggers
r.workers = mpather.NewWorkerGroup(&mpather.WorkerGroupOpts{
Expand Down
25 changes: 13 additions & 12 deletions xact/xreg/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
package xreg

import (
"math/rand"
"time"

"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/OneOfOne/xxhash"
)

var (
Expand All @@ -19,20 +19,21 @@ var (

// see related: cmn/cos/uuid.go

func GenBeUID(div, slt int64) (buid string) {
now := time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load()
// "best-effort ID" - to independently and locally generate globally unique xaction ID
func GenBEID(div uint64, tag string) (beid string) {
now := uint64(time.Now().UnixNano() - MyTime.Load() + PrimeTime.Load())
if div%2 == 0 {
div++
}
rem := now % div
val := now - rem
val ^= xxhash.ChecksumString64S(tag, val)

seed := now - rem + slt
if seed < 0 {
seed = now - rem - slt
}
rnd := rand.New(rand.NewSource(seed))
buid = cos.RandStringWithSrc(rnd, cos.LenShortID)
beid = cos.GenBEID(val)

if xctn, err := GetXact(buid); err != nil /*unlikely*/ || xctn != nil /*idling away*/ {
// fallback
buid = cos.GenUUID()
if xctn, err := GetXact(beid); err != nil /*unlikely*/ || xctn != nil {
// idling away? fallback to common default
beid = cos.GenUUID()
}
return
}
8 changes: 3 additions & 5 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/NVIDIA/aistore/transport"
"github.com/NVIDIA/aistore/xact"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/OneOfOne/xxhash"
)

// TODO (feature): one source multiple destinations (buckets)
Expand Down Expand Up @@ -84,14 +83,13 @@ func (p *archFactory) Start() error {
//
// target-local generation of a global UUID
//
div := int64(xact.IdleDefault)
div := uint64(xact.IdleDefault)
bckTo, ok := p.Args.Custom.(*meta.Bck)
debug.Assertf(ok, "%+v", bckTo)
if !ok || bckTo.IsEmpty() {
bckTo = &meta.Bck{Name: "any"} // local usage to gen uuid, see r.bckTo below
}
slt := xxhash.ChecksumString64S(p.Bck.MakeUname("")+"|"+bckTo.MakeUname(""), 8214808651 /*m.b per xkind*/)
p.Args.UUID = xreg.GenBeUID(div, int64(slt))
p.Args.UUID = xreg.GenBEID(div, p.kind+"|"+p.Bck.MakeUname("")+"|"+bckTo.MakeUname(""))

//
// new x-archive
Expand All @@ -100,7 +98,7 @@ func (p *archFactory) Start() error {
r := &XactArch{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, workCh: workCh}
r.pending.m = make(map[string]*archwi, maxNumInParallel)
p.xctn = r
r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, apc.ActArchive, p.Bck /*from*/, xact.IdleDefault)
r.DemandBase.Init(p.UUID() /*== p.Args.UUID above*/, p.kind, p.Bck /*from*/, xact.IdleDefault)

bmd := p.Args.T.Bowner().Get()
trname := fmt.Sprintf("arch-%s%s-%s-%d", p.Bck.Provider, p.Bck.Ns, p.Bck.Name, bmd.Version) // NOTE: (bmd.Version)
Expand Down
10 changes: 2 additions & 8 deletions xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/NVIDIA/aistore/transport"
"github.com/NVIDIA/aistore/xact"
"github.com/NVIDIA/aistore/xact/xreg"
"github.com/OneOfOne/xxhash"
)

type (
Expand Down Expand Up @@ -70,13 +69,8 @@ func (p *tcoFactory) Start() error {
//
// target-local generation of a global UUID
//
div := int64(xact.IdleDefault)
sed := uint64(3282306647)
if p.kind == apc.ActETLObjects {
sed = 5058223172
}
slt := xxhash.ChecksumString64S(p.args.BckFrom.MakeUname("")+"|"+p.args.BckTo.MakeUname(""), sed)
p.Args.UUID = xreg.GenBeUID(div, int64(slt))
div := uint64(xact.IdleDefault)
p.Args.UUID = xreg.GenBEID(div, p.kind+"|"+p.args.BckFrom.MakeUname("")+"|"+p.args.BckTo.MakeUname(""))

// new x-tco
workCh := make(chan *cmn.TCObjsMsg, maxNumInParallel)
Expand Down
35 changes: 35 additions & 0 deletions xact/xs/xaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,38 @@ func TestXactionQueryFinished(t *testing.T) {
f(t, test)
}
}

func TestBeid(t *testing.T) {
num := 10000
if testing.Short() {
num = 10
}
results := make(map[string]uint64, num)
compare := make(map[uint64]string, num)
tags := []string{"tag1", "tag2"}
for i := 0; i < num; i++ {
val := uint64(time.Now().UnixNano())
beid := xreg.GenBEID(val, tags[i%2])
compare[val] = beid
if _, ok := results[beid]; ok {
t.Fatalf("%s duplicated", beid)
}
results[beid] = val
time.Sleep(time.Millisecond)
}
if len(compare) != len(results) {
t.Fatalf("lengths differ %d != %d", len(compare), len(results))
}
// repro
for val, beid := range compare {
b1 := xreg.GenBEID(val, tags[0])
b2 := xreg.GenBEID(val, tags[1])
if b1 == beid && b2 != beid {
continue
}
if b2 == beid && b1 != beid {
continue
}
t.Fatalf("failed to repro for %x: %s, %s, %s", val, beid, b1, b2)
}
}

0 comments on commit 0dd8702

Please sign in to comment.