-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstore.js
113 lines (99 loc) · 3.4 KB
/
store.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
var redis = require("redis");
var extend = require("util")._extend;
var genId = require("./genId.js");
var ReadWriteLock = require("rwlock");
module.exports = function(redisConfig) {
var redisConfig = extend({}, redisConfig, {return_buffers:false});
var redisClient = redis.createClient(redisConfig);
var lock = new ReadWriteLock();
function keyName(siteName, listName, event) {
return siteName + ":socket.io:" + listName + ":" + event;
}
function itemMatchId(item, dataId) {
return item && item.indexOf(dataId) === 0;
}
function forEachKey(siteName, listName, func) {
var redisKeyPrefix = keyName(siteName, listName, "") + "*";
redisClient.keys(redisKeyPrefix, function(err, keys) {
for (var idx in keys) {
func(keys[idx], keys);
}
});
}
return {
add: function(siteName, listName, event, data, ttl) {
var dataId = genId();
var redisKey = keyName(siteName, listName, event);
redisClient.rpush(redisKey, dataId + ":" + JSON.stringify(data));
redisClient.expire(redisKey, ttl);
return dataId;
},
/**
* Removes a single item from the store
* @param {String} siteName - the site name this store belongs to
* @param {String} listName - the name of the list
* @param {String} dataId - the id of the item in the list to remove
* @param {String} event - the event name used for notFirstCallback
* @param {Function} notFirstCallback - if provided if item is not first in list for the specified event will call this function instead
*/
remove: function(siteName, listName, event, dataId, notFirstCallback) {
var redisKey = keyName(siteName, listName, event);
if (notFirstCallback) {
lock.writeLock(redisKey, function(release) {
redisClient.lrange(redisKey, 0, 0, function(err, res) {
if (itemMatchId(res[0], dataId)) {
redisClient.lpop(redisKey);
release();
} else {
release();
notFirstCallback();
}
});
});
} else {
redisClient.lrange(redisKey, 0, -1, function(err, res) {
for (var idx in res) {
if (itemMatchId(res[idx], dataId)) {
redisClient.lrem(redisKey, 1, res[idx]);
break;
}
}
});
}
},
clear: function(siteName, listName, /*optional*/ event) {
if (arguments.length === 2) {
forEachKey(siteName, listName, function(key) {
redisClient.del(key);
});
} else {
redisClient.del(keyName(siteName, listName, event));
}
},
get: function(siteName, listName, callback) {
var returnedData = [];
var results = 0;
forEachKey(siteName, listName, function(key, keys) {
redisClient.lrange(key, 0, -1, function(err, res) {
res.forEach(function(item) {
returnedData.push(JSON.parse(item.substring(item.indexOf(":") + 1)));
});
results++;
if (results === keys.length) {
callback(returnedData);
}
});
});
},
expire: function(siteName, listName, ttl) {
forEachKey(siteName, listName, function(key) {
redisClient.expire(key, ttl);
});
},
persist: function(siteName, listName, ttl) {
forEachKey(siteName, listName, function(key) {
redisClient.persist(key);
});
}
};
}