Skip to content

Commit

Permalink
Adding support for jedis
Browse files Browse the repository at this point in the history
  • Loading branch information
tramchamploo committed Jun 13, 2018
1 parent 5d8d00c commit 51fff53
Show file tree
Hide file tree
Showing 29 changed files with 5,215 additions and 17 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jobs:
working_directory: ~/buffer-slayer
docker:
- image: circleci/openjdk:8-jdk-browsers
- image: circleci/redis:latest

steps:
- checkout
Expand Down
5 changes: 5 additions & 0 deletions benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-jedis</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public abstract class AbstractBatchJdbcTemplateBenchmark {
private BatchJdbcTemplate batch;
private JdbcTemplate unbatch;
private Reporter<Sql, Integer> reporter;
private static SenderProxy proxy;
private static SenderProxy<Sql, Integer> proxy;
private static AtomicLong counter = new AtomicLong();

private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS test";
Expand All @@ -51,7 +51,7 @@ public void setup() {
JdbcTemplate delegate = new JdbcTemplate(dataSource);
delegate.setDataSource(dataSource);

proxy = new SenderProxy(new JdbcTemplateSender(delegate));
proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
proxy.onMessages(updated -> counter.addAndGet(updated.size()));

reporter = reporter(proxy);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.github.tramchamploo.bufferslayer;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
* Jedis benchmark that executing jedis commands.
*/
public abstract class AbstractBatchJedisBenchmark {

private JedisPool unbatch;
private BatchJedis batch;
private Reporter<RedisCommand, Object> reporter;
private static SenderProxy<RedisCommand, Object> proxy;
private static AtomicLong counter = new AtomicLong();

static String propertyOr(String key, String fallback) {
return System.getProperty(key, fallback);
}

protected abstract Reporter<RedisCommand, Object> reporter(Sender<RedisCommand, Object> sender);

@Setup
public void setup() {
unbatch = new JedisPool(propertyOr("redisHost", "127.0.0.1"),
Integer.parseInt(propertyOr("redisPort", "6379")));

proxy = new SenderProxy<>(new JedisSender(unbatch));
proxy.onMessages(updated -> counter.addAndGet(updated.size()));

reporter = reporter(proxy);
batch = new BatchJedis(unbatch, reporter);
}

@TearDown(Level.Iteration)
public void flushDB() {
try (Jedis jedis = unbatch.getResource()) {
jedis.flushDB();
}
}

@AuxCounters
@State(Scope.Thread)
public static class AtomicLongCounter {

public long updated() {
return counter.get();
}

@Setup(Level.Iteration)
public void clean() {
counter.set(0);
}
}

@State(Scope.Benchmark)
public static class Lagging {

@Setup(Level.Iteration)
public void lag() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
}
}

static String randomString() {
return String.valueOf(ThreadLocalRandom.current().nextLong());
}

@Benchmark @Group("no_contention_batched") @GroupThreads(1)
public void no_contention_batched_set(Lagging l, AtomicLongCounter counters) {
batch.set(randomString(), randomString());
}

@Benchmark @Group("no_contention_unbatched") @GroupThreads(1)
public void no_contention_unbatched_set(Lagging l) {
try (Jedis jedis = unbatch.getResource()) {
jedis.set(randomString(), randomString());
}
}

@Benchmark @Group("mild_contention_batched") @GroupThreads(2)
public void mild_contention_batched_set(Lagging l, AtomicLongCounter counters) {
batch.set(randomString(), randomString());
}

@Benchmark @Group("mild_contention_unbatched") @GroupThreads(2)
public void mild_contention_unbatched_set(Lagging l) {
try (Jedis jedis = unbatch.getResource()) {
jedis.set(randomString(), randomString());
}
}

@Benchmark @Group("high_contention_batched") @GroupThreads(8)
public void high_contention_batched_set(Lagging l, AtomicLongCounter counters) {
batch.set(randomString(), randomString());
}

@Benchmark @Group("high_contention_unbatched") @GroupThreads(8)
public void high_contention_unbatched_set(Lagging l) {
try (Jedis jedis = unbatch.getResource()) {
jedis.set(randomString(), randomString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static String propertyOr(String key, String fallback) {
protected void run() throws Exception {
BatchJdbcTemplate batch;
JdbcTemplate unbatch;
SenderProxy proxy;
SenderProxy<Sql, Integer> proxy;

DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
Expand All @@ -42,7 +42,7 @@ protected void run() throws Exception {

CountDownLatch countDown = new CountDownLatch(1);

proxy = new SenderProxy(new JdbcTemplateSender(delegate));
proxy = new SenderProxy<>(new JdbcTemplateSender(delegate));
proxy.onMessages(updated -> {
if (counter.addAndGet(updated.size()) == 5050) {
countDown.countDown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.tramchamploo.bufferslayer;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 3, time = 1)
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Group)
public class AsyncBatchJedisBenchmark extends AbstractBatchJedisBenchmark {

protected Reporter<RedisCommand, Object> reporter(Sender<RedisCommand, Object> sender) {
return AsyncReporter.builder(sender)
.pendingKeepalive(1, TimeUnit.SECONDS)
.build();
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + AsyncBatchJedisBenchmark.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void flushIfExceedMaxSize() throws InterruptedException {
assertTrue(countDown.await(200, TimeUnit.MILLISECONDS));
assertEquals(50, sender.sent.size());

// wait for the queue to be released
Thread.sleep(100);
// make sure the queue is released
assertEquals(0, reporter.synchronizer.queue.size());

Expand Down
19 changes: 19 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
machine:
java:
version: oraclejdk8
services:
- redis

dependencies:
pre:
- openssl aes-256-cbc -d -in .buildscript/secret-env-cipher -k $KEY >> ~/.circlerc
override:
- mvn --fail-never dependency:go-offline || true
cache_directories:
- "~/.m2"

test:
post:
- mkdir -p $CIRCLE_TEST_REPORTS/junit/
- find . -type f -regex ".*/target/surefire-reports/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
- .buildscript/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class CompositeFuture extends AbstractFuture<CompositeFuture> {
* When the list is empty, the returned future will be already completed.
*/
public static CompositeFuture all(List<? extends Future<?>> futures) {
return DefaultCompositeFuture.all(futures.toArray(new Future[futures.size()]));
return DefaultCompositeFuture.all(futures.toArray(new Future[0]));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.github.tramchamploo.bufferslayer.Message;
import io.github.tramchamploo.bufferslayer.MessageDroppedException;
import io.github.tramchamploo.bufferslayer.OverflowStrategy.Strategy;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

Expand All @@ -19,7 +20,12 @@ public static <R> void allSuccess(List<R> result, List<MessagePromise<R>> promis
for (int i = 0; i < result.size(); i++) {
MessagePromise<R> promise = promises.get(i);
R ret = result.get(i);
promise.setSuccess(ret);
if (ret instanceof Throwable) {
promise.setFailure(MessageDroppedException.dropped((Throwable) ret,
Collections.singletonList(promise.message())));
} else {
promise.setSuccess(ret);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
/**
* Delegate sending and trigger onMessages afterwards
*/
public class SenderProxy implements Sender<Sql, Integer> {
public class SenderProxy<M extends Message, R> implements Sender<M, R> {

private AtomicBoolean closed = new AtomicBoolean(false);
private Consumer<List<Integer>> onMessages = messages -> { };
final Sender<Sql, Integer> delegate;
private final AtomicBoolean closed = new AtomicBoolean(false);
private Consumer<List<M>> onMessages = messages -> { };
private final Sender<M, R> delegate;

public SenderProxy(Sender<Sql, Integer> delegate) {
SenderProxy(Sender<M, R> delegate) {
this.delegate = delegate;
}

Expand All @@ -28,16 +28,16 @@ public void close() {
}

@Override
public List<Integer> send(List<Sql> messages) {
public List<R> send(List<M> messages) {
if (closed.get()) {
throw new IllegalStateException("Closed!");
}
List<Integer> sent = delegate.send(messages);
onMessages.accept(sent);
List<R> sent = delegate.send(messages);
onMessages.accept(messages);
return sent;
}

public void onMessages(Consumer<List<Integer>> onMessages) {
public void onMessages(Consumer<List<M>> onMessages) {
this.onMessages = onMessages;
}
}
7 changes: 7 additions & 0 deletions jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
<artifactId>bufferslayer-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-boundedqueue</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void singleKey() {

@Test
public void updateFailed() throws InterruptedException {
SenderProxy sender = new SenderProxy(new JdbcTemplateSender(underlying));
SenderProxy<Sql, Integer> sender = new SenderProxy<>(new JdbcTemplateSender(underlying));
RuntimeException ex = new RuntimeException();
sender.onMessages(messages -> { throw ex; });
reporter = AsyncReporter.builder(sender).messageTimeout(10, TimeUnit.MILLISECONDS).build();
Expand Down
38 changes: 38 additions & 0 deletions jedis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bufferslayer-parent</artifactId>
<groupId>io.github.tramchamploo</groupId>
<version>2.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>bufferslayer-jedis</artifactId>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-core</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>bufferslayer-boundedqueue</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 51fff53

Please sign in to comment.