Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding support for jedis
Browse files Browse the repository at this point in the history
tramchamploo committed Feb 9, 2018
1 parent 27ccea2 commit ac7f66a
Showing 10 changed files with 2,511 additions and 1 deletion.
2 changes: 2 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
machine:
java:
version: oraclejdk8
services:
- redis

dependencies:
pre:
38 changes: 38 additions & 0 deletions jedis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bufferslayer-parent</artifactId>
<groupId>io.github.tramchamploo</groupId>
<version>1.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>buffer-jedis</artifactId>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-boundedqueue</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
2,098 changes: 2,098 additions & 0 deletions jedis/src/main/java/io/github/tramchamploo/bufferslayer/BatchJedis.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.tramchamploo.bufferslayer;

/**
* Builder that build response to type T
* @param <T> the result type
*/
abstract class Builder<T> {

abstract T build(Object data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.github.tramchamploo.bufferslayer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import redis.clients.util.SafeEncoder;

/**
* Factory for {@link Builder} that builds result
*/
final class BuilderFactory {

@SuppressWarnings("unchecked")
static final Builder<List<byte[]>> BYTE_ARRAY_LIST = new Builder<List<byte[]>>() {
@Override
List<byte[]> build(Object data) {
if (null == data) {
return Collections.emptyList();
}
List<String> l = (List<String>) data;
final ArrayList<byte[]> result = new ArrayList<>(l.size());
for (final String s: l) {
if (s == null) {
result.add(null);
} else {
result.add(SafeEncoder.encode(s));
}
}
return result;
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.tramchamploo.bufferslayer;

import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

/**
* Send buffered commands in delegate's pipeline
*/
final class JedisSender implements Sender<RedisCommand, Object> {

private final Jedis delegate;

JedisSender(Jedis delegate) {
this.delegate = delegate;
}

@Override
public CheckResult check() {
try {
String ping = delegate.ping();
if ("PONG".equalsIgnoreCase(ping)) {
return CheckResult.OK;
}
return CheckResult.failed(new RuntimeException("PING doesn't get PONG."));
} catch (Exception e) {
return CheckResult.failed(e);
}
}

@Override
public void close() {
delegate.close();
}

@Override
public List<Object> send(List<RedisCommand> messages) {
Pipeline pipeline = delegate.pipelined();

for (RedisCommand command: messages) {
command.apply(pipeline);
}
return pipeline.syncAndReturnAll();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.github.tramchamploo.bufferslayer;

import redis.clients.jedis.Pipeline;

/**
* A redis command to be executed
*/
abstract class RedisCommand extends Message {

final byte[] key;

RedisCommand(byte[] key) {
this.key = key;
}

/**
* This implies how redis pipeline execute this command.
*
* @param pipeline pipeline to behave on.
*/
protected abstract void apply(Pipeline pipeline);

@Override
public MessageKey asMessageKey() {
return Message.SINGLE_KEY;
}

abstract static class MultiKeyCommand extends RedisCommand {

final byte[][] keys;

MultiKeyCommand(byte[][] keys) {
super(keys[0]);
this.keys = keys;
}

String keysString() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < keys.length; i++) {
builder.append(new String(keys[i]));
}
return builder.toString();
}
}

// Commands start

final static class Append extends RedisCommand {

final byte[] value;

Append(byte[] key, byte[] value) {
super(key);
this.value = value;
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.append(key, value);
}

@Override
public String toString() {
return "Append(" + new String(key) + ", " + new String(value) + ")";
}
}

final static class Blpop extends MultiKeyCommand {

final int timeout;

Blpop(int timeout, byte[]... keys) {
super(keys);
this.timeout = timeout;
}

@Override
protected void apply(Pipeline pipeline) {
if (timeout == 0) {
pipeline.blpop(keys);
} else {
pipeline.blpop(timeout, keys);
}
}

@Override
public String toString() {
return "Blpop(" + keysString() + " " + timeout + ")";
}
}

final static class Brpop extends MultiKeyCommand {

final int timeout;

Brpop(int timeout, byte[]... keys) {
super(keys);
this.timeout = timeout;
}

@Override
protected void apply(Pipeline pipeline) {
if (timeout == 0) {
pipeline.brpop(keys);
} else {
pipeline.brpop(timeout, keys);
}
}

@Override
public String toString() {
return "Brpop(" + keysString() + " " + timeout + ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.tramchamploo.bufferslayer;

import io.github.tramchamploo.bufferslayer.internal.DefaultMessagePromise;
import io.github.tramchamploo.bufferslayer.internal.Future;
import io.github.tramchamploo.bufferslayer.internal.FutureListener;
import io.github.tramchamploo.bufferslayer.internal.MessageFuture;

final class ResponseUtil {

@SuppressWarnings("unchecked")
static <T> MessageFuture<T> transformResponse(MessageFuture<?> future, final Builder<T> builder) {
final DefaultMessagePromise<T> promise = new DefaultMessagePromise<>(future.message());
future.addListener(new FutureListener<Object>() {

@Override
public void operationComplete(Future<Object> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(builder.build(future.get()));
} else {
promise.setFailure(future.cause());
}
}
});

return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io.github.tramchamploo.bufferslayer;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.github.tramchamploo.bufferslayer.internal.MessageFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.util.SafeEncoder;

@SuppressWarnings("unchecked")
public class BatchJedisTest {

final byte[] bfoo = {0x01, 0x02, 0x03, 0x04};
final byte[] bbar = {0x05, 0x06, 0x07, 0x08};

@Rule
public ExpectedException thrown = ExpectedException.none();

private Jedis jedis;
private BatchJedis batchJedis;
private AsyncReporter reporter;

@Before
public void setup() {
jedis = new Jedis("localhost", 6379);
jedis.flushAll();

reporter = AsyncReporter.builder(new JedisSender(jedis))
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build();
batchJedis = new BatchJedis(jedis, reporter);
}

@Test
public void pipelined() {
Jedis delegate = mock(Jedis.class);
reporter = AsyncReporter.builder(new JedisSender(delegate))
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build();

batchJedis = new BatchJedis(delegate, reporter);

Pipeline pipeline = mock(Pipeline.class);
when(delegate.pipelined()).thenReturn(pipeline);

batchJedis.append("foo", "bar");
batchJedis.append("foo", "bar");
reporter.flush();

verify(delegate).pipelined();
verify(pipeline, times(2))
.append(SafeEncoder.encode("foo"), SafeEncoder.encode("bar"));
verify(pipeline).syncAndReturnAll();
}

@Test
public void append() {
long value = blocking(batchJedis.append("foo", "bar"));
assertEquals(3L, value);
assertEquals("bar", jedis.get("foo"));

value = blocking(batchJedis.append("foo", "bar"));
assertEquals(6L, value);
assertEquals("barbar", jedis.get("foo"));
}

@Test
public void blpop() {
List<String> value = blocking(batchJedis.blpop(1, "foo"));
assertNull(value);

jedis.lpush("foo", "bar");
value = blocking(batchJedis.blpop(1, "foo"));
assertEquals(2, value.size());
assertEquals("foo", value.get(0));
assertEquals("bar", value.get(1));

// Binary
jedis.lpush(bfoo, bbar);
List<byte[]> value2 = blocking(batchJedis.blpop(1, bfoo));
assertEquals(2, value2.size());
assertArrayEquals(bfoo, value2.get(0));
assertArrayEquals(bbar, value2.get(1));
}

@Test
public void brpop() {
List<String> value = blocking(batchJedis.brpop(1, "foo"));
assertNull(value);

jedis.lpush("foo", "bar");
value = blocking(batchJedis.brpop(1, "foo"));
assertEquals(2, value.size());
assertEquals("foo", value.get(0));
assertEquals("bar", value.get(1));

// Binary
jedis.lpush(bfoo, bbar);
List<byte[]> value2 = blocking(batchJedis.brpop(1, bfoo));
assertEquals(2, value2.size());
assertArrayEquals(bfoo, value2.get(0));
assertArrayEquals(bbar, value2.get(1));
}

private <T> T blocking(MessageFuture<?> future) {
reporter.flush();

try {
return (T) future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
19 changes: 18 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
<module>jdbc</module>
<module>benchmark</module>
<module>rx</module>
<module>jedis</module>
</modules>

<properties>
@@ -168,6 +169,12 @@
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.7.22</version>
</dependency>
</dependencies>
</dependencyManagement>

@@ -237,6 +244,16 @@
</pluginManagement>

<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<testSource>${maven.compiler.testSource}</testSource>
<testTarget>${maven.compiler.testTarget}</testTarget>
</configuration>
</plugin>

<plugin>
<artifactId>maven-release-plugin</artifactId>
<version>${maven-release-plugin.version}</version>
@@ -331,4 +348,4 @@
</properties>
</profile>
</profiles>
</project>
</project>

0 comments on commit ac7f66a

Please sign in to comment.