diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java index 48f588421d..e31841a814 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java @@ -1,7 +1,5 @@ package com.bakdata.conquery.io.mina; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import io.dropwizard.util.DataSize; @@ -29,8 +27,6 @@ public class ChunkingFilter extends IoFilterAdapter { private final int socketSendBufferSize; - - @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) { @@ -40,7 +36,6 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w // The first 4 bytes hold the object length in bytes int objectLength = ioBuffer.getInt(ioBuffer.position()); - if (objectLength < socketSendBufferSize) { // IoBuffer is shorter than socket buffer, we can just send it. log.trace("Sending buffer without chunking: {} (limit = {})", DataSize.bytes(objectLength), DataSize.bytes(socketSendBufferSize)); @@ -75,9 +70,7 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w do { // Size a new Buffer int nextBufSize = Math.min(remainingBytes, socketSendBufferSize); - IoBuffer nextBuffer = ioBuffer.duplicate(); - nextBuffer.limit(newLimit + nextBufSize); - nextBuffer.position(newLimit); + IoBuffer nextBuffer = ioBuffer.getSlice(newLimit, nextBufSize); // Write chunked buffer chunkCount++; @@ -89,10 +82,15 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w future.addListener(listener); // Recalculate for next iteration - newLimit = newLimit + nextBufSize; - remainingBytes = remainingBytes - nextBufSize; + newLimit += nextBufSize; + remainingBytes -= nextBufSize; - } while(remainingBytes > 0); + } while (remainingBytes > 0); + } + + public static int divideAndRoundUp(int num, int divisor) { + // only for positive values + return (num + divisor - 1) / divisor; } private static @NotNull IoFutureListener handleWrittenChunk(WriteRequest writeRequest, AtomicInteger writtenChunks, int totalChunks) { @@ -115,9 +113,4 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w } }; } - - public static int divideAndRoundUp(int num, int divisor) { - // only for positive values - return (num + divisor - 1) / divisor; - } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java index 838738f8b3..3847f4bfd4 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java @@ -16,6 +16,7 @@ @RequiredArgsConstructor public class JacksonProtocolEncoder extends ObjectSerializationEncoder { + private final int SIZE_PREFIX_LENGTH = Integer.BYTES; private final ObjectWriter objectWriter; @@ -25,27 +26,25 @@ public class JacksonProtocolEncoder extends ObjectSerializationEncoder { @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { - IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); + final IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); buf.setAutoExpand(true); + buf.position(); + buf.skip(SIZE_PREFIX_LENGTH); // Make a room for the length field. - int oldPos = buf.position(); - buf.skip(4); // Make a room for the length field. - - Stopwatch stopwatch = Stopwatch.createStarted(); + final Stopwatch stopwatch = Stopwatch.createStarted(); log.trace("BEGIN Encoding message"); + objectWriter.writeValue(buf.asOutputStream(), message); - int objectSize = buf.position() - 4; + final int objectSize = buf.position() - SIZE_PREFIX_LENGTH; + if (objectSize > getMaxObjectSize()) { throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + getMaxObjectSize() + ')'); } // Fill the length field - int newPos = buf.position(); - buf.position(oldPos); - buf.putInt(newPos - oldPos - 4); - buf.position(newPos); + buf.putInt(0, objectSize); buf.flip(); log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message);