From 4b588f0d0bd485f5b7ae27d242e0d52f6e28149c Mon Sep 17 00:00:00 2001 From: funkye Date: Thu, 11 Apr 2024 17:47:16 +0800 Subject: [PATCH] feature: support pipeline protocol (#22) --- README.md | 19 ++--- .../handler/RedisCommandHandler.java | 19 ++++- .../AbstractRedisRequestProcessor.java | 38 +++++++++- .../process/impl/CommandRequestProcessor.java | 2 +- .../process/impl/DelRequestProcessor.java | 4 +- .../process/impl/GetRequestProcessor.java | 6 +- .../process/impl/HDelRequestProcessor.java | 10 +-- .../process/impl/HGetAllRequestProcessor.java | 6 +- .../process/impl/HGetRequestProcessor.java | 8 +-- .../process/impl/HSetRequestProcessor.java | 4 +- .../process/impl/KeysRequestProcessor.java | 6 +- .../process/impl/SAddRequestProcessor.java | 4 +- .../process/impl/SCardRequestProcessor.java | 4 +- .../impl/SMembersRequestProcessor.java | 6 +- .../process/impl/SPopRequestProcessor.java | 8 +-- .../impl/SRandmemberRequestProcessor.java | 10 +-- .../process/impl/SRemRequestProcessor.java | 6 +- .../process/impl/SetRequestProcessor.java | 12 ++-- .../protocol/AbstractRedisRequest.java | 46 ++++++++++++ .../protocol/RedisCommandDecoder.java | 72 ++++++++++++------- .../protocol/request/CommandRequest.java | 8 ++- .../protocol/request/DelRequest.java | 6 +- .../protocol/request/GetRequest.java | 6 +- .../protocol/request/HDelRequest.java | 8 +-- .../protocol/request/HGetAllRequest.java | 7 +- .../protocol/request/HGetRequest.java | 6 +- .../protocol/request/HSetRequest.java | 7 +- .../protocol/request/KeysRequest.java | 10 ++- .../protocol/request/SAddRequest.java | 7 +- .../protocol/request/SCardRequest.java | 10 ++- .../protocol/request/SMembersRequest.java | 7 +- .../protocol/request/SPopRequest.java | 18 ++--- .../protocol/request/SRandmemberRequest.java | 22 +++--- .../protocol/request/SRemRequest.java | 9 ++- .../protocol/request/SetRequest.java | 8 +-- .../protocol/response/BulkResponse.java | 2 +- .../redispike/util/ThreadPoolFactory.java | 63 ++++++++++++++++ .../funkye/redispike/JedisPooledFactory.java | 7 +- .../java/icu/funkye/redispike/ServerTest.java | 14 ++-- 39 files changed, 357 insertions(+), 158 deletions(-) create mode 100644 src/main/java/icu/funkye/redispike/protocol/AbstractRedisRequest.java create mode 100644 src/main/java/icu/funkye/redispike/util/ThreadPoolFactory.java diff --git a/README.md b/README.md index d278376..01ceb9d 100644 --- a/README.md +++ b/README.md @@ -11,15 +11,16 @@ Redis 3.x - latest Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main ยท funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java) -| feature | support | note | -|---------|--------------------------------------------------------------------------------------------------------------|----------------------------------------------------------| -| String | done | | -| Hash | done | hsetnx only supports the key level, not the column level | -| Scan | | | -| List | | | -| Set | scard done
srem done
sadd done
spop done
smembers done
srandmember done
other wait | | -| ZSet | wait | | -| keys | done | | +| feature | support | note | +|----------|--------------------------------------------------------------------------------------------------------------|----------------------------------------------------------| +| String | done | | +| Hash | done | hsetnx only supports the key level, not the column level | +| Scan | | | +| List | | | +| Set | scard done
srem done
sadd done
spop done
smembers done
srandmember done
other wait | | +| ZSet | wait | | +| keys | done | | +| pipeline | done | | ### Performance Test Report aerospike 3.x 2c4g redispike-proxy 2c4g: diff --git a/src/main/java/icu/funkye/redispike/handler/RedisCommandHandler.java b/src/main/java/icu/funkye/redispike/handler/RedisCommandHandler.java index dc51d3b..cd92335 100644 --- a/src/main/java/icu/funkye/redispike/handler/RedisCommandHandler.java +++ b/src/main/java/icu/funkye/redispike/handler/RedisCommandHandler.java @@ -16,9 +16,13 @@ */ package icu.funkye.redispike.handler; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import com.alipay.remoting.CommandCode; import com.alipay.remoting.CommandHandler; @@ -85,8 +89,19 @@ public RedisCommandHandler() { @Override public void handleCommand(RemotingContext ctx, Object msg) { - if (msg instanceof RedisRequest) { - RedisRequest request = (RedisRequest) msg; + if (msg instanceof List) { + List list = (List) msg; + for (Object object : list) { + processSingleRequest(ctx, object); + } + } else { + processSingleRequest(ctx, msg); + } + } + + private void processSingleRequest(RemotingContext ctx, Object object) { + if (object instanceof RedisRequest) { + RedisRequest request = (RedisRequest) object; try { processorMap.get(request.getCmdCode().value()).process(ctx, request, getDefaultExecutor()); } catch (Exception e) { diff --git a/src/main/java/icu/funkye/redispike/handler/process/AbstractRedisRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/AbstractRedisRequestProcessor.java index cbdd615..67b47a5 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/AbstractRedisRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/AbstractRedisRequestProcessor.java @@ -16,23 +16,30 @@ */ package icu.funkye.redispike.handler.process; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import com.aerospike.client.IAerospikeClient; import com.alipay.remoting.CommandCode; import com.alipay.remoting.RemotingCommand; import com.alipay.remoting.RemotingContext; import icu.funkye.redispike.factory.AeroSpikeClientFactory; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.util.ThreadPoolFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractRedisRequestProcessor> implements RedisRequestProcessor { - protected final IAerospikeClient client = AeroSpikeClientFactory.getClient(); + protected final IAerospikeClient client = AeroSpikeClientFactory.getClient(); - protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Logger logger = LoggerFactory.getLogger(getClass()); - protected CommandCode cmdCode; + protected CommandCode cmdCode; + + protected static final ExecutorService EXECUTOR_SERVICE = ThreadPoolFactory.newVirtualThreadPerTaskExecutor(); @Override public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService defaultExecutor) throws Exception { @@ -53,4 +60,29 @@ public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService de public CommandCode getCmdCode() { return this.cmdCode; } + + public void write(RemotingContext ctx, AbstractRedisRequest request) { + CompletableFuture.runAsync(() -> { + CountDownLatch countDownLatch = request.getCountDownLatch(); + if (request.isFlush()) { + if (countDownLatch != null) { + try { + countDownLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + } + if (logger.isDebugEnabled()) { + logger.debug("writeAndFlush response:{}", request.getResponse()); + } + ctx.writeAndFlush(request.getResponse()); + } else { + ctx.getChannelContext().write(request.getResponse()); + countDownLatch.countDown(); + if (logger.isDebugEnabled()) { + logger.debug("write response:{}", request.getResponse()); + } + } + }, EXECUTOR_SERVICE); + } } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/CommandRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/CommandRequestProcessor.java index 1b3b811..f574ccb 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/CommandRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/CommandRequestProcessor.java @@ -31,6 +31,6 @@ public CommandRequestProcessor() { @Override public void handle(RemotingContext ctx, CommandRequest request) { request.setResponse("OK"); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/DelRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/DelRequestProcessor.java index ee3078d..38a3b55 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/DelRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/DelRequestProcessor.java @@ -61,10 +61,10 @@ public void onFailure(AerospikeException e) { CompletableFuture.runAsync(() -> { try { countDownLatch.await(10, TimeUnit.SECONDS); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } catch (InterruptedException e) { logger.error(e.getMessage(), e); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/GetRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/GetRequestProcessor.java index 386b168..e156e28 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/GetRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/GetRequestProcessor.java @@ -41,20 +41,20 @@ public void handle(RemotingContext ctx, GetRequest request) { @Override public void onSuccess(Key key, Record record) { if (record == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); return; } String value = record.getString(" "); if (StringUtil.isNotBlank(value)) { request.setResponse(value); } - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getReadPolicyDefault(), key); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/HDelRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/HDelRequestProcessor.java index 6b0ced4..62dd2d0 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/HDelRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/HDelRequestProcessor.java @@ -54,13 +54,13 @@ public void onSuccess(Key key, Record record) { public void onSuccess(Key key, boolean b) { request.setResponse( String.valueOf(request.getFields().size())); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, client.getWritePolicyDefault(), key); } else { @@ -71,13 +71,13 @@ public void onFailure(AerospikeException exception) { public void onSuccess(Key key) { request.setResponse( String.valueOf(request.getFields().size())); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, client.getWritePolicyDefault(), key, newBins.toArray(new Bin[0])); } @@ -86,7 +86,7 @@ public void onFailure(AerospikeException exception) { @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, client.getReadPolicyDefault(), key); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/HGetAllRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/HGetAllRequestProcessor.java index 809046b..a7e8300 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/HGetAllRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/HGetAllRequestProcessor.java @@ -43,20 +43,20 @@ public void handle(RemotingContext ctx, HGetAllRequest request) { @Override public void onSuccess(Key key, Record record) { if (record == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); return; } record.bins.forEach((k,v)-> { request.setResponse(k); request.setResponse(v.toString()); }); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, client.getReadPolicyDefault(), key); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/HGetRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/HGetRequestProcessor.java index 782aea1..3419dc7 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/HGetRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/HGetRequestProcessor.java @@ -38,7 +38,7 @@ public HGetRequestProcessor() { @Override public void handle(RemotingContext ctx, HGetRequest request) { if (request.getField() == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); return; } Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey()); @@ -46,20 +46,20 @@ public void handle(RemotingContext ctx, HGetRequest request) { @Override public void onSuccess(Key key, Record record) { if (record == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); return; } String value = record.getString(request.getField()); if (StringUtil.isNotBlank(value)) { request.setResponse(value); } - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getReadPolicyDefault(), key, request.getField()); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/HSetRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/HSetRequestProcessor.java index a2c3b7e..c049b96 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/HSetRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/HSetRequestProcessor.java @@ -57,13 +57,13 @@ public void handle(RemotingContext ctx, HSetRequest request) { @Override public void onSuccess(Key key) { request.setResponse(String.valueOf(request.getKv().size())); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, writePolicy, key, list.toArray(new Bin[0])); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/KeysRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/KeysRequestProcessor.java index e55d637..511cb7d 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/KeysRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/KeysRequestProcessor.java @@ -41,7 +41,7 @@ public KeysRequestProcessor() { @Override public void handle(RemotingContext ctx, KeysRequest request) { if (StringUtils.isBlank(request.getPattern())) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); return; } boolean all = StringUtils.equals(request.getPattern(), "*"); @@ -84,13 +84,13 @@ public void onRecord(Key key, Record record) throws AerospikeException { @Override public void onSuccess() { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, scanPolicy, AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SAddRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SAddRequestProcessor.java index 5dd1f4f..dc86731 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SAddRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SAddRequestProcessor.java @@ -52,13 +52,13 @@ public void handle(RemotingContext ctx, SAddRequest request) { @Override public void onSuccess(Key key) { request.setResponse(String.valueOf(request.getFields().size())); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, defaultWritePolicy, key, list.toArray(new Bin[0])); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SCardRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SCardRequestProcessor.java index 5bc755c..4cb7595 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SCardRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SCardRequestProcessor.java @@ -47,13 +47,13 @@ public void handle(RemotingContext ctx, SCardRequest request) { @Override public void onSuccess(Key key, Object obj) { request.setResponse(obj.toString()); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getWritePolicyDefault(), key, "scard", "count_bins"); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SMembersRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SMembersRequestProcessor.java index ea9a250..3983d7b 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SMembersRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SMembersRequestProcessor.java @@ -41,17 +41,17 @@ public void handle(RemotingContext ctx, SMembersRequest request) { @Override public void onSuccess(Key key, Record record) { if (record == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); return; } record.bins.keySet().forEach(request::setResponse); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx,request); } }, client.getReadPolicyDefault(), key); } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SPopRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SPopRequestProcessor.java index 68956eb..993edd7 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SPopRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SPopRequestProcessor.java @@ -51,15 +51,15 @@ public void onSuccess(Key key, Object obj) { request.setResponse(s); } } - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } - }, client.getWritePolicyDefault(), key, "spop", "random_delete_bins", request.getCount() == null ? Value.get(1) - : Value.get(request.getCount())); + }, client.getWritePolicyDefault(), key, "spop", "random_delete_bins", request.getSum() == null ? Value.get(1) + : Value.get(request.getSum())); } } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SRandmemberRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SRandmemberRequestProcessor.java index 1b35d55..bb67e71 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SRandmemberRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SRandmemberRequestProcessor.java @@ -19,18 +19,14 @@ import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.Language; -import com.aerospike.client.Record; import com.aerospike.client.Value; import com.aerospike.client.listener.ExecuteListener; -import com.aerospike.client.listener.RecordListener; -import com.aerospike.client.policy.QueryPolicy; import com.aerospike.client.task.RegisterTask; import com.alipay.remoting.RemotingContext; import icu.funkye.redispike.factory.AeroSpikeClientFactory; import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor; import icu.funkye.redispike.protocol.RedisRequestCommandCode; -import icu.funkye.redispike.protocol.request.SMembersRequest; import icu.funkye.redispike.protocol.request.SRandmemberRequest; import icu.funkye.redispike.util.IntegerUtils; @@ -55,14 +51,14 @@ public void onSuccess(Key key, Object obj) { request.setResponse(s); } } - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } - }, client.getWritePolicyDefault(), key, "srandmember", "getBinNames", Value.get(request.getCount())); + }, client.getWritePolicyDefault(), key, "srandmember", "getBinNames", Value.get(request.getSum())); } } diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SRemRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SRemRequestProcessor.java index f4764a1..ea735cc 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SRemRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SRemRequestProcessor.java @@ -52,7 +52,7 @@ public SRemRequestProcessor() { @Override public void handle(RemotingContext ctx, SRemRequest request) { if (request.getBins().isEmpty()) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); return; } Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey()); @@ -60,13 +60,13 @@ public void handle(RemotingContext ctx, SRemRequest request) { @Override public void onSuccess(Key key, Object obj) { request.setResponse(obj.toString()); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException exception) { logger.error(exception.getMessage(), exception); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getWritePolicyDefault(), key, "srem", "delete_bins_return_count", Value.get(String.join(",", request.getBins()))); diff --git a/src/main/java/icu/funkye/redispike/handler/process/impl/SetRequestProcessor.java b/src/main/java/icu/funkye/redispike/handler/process/impl/SetRequestProcessor.java index c6abff1..d41d64b 100644 --- a/src/main/java/icu/funkye/redispike/handler/process/impl/SetRequestProcessor.java +++ b/src/main/java/icu/funkye/redispike/handler/process/impl/SetRequestProcessor.java @@ -64,19 +64,19 @@ public void handle(RemotingContext ctx, SetRequest request) { @Override public void onSuccess(Key key, Record record) { if (record == null) { - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } else { client.put(AeroSpikeClientFactory.eventLoops.next(), new WriteListener() { @Override public void onSuccess(Key key) { request.setResponse("OK"); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getWritePolicyDefault(), key, bin); } @@ -85,7 +85,7 @@ public void onFailure(AerospikeException ae) { @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, client.getReadPolicyDefault(), key); return; @@ -99,13 +99,13 @@ public void onSuccess(Key key) { } else { request.setResponse("OK"); } - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } @Override public void onFailure(AerospikeException ae) { logger.error(ae.getMessage(), ae); - ctx.writeAndFlush(request.getResponse()); + write(ctx, request); } }, writePolicy, key, bin); } diff --git a/src/main/java/icu/funkye/redispike/protocol/AbstractRedisRequest.java b/src/main/java/icu/funkye/redispike/protocol/AbstractRedisRequest.java new file mode 100644 index 0000000..af4f17f --- /dev/null +++ b/src/main/java/icu/funkye/redispike/protocol/AbstractRedisRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package icu.funkye.redispike.protocol; + +import java.util.concurrent.CountDownLatch; + +/** + * @author jianbin@apache.org + */ +public abstract class AbstractRedisRequest implements RedisRequest { + + protected boolean flush; + + protected CountDownLatch countDownLatch; + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public boolean isFlush() { + return flush; + } + + public void setFlush(boolean flush) { + this.flush = flush; + } + +} diff --git a/src/main/java/icu/funkye/redispike/protocol/RedisCommandDecoder.java b/src/main/java/icu/funkye/redispike/protocol/RedisCommandDecoder.java index ce91167..82c3086 100644 --- a/src/main/java/icu/funkye/redispike/protocol/RedisCommandDecoder.java +++ b/src/main/java/icu/funkye/redispike/protocol/RedisCommandDecoder.java @@ -19,6 +19,8 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import com.alipay.remoting.CommandDecoder; import icu.funkye.redispike.protocol.request.HDelRequest; import icu.funkye.redispike.protocol.request.HGetAllRequest; @@ -52,59 +54,79 @@ public class RedisCommandDecoder implements CommandDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { - int length = readParamsLen(in); - List params = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - String param = readParam(in); - params.add(param.toLowerCase()); + List> paramsList = new ArrayList<>(); + while (in.isReadable()) { + int length = readParamsLen(in); + List params = new ArrayList<>(); + for (int i = 0; i < length; i++) { + String param = readParam(in); + params.add(param.toLowerCase()); + } + paramsList.add(params); + } + int size = paramsList.size() - 1; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("cmds: {}", paramsList); + } + CountDownLatch countDownLatch = null; + if(size>0){ + countDownLatch = new CountDownLatch(size); } // convert to RedisRequest - out.add(convert2RedisRequest(params)); + for (int i = 0; i < paramsList.size(); i++) { + AbstractRedisRequest request = convert2RedisRequest(paramsList.get(i), size == i); + if (request != null) { + Optional.ofNullable(countDownLatch).ifPresent(request::setCountDownLatch); + } + out.add(request); + } } - private RedisRequest convert2RedisRequest(List params) { + private AbstractRedisRequest convert2RedisRequest(List params, boolean flush) { String cmd = params.get(0); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("cmd: {}", params); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("cmd: {}", params); } switch (cmd) { case "hdel": - return new HDelRequest(params); + return new HDelRequest(params, flush); case "get": - return new GetRequest(params.get(1)); + return new GetRequest(params.get(1), flush); case "command": - return new CommandRequest(); + return new CommandRequest(flush); case "hset": case "hsetnx": - return new HSetRequest(params); + return new HSetRequest(params, flush); case "setnx": params.add("nx"); - return new SetRequest(params); + return new SetRequest(params, flush); case "set": - return new SetRequest(params); + return new SetRequest(params, flush); case "keys": - return new KeysRequest(params); + return new KeysRequest(params, flush); case "del": params.remove(0); - return new DelRequest(params); + return new DelRequest(params, flush); case "hget": - return new HGetRequest(params.get(1), params.size() > 2 ? params.get(2) : null); + return new HGetRequest(params.get(1), params.size() > 2 ? params.get(2) : null, flush); case "hgetall": - return new HGetAllRequest(params.get(1)); + return new HGetAllRequest(params.get(1), flush); case "scard": - return new SCardRequest(params.get(1)); + return new SCardRequest(params.get(1), flush); case "sadd": - return new SAddRequest(params); + return new SAddRequest(params, flush); case "smembers": - return new SMembersRequest(params.get(1)); + return new SMembersRequest(params.get(1), flush); case "srem": params.remove(0); - return new SRemRequest(params); + return new SRemRequest(params, flush); case "srandmember": - return new SRandmemberRequest(params.get(1), params.size() > 2 ? Integer.parseInt(params.get(2)) : 1); + return new SRandmemberRequest(params.get(1), params.size() > 2 ? Integer.parseInt(params.get(2)) : 1, + flush); case "spop": params.remove(0); - return new SPopRequest(params.remove(0), params.size() > 0 ? Integer.parseInt(params.get(0)) : null); + return new SPopRequest(params.remove(0), params.size() > 0 ? Integer.parseInt(params.get(0)) : null, + flush); default: return null; } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/CommandRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/CommandRequest.java index 397f493..fd0902f 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/CommandRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/CommandRequest.java @@ -16,14 +16,18 @@ */ package icu.funkye.redispike.protocol.request; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class CommandRequest implements RedisRequest { +public class CommandRequest extends AbstractRedisRequest { private final BulkResponse response = new BulkResponse(); + public CommandRequest(boolean flush) { + this.flush = flush; + } + @Override public RedisResponse getResponse() { return response; diff --git a/src/main/java/icu/funkye/redispike/protocol/request/DelRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/DelRequest.java index 1545123..838b1af 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/DelRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/DelRequest.java @@ -18,11 +18,12 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; -public class DelRequest implements RedisRequest { +public class DelRequest extends AbstractRedisRequest { List key; @@ -30,7 +31,8 @@ public class DelRequest implements RedisRequest { IntegerResponse response = new IntegerResponse(); - public DelRequest(List key) { + public DelRequest(List key, boolean flush) { + this.flush = flush; this.key = key; } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/GetRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/GetRequest.java index 387c5a9..4280678 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/GetRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/GetRequest.java @@ -16,17 +16,19 @@ */ package icu.funkye.redispike.protocol.request; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class GetRequest implements RedisRequest { +public class GetRequest extends AbstractRedisRequest { String key; BulkResponse response = new BulkResponse(); - public GetRequest(String key) { + public GetRequest(String key, boolean flush) { + this.flush = flush; this.key = key; } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/HDelRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/HDelRequest.java index 53d186b..d72dd47 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/HDelRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/HDelRequest.java @@ -19,13 +19,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; -public class HDelRequest implements RedisRequest { +public class HDelRequest extends AbstractRedisRequest { String key; @@ -33,7 +32,8 @@ public class HDelRequest implements RedisRequest { IntegerResponse response = new IntegerResponse(); - public HDelRequest(List params) { + public HDelRequest(List params, boolean flush) { + this.flush = flush; params.remove(0); this.key = params.remove(0); this.fields = new HashSet<>(params); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/HGetAllRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/HGetAllRequest.java index 6d049dd..d276944 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/HGetAllRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/HGetAllRequest.java @@ -17,17 +17,18 @@ package icu.funkye.redispike.protocol.request; import java.util.ArrayList; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class HGetAllRequest implements RedisRequest { +public class HGetAllRequest extends AbstractRedisRequest { String key; BulkResponse response = new BulkResponse(new ArrayList<>()); - public HGetAllRequest(String key) { + public HGetAllRequest(String key, boolean flush) { + this.flush = flush; this.key = key; } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/HGetRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/HGetRequest.java index fa5f47f..0f0deb9 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/HGetRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/HGetRequest.java @@ -17,11 +17,12 @@ package icu.funkye.redispike.protocol.request; import com.alipay.remoting.util.StringUtils; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class HGetRequest implements RedisRequest { +public class HGetRequest extends AbstractRedisRequest { final String key; @@ -29,7 +30,8 @@ public class HGetRequest implements RedisRequest { BulkResponse response = new BulkResponse(); - public HGetRequest(String key, String field) { + public HGetRequest(String key, String field, boolean flush) { + this.flush = flush; this.key = key; if (StringUtils.isBlank(field)) { response.setError("ERR wrong number of arguments for 'hget' command"); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/HSetRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/HSetRequest.java index 265d1a2..d9d0802 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/HSetRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/HSetRequest.java @@ -19,13 +19,13 @@ import java.util.List; import java.util.Map; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.request.conts.Operate; import icu.funkye.redispike.protocol.response.IntegerResponse; import icu.funkye.redispike.util.CollectionUtils; -public class HSetRequest implements RedisRequest { +public class HSetRequest extends AbstractRedisRequest { final String originalCommand; @@ -37,7 +37,8 @@ public class HSetRequest implements RedisRequest { RedisResponse response; - public HSetRequest(List params) { + public HSetRequest(List params, boolean flush) { + this.flush = flush; this.originalCommand = params.remove(0); this.key = params.remove(0); this.kv = CollectionUtils.arrayToMap(params); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/KeysRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/KeysRequest.java index 39240c0..51bca6a 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/KeysRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/KeysRequest.java @@ -19,14 +19,11 @@ import java.util.ArrayList; import java.util.List; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; -import icu.funkye.redispike.protocol.request.conts.Operate; -import icu.funkye.redispike.protocol.request.conts.TtlType; import icu.funkye.redispike.protocol.response.BulkResponse; -import icu.funkye.redispike.protocol.response.IntegerResponse; -public class KeysRequest implements RedisRequest { +public class KeysRequest extends AbstractRedisRequest { final String originalCommand; @@ -34,7 +31,8 @@ public class KeysRequest implements RedisRequest { BulkResponse response; - public KeysRequest(List params) { + public KeysRequest(List params, boolean flush) { + this.flush = flush; this.originalCommand = params.get(0); if (params.size() != 2) { this.response = new BulkResponse(); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SAddRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SAddRequest.java index 4596dac..2ef1bdc 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SAddRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SAddRequest.java @@ -20,11 +20,11 @@ import java.util.List; import java.util.Set; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; -public class SAddRequest implements RedisRequest { +public class SAddRequest extends AbstractRedisRequest { String key; @@ -32,7 +32,8 @@ public class SAddRequest implements RedisRequest { IntegerResponse response = new IntegerResponse(); - public SAddRequest(List params) { + public SAddRequest(List params, boolean flush) { + this.flush = flush; params.remove(0); this.key = params.remove(0); this.fields = new HashSet<>(params); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SCardRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SCardRequest.java index 8f941ac..fdc6d06 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SCardRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SCardRequest.java @@ -16,20 +16,18 @@ */ package icu.funkye.redispike.protocol.request; -import java.util.List; - -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; -import icu.funkye.redispike.protocol.response.BulkResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; -public class SCardRequest implements RedisRequest { +public class SCardRequest extends AbstractRedisRequest { final String key; RedisResponse response; - public SCardRequest(String key) { + public SCardRequest(String key, boolean flush) { + this.flush = flush; this.key = key; this.response = new IntegerResponse(); } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SMembersRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SMembersRequest.java index d421426..73a7be3 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SMembersRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SMembersRequest.java @@ -17,17 +17,18 @@ package icu.funkye.redispike.protocol.request; import java.util.ArrayList; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class SMembersRequest implements RedisRequest { +public class SMembersRequest extends AbstractRedisRequest { String key; BulkResponse response = new BulkResponse(new ArrayList<>()); - public SMembersRequest(String key) { + public SMembersRequest(String key, boolean flush) { + this.flush = flush; this.key = key; } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SPopRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SPopRequest.java index ed3921b..e1372e5 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SPopRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SPopRequest.java @@ -17,23 +17,25 @@ package icu.funkye.redispike.protocol.request; import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; -public class SPopRequest implements RedisRequest { +public class SPopRequest extends AbstractRedisRequest { String key; - Integer count; + Integer sum; BulkResponse response; - public SPopRequest(String key, Integer count) { + public SPopRequest(String key, Integer sum, boolean flush) { + this.flush = flush; this.key = key; - this.count = count; - if (count != null) { + this.sum = sum; + if (sum != null) { this.response = new BulkResponse(new ArrayList<>()); } else { this.response = new BulkResponse(); @@ -48,8 +50,8 @@ public void setKey(String key) { this.key = key; } - public Integer getCount() { - return count; + public Integer getSum() { + return sum; } @Override diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SRandmemberRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SRandmemberRequest.java index 6985d2d..38d403b 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SRandmemberRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SRandmemberRequest.java @@ -17,23 +17,25 @@ package icu.funkye.redispike.protocol.request; import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; -import icu.funkye.redispike.protocol.RedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; +import icu.funkye.redispike.protocol.AbstractRedisRequest; -public class SRandmemberRequest implements RedisRequest { +public class SRandmemberRequest extends AbstractRedisRequest { String key; - int count; + int sum; - BulkResponse response = new BulkResponse(new ArrayList<>()); + BulkResponse response; - public SRandmemberRequest(String key, int count) { + public SRandmemberRequest(String key, int sum, boolean flush) { + this.flush = flush; this.key = key; - this.count = count; - if (count > 1) { + this.sum = sum; + if (sum > 1) { this.response = new BulkResponse(new ArrayList<>()); } else { this.response = new BulkResponse(); @@ -54,13 +56,13 @@ public RedisResponse getResponse() { return response; } - public int getCount() { - return count; + public int getSum() { + return sum; } @Override public String toString() { - return "SRandmemberRequest{" + "key='" + key + '\'' + ", count=" + count + ", response=" + response + '}'; + return "SRandmemberRequest{" + "key='" + key + '\'' + ", count=" + sum + ", response=" + response + '}'; } } diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SRemRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SRemRequest.java index a36e977..ed4da13 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SRemRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SRemRequest.java @@ -16,16 +16,14 @@ */ package icu.funkye.redispike.protocol.request; -import java.util.ArrayList; import java.util.List; -import com.alipay.remoting.util.StringUtils; -import icu.funkye.redispike.protocol.RedisRequest; +import icu.funkye.redispike.protocol.AbstractRedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.response.BulkResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; -public class SRemRequest implements RedisRequest { +public class SRemRequest extends AbstractRedisRequest { String key; @@ -33,7 +31,8 @@ public class SRemRequest implements RedisRequest { RedisResponse response; - public SRemRequest(List params) { + public SRemRequest(List params, boolean flush) { + this.flush = flush; this.key = params.remove(0); if (params.isEmpty()) { BulkResponse bulkResponse = new BulkResponse(); diff --git a/src/main/java/icu/funkye/redispike/protocol/request/SetRequest.java b/src/main/java/icu/funkye/redispike/protocol/request/SetRequest.java index 1f9cd8d..4e4c27a 100644 --- a/src/main/java/icu/funkye/redispike/protocol/request/SetRequest.java +++ b/src/main/java/icu/funkye/redispike/protocol/request/SetRequest.java @@ -16,16 +16,15 @@ */ package icu.funkye.redispike.protocol.request; -import java.nio.charset.StandardCharsets; import java.util.List; -import icu.funkye.redispike.protocol.RedisRequest; import icu.funkye.redispike.protocol.RedisResponse; import icu.funkye.redispike.protocol.request.conts.Operate; import icu.funkye.redispike.protocol.request.conts.TtlType; import icu.funkye.redispike.protocol.response.BulkResponse; import icu.funkye.redispike.protocol.response.IntegerResponse; +import icu.funkye.redispike.protocol.AbstractRedisRequest; -public class SetRequest implements RedisRequest { +public class SetRequest extends AbstractRedisRequest { final String originalCommand; @@ -41,7 +40,8 @@ public class SetRequest implements RedisRequest { RedisResponse response; - public SetRequest(List params) { + public SetRequest(List params, boolean flush) { + this.flush = flush; this.originalCommand = params.get(0); this.key = params.get(1); this.value = params.get(2); diff --git a/src/main/java/icu/funkye/redispike/protocol/response/BulkResponse.java b/src/main/java/icu/funkye/redispike/protocol/response/BulkResponse.java index 6fe4b38..12ed695 100644 --- a/src/main/java/icu/funkye/redispike/protocol/response/BulkResponse.java +++ b/src/main/java/icu/funkye/redispike/protocol/response/BulkResponse.java @@ -100,6 +100,6 @@ public void setError(String error) { @Override public String toString() { - return "BulkResponse{" + "list=" + list + '}'; + return "BulkResponse{" + "list=" + list + ", data='" + data + '\'' + ", error='" + error + '\'' + '}'; } } diff --git a/src/main/java/icu/funkye/redispike/util/ThreadPoolFactory.java b/src/main/java/icu/funkye/redispike/util/ThreadPoolFactory.java new file mode 100644 index 0000000..7034282 --- /dev/null +++ b/src/main/java/icu/funkye/redispike/util/ThreadPoolFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package icu.funkye.redispike.util; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.aerospike.client.cluster.ThreadDaemonFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThreadPoolFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolFactory.class); + + public static boolean isJDK19OrAbove() { + String version = System.getProperty("java.version"); + if (version.startsWith("1.")) { + version = version.substring(2, 3); + } else { + int dot = version.indexOf("."); + if (dot != -1) { + version = version.substring(0, dot); + } + } + int v = Integer.parseInt(version); + return v >= 19; + } + + public static ExecutorService newVirtualThreadPerTaskExecutor() { + if (isJDK19OrAbove()) { + try { + Class clz = (Class) Class.forName("java.util.concurrent.Executors"); + Method method = clz.getMethod("newVirtualThreadPerTaskExecutor"); + LOGGER.info("use jdk 19+ newVirtualThreadPerTaskExecutor"); + return (ExecutorService) method.invoke(null); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + return new ThreadPoolExecutor(0, 200, 120L, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadDaemonFactory()); + } + +} diff --git a/src/test/java/icu/funkye/redispike/JedisPooledFactory.java b/src/test/java/icu/funkye/redispike/JedisPooledFactory.java index 5fb051a..07796c0 100644 --- a/src/test/java/icu/funkye/redispike/JedisPooledFactory.java +++ b/src/test/java/icu/funkye/redispike/JedisPooledFactory.java @@ -16,6 +16,7 @@ */ package icu.funkye.redispike; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,11 @@ public static JedisPoolAbstract getJedisPoolInstance(String ip, int port) { if (jedisPool == null) { synchronized (JedisPooledFactory.class) { if (jedisPool == null) { - jedisPool = new JedisPool(ip, port); + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(100); + config.setMinIdle(2); + config.setMaxIdle(10); + jedisPool = new JedisPool(config, ip, port, 10000); } } } diff --git a/src/test/java/icu/funkye/redispike/ServerTest.java b/src/test/java/icu/funkye/redispike/ServerTest.java index a62fcc4..e25c61d 100644 --- a/src/test/java/icu/funkye/redispike/ServerTest.java +++ b/src/test/java/icu/funkye/redispike/ServerTest.java @@ -67,16 +67,22 @@ public void TestPippline() { try (Pipeline pipeline = jedis.pipelined()) { for (String value : keys) { pipeline.hset(key, value, "b"); - pipeline.syncAndReturnAll(); } + pipeline.syncAndReturnAll(); } jedis.del(key); try (Pipeline pipeline = jedis.pipelined()) { for (String value : keys) { - pipeline.set(key, value); - pipeline.sync(); + pipeline.set(value, value); + } + pipeline.sync(); + for (String value : keys) { + pipeline.get(value); + } + List list = pipeline.syncAndReturnAll(); + for (Object object : list) { + Assertions.assertTrue(keys.contains(object.toString())); } - } jedis.del(key); }