Skip to content

Commit

Permalink
use slices for buffer handling in Mina
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Dec 16, 2024
1 parent ef752fc commit 7f9f72f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.bakdata.conquery.io.mina;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import io.dropwizard.util.DataSize;
Expand Down Expand Up @@ -29,8 +27,6 @@ public class ChunkingFilter extends IoFilterAdapter {
private final int socketSendBufferSize;




@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) {
Expand All @@ -40,7 +36,6 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w
// The first 4 bytes hold the object length in bytes
int objectLength = ioBuffer.getInt(ioBuffer.position());


if (objectLength < socketSendBufferSize) {
// IoBuffer is shorter than socket buffer, we can just send it.
log.trace("Sending buffer without chunking: {} (limit = {})", DataSize.bytes(objectLength), DataSize.bytes(socketSendBufferSize));
Expand Down Expand Up @@ -75,9 +70,7 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w
do {
// Size a new Buffer
int nextBufSize = Math.min(remainingBytes, socketSendBufferSize);
IoBuffer nextBuffer = ioBuffer.duplicate();
nextBuffer.limit(newLimit + nextBufSize);
nextBuffer.position(newLimit);
IoBuffer nextBuffer = ioBuffer.getSlice(newLimit, nextBufSize);

// Write chunked buffer
chunkCount++;
Expand All @@ -89,10 +82,15 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w
future.addListener(listener);

// Recalculate for next iteration
newLimit = newLimit + nextBufSize;
remainingBytes = remainingBytes - nextBufSize;
newLimit += nextBufSize;
remainingBytes -= nextBufSize;

} while(remainingBytes > 0);
} while (remainingBytes > 0);
}

public static int divideAndRoundUp(int num, int divisor) {
// only for positive values
return (num + divisor - 1) / divisor;
}

private static @NotNull IoFutureListener<IoFuture> handleWrittenChunk(WriteRequest writeRequest, AtomicInteger writtenChunks, int totalChunks) {
Expand All @@ -115,9 +113,4 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w
}
};
}

public static int divideAndRoundUp(int num, int divisor) {
// only for positive values
return (num + divisor - 1) / divisor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
@RequiredArgsConstructor
public class JacksonProtocolEncoder extends ObjectSerializationEncoder {

private final int SIZE_PREFIX_LENGTH = Integer.BYTES;

private final ObjectWriter objectWriter;

Expand All @@ -25,27 +26,25 @@ public class JacksonProtocolEncoder extends ObjectSerializationEncoder {

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false);
final IoBuffer buf = IoBuffer.allocate(initialBufferCapacityBytes, false);
buf.setAutoExpand(true);

buf.position();
buf.skip(SIZE_PREFIX_LENGTH); // Make a room for the length field.

int oldPos = buf.position();
buf.skip(4); // Make a room for the length field.

Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.trace("BEGIN Encoding message");

objectWriter.writeValue(buf.asOutputStream(), message);

int objectSize = buf.position() - 4;
final int objectSize = buf.position() - SIZE_PREFIX_LENGTH;

if (objectSize > getMaxObjectSize()) {
throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + getMaxObjectSize() + ')');
}

// Fill the length field
int newPos = buf.position();
buf.position(oldPos);
buf.putInt(newPos - oldPos - 4);
buf.position(newPos);
buf.putInt(0, objectSize);

buf.flip();
log.trace("FINISHED Encoding message in {}. Buffer size: {}. Message: {}", stopwatch, DataSize.bytes(buf.remaining()), message);
Expand Down

0 comments on commit 7f9f72f

Please sign in to comment.