Skip to content

Commit

Permalink
feat: keyReadWrite (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 3, 2025
1 parent 69ada29 commit 0c72246
Show file tree
Hide file tree
Showing 37 changed files with 1,419 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public enum RedisType {
//仅camellia-redis-proxy支持
RedisKV("redis-kv://", false),

//格式:embedded-storage:/home/data
//仅camellia-redis-proxy支持
EmbeddedStorage("embedded-storage:", false),

;
private final String prefix;
private final boolean tlsEnable;
Expand Down
21 changes: 16 additions & 5 deletions camellia-redis-proxy/camellia-redis-proxy-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
</parent>

<dependencies>
<dependency>
<groupId>com.netease.nim</groupId>
<artifactId>camellia-redis-base</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.netease.nim</groupId>
<artifactId>camellia-codec</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
Expand All @@ -33,11 +43,6 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.netease.nim</groupId>
<artifactId>camellia-redis-base</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
Expand All @@ -48,6 +53,12 @@
<artifactId>camellia-http-console</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.6-9</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ErrorReply implements Reply {
public static final ErrorReply CROSS_SLOT_ERROR = new ErrorReply("CROSSSLOT Keys in request don't hash to the same slot");
public static final ErrorReply ILLEGAL_CLUSTER_HEATBEAT = new ErrorReply("ERR illegal cluster heatbeat");

public static final ErrorReply KEY_TOO_LONG = new ErrorReply("ERR key too long");

private static final char MARKER = Marker.ErrorReply.getMarker();
private final String error;
private final byte[] raw;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage;

import com.netease.nim.camellia.core.model.Resource;
import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.IUpstreamClient;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Created by caojiajun on 2025/1/3
*/
public class EmbeddedClient implements IUpstreamClient {

@Override
public void sendCommand(int db, List<Command> commands, List<CompletableFuture<Reply>> futureList) {
if (db > 0) {
for (CompletableFuture<Reply> future : futureList) {
future.complete(ErrorReply.DB_INDEX_OUT_OF_RANGE);
}
return;
}
}

@Override
public void start() {

}

@Override
public void preheat() {
}

@Override
public boolean isValid() {
return true;
}

@Override
public void shutdown() {
}

@Override
public Resource getResource() {
return null;
}

@Override
public void renew() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.codec;

import com.netease.nim.camellia.codec.Pack;
import com.netease.nim.camellia.codec.Unpack;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.CompressUtils;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.compress.ICompressor;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.kv.utils.BytesUtils;
import com.netease.nim.camellia.redis.proxy.util.RedisClusterCRC16Utils;
import com.netease.nim.camellia.tools.utils.BytesKey;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
* Created by caojiajun on 2025/1/3
*/
public class KeyCodec {

public static Map<BytesKey, KeyInfo> decodeSlot(byte[] all) {
ByteBuffer buffer = ByteBuffer.wrap(all);
int bucketSize = all.length / EmbeddedStorageConstants._4k;
Map<BytesKey, KeyInfo> result = new HashMap<>();
for (int i=0; i<bucketSize; i++) {
byte[] bytes = new byte[EmbeddedStorageConstants._4k];
buffer.get(bytes);
Map<BytesKey, KeyInfo> map = KeyCodec.decodeBucket(bytes);
result.putAll(map);
}
return result;
}

/**
* 解码bucket
* @param bytes 固定为4k输入
* @return 解码结果
*/
public static Map<BytesKey, KeyInfo> decodeBucket(byte[] bytes) {
int crc1 = BytesUtils.toInt(bytes, 0);//0,1,2,3
int crc2 = RedisClusterCRC16Utils.getCRC16(bytes, 5, bytes.length);
if (crc1 != crc2) {
return new HashMap<>();
}
int decompressLen = BytesUtils.toShort(bytes, 4);//4,5
byte compressType = bytes[6];//6
ICompressor compressor = CompressUtils.get(CompressType.getByValue(compressType));
byte[] decompressData = compressor.decompress(bytes, 7, bytes.length - 7, decompressLen);
Unpack unpack = new Unpack(decompressData);
int size = unpack.popVarUint();
Map<BytesKey, KeyInfo> map = new HashMap<>();
for (int i=0; i<size; i++) {
KeyInfo key = new KeyInfo();
unpack.popMarshallable(key);
map.put(new BytesKey(key.getKey()), key);
}
return map;
}

/**
* 编码bucket,可能会压缩
* 如果编码后超过了4k,则返回null,上层自行拆分;如果不足4k,则补0
* @param keys keys
* @return 编码结果,固定为4k
*/
public static byte[] encodeBucket(Map<BytesKey, KeyInfo> keys) {
Pack pack = new Pack();
pack.putVarUint(keys.size());
for (Map.Entry<BytesKey, KeyInfo> entry : keys.entrySet()) {
pack.putMarshallable(entry.getValue());
}
pack.getBuffer().capacity(pack.getBuffer().readableBytes());
byte[] array = pack.getBuffer().array();
short decompressLen = (short) array.length;
CompressType compressType = CompressType.zstd;
ICompressor compressor = CompressUtils.get(compressType);
byte[] compressed = compressor.compress(array, 0, array.length);
if (compressed.length > array.length) {
compressType = CompressType.none;
compressed = array;
}
if (compressed.length + 5 > EmbeddedStorageConstants._4k) {
return null;
}
int crc = RedisClusterCRC16Utils.getCRC16(compressed, 0, compressed.length);
ByteBuffer buffer = ByteBuffer.allocate(EmbeddedStorageConstants._4k);
buffer.putInt(crc);
buffer.putShort(decompressLen);
buffer.put(compressType.getType());
buffer.put(compressed);
return buffer.array();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command;

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.string.StringReadWrite;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.wal.WalGroup;

/**
* Created by caojiajun on 2025/1/3
*/
public abstract class CommandOnEmbeddedStorage {

protected WalGroup walGroup;

protected KeyReadWrite keyReadWrite;
protected StringReadWrite stringReadWrite;

/**
* redis command of commander
* @return redis-command
*/
public abstract RedisCommand redisCommand();

/**
* check param
* @param command command
* @return success or fail
*/
protected abstract boolean parse(Command command);

/**
* execute command
* @param slot slot
* @param command command
* @return reply
*/
protected abstract Reply execute(short slot, Command command) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command;


import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.util.MpscSlotHashExecutor;
import com.netease.nim.camellia.tools.utils.SysUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by caojiajun on 2025/1/3
*/
public class EmbeddedStorageExecutors {

private static final Logger logger = LoggerFactory.getLogger(EmbeddedStorageExecutors.class);

private static volatile EmbeddedStorageExecutors INSTANCE;

private final MpscSlotHashExecutor commandExecutor;

private EmbeddedStorageExecutors() {
int threads = ProxyDynamicConf.getInt("embedded.storage.command.executor.threads", SysUtils.getCpuNum() * 4);
int queueSize = ProxyDynamicConf.getInt("embedded.storage.command.executor.queue.size", 1024*128);
commandExecutor = new MpscSlotHashExecutor("embedded-storage-command-executor", threads, queueSize, new MpscSlotHashExecutor.AbortPolicy());
logger.info("EmbeddedStorageExecutors init success, threads = {}, queueSize = {}", threads, queueSize);
}

public static EmbeddedStorageExecutors getInstance() {
if (INSTANCE == null) {
synchronized (EmbeddedStorageExecutors.class) {
if (INSTANCE == null) {
INSTANCE = new EmbeddedStorageExecutors();
}
}
}
return INSTANCE;
}

public MpscSlotHashExecutor getCommandExecutor() {
return commandExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.string;

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.BulkReply;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.command.CommandOnEmbeddedStorage;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.enums.DataType;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.ValueLocation;
import com.netease.nim.camellia.tools.utils.BytesKey;

/**
* GET key
* <p>
* Created by caojiajun on 2025/1/3
*/
public class Get extends CommandOnEmbeddedStorage {

@Override
public RedisCommand redisCommand() {
return RedisCommand.GET;
}

@Override
protected boolean parse(Command command) {
byte[][] objects = command.getObjects();
return objects.length == 2;
}

@Override
protected Reply execute(short slot, Command command) throws Exception {
byte[][] objects = command.getObjects();
BytesKey key = new BytesKey(objects[1]);
KeyInfo keyInfo = keyReadWrite.get(slot, key);
return execute0(keyInfo);
}

private Reply execute0(KeyInfo keyInfo) {
if (keyInfo == null) {
return BulkReply.NIL_REPLY;
}
if (keyInfo.getDataType() != DataType.string) {
return ErrorReply.WRONG_TYPE;
}
if (keyInfo.containsExtra()) {
return new BulkReply(keyInfo.getExtra());
}
ValueLocation valueLocation = keyInfo.getValueLocation();
byte[] bytes = stringReadWrite.get(valueLocation);
if (bytes == null) {
return BulkReply.NIL_REPLY;
}
return new BulkReply(bytes);
}
}
Loading

0 comments on commit 0c72246

Please sign in to comment.