From 48ddc1a03b6a287e4f7850c3cb337134bb157537 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Mon, 16 Dec 2024 11:58:33 +0100 Subject: [PATCH] use slices for buffer handling in Mina --- .../conquery/io/mina/ChunkingFilter.java | 25 +++++++------------ .../io/mina/JacksonProtocolEncoder.java | 19 +++++++------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java index 48f588421d..e31841a814 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/ChunkingFilter.java @@ -1,7 +1,5 @@ package com.bakdata.conquery.io.mina; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import io.dropwizard.util.DataSize; @@ -29,8 +27,6 @@ public class ChunkingFilter extends IoFilterAdapter { private final int socketSendBufferSize; - - @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) { @@ -40,7 +36,6 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w // The first 4 bytes hold the object length in bytes int objectLength = ioBuffer.getInt(ioBuffer.position()); - if (objectLength < socketSendBufferSize) { // IoBuffer is shorter than socket buffer, we can just send it. log.trace("Sending buffer without chunking: {} (limit = {})", DataSize.bytes(objectLength), DataSize.bytes(socketSendBufferSize)); @@ -75,9 +70,7 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w do { // Size a new Buffer int nextBufSize = Math.min(remainingBytes, socketSendBufferSize); - IoBuffer nextBuffer = ioBuffer.duplicate(); - nextBuffer.limit(newLimit + nextBufSize); - nextBuffer.position(newLimit); + IoBuffer nextBuffer = ioBuffer.getSlice(newLimit, nextBufSize); // Write chunked buffer chunkCount++; @@ -89,10 +82,15 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w future.addListener(listener); // Recalculate for next iteration - newLimit = newLimit + nextBufSize; - remainingBytes = remainingBytes - nextBufSize; + newLimit += nextBufSize; + remainingBytes -= nextBufSize; - } while(remainingBytes > 0); + } while (remainingBytes > 0); + } + + public static int divideAndRoundUp(int num, int divisor) { + // only for positive values + return (num + divisor - 1) / divisor; } private static @NotNull IoFutureListener handleWrittenChunk(WriteRequest writeRequest, AtomicInteger writtenChunks, int totalChunks) { @@ -115,9 +113,4 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w } }; } - - public static int divideAndRoundUp(int num, int divisor) { - // only for positive values - return (num + divisor - 1) / divisor; - } } diff --git a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java index 838738f8b3..4aa3444843 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java +++ b/backend/src/main/java/com/bakdata/conquery/io/mina/JacksonProtocolEncoder.java @@ -25,27 +25,26 @@ public class JacksonProtocolEncoder extends ObjectSerializationEncoder { @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { - IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); + final IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false); buf.setAutoExpand(true); - - int oldPos = buf.position(); + buf.position(); buf.skip(4); // Make a room for the length field. + final IoBuffer slice = buf.slice(); - Stopwatch stopwatch = Stopwatch.createStarted(); + final Stopwatch stopwatch = Stopwatch.createStarted(); log.trace("BEGIN Encoding message"); - objectWriter.writeValue(buf.asOutputStream(), message); - int objectSize = buf.position() - 4; + objectWriter.writeValue(slice.asOutputStream(), message); + + final int objectSize = slice.position(); + if (objectSize > getMaxObjectSize()) { throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + getMaxObjectSize() + ')'); } // Fill the length field - int newPos = buf.position(); - buf.position(oldPos); - buf.putInt(newPos - oldPos - 4); - buf.position(newPos); + buf.putInt(0, objectSize); buf.flip(); log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message);