Skip to content

Commit

Permalink
fix: always re-register rueidislock csc notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Nov 12, 2024
1 parent adffb5f commit d27df62
Showing 1 changed file with 39 additions and 85 deletions.
124 changes: 39 additions & 85 deletions rueidislock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,62 +184,16 @@ func (m *locker) script(ctx context.Context, script *rueidis.Lua, key, val strin
return ErrNotLocked
}

func (m *locker) waitgate(ctx context.Context, name string) (g *gate, err error) {
func (m *locker) getgate(name string) (g *gate) {
m.mu.Lock()
g, ok := m.gates[name]
if !ok {
if m.gates == nil {
m.mu.Unlock()
return nil, ErrLockerClosed
defer m.mu.Unlock()
if m.gates != nil {
if g = m.gates[name]; g == nil {
g = makegate(m.totalcnt)
m.gates[name] = g
}
g = makegate(m.totalcnt)
g.w++
m.gates[name] = g
m.mu.Unlock()
return g, nil
} else {
g.w++
m.mu.Unlock()
}
var timeout <-chan time.Time
if m.nocsc {
timeout = time.After(m.timeout)
}
select {
case <-ctx.Done():
m.removegate(g, name)
return nil, ctx.Err()
case _, ok = <-g.ch:
if ok {
return g, nil
}
return nil, ErrLockerClosed
case <-timeout:
return g, nil
}
}

func (m *locker) trygate(name string) (g *gate) {
m.mu.Lock()
if _, ok := m.gates[name]; !ok && m.gates != nil {
g = makegate(m.totalcnt)
g.w++
m.gates[name] = g
}
m.mu.Unlock()
return g
}

func (m *locker) forcegate(name string) (g *gate) {
m.mu.Lock()
if g = m.gates[name]; g == nil && m.gates != nil {
g = makegate(m.totalcnt)
m.gates[name] = g
}
if g != nil {
g.w++
}
m.mu.Unlock()
return g
}

Expand Down Expand Up @@ -403,53 +357,53 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
return cancel, err
}

func (m *locker) ForceWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
var err error
func (m *locker) tryonce(ctx context.Context, name string, force bool) (context.Context, context.CancelFunc, error) {
g := m.getgate(name)
if g == nil {
return nil, nil, ErrLockerClosed
}
ctx, cancel := context.WithCancel(ctx)
if g := m.forcegate(name); g != nil {
if cancel, err = m.try(ctx, cancel, name, g, true); err == nil {
return ctx, cancel, nil
}
cancel, err := m.try(ctx, cancel, name, g, force)
if err != nil {
m.removegate(g, name)
}
cancel()
if err == nil {
err = ErrLockerClosed
cancel()
}
return ctx, cancel, err
}

func (m *locker) ForceWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
return m.tryonce(ctx, name, true)
}

func (m *locker) TryWithContext(ctx context.Context, name string) (context.Context, context.CancelFunc, error) {
var err error
ctx, cancel := context.WithCancel(ctx)
if g := m.trygate(name); g != nil {
if cancel, err = m.try(ctx, cancel, name, g, false); err == nil {
return ctx, cancel, nil
}
m.removegate(g, name)
}
cancel()
if err == nil {
err = fmt.Errorf("%w: the lock is held by others or the locker is closed", ErrNotLocked)
}
return ctx, cancel, err
return m.tryonce(ctx, name, false)
}

func (m *locker) WithContext(src context.Context, name string) (context.Context, context.CancelFunc, error) {
for {
g := m.getgate(name)
if g == nil {
return nil, nil, ErrLockerClosed
}
ctx, cancel := context.WithCancel(src)
g, err := m.waitgate(ctx, name)
if g != nil {
if cancel, err := m.try(ctx, cancel, name, g, false); err == nil {
return ctx, cancel, nil
}
m.mu.Lock()
g.w-- // do not delete g from m.gates here.
m.mu.Unlock()
if cancel, err := m.try(ctx, cancel, name, g, false); err == nil {
return ctx, cancel, nil
}
if cancel(); err != nil {
return ctx, cancel, err
cancel()
var timeout <-chan time.Time
if m.nocsc {
timeout = time.After(m.timeout)
}
select {
case <-src.Done():
m.removegate(g, name)
return nil, nil, src.Err()
case <-g.ch:
case <-timeout:
}
m.mu.Lock()
g.w--
m.mu.Unlock()
}
}

Expand Down

0 comments on commit d27df62

Please sign in to comment.