Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor code #12

Merged
merged 5 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# redis2asp
High-performance Aerospike proxy for the Redis protocol
High-performance Aerospike proxy for the Redis protocold

### support mode

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | note |
| ------- | -------------------------------------------------------- |
| String | Perfect support |
| Hash | HSETNX only supports the key level, not the column level |
| List | Not support |
| pub/sub | Ready for support |
| Set | Ready for support |
| ZSet | |



### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.CommandHandler;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.RemotingProcessor;
import icu.funkye.redispike.handler.process.impl.GetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HDelRequestProcessor;
import icu.funkye.redispike.handler.process.impl.HSetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.SetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.CommandRequestProcessor;
import icu.funkye.redispike.handler.process.impl.DelRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequest;
import icu.funkye.redispike.protocol.response.BulkResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisCommandHandler implements CommandHandler {

private final Logger logger = LoggerFactory.getLogger(getClass());

Map<Short, RemotingProcessor<RedisRequest>> processorMap = new HashMap<>();

public RedisCommandHandler() {
CommandRequestProcessor commandRequestProcessor = new CommandRequestProcessor();
processorMap.put(commandRequestProcessor.getCmdCode().value(), commandRequestProcessor);
DelRequestProcessor delRequestProcessor = new DelRequestProcessor();
processorMap.put(delRequestProcessor.getCmdCode().value(), delRequestProcessor);
GetRequestProcessor getRequestProcessor = new GetRequestProcessor();
processorMap.put(getRequestProcessor.getCmdCode().value(), getRequestProcessor);
HSetRequestProcessor hSetRequestProcessor = new HSetRequestProcessor();
processorMap.put(hSetRequestProcessor.getCmdCode().value(), hSetRequestProcessor);
HDelRequestProcessor hDelRequestProcessor = new HDelRequestProcessor();
processorMap.put(hDelRequestProcessor.getCmdCode().value(), hDelRequestProcessor);
SetRequestProcessor setRequestProcessor = new SetRequestProcessor();
processorMap.put(setRequestProcessor.getCmdCode().value(), setRequestProcessor);
}

@Override
public void handleCommand(RemotingContext ctx, Object msg) {
if (msg instanceof RedisRequest) {
RedisRequest request = (RedisRequest) msg;
try {
processorMap.get(request.getCmdCode().value()).process(ctx, request, getDefaultExecutor());
} catch (Exception e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(new BulkResponse());
}
}
}

@Override
public void registerProcessor(CommandCode cmd, RemotingProcessor processor) {
processorMap.put(cmd.value(), processor);
}

@Override
public void registerDefaultExecutor(ExecutorService executor) {
}

@Override
public ExecutorService getDefaultExecutor() {
return ForkJoinPool.commonPool();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process;

import java.util.concurrent.ExecutorService;
import com.aerospike.client.IAerospikeClient;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.RemotingCommand;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.protocol.RedisRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRedisRequestProcessor<T extends RedisRequest<?>> implements RedisRequestProcessor<T> {

protected final IAerospikeClient client = AeroSpikeClientFactory.getClient();

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected CommandCode cmdCode;

@Override
public void process(RemotingContext ctx, RemotingCommand msg, ExecutorService defaultExecutor) throws Exception {
if (defaultExecutor != null) {
defaultExecutor.submit(() -> this.handle(ctx, (T)msg));
} else {
this.handle(ctx, (T)msg);
}
}

@Override
public CommandCode getCmdCode() {
return this.cmdCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process;

import java.util.concurrent.ExecutorService;
import com.alipay.remoting.CommandCode;
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.RemotingProcessor;
import icu.funkye.redispike.protocol.RedisRequest;

public interface RedisRequestProcessor<T extends RedisRequest<?>> extends RemotingProcessor {

void handle(RemotingContext ctx, T request);

CommandCode getCmdCode();

@Override
default ExecutorService getExecutor() {
return null;
}

@Override
default void setExecutor(ExecutorService executor) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.CommandRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class CommandRequestProcessor extends AbstractRedisRequestProcessor<CommandRequest> {

public CommandRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(CommandRequest.class.hashCode()));
}

@Override
public void handle(RemotingContext ctx, CommandRequest request) {
request.setResponse("OK".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(request.getResponse());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.listener.DeleteListener;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.DelRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class DelRequestProcessor extends AbstractRedisRequestProcessor<DelRequest> {

public DelRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(DelRequest.class.hashCode()));
}

@Override public void handle(RemotingContext ctx, DelRequest request) {
List<String> keys = request.getKey();
List<Key> list =
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, key))
.collect(Collectors.toList());
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Key key : list) {
client.delete(AeroSpikeClientFactory.eventLoops.next(), new DeleteListener() {
@Override
public void onSuccess(Key key, boolean b) {
request.setResponse(String.valueOf(request.getCount().incrementAndGet())
.getBytes(StandardCharsets.UTF_8));
countDownLatch.countDown();
}

@Override
public void onFailure(AerospikeException e) {
countDownLatch.countDown();
}
}, client.getWritePolicyDefault(), key);
}
CompletableFuture.runAsync(() -> {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
ctx.writeAndFlush(request.getResponse());
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(request.getResponse());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.handler.process.impl;

import java.nio.charset.StandardCharsets;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import com.alipay.remoting.RemotingContext;
import com.alipay.sofa.common.profile.StringUtil;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.GetRequest;
import icu.funkye.redispike.util.IntegerUtils;

public class GetRequestProcessor extends AbstractRedisRequestProcessor<GetRequest> {

public GetRequestProcessor() {
this.cmdCode = new RedisRequestCommandCode(IntegerUtils.hashCodeToShort(GetRequest.class.hashCode()));
}

@Override
public void handle(RemotingContext ctx, GetRequest request) {
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
ctx.writeAndFlush(request.getResponse());
return;
}
String value = record.getString(request.getKey());
if (StringUtil.isNotBlank(value)) {
request.setResponse(value.getBytes(StandardCharsets.UTF_8));
}
ctx.writeAndFlush(request.getResponse());
}

@Override
public void onFailure(AerospikeException ae) {
logger.error(ae.getMessage(), ae);
ctx.writeAndFlush(request.getResponse());
}
}, client.getReadPolicyDefault(), key);
}
}
Loading