diff --git a/.circleci/config.yml b/.circleci/config.yml
index a4a79e5..df55ffa 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -5,6 +5,7 @@ jobs:
working_directory: ~/buffer-slayer
docker:
- image: circleci/openjdk:8-jdk-browsers
+ - image: circleci/redis:latest
steps:
- checkout
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 11534ec..c46c956 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -65,6 +65,11 @@
true
+
+ ${project.groupId}
+ bufferslayer-jedis
+
+
ch.qos.logback
logback-classic
diff --git a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java
index f1dc6d1..443aed2 100644
--- a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java
+++ b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJdbcTemplateBenchmark.java
@@ -25,7 +25,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
private BatchJdbcTemplate batch;
private JdbcTemplate unbatch;
private Reporter reporter;
- private static SenderProxy proxy;
+ private static SenderProxy proxy;
private static AtomicLong counter = new AtomicLong();
private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS test";
@@ -51,7 +51,7 @@ public void setup() {
JdbcTemplate delegate = new JdbcTemplate(dataSource);
delegate.setDataSource(dataSource);
- proxy = new SenderProxy(new JdbcTemplateSender(delegate));
+ proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
proxy.onMessages(updated -> counter.addAndGet(updated.size()));
reporter = reporter(proxy);
diff --git a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJedisBenchmark.java b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJedisBenchmark.java
new file mode 100644
index 0000000..06571d0
--- /dev/null
+++ b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractBatchJedisBenchmark.java
@@ -0,0 +1,116 @@
+package io.github.tramchamploo.bufferslayer;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+
+/**
+ * Jedis benchmark that executing jedis commands.
+ */
+public abstract class AbstractBatchJedisBenchmark {
+
+ private JedisPool unbatch;
+ private BatchJedis batch;
+ private Reporter reporter;
+ private static SenderProxy proxy;
+ private static AtomicLong counter = new AtomicLong();
+
+ static String propertyOr(String key, String fallback) {
+ return System.getProperty(key, fallback);
+ }
+
+ protected abstract Reporter reporter(Sender sender);
+
+ @Setup
+ public void setup() {
+ unbatch = new JedisPool(propertyOr("redisHost", "127.0.0.1"),
+ Integer.parseInt(propertyOr("redisPort", "6379")));
+
+ proxy = new SenderProxy<>(new JedisSender(unbatch));
+ proxy.onMessages(updated -> counter.addAndGet(updated.size()));
+
+ reporter = reporter(proxy);
+ batch = new BatchJedis(unbatch, reporter);
+ }
+
+ @TearDown(Level.Iteration)
+ public void flushDB() {
+ try (Jedis jedis = unbatch.getResource()) {
+ jedis.flushDB();
+ }
+ }
+
+ @AuxCounters
+ @State(Scope.Thread)
+ public static class AtomicLongCounter {
+
+ public long updated() {
+ return counter.get();
+ }
+
+ @Setup(Level.Iteration)
+ public void clean() {
+ counter.set(0);
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class Lagging {
+
+ @Setup(Level.Iteration)
+ public void lag() throws InterruptedException {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ static String randomString() {
+ return String.valueOf(ThreadLocalRandom.current().nextLong());
+ }
+
+ @Benchmark @Group("no_contention_batched") @GroupThreads(1)
+ public void no_contention_batched_set(Lagging l, AtomicLongCounter counters) {
+ batch.set(randomString(), randomString());
+ }
+
+ @Benchmark @Group("no_contention_unbatched") @GroupThreads(1)
+ public void no_contention_unbatched_set(Lagging l) {
+ try (Jedis jedis = unbatch.getResource()) {
+ jedis.set(randomString(), randomString());
+ }
+ }
+
+ @Benchmark @Group("mild_contention_batched") @GroupThreads(2)
+ public void mild_contention_batched_set(Lagging l, AtomicLongCounter counters) {
+ batch.set(randomString(), randomString());
+ }
+
+ @Benchmark @Group("mild_contention_unbatched") @GroupThreads(2)
+ public void mild_contention_unbatched_set(Lagging l) {
+ try (Jedis jedis = unbatch.getResource()) {
+ jedis.set(randomString(), randomString());
+ }
+ }
+
+ @Benchmark @Group("high_contention_batched") @GroupThreads(8)
+ public void high_contention_batched_set(Lagging l, AtomicLongCounter counters) {
+ batch.set(randomString(), randomString());
+ }
+
+ @Benchmark @Group("high_contention_unbatched") @GroupThreads(8)
+ public void high_contention_unbatched_set(Lagging l) {
+ try (Jedis jedis = unbatch.getResource()) {
+ jedis.set(randomString(), randomString());
+ }
+ }
+}
diff --git a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java
index 7158e5d..f414328 100644
--- a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java
+++ b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AbstractTimeUsedComparison.java
@@ -21,7 +21,7 @@ static String propertyOr(String key, String fallback) {
protected void run() throws Exception {
BatchJdbcTemplate batch;
JdbcTemplate unbatch;
- SenderProxy proxy;
+ SenderProxy proxy;
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
@@ -42,7 +42,7 @@ protected void run() throws Exception {
CountDownLatch countDown = new CountDownLatch(1);
- proxy = new SenderProxy(new JdbcTemplateSender(delegate));
+ proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
proxy.onMessages(updated -> {
if (counter.addAndGet(updated.size()) == 5050) {
countDown.countDown();
diff --git a/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AsyncBatchJedisBenchmark.java b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AsyncBatchJedisBenchmark.java
new file mode 100644
index 0000000..5ad8fc8
--- /dev/null
+++ b/benchmark/src/main/java/io/github/tramchamploo/bufferslayer/AsyncBatchJedisBenchmark.java
@@ -0,0 +1,38 @@
+package io.github.tramchamploo.bufferslayer;
+
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@Measurement(iterations = 5, time = 1)
+@Warmup(iterations = 3, time = 1)
+@Fork(3)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@State(Scope.Group)
+public class AsyncBatchJedisBenchmark extends AbstractBatchJedisBenchmark {
+
+ protected Reporter reporter(Sender sender) {
+ return AsyncReporter.builder(sender)
+ .pendingKeepalive(1, TimeUnit.SECONDS)
+ .build();
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + AsyncBatchJedisBenchmark.class.getSimpleName() + ".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+}
diff --git a/boundedqueue/src/test/java/io/github/tramchamploo/bufferslayer/AsyncReporterTest.java b/boundedqueue/src/test/java/io/github/tramchamploo/bufferslayer/AsyncReporterTest.java
index d6410ed..a385671 100644
--- a/boundedqueue/src/test/java/io/github/tramchamploo/bufferslayer/AsyncReporterTest.java
+++ b/boundedqueue/src/test/java/io/github/tramchamploo/bufferslayer/AsyncReporterTest.java
@@ -70,6 +70,8 @@ public void flushIfExceedMaxSize() throws InterruptedException {
assertTrue(countDown.await(200, TimeUnit.MILLISECONDS));
assertEquals(50, sender.sent.size());
+ // wait for the queue to be released
+ Thread.sleep(100);
// make sure the queue is released
assertEquals(0, reporter.synchronizer.queue.size());
diff --git a/circle.yml b/circle.yml
new file mode 100644
index 0000000..662724d
--- /dev/null
+++ b/circle.yml
@@ -0,0 +1,19 @@
+machine:
+ java:
+ version: oraclejdk8
+ services:
+ - redis
+
+dependencies:
+ pre:
+ - openssl aes-256-cbc -d -in .buildscript/secret-env-cipher -k $KEY >> ~/.circlerc
+ override:
+ - mvn --fail-never dependency:go-offline || true
+ cache_directories:
+ - "~/.m2"
+
+test:
+ post:
+ - mkdir -p $CIRCLE_TEST_REPORTS/junit/
+ - find . -type f -regex ".*/target/surefire-reports/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
+ - .buildscript/release.sh
\ No newline at end of file
diff --git a/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/CompositeFuture.java b/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/CompositeFuture.java
index 7940f63..4bafb5b 100644
--- a/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/CompositeFuture.java
+++ b/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/CompositeFuture.java
@@ -14,7 +14,7 @@ public abstract class CompositeFuture extends AbstractFuture {
* When the list is empty, the returned future will be already completed.
*/
public static CompositeFuture all(List extends Future>> futures) {
- return DefaultCompositeFuture.all(futures.toArray(new Future[futures.size()]));
+ return DefaultCompositeFuture.all(futures.toArray(new Future[0]));
}
/**
diff --git a/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/Promises.java b/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/Promises.java
index ca9004b..92058d3 100644
--- a/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/Promises.java
+++ b/core/src/main/java/io/github/tramchamploo/bufferslayer/internal/Promises.java
@@ -6,6 +6,7 @@
import io.github.tramchamploo.bufferslayer.Message;
import io.github.tramchamploo.bufferslayer.MessageDroppedException;
import io.github.tramchamploo.bufferslayer.OverflowStrategy.Strategy;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -19,7 +20,12 @@ public static void allSuccess(List result, List> promis
for (int i = 0; i < result.size(); i++) {
MessagePromise promise = promises.get(i);
R ret = result.get(i);
- promise.setSuccess(ret);
+ if (ret instanceof Throwable) {
+ promise.setFailure(MessageDroppedException.dropped((Throwable) ret,
+ Collections.singletonList(promise.message())));
+ } else {
+ promise.setSuccess(ret);
+ }
}
}
diff --git a/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java b/core/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java
similarity index 53%
rename from jdbc/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java
rename to core/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java
index 3d38d58..ac55876 100644
--- a/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java
+++ b/core/src/test/java/io/github/tramchamploo/bufferslayer/SenderProxy.java
@@ -7,13 +7,13 @@
/**
* Delegate sending and trigger onMessages afterwards
*/
-public class SenderProxy implements Sender {
+public class SenderProxy implements Sender {
- private AtomicBoolean closed = new AtomicBoolean(false);
- private Consumer> onMessages = messages -> { };
- final Sender delegate;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private Consumer> onMessages = messages -> { };
+ private final Sender delegate;
- public SenderProxy(Sender delegate) {
+ SenderProxy(Sender delegate) {
this.delegate = delegate;
}
@@ -28,16 +28,16 @@ public void close() {
}
@Override
- public List send(List messages) {
+ public List send(List messages) {
if (closed.get()) {
throw new IllegalStateException("Closed!");
}
- List sent = delegate.send(messages);
- onMessages.accept(sent);
+ List sent = delegate.send(messages);
+ onMessages.accept(messages);
return sent;
}
- public void onMessages(Consumer> onMessages) {
+ public void onMessages(Consumer> onMessages) {
this.onMessages = onMessages;
}
}
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 18b52b0..b53c11b 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -15,6 +15,13 @@
bufferslayer-core
+
+ ${project.groupId}
+ bufferslayer-core
+ test-jar
+ test
+
+
${project.groupId}
bufferslayer-boundedqueue
diff --git a/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/BatchJdbcTemplateTest.java b/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/BatchJdbcTemplateTest.java
index 6e52cf1..04d9371 100644
--- a/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/BatchJdbcTemplateTest.java
+++ b/jdbc/src/test/java/io/github/tramchamploo/bufferslayer/BatchJdbcTemplateTest.java
@@ -129,7 +129,7 @@ public void singleKey() {
@Test
public void updateFailed() throws InterruptedException {
- SenderProxy sender = new SenderProxy(new JdbcTemplateSender(underlying));
+ SenderProxy sender = new SenderProxy<>(new JdbcTemplateSender(underlying));
RuntimeException ex = new RuntimeException();
sender.onMessages(messages -> { throw ex; });
reporter = AsyncReporter.builder(sender).messageTimeout(10, TimeUnit.MILLISECONDS).build();
diff --git a/jedis/pom.xml b/jedis/pom.xml
new file mode 100644
index 0000000..78d1c88
--- /dev/null
+++ b/jedis/pom.xml
@@ -0,0 +1,38 @@
+
+
+
+ bufferslayer-parent
+ io.github.tramchamploo
+ 2.0.5-SNAPSHOT
+
+ 4.0.0
+
+ bufferslayer-jedis
+
+
+
+ ${project.groupId}
+ bufferslayer-core
+
+
+
+ ${project.groupId}
+ bufferslayer-boundedqueue
+
+
+
+ redis.clients
+ jedis
+ 2.9.0
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
\ No newline at end of file
diff --git a/jedis/src/main/java/io/github/tramchamploo/bufferslayer/BatchJedis.java b/jedis/src/main/java/io/github/tramchamploo/bufferslayer/BatchJedis.java
new file mode 100644
index 0000000..17ea699
--- /dev/null
+++ b/jedis/src/main/java/io/github/tramchamploo/bufferslayer/BatchJedis.java
@@ -0,0 +1,2177 @@
+package io.github.tramchamploo.bufferslayer;
+
+import static io.github.tramchamploo.bufferslayer.ResponseUtil.transformResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.Blpop;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.Brpop;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.Del;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.Exists;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.MGet;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.MSet;
+import io.github.tramchamploo.bufferslayer.MultiKeyRedisCommand.MSetNX;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Append;
+import io.github.tramchamploo.bufferslayer.RedisCommand.BitPos;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Decr;
+import io.github.tramchamploo.bufferslayer.RedisCommand.DecrBy;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Echo;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Expire;
+import io.github.tramchamploo.bufferslayer.RedisCommand.ExpireAt;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Get;
+import io.github.tramchamploo.bufferslayer.RedisCommand.GetBit;
+import io.github.tramchamploo.bufferslayer.RedisCommand.GetRange;
+import io.github.tramchamploo.bufferslayer.RedisCommand.GetSet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HDel;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HExists;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HGet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HGetAll;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HIncrBy;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HKeys;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HLen;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HMGet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HMSet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HSet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HSetNX;
+import io.github.tramchamploo.bufferslayer.RedisCommand.HVals;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Incr;
+import io.github.tramchamploo.bufferslayer.RedisCommand.IncrBy;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LIndex;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LInsert;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LLen;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LPop;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LPush;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LPushX;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LRange;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LRem;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LSet;
+import io.github.tramchamploo.bufferslayer.RedisCommand.LTrim;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Move;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Persist;
+import io.github.tramchamploo.bufferslayer.RedisCommand.RPop;
+import io.github.tramchamploo.bufferslayer.RedisCommand.RPopLpush;
+import io.github.tramchamploo.bufferslayer.RedisCommand.RPush;
+import io.github.tramchamploo.bufferslayer.RedisCommand.RPushX;
+import io.github.tramchamploo.bufferslayer.RedisCommand.Rename;
+import io.github.tramchamploo.bufferslayer.RedisCommand.RenameNX;
+import io.github.tramchamploo.bufferslayer.internal.MessageFuture;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import redis.clients.jedis.BinaryClient.LIST_POSITION;
+import redis.clients.jedis.BinaryJedisPubSub;
+import redis.clients.jedis.BitOP;
+import redis.clients.jedis.Client;
+import redis.clients.jedis.DebugParams;
+import redis.clients.jedis.GeoCoordinate;
+import redis.clients.jedis.GeoRadiusResponse;
+import redis.clients.jedis.GeoUnit;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster.Reset;
+import redis.clients.jedis.JedisMonitor;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.PipelineBlock;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+import redis.clients.jedis.SortingParams;
+import redis.clients.jedis.Transaction;
+import redis.clients.jedis.TransactionBlock;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+import redis.clients.jedis.params.geo.GeoRadiusParam;
+import redis.clients.jedis.params.sortedset.ZAddParams;
+import redis.clients.jedis.params.sortedset.ZIncrByParams;
+import redis.clients.util.Pool;
+import redis.clients.util.SafeEncoder;
+import redis.clients.util.Slowlog;
+
+/**
+ * {@link Jedis} that execute commands in pipeline
+ */
+@SuppressWarnings("unchecked")
+public class BatchJedis {
+
+ private final JedisPool jedisPool;
+ private final Reporter reporter;
+
+ @VisibleForTesting
+ BatchJedis(JedisPool jedisPool, Reporter reporter) {
+ this.jedisPool = jedisPool;
+ this.reporter = reporter;
+ }
+
+ public BatchJedis(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
+ this.reporter = AsyncReporter.builder(new JedisSender(jedisPool)).build();
+ }
+
+ public MessageFuture set(String key, String value) {
+ return (MessageFuture) reporter.report(
+ new RedisCommand.Set(SafeEncoder.encode(key), SafeEncoder.encode(value), null, null, 0L));
+ }
+
+ public MessageFuture set(String key, String value, String nxxx, String expx, long time) {
+ return (MessageFuture) reporter.report(
+ new RedisCommand.Set(SafeEncoder.encode(key), SafeEncoder.encode(value), SafeEncoder.encode(nxxx), SafeEncoder.encode(expx), time));
+ }
+
+ public MessageFuture get(String key) {
+ return transformResponse(
+ reporter.report(new Get(SafeEncoder.encode(key))), BuilderFactory.STRING);
+ }
+
+ public MessageFuture exists(String... keys) {
+ return (MessageFuture) reporter.report(new Exists(SafeEncoder.encodeMany(keys)));
+ }
+
+ public MessageFuture exists(String key) {
+ return transformResponse(
+ reporter.report(new Exists(SafeEncoder.encode(key))), BuilderFactory.BOOLEAN);
+ }
+
+ public MessageFuture del(String... keys) {
+ return (MessageFuture) reporter.report(new Del(SafeEncoder.encodeMany(keys)));
+ }
+
+ public MessageFuture del(String key) {
+ return (MessageFuture) reporter.report(new Del(SafeEncoder.encode(key)));
+ }
+
+ public String type(String key) {
+ return null; // TODO
+ }
+
+ public Set keys(String pattern) {
+ return null; // TODO
+ }
+
+ public String randomKey() {
+ return null; // TODO
+ }
+
+ public MessageFuture rename(String oldkey, String newkey) {
+ return (MessageFuture) reporter.report(
+ new Rename(SafeEncoder.encode(oldkey), SafeEncoder.encode(newkey)));
+ }
+
+ public MessageFuture renamenx(String oldkey, String newkey) {
+ return (MessageFuture) reporter.report(
+ new RenameNX(SafeEncoder.encode(oldkey), SafeEncoder.encode(newkey)));
+ }
+
+ public MessageFuture expire(String key, int seconds) {
+ return (MessageFuture) reporter.report(new Expire(SafeEncoder.encode(key), seconds));
+ }
+
+ public MessageFuture expireAt(String key, long unixTime) {
+ return (MessageFuture) reporter.report(new ExpireAt(SafeEncoder.encode(key), unixTime));
+ }
+
+ public Long ttl(String key) {
+ return null; // TODO
+ }
+
+ public MessageFuture move(String key, int dbIndex) {
+ return (MessageFuture) reporter.report(new Move(SafeEncoder.encode(key), dbIndex));
+ }
+
+ public MessageFuture getSet(String key, String value) {
+ return transformResponse(reporter.report(
+ new GetSet(SafeEncoder.encode(key), SafeEncoder.encode(value))), BuilderFactory.STRING);
+ }
+
+ public MessageFuture> mget(String... keys) {
+ return transformResponse(reporter.report(
+ new MGet(SafeEncoder.encodeMany(keys))), BuilderFactory.STRING_LIST);
+ }
+
+ public Long setnx(String key, String value) {
+ return null; // TODO
+ }
+
+ public String setex(String key, int seconds, String value) {
+ return null; // TODO
+ }
+
+ public MessageFuture mset(String... keysvalues) {
+ return (MessageFuture) reporter.report(new MSet(SafeEncoder.encodeMany(keysvalues)));
+ }
+
+ public MessageFuture msetnx(String... keysvalues) {
+ return (MessageFuture) reporter.report(new MSetNX(SafeEncoder.encodeMany(keysvalues)));
+ }
+
+ public MessageFuture decrBy(String key, long integer) {
+ return (MessageFuture) reporter.report(new DecrBy(SafeEncoder.encode(key), integer));
+ }
+
+ public MessageFuture decr(String key) {
+ return (MessageFuture) reporter.report(new Decr(SafeEncoder.encode(key)));
+ }
+
+ public MessageFuture incrBy(String key, long integer) {
+ return (MessageFuture) reporter.report(
+ new IncrBy(SafeEncoder.encode(key), integer));
+ }
+
+ public Double incrByFloat(String key, double value) {
+ return null; // TODO
+ }
+
+ public MessageFuture incr(String key) {
+ return (MessageFuture) reporter.report(new Incr(SafeEncoder.encode(key)));
+ }
+
+ public MessageFuture append(String key, String value) {
+ return (MessageFuture) reporter.report(
+ new Append(SafeEncoder.encode(key), SafeEncoder.encode(value)));
+ }
+
+ public String substr(String key, int start, int end) {
+ return null; // TODO
+ }
+
+ public MessageFuture hset(String key, String field, String value) {
+ return hset(SafeEncoder.encode(key), SafeEncoder.encode(field), SafeEncoder.encode(value));
+ }
+
+ public MessageFuture hget(String key, String field) {
+ return transformResponse(reporter.report(
+ new HGet(SafeEncoder.encode(key), SafeEncoder.encode(field))), BuilderFactory.STRING);
+ }
+
+ public MessageFuture hsetnx(String key, String field, String value) {
+ return hsetnx(SafeEncoder.encode(key), SafeEncoder.encode(field), SafeEncoder.encode(value));
+ }
+
+ public MessageFuture hmset(String key, Map hash) {
+ final Map bhash = new HashMap<>(hash.size());
+ for (final Entry entry : hash.entrySet()) {
+ bhash.put(SafeEncoder.encode(entry.getKey()), SafeEncoder.encode(entry.getValue()));
+ }
+ return hmset(SafeEncoder.encode(key), bhash);
+ }
+
+ public MessageFuture> hmget(String key, String... fields) {
+ return transformResponse(reporter.report(
+ new HMGet(SafeEncoder.encode(key), SafeEncoder.encodeMany(fields))), BuilderFactory.STRING_LIST);
+ }
+
+ public MessageFuture hincrBy(String key, String field, long value) {
+ return (MessageFuture) reporter.report(
+ new HIncrBy(SafeEncoder.encode(key), SafeEncoder.encode(field), value));
+ }
+
+ public Double hincrByFloat(String key, String field, double value) {
+ return null; // TODO
+ }
+
+ public MessageFuture hexists(String key, String field) {
+ return (MessageFuture) reporter.report(
+ new HExists(SafeEncoder.encode(key), SafeEncoder.encode(field)));
+ }
+
+ public MessageFuture hdel(String key, String... fields) {
+ return (MessageFuture) reporter.report(
+ new HDel(SafeEncoder.encode(key), SafeEncoder.encodeMany(fields)));
+ }
+
+ public MessageFuture hlen(String key) {
+ return (MessageFuture) reporter.report(new HLen(SafeEncoder.encode(key)));
+ }
+
+ public MessageFuture> hkeys(String key) {
+ return transformResponse(
+ reporter.report(new HKeys(SafeEncoder.encode(key))), BuilderFactory.STRING_SET);
+ }
+
+ public MessageFuture> hvals(String key) {
+ return transformResponse(reporter.report(new HVals(SafeEncoder.encode(key))),
+ BuilderFactory.STRING_LIST);
+ }
+
+ public MessageFuture