-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathinmem_dlm.go
155 lines (120 loc) · 2.66 KB
/
inmem_dlm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package dlm
import (
"sync"
"time"
)
// InMemDLM is a local implementation of a DLM that it's intended to be used during development.
type InMemDLM struct {
mutex sync.Mutex
keys map[string]inMemEntry
namespace string
}
// NewInMemDLM creates a new InMemDLM.
func NewInMemDLM(opts *Options) DLM {
if opts == nil {
opts = &Options{}
}
keys := make(map[string]inMemEntry)
return &InMemDLM{keys: keys, namespace: opts.Namespace}
}
// NewLock creates a lock for the given key. The returned lock is not held
// and must be acquired with a call to .Lock.
func (d *InMemDLM) NewLock(key string, opts *LockOptions) (Locker, error) {
if opts == nil {
opts = &LockOptions{}
}
opts = opts.WithDefaults()
token, err := randstr(32)
if err != nil {
return nil, err
}
lock := inMemLock{
ttl: opts.TTL,
waitTime: opts.WaitTime,
retryTime: opts.RetryTime,
dlm: d,
namespace: d.namespace,
key: key,
token: token,
}
return &lock, nil
}
func (d *InMemDLM) acquire(key, token string, ttl time.Duration) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
now := time.Now()
key = d.namespace + key
entry, ok := d.keys[key]
if ok && entry.validUntil.After(now) {
return false
}
d.keys[key] = inMemEntry{token, now.Add(ttl)}
return true
}
func (d *InMemDLM) release(key, token string) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
key = d.namespace + key
entry, ok := d.keys[key]
if !ok || entry.token != token {
return false
}
delete(d.keys, key)
return true
}
type inMemEntry struct {
token string
validUntil time.Time
}
type inMemLock struct {
mutex sync.Mutex // Used while manipulating the internal state of the lock itself
dlm *InMemDLM
ttl time.Duration
waitTime time.Duration
retryTime time.Duration
namespace string
key string
token string // A random string used to safely release the lock
isHeld bool
}
func (l *inMemLock) Key() string {
return l.key
}
func (l *inMemLock) Namespace() string {
return l.namespace
}
func (l *inMemLock) Lock() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if l.isHeld {
return ErrLockHeld
}
if l.dlm.acquire(l.key, l.token, l.ttl) {
l.isHeld = true
return nil
}
timeout := time.NewTimer(l.waitTime)
retry := time.NewTicker(l.retryTime)
defer retry.Stop()
for {
select {
case <-timeout.C:
return ErrCannotLock
case <-retry.C:
if l.dlm.acquire(l.key, l.token, l.ttl) {
l.isHeld = true
timeout.Stop()
return nil
}
}
}
}
func (l *inMemLock) Unlock() error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.isHeld || !l.dlm.release(l.key, l.token) {
return ErrLockNotHeld
}
l.isHeld = false
return nil
}