Skip to content

Commit

Permalink
feat: shorten rueidislock validity if there is a shorter deadline in …
Browse files Browse the repository at this point in the history
…the context

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Nov 12, 2024
1 parent d27df62 commit 9023aa7
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
19 changes: 13 additions & 6 deletions rueidislock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,18 @@ func keyname(prefix, name string, i int32) string {
return sb.String()
}

func (m *locker) acquire(ctx context.Context, key, val string, deadline time.Time, force bool) (err error) {
func (m *locker) acquire(ctx context.Context, key, val string, duration time.Duration, deadline time.Time, force bool) (err error) {
ctx, cancel := context.WithTimeout(ctx, m.timeout)
var resp rueidis.RedisResult
if force {
if m.setpx {
resp = fcqms.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(m.validity.Milliseconds(), 10)})
resp = fcqms.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(duration.Milliseconds(), 10)})
} else {
resp = fcqat.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(deadline.UnixMilli(), 10)})
}
} else {
if m.setpx {
resp = acqms.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(m.validity.Milliseconds(), 10)})
resp = acqms.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(duration.Milliseconds(), 10)})
} else {
resp = acqat.Exec(ctx, m.client, []string{key}, []string{val, strconv.FormatInt(deadline.UnixMilli(), 10)})
}
Expand Down Expand Up @@ -249,8 +249,15 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
var err error

val := random()
deadline := time.Now().Add(m.validity)
cacneltm := time.AfterFunc(m.validity, cancel)
now := time.Now()
duration := m.validity
if dl, ok := ctx.Deadline(); ok {
if dur := dl.Sub(now); dur < duration {
duration = dur
}
}
deadline := now.Add(duration)
cacneltm := time.AfterFunc(duration, cancel)
released := int32(0)
acquired := int32(0)
failures := int32(0)
Expand Down Expand Up @@ -314,7 +321,7 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
default:
}
if !errors.Is(err, ErrNotLocked) {
if err = m.acquire(ctx, key, val, deadline, force); force && err == nil {
if err = m.acquire(ctx, key, val, duration, deadline, force); force && err == nil {
m.mu.RLock()
if m.gates != nil {
select {
Expand Down
39 changes: 39 additions & 0 deletions rueidislock/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,45 @@ func TestLocker_WithContext_CancelContext(t *testing.T) {
}
}

func TestLocker_WithContext_TimeoutContext(t *testing.T) {
test := func(t *testing.T, noLoop, setpx, nocsc bool) {
locker := newLocker(t, noLoop, setpx, nocsc)
defer locker.Close()

lck := strconv.Itoa(rand.Int())
ctx, cancel, err := locker.WithContext(context.Background(), lck)

wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if _, _, err := locker.WithContext(ctx, lck); !errors.Is(err, context.DeadlineExceeded) {
t.Error(err)
}
wg.Done()
}()
}
wg.Wait()
cancel()
if !errors.Is(ctx.Err(), context.Canceled) {
t.Fatal(err)
}
}
for _, nocsc := range []bool{false, true} {
t.Run("Tracking Loop", func(t *testing.T) {
test(t, false, false, nocsc)
})
t.Run("Tracking NoLoop", func(t *testing.T) {
test(t, true, false, nocsc)
})
t.Run("SET PX", func(t *testing.T) {
test(t, true, true, nocsc)
})
}
}

func TestLocker_TryWithContext(t *testing.T) {
test := func(t *testing.T, noLoop, setpx, nocsc bool) {
locker := newLocker(t, noLoop, setpx, nocsc)
Expand Down

0 comments on commit 9023aa7

Please sign in to comment.