Skip to content

Commit

Permalink
feature: support pipeline protocol (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 11, 2024
1 parent 2ba0a01 commit 4b588f0
Show file tree
Hide file tree
Showing 39 changed files with 357 additions and 158 deletions.
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ Redis 3.x - latest

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | support | note |
|---------|--------------------------------------------------------------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | scard done<br/>srem done <br/>sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |
| feature | support | note |
|----------|--------------------------------------------------------------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | scard done<br/>srem done <br/>sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |
| pipeline | done | |

### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
*/
package icu.funkye.redispike.handler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.CommandHandler;
Expand Down Expand Up @@ -85,8 +89,19 @@ public RedisCommandHandler() {

@Override
public void handleCommand(RemotingContext ctx, Object msg) {
if (msg instanceof RedisRequest) {
RedisRequest request = (RedisRequest) msg;
if (msg instanceof List) {
List<Object> list = (List<Object>) msg;
for (Object object : list) {
processSingleRequest(ctx, object);
}
} else {
processSingleRequest(ctx, msg);
}
}

private void processSingleRequest(RemotingContext ctx, Object object) {
if (object instanceof RedisRequest) {
RedisRequest request = (RedisRequest) object;
try {
processorMap.get(request.getCmdCode().value()).process(ctx, request, getDefaultExecutor());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,30 @@
*/
package icu.funkye.redispike.handler.process;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.aerospike.client.IAerospikeClient;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.RemotingCommand;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.protocol.AbstractRedisRequest;
import icu.funkye.redispike.protocol.RedisRequest;
import icu.funkye.redispike.util.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRedisRequestProcessor<T extends RedisRequest<?>> implements RedisRequestProcessor<T> {

protected final IAerospikeClient client = AeroSpikeClientFactory.getClient();
protected final IAerospikeClient client = AeroSpikeClientFactory.getClient();

protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final Logger logger = LoggerFactory.getLogger(getClass());

protected CommandCode cmdCode;
protected CommandCode cmdCode;

protected static final ExecutorService EXECUTOR_SERVICE = ThreadPoolFactory.newVirtualThreadPerTaskExecutor();

@Override
public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService defaultExecutor) throws Exception {
Expand All @@ -53,4 +60,29 @@ public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService de
public CommandCode getCmdCode() {
return this.cmdCode;
}

public void write(RemotingContext ctx, AbstractRedisRequest request) {
CompletableFuture.runAsync(() -> {
CountDownLatch countDownLatch = request.getCountDownLatch();
if (request.isFlush()) {
if (countDownLatch != null) {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
if (logger.isDebugEnabled()) {
logger.debug("writeAndFlush response:{}", request.getResponse());
}
ctx.writeAndFlush(request.getResponse());
} else {
ctx.getChannelContext().write(request.getResponse());
countDownLatch.countDown();
if (logger.isDebugEnabled()) {
logger.debug("write response:{}", request.getResponse());
}
}
}, EXECUTOR_SERVICE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ public CommandRequestProcessor() {
@Override
public void handle(RemotingContext ctx, CommandRequest request) {
request.setResponse("OK");
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void onFailure(AerospikeException e) {
CompletableFuture.runAsync(() -> {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ public void handle(RemotingContext ctx, GetRequest request) {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
return;
}
String value = record.getString(" ");
if (StringUtil.isNotBlank(value)) {
request.setResponse(value);
}
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, client.getReadPolicyDefault(), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ public void onSuccess(Key key, Record record) {
public void onSuccess(Key key, boolean b) {
request.setResponse(
String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, client.getWritePolicyDefault(), key);
} else {
Expand All @@ -71,13 +71,13 @@ public void onFailure(AerospikeException exception) {
public void onSuccess(Key key) {
request.setResponse(
String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, client.getWritePolicyDefault(), key, newBins.toArray(new Bin[0]));
}
Expand All @@ -86,7 +86,7 @@ public void onFailure(AerospikeException exception) {
@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, client.getReadPolicyDefault(), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ public void handle(RemotingContext ctx, HGetAllRequest request) {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
return;
}
record.bins.forEach((k,v)-> {
request.setResponse(k);
request.setResponse(v.toString());
});
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, client.getReadPolicyDefault(), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,28 @@ public HGetRequestProcessor() {
@Override
public void handle(RemotingContext ctx, HGetRequest request) {
if (request.getField() == null) {
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
return;
}
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
return;
}
String value = record.getString(request.getField());
if (StringUtil.isNotBlank(value)) {
request.setResponse(value);
}
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, client.getReadPolicyDefault(), key, request.getField());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public void handle(RemotingContext ctx, HSetRequest request) {
@Override
public void onSuccess(Key key) {
request.setResponse(String.valueOf(request.getKv().size()));
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, writePolicy, key, list.toArray(new Bin[0]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public KeysRequestProcessor() {
@Override
public void handle(RemotingContext ctx, KeysRequest request) {
if (StringUtils.isBlank(request.getPattern())) {
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
return;
}
boolean all = StringUtils.equals(request.getPattern(), "*");
Expand Down Expand Up @@ -84,13 +84,13 @@ public void onRecord(Key key, Record record) throws AerospikeException {

@Override
public void onSuccess() {
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, scanPolicy, AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public void handle(RemotingContext ctx, SAddRequest request) {
@Override
public void onSuccess(Key key) {
request.setResponse(String.valueOf(request.getFields().size()));
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, defaultWritePolicy, key, list.toArray(new Bin[0]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ public void handle(RemotingContext ctx, SCardRequest request) {
@Override
public void onSuccess(Key key, Object obj) {
request.setResponse(obj.toString());
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, client.getWritePolicyDefault(), key, "scard", "count_bins");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ public void handle(RemotingContext ctx, SMembersRequest request) {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
return;
}
record.bins.keySet().forEach(request::setResponse);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
write(ctx,request);
}
}, client.getReadPolicyDefault(), key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public void onSuccess(Key key, Object obj) {
request.setResponse(s);
}
}
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
write(ctx, request);
}
}, client.getWritePolicyDefault(), key, "spop", "random_delete_bins", request.getCount() == null ? Value.get(1)
: Value.get(request.getCount()));
}, client.getWritePolicyDefault(), key, "spop", "random_delete_bins", request.getSum() == null ? Value.get(1)
: Value.get(request.getSum()));
}
}
Loading

0 comments on commit 4b588f0

Please sign in to comment.