Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support cluster based on consistent hash #18

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ install:
- git clone https://github.com/openresty/echo-nginx-module.git ../echo-nginx-module
- git clone https://github.com/openresty/no-pool-nginx.git ../no-pool-nginx
- git clone -b v2.1-agentzh https://github.com/openresty/luajit2.git
- git clone https://github.com/openresty/lua-resty-balancer.git ../lua-resty-balancer
- git clone https://github.com/openresty/mockeagain.git

script:
Expand All @@ -58,6 +59,9 @@ script:
- make -j$JOBS > build.log 2>&1 || (cat build.log && exit 1)
- sudo make PATH=$PATH install_sw > build.log 2>&1 || (cat build.log && exit 1)
- cd ../mockeagain/ && make CC=$CC -j$JOBS && cd ..
- make -C ../lua-resty-balancer
- cp -r ../lua-resty-balancer/lib/resty ../lua-resty-balancer/librestychash.so lib
- memcached -p 11212 -d -m 1024
- export PATH=$PWD/work/nginx/sbin:$PWD/nginx-devel-utils:$PATH
- export LD_PRELOAD=$PWD/mockeagain/mockeagain.so
- export LD_LIBRARY_PATH=$PWD/mockeagain:$LD_LIBRARY_PATH
Expand Down
104 changes: 104 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Table of Contents
* [version](#version)
* [quit](#quit)
* [verbosity](#verbosity)
* [Cluster Support](#cluster-support)
* [Automatic Error Logging](#automatic-error-logging)
* [Limitations](#limitations)
* [TODO](#todo)
Expand Down Expand Up @@ -359,6 +360,7 @@ In case of success, returns `1`. In case of errors, returns `nil` with a string
get
---
`syntax: value, flags, err = memc:get(key)`

`syntax: results, err = memc:get(keys)`

Get a single entry or multiple entries in the memcached server via a single key or a table of keys.
Expand Down Expand Up @@ -501,6 +503,108 @@ Returns `1` in case of success and `nil` other wise. In case of failures, anothe

[Back to TOC](#table-of-contents)

Cluster Support
===============

A new module is imported to support memcache cluster based on [consistent hash](https://github.com/openresty/lua-resty-balancer). hashmemcached is a proxy class of memcached implemented by closure. Please notice that this module won't reconnect to a good server when it finds current one is down.

example
-------

```lua
local hashmemcached = require "resty.hashmemcached"
local hmemc, err = hashmemcached.new({
{'127.0.0.1', 11211, 2},
{'127.0.0.1', 11212, 1}
}, 'hashmemcached')

-- always flush_all at first
local result = hmemc:flush_all()
local serv, res, ok, err
for serv, res in pairs(result) do
ok, err = unpack(res)
if not ok then
ngx.say("failed to flush ", serv, ", ", err)
return
end
end

ngx.say("set")

keys = {'dog', 'puppy', 'cat', 'kitten'}
values = {32, "I am a little dog", 64, "I am a \nlittle cat\n"}
local i, key
for i, key in ipairs(keys) do
local ok, err = hmemc:set(key, values[i])
if not ok then
ngx.say("failed to set ", key, ": ", err)
else
ngx.say(key, " is stored in ", hmemc:which_server())
end
end

ngx.say("\nget")

for i, key in ipairs(keys) do
local res, flags, err = hmemc:get(key)
if err then
ngx.say("failed to get ", key, ": ", err)
elseif not res then
ngx.say(key, " not found")
else
ngx.say(key, ": ", res, " (flags: ", flags, ")")
end
end

hmemc:set_keepalive(10000, 100)
```

methods
-------
below are methods whose behavior changed.

new
---
`syntax: hmemc, err = hashmemcached.new(cluster, shm, opts?)`

Creates a hashed memcached object based on the cluster paramter.

* cluster
an array of tables containing all memcaches' infomation in the form of `{{ip, port, weight}, ...}`. Here is an example:
```lua
{
{'127.0.0.1', 11211, 2},
{'127.0.0.1', 11212}
}
```
`weight` is optional, defaut value is 1
* shm
the name of the shared memory zone used to share infomation between worker processes. By defaut, it's `hashmemcached`
* opts
besides `key_transform`, it accepts the following options:
* max_fails
* fail_timeout

if a node fails `max_fails` times in `fail_timeout` seconds then this node is considered unavailable in the duration of `fail_timeout`. This policy of remapping imitates the nginx http upstream's server directive.

which_server
------------
`syntax: results = hashmemcached:which_server()`

This method is added for debugging purpose. Return currently connnected server's id which is a string of `ip:port`.

flush_all
---------
`syntax: server_id = hashmemcached:flush_all()`

call flush_all method of every memcache instance.
`results` is a table of `{server_id:res}` and res is a table of `{1}` or `{nil, err}`

get(s)
-----------
They can accept multiple keys and these keys can be mapped to multiple memcaches.
The return value is the same as the original version except that it will always return a table whose value is a table of the **data** or `{nil, err}` or `nil` which indicates not found.

Automatic Error Logging
=======================

Expand Down
230 changes: 230 additions & 0 deletions lib/resty/hashmemcached.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
-- Dependency
local memcached = require "resty.memcached"
local resty_chash = require "resty.chash"

local _M = {
_VERSION = '0.01'
}

-- Global private methods
local function Set (list)
local set = {}
for _, l in ipairs(list) do set[l] = true end
return set
end

-- Global private variables:
local log = ngx.log
local ERR = ngx.ERR
local INFO = ngx.INFO

local key_methods = Set{
'get', 'gets',
'set', 'add', 'replace', 'append', 'prepend',
'cas',
'delete',
'incr', 'decr',
'touch'
}

-- Class implemented by closure
function _M.new(cluster, shm, opts)
local self = {}

-- Private variables:
local client, err = memcached:new(opts)
if not client then
return client, err
end

local max_fails = 3
local fail_timeout = 10
if opts then
if opts.max_fails then
max_fails = opts.max_fails
end
if opts.fail_timeout then
fail_timeout = opts.fail_timeout
end
end

local dict = ngx.shared[shm or 'hashmemcached']
local serv_id -- current node (string in format 'ip:port')
local chash_up
local servers = {}

-- Private methods:
local function init_chash()
local nodes = {}
for i, node in ipairs(cluster) do
local ip, port, weight = unpack(node)
local id = ip..':'..port
servers[id] = {ip, port}
nodes[id] = weight or 1
end
local keys = dict:get_keys()
for i, k in ipairs(keys) do
local v = dict:get(k)
-- 0 means this node is down
if v == 0 then
nodes[k] = nil
end
end
chash_up = resty_chash:new(nodes)
end

local function down()
-- if a node fails max_fails times in fail_timeout seconds
-- then this node is considered unavailable in the duration of fail_timeout
local fails = dict:get(serv_id)
if fails then
if fails > 0 then
fails = fails + 1
-- two requests may increase the same value
fails = dict:incr(serv_id, 1)
log(INFO, "hashmemcached: ", serv_id, " failed ", fails, " times")
else
-- in case this node is already marked down by another client
chash_up:delete(serv_id)
end
else
fails = 1
log(INFO, "hashmemcached: ", serv_id, " failed the first time")
dict:set(serv_id, 1, fail_timeout)
end
if fails >= max_fails then
dict:set(serv_id, 0, fail_timeout)
log(ERR, "hashmemcached: ", serv_id, " is turned down after ", fails, " failure(s)")
chash_up:delete(serv_id)
end
serv_id = nil
end

local function connect(id)
-- is connected already
if serv_id then
if serv_id == id then return 1 end
-- ignore error
client:set_keepalive()
end
serv_id = id
server = servers[id]
local ok, err = client:connect(unpack(server))
if not ok then
down()
end
return ok, err
end

local function call(method)
return function(self, key, ...)
if type(key) == "table" then
-- get or gets multi keys
local servs = {}
-- use empty table as default value
setmetatable(servs, { __index = function(t, k) t[k]={};return t[k] end })
for i, k in pairs(key) do
local id = chash_up:find(k)
if not id then
assert(next(servs)==nil) -- servs must be empty
return nil, 'no available memcached server'
end
table.insert(servs[id], k)
end

local results = {}
for id, keys in pairs(servs) do
local ok, err = connect(id)
if ok then
local data, err = client[method](client, keys, ...)
if data then
-- data is a table
for k, v in pairs(data) do
-- v is a table too
-- merge result
results[k] = v
end
else
for i, k in ipairs(keys) do
results[k] = {nil, err}
end
if client.failed then
down()
end
end
else
for i, k in ipairs(keys) do
results[k] = {nil, err}
end
end
end
return results
else
-- single key
local id = chash_up:find(key)
local res1, res2, res3, res4
local ok, err
if id then
ok, err = connect(id)
if ok then
-- at most 4 return values
res1, res2, res3, res4 = client[method](client, key, ...)
if client.failed then
down()
end
end
else
err = 'no available memcached server'
end
if method == 'get' then
return res1, res2, err or res3
elseif method == 'gets' then
return res1, res2, res3, err or res4
else
return res1, err or res2
end
end
end
end


-- Public methods:
function self.which_server()
return serv_id
end

-- override flush_all
function self.flush_all(self, time)
local ok, err
local results = {}
for id, serv in pairs(servers) do
ok, err = connect(id)
if ok then
results[id] = {client:flush_all(time)}
else
results[id] = {nil, err}
end
end
return results
end

-- Apply some private methods
init_chash()

local mt = {}
mt.__index = function(t, k)
-- intercept these methods
if key_methods[k] then
return call(k)
else
if k=='connect' or k=='close' or k=='quit' or k=='set_keepalive' then
serv_id = nil
end
return client[k]
end
end
setmetatable(self, mt)
return self
end

return _M
Loading