Skip to content

Commit

Permalink
feature: support srem protocol (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Apr 10, 2024
1 parent 90e1e2f commit bcbc486
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 9 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ Redis 3.x - latest

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | support | note |
|---------|--------------------------------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |
| feature | support | note |
|---------|-----------------------------------------------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | srem done <br/>sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |

### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import icu.funkye.redispike.handler.process.impl.KeysRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SPopRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SRandmemberRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SRemRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.CommandRequestProcessor;
import icu.funkye.redispike.handler.process.impl.DelRequestProcessor;
Expand Down Expand Up @@ -75,6 +76,8 @@ public RedisCommandHandler() {
processorMap.put(sPopRequestProcessor.getCmdCode().value(), sPopRequestProcessor);
SRandmemberRequestProcessor sRandmemberRequestProcessor = new SRandmemberRequestProcessor();
processorMap.put(sRandmemberRequestProcessor.getCmdCode().value(), sRandmemberRequestProcessor);
SRemRequestProcessor sRemRequestProcessor = new SRemRequestProcessor();
processorMap.put(sRemRequestProcessor.getCmdCode().value(), sRemRequestProcessor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.Language;
import com.aerospike.client.Record;
import com.aerospike.client.Value;
import com.aerospike.client.listener.DeleteListener;
import com.aerospike.client.listener.ExecuteListener;
import com.aerospike.client.listener.RecordListener;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.task.RegisterTask;
import com.alipay.remoting.RemotingContext;

import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.HDelRequest;
import icu.funkye.redispike.protocol.request.SRemRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class SRemRequestProcessor extends AbstractRedisRequestProcessor<SRemRequest> {

public SRemRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(SRemRequest.class.hashCode()));
RegisterTask task = client.register(null, SPopRequestProcessor.class.getClassLoader(), "lua/srem.lua",
"srem.lua", Language.LUA);
task.waitTillComplete();
}

@Override
public void handle(RemotingContext ctx, SRemRequest request) {
if (request.getBins().isEmpty()) {
ctx.writeAndFlush(request.getResponse());
return;
}
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.execute(AeroSpikeClientFactory.eventLoops.next(), new ExecuteListener() {
@Override
public void onSuccess(Key key, Object obj) {
request.setResponse(obj.toString());
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
ctx.writeAndFlush(request.getResponse());
}
}, client.getWritePolicyDefault(), key, "srem", "delete_bins_return_count",
Value.get(String.join(",", request.getBins())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import icu.funkye.redispike.protocol.request.SMembersRequest;
import icu.funkye.redispike.protocol.request.SPopRequest;
import icu.funkye.redispike.protocol.request.SRandmemberRequest;
import icu.funkye.redispike.protocol.request.SRemRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -93,6 +94,9 @@ private RedisRequest<?> convert2RedisRequest(List<String> params) {
return new SAddRequest(params);
case "smembers":
return new SMembersRequest(params.get(1));
case "srem":
params.remove(0);
return new SRemRequest(params);
case "srandmember":
return new SRandmemberRequest(params.get(1), params.size() > 2 ? Integer.parseInt(params.get(2)) : 1);
case "spop":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.protocol.request;

import java.util.ArrayList;
import java.util.List;

import com.alipay.remoting.util.StringUtils;
import icu.funkye.redispike.protocol.RedisRequest;
import icu.funkye.redispike.protocol.RedisResponse;
import icu.funkye.redispike.protocol.response.BulkResponse;
import icu.funkye.redispike.protocol.response.IntegerResponse;

public class SRemRequest implements RedisRequest<String> {

String key;

List<String> bins;

RedisResponse<String> response;

public SRemRequest(List<String> params) {
this.key = params.remove(0);
if (params.isEmpty()) {
BulkResponse bulkResponse = new BulkResponse();
bulkResponse.setError("ERR wrong number of arguments for 'srem' command");
this.response = bulkResponse;
} else {
this.response = new IntegerResponse();
}
this.bins = params;
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public List<String> getBins() {
return bins;
}

@Override
public void setResponse(String data) {
this.response.setData(data);
}

@Override
public RedisResponse<String> getResponse() {
return response;
}

}
21 changes: 21 additions & 0 deletions src/main/resources/lua/srem.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
function delete_bins_return_count(rec, bins_to_delete_str)
if not aerospike:exists(rec) then
return 0
end
if type(bins_to_delete_str) ~= 'string' then
error("bins_to_delete must be a string")
end
local bins_to_delete = {}
for bin in string.gmatch(bins_to_delete_str, '([^,]+)') do
table.insert(bins_to_delete, bin)
end
local delete_count = 0
for i, bin in ipairs(bins_to_delete) do
if rec[bin] ~= nil then
rec[bin] = nil
delete_count = delete_count + 1
end
end
aerospike:update(rec)
return delete_count
end
7 changes: 7 additions & 0 deletions src/test/java/icu/funkye/redispike/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ public void TestSet() {
Assertions.assertNotEquals(list.size(), 2);
list = jedis.smembers(key);
Assertions.assertEquals(list.size(), 0);
for (String value2 : keys) {
jedis.sadd(key, value2);
}
long result = jedis.srem(key, keys.remove(0));
Assertions.assertEquals(result, 1);
result = jedis.srem(key, keys.toArray(new String[0]));
Assertions.assertEquals(result, 2);
}
}

Expand Down

0 comments on commit bcbc486

Please sign in to comment.