diff --git a/core/src/main/java/dev/keva/core/aof/AOFContainer.java b/core/src/main/java/dev/keva/core/aof/AOFContainer.java index 9cce1605..de696e2a 100644 --- a/core/src/main/java/dev/keva/core/aof/AOFContainer.java +++ b/core/src/main/java/dev/keva/core/aof/AOFContainer.java @@ -81,10 +81,13 @@ public void sync() throws IOException { bufferLock.lock(); try { for (Command command : buffer) { - output.writeObject(command); + output.writeObject(command.getObjects()); } } finally { output.flush(); + for (Command command : buffer) { + command.recycle(); + } buffer.clear(); bufferLock.unlock(); } @@ -92,10 +95,12 @@ public void sync() throws IOException { public void syncPerMutation(Command command) { try { - output.writeObject(command); + output.writeObject(command.getObjects()); output.flush(); } catch (IOException e) { log.error("Error writing AOF file", e); + } finally { + command.recycle(); } } @@ -106,8 +111,8 @@ public List read() throws IOException { ObjectInputStream input = new ObjectInputStream(fis); while (true) { try { - Command command = (Command) input.readObject(); - commands.add(command); + byte[][] objects = (byte[][]) input.readObject(); + commands.add(Command.newInstance(objects, false)); } catch (EOFException e) { fis.close(); return commands; diff --git a/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java b/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java index f82b7b3f..f15e9137 100644 --- a/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java +++ b/core/src/main/java/dev/keva/core/command/impl/key/manager/ExpirationManager.java @@ -70,7 +70,7 @@ public void executeExpire(byte[] key) { byte[][] data = new byte[2][]; data[0] = "delete".getBytes(); data[1] = key; - Command command = new Command(data, false); + Command command = Command.newInstance(data, false); Lock lock = database.getLock(); lock.lock(); try { diff --git a/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java b/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java index 07ef8eaf..69d6334f 100644 --- a/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java +++ b/core/src/main/java/dev/keva/core/command/mapping/CommandMapper.java @@ -108,6 +108,11 @@ public void init() { } Object[] objects = new Object[types.length]; command.toArguments(objects, types, ctx); + // If not in AOF mode, then recycle(), + // else, the command will be recycled in the AOF dump + if (!kevaConfig.getAof()) { + command.recycle(); + } return (Reply) method.invoke(instance, objects); } finally { lock.unlock(); diff --git a/resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java b/resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java index 7099bd3b..60a81053 100644 --- a/resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java +++ b/resp-protocol/src/main/java/dev/keva/protocol/resp/Command.java @@ -1,6 +1,8 @@ package dev.keva.protocol.resp; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.Recycler; +import lombok.Getter; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -8,18 +10,47 @@ public class Command implements Serializable { private static final byte LOWER_DIFF = 'a' - 'A'; - private final Object[] objects; + private static final Recycler RECYCLER = new Recycler() { + protected Command newObject(Recycler.Handle handle) { + return new Command(handle); + } + }; + + private final Recycler.Handle handle; + + @Getter + private byte[][] objects; + + private Command(Recycler.Handle handle) { + this.handle = handle; + } - public Command(Object[] objects, boolean inline) { + public static Command newInstance(byte[][] objects, boolean inline) { + if (objects == null) { + throw new IllegalArgumentException("objects must not be null"); + } + if (objects.length == 0) { + throw new IllegalArgumentException("objects must not be empty"); + } + + Command command = RECYCLER.get(); if (inline) { - byte[] objs = ByteUtil.getBytes(objects[0]); + byte[] objs = objects[0]; String[] strings = new String(objs, StandardCharsets.UTF_8).trim().split("\\s+"); - objects = new Object[strings.length]; + objects = new byte[strings.length][]; for (int i = 0; i < strings.length; i++) { objects[i] = ByteUtil.getBytes(strings[i]); } } - this.objects = objects; + command.objects = objects; + // LowerCase bytes + for (int i = 0; i < objects[0].length; i++) { + byte b = objects[0][i]; + if (b >= 'A' && b <= 'Z') { + objects[0][i] = (byte) (b + LOWER_DIFF); + } + } + return command; } public int getLength() { @@ -31,15 +62,7 @@ public int getLength() { } public byte[] getName() { - byte[] name = ByteUtil.getBytes(objects[0]); - // LowerCase bytes - for (int i = 0; i < name.length; i++) { - byte b = name[i]; - if (b >= 'A' && b <= 'Z') { - name[i] = (byte) (b + LOWER_DIFF); - } - } - return name; + return objects[0]; } public void toArguments(Object[] arguments, Class[] types, ChannelHandlerContext ctx) { @@ -60,11 +83,16 @@ public void toArguments(Object[] arguments, Class[] types, ChannelHandlerCont int left = isFirstVararg ? (objects.length - position - 1) : (objects.length - 1); byte[][] lastArgument = new byte[left][]; for (int i = 0; i < left; i++) { - lastArgument[i] = isFirstVararg ? (byte[]) (objects[i + position + 1]) : (byte[]) (objects[i + position]); + lastArgument[i] = isFirstVararg ? objects[i + position + 1] : objects[i + position]; } arguments[position] = lastArgument; } position++; } } + + public void recycle() { + objects = null; + handle.recycle(this); + } } diff --git a/resp-protocol/src/main/java/dev/keva/protocol/resp/RedisCommandDecoder.java b/resp-protocol/src/main/java/dev/keva/protocol/resp/RedisCommandDecoder.java index a75ac689..6d5a81f5 100644 --- a/resp-protocol/src/main/java/dev/keva/protocol/resp/RedisCommandDecoder.java +++ b/resp-protocol/src/main/java/dev/keva/protocol/resp/RedisCommandDecoder.java @@ -34,7 +34,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } try { - out.add(new Command(bytes, false)); + out.add(Command.newInstance(bytes, false)); } finally { bytes = null; arguments = 0; @@ -60,7 +60,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t b[0] = new byte[buf.readableBytes()]; buf.getBytes(0, b[0]); in.skipBytes(isCRLF ? 2 : 1); - out.add(new Command(b, true)); + out.add(Command.newInstance(b, true)); } } }