Skip to content

Commit

Permalink
Merge pull request redis#687 from HeartSaVioR/cluster-support-multi-k…
Browse files Browse the repository at this point in the history
…ey-new

Supports Multi Key commands to JedisCluster (revised of redis#673)
  • Loading branch information
HeartSaVioR committed Apr 20, 2015
2 parents 18308d1 + d584951 commit 04e94e6
Show file tree
Hide file tree
Showing 8 changed files with 957 additions and 101 deletions.
333 changes: 328 additions & 5 deletions src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import redis.clients.util.KeyMergeUtil;
import redis.clients.util.SafeEncoder;

import java.io.Closeable;
Expand All @@ -11,7 +12,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class BinaryJedisCluster implements BinaryJedisCommands,
public class BinaryJedisCluster implements BinaryJedisCommands, MultiKeyBinaryJedisClusterCommands,
JedisClusterBinaryScriptingCommands, Closeable {

public static final short HASHSLOTS = 16384;
Expand Down Expand Up @@ -52,6 +53,10 @@ public void close() throws IOException {
}
}

public Map<String, JedisPool> getClusterNodes() {
return connectionHandler.getNodes();
}

@Override
public String set(final byte[] key, final byte[] value) {
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
Expand Down Expand Up @@ -1194,10 +1199,6 @@ public Long execute(Jedis connection) {
}.runBinary(key);
}

public Map<String, JedisPool> getClusterNodes() {
return connectionHandler.getNodes();
}

@Override
public Object eval(final byte[] script, final byte[] keyCount, final byte[]... params) {
return new JedisClusterCommand<Object>(connectionHandler, maxRedirections) {
Expand Down Expand Up @@ -1308,4 +1309,326 @@ public String execute(Jedis connection) {
}.runBinary(key);
}

@Override
public Long del(final byte[]... keys) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.del(keys);
}
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> blpop(final int timeout, final byte[]... keys) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.blpop(timeout, keys);
}
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> brpop(final int timeout, final byte[]... keys) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.brpop(timeout, keys);
}
}.runBinary(keys.length, keys);
}

@Override
public List<byte[]> mget(final byte[]... keys) {
return new JedisClusterCommand<List<byte[]>>(connectionHandler, maxRedirections) {
@Override
public List<byte[]> execute(Jedis connection) {
return connection.mget(keys);
}
}.runBinary(keys.length - 1, keys);
}

@Override
public String mset(final byte[]... keysvalues) {
byte[][] keys = new byte[keysvalues.length / 2][];

for (int keyIdx = 0; keyIdx < keys.length; keyIdx++) {
keys[keyIdx] = keysvalues[keyIdx * 2];
}

return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.mset(keysvalues);
}
}.runBinary(keys.length, keys);
}

@Override
public Long msetnx(final byte[]... keysvalues) {
byte[][] keys = new byte[keysvalues.length / 2][];

for (int keyIdx = 0; keyIdx < keys.length; keyIdx++) {
keys[keyIdx] = keysvalues[keyIdx * 2];
}

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.msetnx(keysvalues);
}
}.runBinary(keys.length, keys);
}

@Override
public String rename(final byte[] oldkey, final byte[] newkey) {
return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.rename(oldkey, newkey);
}
}.runBinary(2, oldkey, newkey);
}

@Override
public Long renamenx(final byte[] oldkey, final byte[] newkey) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.renamenx(oldkey, newkey);
}
}.runBinary(2, oldkey, newkey);
}

@Override
public byte[] rpoplpush(final byte[] srckey, final byte[] dstkey) {
return new JedisClusterCommand<byte[]>(connectionHandler, maxRedirections) {
@Override
public byte[] execute(Jedis connection) {
return connection.rpoplpush(srckey, dstkey);
}
}.runBinary(2, srckey, dstkey);
}

@Override
public Set<byte[]> sdiff(final byte[]... keys) {
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
@Override
public Set<byte[]> execute(Jedis connection) {
return connection.sdiff(keys);
}
}.runBinary(keys.length, keys);
}

