Skip to content

Commit

Permalink
feat: add max key/value limit (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 10, 2025
1 parent d26da8a commit 22c8f97
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ErrorReply implements Reply {
public static final ErrorReply ILLEGAL_CLUSTER_HEATBEAT = new ErrorReply("ERR illegal cluster heatbeat");

public static final ErrorReply KEY_TOO_LONG = new ErrorReply("ERR key too long");
public static final ErrorReply VALUE_TOO_LONG = new ErrorReply("ERR value too long");

private static final char MARKER = Marker.ErrorReply.getMarker();
private final String error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public void sendCommand(int db, List<Command> commands, List<CompletableFuture<R
sendNoneKeyCommand(redisCommand, command, future);
} else {
List<byte[]> keys = command.getKeys();
for (byte[] key : keys) {
if (key.length > 1024) {
future.complete(ErrorReply.KEY_TOO_LONG);
return;
}
}
if (keys.size() == 1) {
byte[] key = keys.getFirst();
sendCommand(redisCommand, key, command, future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig;
Expand All @@ -10,6 +11,8 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.util.Utils;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k;

/**
* PSETEX key milliseconds value
* <p>
Expand Down Expand Up @@ -39,6 +42,10 @@ protected Reply execute(short slot, Command command) throws Exception {
long millis = Utils.bytesToNum(objects[2]);
byte[] value = objects[3];

if (value.length > _1024k) {
return ErrorReply.VALUE_TOO_LONG;
}

long expireTime = System.currentTimeMillis() + millis;
KeyInfo keyInfo = new KeyInfo(DataType.string, key);
keyInfo.setExpireTime(expireTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.netease.nim.camellia.redis.proxy.util.ErrorLogCollector;
import com.netease.nim.camellia.redis.proxy.util.Utils;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k;

/**
* SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
* Created by caojiajun on 2025/1/3
Expand Down Expand Up @@ -44,6 +46,9 @@ protected Reply execute(short slot, Command command) throws Exception {
byte[][] objects = command.getObjects();
Key key = new Key(objects[1]);
byte[] value = objects[2];
if (value.length > _1024k) {
return ErrorReply.VALUE_TOO_LONG;
}
int nxxx = -1;
long expireTime = -1;
boolean get = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.reply.StatusReply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig;
Expand All @@ -10,6 +11,8 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.util.Utils;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k;

/**
* SETEX key seconds value
* <p>
Expand Down Expand Up @@ -39,6 +42,10 @@ protected Reply execute(short slot, Command command) throws Exception {
long seconds = Utils.bytesToNum(objects[2]);
byte[] value = objects[3];

if (value.length > _1024k) {
return ErrorReply.VALUE_TOO_LONG;
}

long expireTime = System.currentTimeMillis() + seconds * 1000L;
KeyInfo keyInfo = new KeyInfo(DataType.string, key);
keyInfo.setExpireTime(expireTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.IntegerReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.CommandConfig;
Expand All @@ -10,6 +11,8 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._1024k;

/**
* SETNX key value
* <p>
Expand Down Expand Up @@ -38,6 +41,10 @@ protected Reply execute(short slot, Command command) throws Exception {
Key key = new Key(objects[1]);
byte[] value = objects[2];

if (value.length > _1024k) {
return ErrorReply.VALUE_TOO_LONG;
}

KeyInfo keyInfo = keyReadWrite.get(slot, key);
if (keyInfo == null) {
keyInfo = new KeyInfo(DataType.string, key.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void compact(short slot) {
recycle = decodeResult.remaining() > BlockType._32k.getBlockSize();
} else if (blockType == BlockType._1024k) {
recycle = decodeResult.remaining() > BlockType._256k.getBlockSize();
} else if (blockType == BlockType._10m) {
recycle = decodeResult.remaining() > BlockType._1024k.getBlockSize();
}
}
if (recycle) {
Expand Down Expand Up @@ -149,7 +151,8 @@ private BlockType nextBlockType(short slot) {
case _4k -> nextBlockTypeMap.put(slot, BlockType._32k);
case _32k -> nextBlockTypeMap.put(slot, BlockType._256k);
case _256k -> nextBlockTypeMap.put(slot, BlockType._1024k);
case _1024k -> nextBlockTypeMap.put(slot, BlockType._4k);
case _1024k -> nextBlockTypeMap.put(slot, BlockType._10m);
case _10m -> nextBlockTypeMap.put(slot, BlockType._4k);
}
return blockType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class LocalStorageConstants {
public static final int _64k = 64*1024;
public static final int _256k = 256*1024;
public static final int _1024k = 1024*1024;
public static final int _10m = 10*1024*1024;
public static final int key_manifest_bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib
public static final long data_file_size = 192*1024*1024*1024L;//128Gib
public static final int block_header_len = 4+4+2+1+4+4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public enum BlockType {
_32k(2, 32*1024, 2),
_256k(3, 256*1024, 4),
_1024k(4, 1024*1024, 4),
_10m(5, 10*1024*1024, 4),
;

private final int type;
Expand Down Expand Up @@ -53,6 +54,8 @@ public static BlockType fromData(byte[] data) {
return BlockType._256k;
} else if (data.length + 4 + 4 < LocalStorageConstants._1024k) {
return BlockType._1024k;
} else if (data.length + 4 + 4 < LocalStorageConstants._10m) {
return BlockType._10m;
} else {
throw new IllegalArgumentException("data too long");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string;

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.EstimateSizeValueCalculator;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.SizeCalculator;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.enums.FlushResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.Key;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.KeyInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.persist.ValueFlushExecutor;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.string.block.StringBlockReadWrite;
Expand All @@ -10,26 +14,53 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;


/**
* Created by caojiajun on 2025/1/3
*/
public class StringReadWrite {

private static final String READ_CACHE_CONFIG_KEY = "local.storage.string.read.cache.capacity";
private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.string.write.cache.capacity";

private final ConcurrentHashMap<Short, SlotStringReadWrite> map = new ConcurrentHashMap<>();

private final ValueFlushExecutor flushExecutor;
private final StringBlockReadWrite stringBlockReadWrite;

private final LRUCache<Key, byte[]> readCache;
private final LRUCache<Key, byte[]> writeCache;

public StringReadWrite(ValueFlushExecutor flushExecutor, StringBlockReadWrite stringBlockReadWrite) {
this.flushExecutor = flushExecutor;
this.stringBlockReadWrite = stringBlockReadWrite;
this.readCache = new LRUCache<>("string-read-cache", READ_CACHE_CONFIG_KEY, "32M", 1024, new EstimateSizeValueCalculator<>(), SizeCalculator.BYTES_INSTANCE);
this.writeCache = new LRUCache<>("string-write-cache", WRITE_CACHE_CONFIG_KEY, "32M", 1024, new EstimateSizeValueCalculator<>(), SizeCalculator.BYTES_INSTANCE);
}

public void put(short slot, KeyInfo keyInfo, byte[] data) throws IOException {
Key key = new Key(keyInfo.getKey());
byte[] bytes = readCache.get(key);
if (bytes != null) {
readCache.put(key, data);
} else {
writeCache.put(key, data);
}
get(slot).put(keyInfo, data);
}

public byte[] get(short slot, KeyInfo keyInfo) throws IOException {
Key key = new Key(keyInfo.getKey());
byte[] bytes = readCache.get(key);
if (bytes != null) {
return bytes;
}
bytes = writeCache.get(key);
if (bytes != null) {
readCache.put(key, bytes);
writeCache.delete(key);
return bytes;
}
return get(slot).get(keyInfo);
}

Expand All @@ -41,10 +72,6 @@ public CompletableFuture<FlushResult> flush(short slot) throws IOException {
return slotStringReadWrite.flush();
}

private SlotStringReadWrite get(short slot) {
return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotStringReadWrite(slot, flushExecutor, stringBlockReadWrite));
}

public boolean needFlush(short slot) {
SlotStringReadWrite slotStringReadWrite = get(slot);
if (slotStringReadWrite == null) {
Expand All @@ -53,4 +80,8 @@ public boolean needFlush(short slot) {
return slotStringReadWrite.needFlush();
}

private SlotStringReadWrite get(short slot) {
return CamelliaMapUtils.computeIfAbsent(map, slot, s -> new SlotStringReadWrite(slot, flushExecutor, stringBlockReadWrite));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.LRUCache;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.SizeCalculator;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValue;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueDecodeResult;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.file.FileNames;
Expand All @@ -12,6 +13,7 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.IValueManifest;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._4k;
Expand Down Expand Up @@ -66,10 +68,15 @@ public byte[] get(KeyInfo keyInfo) throws IOException {
if (list.isEmpty()) {
return null;
}
if (list.size() > keyInfo.getValueLocation().offset()) {
if (list.size() <= keyInfo.getValueLocation().offset()) {
return null;
}
return list.get(keyInfo.getValueLocation().offset());
byte[] bytes = list.get(keyInfo.getValueLocation().offset());
StringValue stringValue = StringValue.decode(bytes);
if (Arrays.equals(stringValue.key(), keyInfo.getKey())) {
return stringValue.value();
}
return null;
}

@Override
Expand Down

0 comments on commit 22c8f97

Please sign in to comment.