-
Notifications
You must be signed in to change notification settings - Fork 39
/
shcache.lua
440 lines (344 loc) · 11.5 KB
/
shcache.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
-- Copyright (C) 2013 Matthieu Tourne
-- @author Matthieu Tourne <[email protected]>
-- small overlay over shdict, smart cache load mechanism
local M = {}
local resty_lock = require("resty.lock")
local DEBUG = false
-- defaults in secs
local DEFAULT_POSITIVE_TTL = 10 -- cache for, successful lookup
local DEFAULT_NEGATIVE_TTL = 2 -- cache for, failed lookup
local DEFAULT_ACTUALIZE_TTL = 2 -- stale data, actualize data for
-- default lock options, in secs
local function _get_default_lock_options()
return {
exptime = 1, -- max wait if failing to call unlock()
timeout = 0.5, -- max waiting time of lock()
max_step = 0.1, -- max sleeping interval
}
end
local function prequire(m)
local ok, err_or_module = pcall(require, m)
if not ok then
return nil, err_or_module
end
return err_or_module
end
local conf = prequire("conf")
if conf then
DEFAULT_NEGATIVE_TTL = conf.DEFAULT_NEGATIVE_TTL or DEFAULT_NEGATIVE_TTL
DEFAULT_ACTUALIZE_TTL = conf.DEFAULT_ACTUALIZE_TTL or DEFAULT_ACTUALIZE_TTL
end
local band = bit.band
local bor = bit.bor
local st_format = string.format
-- there are only really 5 states total
-- is_stale is_neg is_from_cache
local MISS_STATE = 0 -- 0 0 0
local HIT_POSITIVE_STATE = 1 -- 0 0 1
local HIT_NEGATIVE_STATE = 3 -- 0 1 1
local STALE_POSITIVE_STATE = 5 -- 1 0 1
-- stale negative doesn't really make sense, use HIT_NEGATIVE instead
-- local STALE_NEGATIVE_STATE = 7 -- 1 1 1
-- xor to set
local NEGATIVE_FLAG = 2
local STALE_FLAG = 4
local STATES = {
[MISS_STATE] = 'MISS',
[HIT_POSITIVE_STATE] = 'HIT',
[HIT_NEGATIVE_STATE] = 'HIT_NEGATIVE',
[STALE_POSITIVE_STATE] = 'STALE',
-- [STALE_NEGATIVE_STATE] = 'STALE_NEGATIVE',
}
local function get_status(flags)
return STATES[flags] or st_format('UNDEF (0x%x)', flags)
end
local EMPTY_DATA = '_EMPTY_'
-- install debug functions
if DEBUG then
local resty_lock_lock = resty_lock.lock
resty_lock.lock = function (...)
local _, key = unpack({...})
print("lock key: ", tostring(key))
return resty_lock_lock(...)
end
local resty_lock_unlock = resty_lock.unlock
resty_lock.unlock = function (...)
print("unlock")
return resty_lock_unlock(...)
end
end
-- store the object in the context
-- useful for debugging and tracking cache status
local function _store_object(self, name)
if DEBUG then
print('storing shcache: ', name, ' into ngx.ctx')
end
local ngx_ctx = ngx.ctx
if not ngx_ctx.shcache then
ngx_ctx.shcache = {}
end
ngx_ctx.shcache[name] = self
end
local obj_mt = {
__index = M,
}
-- default function for callbacks.encode / decode.
local function _identity(data)
return data
end
-- shdict: ngx.shared.DICT, created by the lua_shared_dict directive
-- callbacks: see shcache state machine for user defined functions
-- * callbacks.external_lookup is required
-- * callbacks.encode : optional encoding before saving to shmem
-- * callbacks.decode : optional decoding when retreiving from shmem
-- opts:
-- * opts.positive_ttl : save a valid external loookup for, in seconds
-- * opts.positive_ttl : save a invalid loookup for, in seconds
-- * opts.actualize_ttl : re-actualize a stale record for, in seconds
-- * opts.lock_options : set option to lock see : http://github.com/agentzh/lua-resty-lock
-- for more details.
-- * opts.locks_shdict : specificy the name of the shdict containing the locks
-- (useful if you might have locks key collisions)
-- uses "locks" by default.
-- * opts.name : if shcache object is named, it will automatically
-- register itself in ngx.ctx.shcache (useful for logging).
local function new(self, shdict, callbacks, opts)
if not shdict then
return nil, "shdict does not exist"
end
-- check that callbacks.external_lookup is set
if not callbacks or not callbacks.external_lookup then
return nil, "no external_lookup function defined"
end
if not callbacks.encode then
callbacks.encode = _identity
end
if not callbacks.decode then
callbacks.decode = _identity
end
local opts = opts or {}
-- merge default lock options with the ones passed to new()
local lock_options = _get_default_lock_options()
if opts.lock_options then
for k, v in pairs(opts.lock_options) do
lock_options[k] = v
end
end
local name = opts.name
local obj = {
shdict = shdict,
callbacks = callbacks,
positive_ttl = opts.positive_ttl or DEFAULT_POSITIVE_TTL,
negative_ttl = opts.negative_ttl or DEFAULT_NEGATIVE_TTL,
-- ttl to actualize stale data to
actualize_ttl = opts.actualize_ttl or DEFAULT_ACTUALIZE_TTL,
lock_options = lock_options,
locks_shdict = opts.lock_shdict or "locks",
-- STATUS --
from_cache = false,
cache_status = 'UNDEF',
cache_state = MISS_STATE,
lock_status = 'NO_LOCK',
-- shdict:set() pushed out another value
forcible_set = false,
-- cache hit on second attempt (post lock)
hit2 = false,
name = name,
}
local locks = ngx.shared[obj.locks_shdict]
-- check for existence, locks is not directly used
if not locks then
ngx.log(ngx.CRIT, 'shared mem locks is missing.\n',
'## add to you lua conf: lua_shared_dict locks 5M; ##')
return nil
end
local self = setmetatable(obj, obj_mt)
-- if the shcache object is named
-- keep track of the object in the context
-- (useful for gathering stats at log phase)
if name then
_store_object(self, name)
end
return self
end
M.new = new
-- acquire a lock
local function _get_lock(self)
local lock = self.lock
if not lock then
lock = resty_lock:new(self.locks_shdict, self.lock_options)
self.lock = lock
end
return lock
end
-- remove the lock if there is any
local function _unlock(self)
local lock = self.lock
if lock then
local ok, err = lock:unlock()
if not ok then
ngx.log(ngx.ERR, "failed to unlock :" , err)
end
self.lock = nil
end
end
local function _return(self, data, flags)
-- make sure we remove the locks if any before returning data
_unlock(self)
-- set cache status
local cache_status = get_status(self.cache_state)
if cache_status == 'MISS' and not data then
cache_status = 'NO_DATA'
end
self.cache_status = cache_status
return data, self.from_cache
end
local function _set(self, ...)
if DEBUG then
local key, data, ttl, flags = unpack({...})
print("saving key: ", key, ", for: ", ttl)
end
local ok, err, forcible = self.shdict:set(...)
self.forcible_set = forcible
if not ok then
local key, data, ttl, flags = unpack({...})
ngx.log(ngx.ERR, 'failed to set key: ', key, ', err: ', err)
end
return ok
end
-- check if the data returned by :get() is considered empty
local function _is_empty(data, flags)
return flags and band(flags, NEGATIVE_FLAG) and data == EMPTY_DATA
end
-- save positive, encode the data if needed before :set()
local function _save_positive(self, key, data)
if DEBUG then
print("key: ", key, ". save positive, ttl: ", self.positive_ttl)
end
data = self.callbacks.encode(data)
return _set(self, key, data, self.positive_ttl, HIT_POSITIVE_STATE)
end
-- save negative, no encoding required (no data actually saved)
local function _save_negative(self, key)
if DEBUG then
print("key: ", key, ". save negative, ttl: ", self.negative_ttl)
end
return _set(self, key, EMPTY_DATA, self.negative_ttl, HIT_NEGATIVE_STATE)
end
-- save actualize, will boost a stale record to a live one
local function _save_actualize(self, key, data, flags)
local new_flags = bor(flags, STALE_FLAG)
if DEBUG then
print("key: ", key, ". save actualize, ttl: ", self.actualize_ttl,
". new state: ", get_status(new_flags))
end
_set(self, key, data, self.actualize_ttl, new_flags)
return new_flags
end
local function _process_cached_data(self, data, flags)
if DEBUG then
print("data: ", data, st_format(", flags: %x", flags))
end
self.cache_state = flags
self.from_cache = true
if _is_empty(data, flags) then
-- empty cached data
return nil
else
return self.callbacks.decode(data)
end
end
-- wrapper to get data from the shdict
local function _get(self, key)
-- always call get_stale() as it does not free element
-- like get does on each call
local data, flags, stale = self.shdict:get_stale(key)
if data and stale then
if DEBUG then
print("found stale data for key : ", key)
end
self.stale_data = { data, flags }
return nil, nil
end
return data, flags
end
local function _get_stale(self)
local stale_data = self.stale_data
if stale_data then
return unpack(stale_data)
end
return nil, nil
end
local function load(self, key)
-- start: check for existing cache
local data, flags = _get(self, key)
-- hit: process_cache_hit
if data then
data = _process_cached_data(self, data, flags)
return _return(self, data)
end
-- miss: set lock
-- lock: set a lock before performing external lookup
local lock = _get_lock(self)
local elapsed, err = lock:lock(key)
if not elapsed then
-- failed to acquire lock, still proceed normally to external_lookup
-- unlock() might fail.
ngx.log(ngx.ERR, "failed to acquire the lock: ", err)
self.lock_status = 'ERROR'
-- _unlock won't try to unlock() without a valid lock
self.lock = nil
else
-- lock acquired successfuly
if elapsed > 0 then
-- elapsed > 0 => waited lock (other thread might have :set() the data)
-- (more likely to get a HIT on cache_load 2)
self.lock_status = 'WAITED'
else
-- elapsed == 0 => immediate lock
-- it is less likely to get a HIT on cache_load 2
-- but still perform it (race condition cases)
self.lock_status = 'IMMEDIATE'
end
-- perform cache_load 2
data, flags = _get(self, key)
if data then
-- hit2 : process cache hit
self.hit2 = true
-- unlock before de-serializing cached data
_unlock(self)
data = _process_cached_data(self, data, flags)
return _return(self, data)
end
-- continue to external lookup
end
-- perform external lookup
data, err = self.callbacks.external_lookup()
if data then
-- succ: save positive and return the data
_save_positive(self, key, data)
return _return(self, data)
else
ngx.log(ngx.WARN, 'external lookup failed: ', err)
end
-- external lookup failed
-- attempt to load stale data
data, flags = _get_stale(self)
if data and not _is_empty(data, flags) then
-- hit_stale + valid (positive) data
flags = _save_actualize(self, key, data, flags)
-- unlock before de-serializing data
_unlock(self)
data = _process_cached_data(self, data, flags)
return _return(self, data)
end
if DEBUG and data then
-- there is data, but it failed _is_empty() => stale negative data
print('STALE_NEGATIVE data => cache as a new HIT_NEGATIVE')
end
-- nothing has worked, save negative and return empty
_save_negative(self, key)
return _return(self, nil)
end
M.load = load
return M