@Override
public Long sdiffstore(final byte[] dstkey, final byte[]... keys) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.sdiffstore(dstkey, keys);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Set<byte[]> sinter(final byte[]... keys) {
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
@Override
public Set<byte[]> execute(Jedis connection) {
return connection.sinter(keys);
}
}.runBinary(keys.length, keys);
}

@Override
public Long sinterstore(final byte[] dstkey, final byte[]... keys) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.sinterstore(dstkey, keys);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long smove(final byte[] srckey, final byte[] dstkey, final byte[] member) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.smove(srckey, dstkey, member);
}
}.runBinary(2, srckey, dstkey);
}

@Override
public Long sort(final byte[] key, final SortingParams sortingParameters, final byte[] dstkey) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.sort(key, sortingParameters, dstkey);
}
}.runBinary(2, key, dstkey);
}

@Override
public Long sort(final byte[] key, final byte[] dstkey) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.sort(key, dstkey);
}
}.runBinary(2, key, dstkey);
}

@Override
public Set<byte[]> sunion(final byte[]... keys) {
return new JedisClusterCommand<Set<byte[]>>(connectionHandler, maxRedirections) {
@Override
public Set<byte[]> execute(Jedis connection) {
return connection.sunion(keys);
}
}.runBinary(keys.length, keys);
}

@Override
public Long sunionstore(final byte[] dstkey, final byte[]... keys) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, keys);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.sunionstore(dstkey, keys);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long zinterstore(final byte[] dstkey, final byte[]... sets) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.zinterstore(dstkey, sets);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long zinterstore(final byte[] dstkey, final ZParams params, final byte[]... sets) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.zinterstore(dstkey, params, sets);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long zunionstore(final byte[] dstkey, final byte[]... sets) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.zunionstore(dstkey, sets);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long zunionstore(final byte[] dstkey, final ZParams params, final byte[]... sets) {
byte[][] wholeKeys = KeyMergeUtil.merge(dstkey, sets);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.zunionstore(dstkey, params, sets);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public byte[] brpoplpush(final byte[] source, final byte[] destination, final int timeout) {
return new JedisClusterCommand<byte[]>(connectionHandler, maxRedirections) {
@Override
public byte[] execute(Jedis connection) {
return connection.brpoplpush(source, destination, timeout);
}
}.runBinary(2, source, destination);
}

@Override
public Long publish(final byte[] channel, final byte[] message) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.publish(channel, message);
}
}.runWithAnyNode();
}

@Override
public void subscribe(final BinaryJedisPubSub jedisPubSub, final byte[]... channels) {
new JedisClusterCommand<Integer>(connectionHandler, maxRedirections) {
@Override
public Integer execute(Jedis connection) {
connection.subscribe(jedisPubSub, channels);
return 0;
}
}.runWithAnyNode();
}

@Override
public void psubscribe(final BinaryJedisPubSub jedisPubSub, final byte[]... patterns) {
new JedisClusterCommand<Integer>(connectionHandler, maxRedirections) {
@Override
public Integer execute(Jedis connection) {
connection.subscribe(jedisPubSub, patterns);
return 0;
}
}.runWithAnyNode();
}

@Override
public Long bitop(final BitOP op, final byte[] destKey, final byte[]... srcKeys) {
byte[][] wholeKeys = KeyMergeUtil.merge(destKey, srcKeys);

return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.bitop(op, destKey, srcKeys);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public String pfmerge(final byte[] destkey, final byte[]... sourcekeys) {
byte[][] wholeKeys = KeyMergeUtil.merge(destkey, sourcekeys);

return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {
@Override
public String execute(Jedis connection) {
return connection.pfmerge(destkey, sourcekeys);
}
}.runBinary(wholeKeys.length, wholeKeys);
}

@Override
public Long pfcount(final byte[]... keys) {
return new JedisClusterCommand<Long>(connectionHandler, maxRedirections) {
@Override
public Long execute(Jedis connection) {
return connection.pfcount(keys);
}
}.runBinary(keys.length, keys);
}

}
Loading

0 comments on commit 04e94e6

Please sign in to comment.