Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding support for jedis
Browse files Browse the repository at this point in the history
tramchamploo committed Mar 8, 2018
1 parent c13f439 commit f17853d
Showing 12 changed files with 2,683 additions and 1 deletion.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ jobs:

docker:
- image: circleci/openjdk:8-jdk-browsers
- image: circleci/redis:latest

steps:
- checkout
Original file line number Diff line number Diff line change
@@ -67,6 +67,8 @@ public void flushIfExceedMaxSize() throws InterruptedException {
assertTrue(countDown.await(300, TimeUnit.MILLISECONDS));
assertEquals(50, sender.sent.size());

// wait for the queue to be released
Thread.sleep(100);
// make sure the queue is released
assertEquals(0, reporter.synchronizer.deque.size());
assertEquals(0, reporter.synchronizer.keyToReady.size());
19 changes: 19 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
machine:
java:
version: oraclejdk8
services:
- redis

dependencies:
pre:
- openssl aes-256-cbc -d -in .buildscript/secret-env-cipher -k $KEY >> ~/.circlerc
override:
- mvn --fail-never dependency:go-offline || true
cache_directories:
- "~/.m2"

test:
post:
- mkdir -p $CIRCLE_TEST_REPORTS/junit/
- find . -type f -regex ".*/target/surefire-reports/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
- .buildscript/release.sh
38 changes: 38 additions & 0 deletions jedis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bufferslayer-parent</artifactId>
<groupId>io.github.tramchamploo</groupId>
<version>1.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>buffer-jedis</artifactId>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-boundedqueue</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
2,091 changes: 2,091 additions & 0 deletions jedis/src/main/java/io/github/tramchamploo/bufferslayer/BatchJedis.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.tramchamploo.bufferslayer;

/**
* Builder that build response to type T
* @param <T> the result type
*/
abstract class Builder<T> {

abstract T build(Object data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.tramchamploo.bufferslayer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import redis.clients.util.SafeEncoder;

/**
* Factory for {@link Builder} that builds result
*/
final class BuilderFactory {

@SuppressWarnings("unchecked")
static final Builder<List<byte[]>> BYTE_ARRAY_LIST = new Builder<List<byte[]>>() {
@Override
List<byte[]> build(Object data) {
if (null == data) {
return Collections.emptyList();
}
List<String> l = (List<String>) data;
final ArrayList<byte[]> result = new ArrayList<>(l.size());
for (final String s: l) {
if (s == null) {
result.add(null);
} else {
result.add(SafeEncoder.encode(s));
}
}
return result;
}
};

static final Builder<String> STRING = new Builder<String>() {
public String build(Object data) {
return data == null ? null : SafeEncoder.encode((byte[]) data);
}
};

public static final Builder<Boolean> BOOLEAN = new Builder<Boolean>() {
public Boolean build(Object data) {
return ((Long) data) == 1;
}
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.tramchamploo.bufferslayer;

import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

/**
* Send buffered commands in delegate's pipeline
*/
final class JedisSender implements Sender<RedisCommand, Object> {

private final Jedis delegate;

JedisSender(Jedis delegate) {
this.delegate = delegate;
}

@Override
public CheckResult check() {
try {
String ping = delegate.ping();
if ("PONG".equalsIgnoreCase(ping)) {
return CheckResult.OK;
}
return CheckResult.failed(new RuntimeException("PING doesn't get PONG."));
} catch (Exception e) {
return CheckResult.failed(e);
}
}

@Override
public void close() {
delegate.close();
}

@Override
public List<Object> send(List<RedisCommand> messages) {
Pipeline pipeline = delegate.pipelined();

for (RedisCommand command: messages) {
command.apply(pipeline);
}
return pipeline.syncAndReturnAll();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package io.github.tramchamploo.bufferslayer;

import redis.clients.jedis.Pipeline;

/**
* A redis command to be executed
*/
abstract class RedisCommand extends Message {

final byte[] key;

RedisCommand(byte[] key) {
this.key = key;
}

/**
* This implies how redis pipeline execute this command.
*
* @param pipeline pipeline to behave on.
*/
protected abstract void apply(Pipeline pipeline);

@Override
public MessageKey asMessageKey() {
return Message.SINGLE_KEY;
}

String keysString() {
return new String(key);
}

abstract static class MultiKeyCommand extends RedisCommand {

final byte[][] keys;

MultiKeyCommand(byte[][] keys) {
super(keys[0]);
this.keys = keys;
}

String keysString() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < keys.length; i++) {
builder.append(new String(keys[i]));
}
return builder.toString();
}
}

// Commands start

final static class Append extends RedisCommand {

final byte[] value;

Append(byte[] key, byte[] value) {
super(key);
this.value = value;
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.append(key, value);
}

@Override
public String toString() {
return "Append(" + new String(key) + ", " + new String(value) + ")";
}
}

final static class Blpop extends MultiKeyCommand {

final int timeout;

Blpop(int timeout, byte[]... keys) {
super(keys);
this.timeout = timeout;
}

@Override
protected void apply(Pipeline pipeline) {
if (timeout == 0) {
pipeline.blpop(keys);
} else {
pipeline.blpop(timeout, keys);
}
}

@Override
public String toString() {
return "Blpop(" + keysString() + ", " + timeout + ")";
}
}

final static class Brpop extends MultiKeyCommand {

final int timeout;

Brpop(int timeout, byte[]... keys) {
super(keys);
this.timeout = timeout;
}

@Override
protected void apply(Pipeline pipeline) {
if (timeout == 0) {
pipeline.brpop(keys);
} else {
pipeline.brpop(timeout, keys);
}
}

@Override
public String toString() {
return "Brpop(" + keysString() + ", " + timeout + ")";
}
}

final static class Decr extends RedisCommand {

Decr(byte[] key) {
super(key);
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.decr(key);
}

@Override
public String toString() {
return "Decr(" + keysString() + ")";
}
}

final static class DecrBy extends RedisCommand {

final long value;

DecrBy(byte[] key, long value) {
super(key);
this.value = value;
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.decrBy(key, value);
}

@Override
public String toString() {
return "DecrBy(" + keysString() + ", " + + value + ")";
}
}

final static class Del extends MultiKeyCommand {

Del(byte[]... keys) {
super(keys);
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.del(keys);
}

@Override
public String toString() {
return "Del(" + keysString() + ")";
}
}

final static class Echo extends RedisCommand {

Echo(byte[] key) {
super(key);
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.echo(key);
}

@Override
public String toString() {
return "Echo(" + keysString() + ")";
}
}

final static class Exists extends MultiKeyCommand {

Exists(byte[]... keys) {
super(keys);
}

@Override
protected void apply(Pipeline pipeline) {
pipeline.exists(keys);
}

@Override
public String toString() {
return "Exists(" + keysString() + ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.tramchamploo.bufferslayer;

import io.github.tramchamploo.bufferslayer.internal.DefaultMessagePromise;
import io.github.tramchamploo.bufferslayer.internal.Future;
import io.github.tramchamploo.bufferslayer.internal.FutureListener;
import io.github.tramchamploo.bufferslayer.internal.MessageFuture;

final class ResponseUtil {

@SuppressWarnings("unchecked")
static <T> MessageFuture<T> transformResponse(MessageFuture<?> future, final Builder<T> builder) {
final DefaultMessagePromise<T> promise = new DefaultMessagePromise<>(future.message());
future.addListener(new FutureListener<Object>() {

@Override
public void operationComplete(Future<Object> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(builder.build(future.get()));
} else {
promise.setFailure(future.cause());
}
}
});

return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package io.github.tramchamploo.bufferslayer;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.github.tramchamploo.bufferslayer.internal.MessageFuture;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.util.SafeEncoder;

@SuppressWarnings("unchecked")
public class BatchJedisTest {

final byte[] bfoo = {0x01, 0x02, 0x03, 0x04};
final byte[] bbar = {0x05, 0x06, 0x07, 0x08};

@Rule
public ExpectedException thrown = ExpectedException.none();

private Jedis jedis;
private BatchJedis batchJedis;
private AsyncReporter reporter;

@Before
public void setup() {
jedis = new Jedis("localhost", 6379);
jedis.flushAll();

reporter = AsyncReporter.builder(new JedisSender(jedis))
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build();
batchJedis = new BatchJedis(jedis, reporter);
}

@Test
public void pipelined() {
Jedis delegate = mock(Jedis.class);
reporter = AsyncReporter.builder(new JedisSender(delegate))
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build();

batchJedis = new BatchJedis(delegate, reporter);

Pipeline pipeline = mock(Pipeline.class);
when(delegate.pipelined()).thenReturn(pipeline);

batchJedis.append("foo", "bar");
batchJedis.append("foo", "bar");
reporter.flush();

verify(delegate).pipelined();
verify(pipeline, times(2))
.append(SafeEncoder.encode("foo"), SafeEncoder.encode("bar"));
verify(pipeline).syncAndReturnAll();
}

@Test
public void append() {
long value = blocking(batchJedis.append("foo", "bar"));
assertEquals(3L, value);
assertEquals("bar", jedis.get("foo"));

value = blocking(batchJedis.append("foo", "bar"));
assertEquals(6L, value);
assertEquals("barbar", jedis.get("foo"));
}

@Test
public void blpop() {
List<String> value = blocking(batchJedis.blpop(1, "foo"));
assertNull(value);

jedis.lpush("foo", "bar");
value = blocking(batchJedis.blpop(1, "foo"));
assertEquals(2, value.size());
assertEquals("foo", value.get(0));
assertEquals("bar", value.get(1));

// Binary
jedis.lpush(bfoo, bbar);
List<byte[]> value2 = blocking(batchJedis.blpop(1, bfoo));
assertEquals(2, value2.size());
assertArrayEquals(bfoo, value2.get(0));
assertArrayEquals(bbar, value2.get(1));
}

@Test
public void brpop() {
List<String> value = blocking(batchJedis.brpop(1, "foo"));
assertNull(value);

jedis.lpush("foo", "bar");
value = blocking(batchJedis.brpop(1, "foo"));
assertEquals(2, value.size());
assertEquals("foo", value.get(0));
assertEquals("bar", value.get(1));

// Binary
jedis.lpush(bfoo, bbar);
List<byte[]> value2 = blocking(batchJedis.brpop(1, bfoo));
assertEquals(2, value2.size());
assertArrayEquals(bfoo, value2.get(0));
assertArrayEquals(bbar, value2.get(1));
}

@Test
public void decr() {
assertEquals(new Long(1L), jedis.incr("foo"));
assertEquals(new Long(0L), blocking(batchJedis.decr("foo")));

// Binary
assertEquals(new Long(1L), jedis.incr(bfoo));
assertEquals(new Long(0L), blocking(batchJedis.decr(bfoo)));
}

@Test
public void decrBy() {
assertEquals(new Long(2L), jedis.incrBy("foo", 2));
assertEquals(new Long(0L), blocking(batchJedis.decrBy("foo", 2)));

// Binary
assertEquals(new Long(2L), jedis.incrBy(bfoo, 2));
assertEquals(new Long(0L), blocking(batchJedis.decrBy(bfoo, 2)));
}

@Test
public void del() {
assertEquals("OK", jedis.set("foo", "bar"));
assertEquals(new Long(1L), blocking(batchJedis.del("foo")));

// Binary
assertEquals("OK", jedis.set(bfoo, bbar));
assertEquals(new Long(1L), blocking(batchJedis.del(bfoo)));
}

@Test
public void echo() {
assertEquals("foo", blocking(batchJedis.echo("foo")));

// Binary
assertArrayEquals(bfoo, blocking(batchJedis.echo(bfoo)));
}

@Test
public void exists() {
assertEquals("OK", jedis.set("foo", "bar"));
assertEquals(true, blocking(batchJedis.exists("foo")));

assertEquals("OK", jedis.set("bar", "bar"));
assertEquals(new Long(2L), blocking(batchJedis.exists("foo", "bar")));

// Binary
assertEquals("OK", jedis.set(bfoo, bbar));
assertEquals(true, blocking(batchJedis.exists(bfoo)));

assertEquals("OK", jedis.set(bbar, bbar));
assertEquals(new Long(2L), blocking(batchJedis.exists(bfoo, bbar)));
}

private <T> T blocking(MessageFuture<?> future) {
reporter.flush();

try {
return (T) future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
19 changes: 18 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
<module>jdbc</module>
<module>benchmark</module>
<module>rx</module>
<module>jedis</module>
</modules>

<properties>
@@ -168,6 +169,12 @@
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.7.22</version>
</dependency>
</dependencies>
</dependencyManagement>

@@ -237,6 +244,16 @@
</pluginManagement>

<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<testSource>${maven.compiler.testSource}</testSource>
<testTarget>${maven.compiler.testTarget}</testTarget>
</configuration>
</plugin>

<plugin>
<artifactId>maven-release-plugin</artifactId>
<version>${maven-release-plugin.version}</version>
@@ -331,4 +348,4 @@
</properties>
</profile>
</profiles>
</project>
</project>

0 comments on commit f17853d

Please sign in to comment.