Skip to content

Commit

Permalink
don'Ãt allocate new buffers in chunking filter
Browse files Browse the repository at this point in the history
  • Loading branch information
thoniTUB committed Dec 11, 2024
1 parent ece3909 commit 7a80eba
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.buffer.BufferDataException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.DefaultWriteFuture;
Expand All @@ -30,37 +29,29 @@ public class ChunkingFilter extends IoFilterAdapter {
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
if (!(writeRequest.getMessage() instanceof IoBuffer ioBuffer)) {
throw new IllegalStateException("Filter was added at the wrong place in the filterchain. Only expecting IoBuffers here. Got: " + writeRequest.getMessage());
throw new IllegalStateException("Filter was added at the wrong place in the filter chain. Only expecting IoBuffers here. Got: " + writeRequest.getMessage());
}

try {
if (ioBuffer.prefixedDataAvailable(4, socketSendBufferSize)) {
// IoBuffer is shorter than socket buffer, we can just send it.
log.trace("Sending buffer without chunking");
super.filterWrite(nextFilter, session, writeRequest);
return;
}
// The first 4 bytes hold the object length in bytes
int objectLength = ioBuffer.getInt(ioBuffer.position());

throw new IllegalArgumentException("Got an incomplete IoBuffer on the sender side.");
}
catch (BufferDataException e) {
// The IoBuffer is larger than the socketSendBuffer
log.trace("Sending buffer with chunking: {}", e.getMessage());

if (objectLength < socketSendBufferSize) {
// IoBuffer is shorter than socket buffer, we can just send it.
log.info("Sending buffer without chunking");
super.filterWrite(nextFilter, session, writeRequest);
return;
}

// Split buffers

byte[] bufferArray = ioBuffer.array();
int arrayOffset = ioBuffer.arrayOffset();

int oldPos = ioBuffer.position();
int oldLimit = ioBuffer.limit();


ioBuffer.limit(oldPos + socketSendBufferSize);
int newLimit = ioBuffer.limit();

// Send the first resized (original) buffer, do not work on this ioBuffer pos and limit from here on
// Send the first resized (original) buffer
int chunkCount = 1;
log.trace("Sending {}. chunk: {} byte", chunkCount, newLimit - oldPos);
DefaultWriteFuture future = new DefaultWriteFuture(session);
Expand All @@ -72,12 +63,11 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w
int remainingBytes = oldLimit - newLimit;

do {
int newArrayPos = arrayOffset + (newLimit - oldPos);

// Size and allocate new Buffer
// Size a new Buffer
int nextBufSize = Math.min(remainingBytes, socketSendBufferSize);
IoBuffer nextBuffer = IoBuffer.allocate(nextBufSize);
System.arraycopy(bufferArray, newArrayPos, nextBuffer.array(), nextBuffer.arrayOffset(), nextBufSize);
IoBuffer nextBuffer = ioBuffer.duplicate();
nextBuffer.limit(newLimit + nextBufSize);
nextBuffer.position(newLimit);

// Write chunked buffer
log.trace("Sending {}. chunk: {} byte", chunkCount, nextBufSize);
Expand All @@ -92,13 +82,26 @@ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest w

} while(remainingBytes > 0);

// Wait for our self produced write request to get through
futures.forEach(WriteFuture::awaitUninterruptibly);
try {
// Wait for our self produced write request to get through
futures.forEach(WriteFuture::awaitUninterruptibly);

for (WriteFuture writeFuture : futures) {
Throwable exception = writeFuture.getException();
if (exception == null) {
continue;
}
writeRequest.getFuture().setException(new IllegalStateException("Failed to write a chunked ioBuffer", exception));
break;
}

// Set the original request as written
writeRequest.getFuture().setWritten();
ioBuffer.free();
// Set the original request as written
writeRequest.getFuture().setWritten();
}
finally {
ioBuffer.free();
}

log.trace("Send all chunks");
log.info("Sent all chunks");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public static void beforeAll() throws IOException {
CLUSTER_CONFIG.setPort(0);
CLUSTER_CONFIG.setMaxIoBufferSizeBytes(toIntExact(DataSize.mebibytes(10).toBytes()));

// This enables the Chunking filter, which triggers for messages > 1 MebiByte
CLUSTER_CONFIG.getMina().setSendBufferSize(toIntExact(DataSize.mebibytes(1).toBytes()));

// Server
SERVER = CLUSTER_CONFIG.getClusterAcceptor(OM, new IoHandlerAdapter() {
@Override
Expand Down Expand Up @@ -199,8 +202,7 @@ private static Stream<Arguments> dataSizes() {
Arguments.of(DataSize.bytes(10), true),
Arguments.of(DataSize.kibibytes(10), true),
Arguments.of(DataSize.mebibytes(9), true),
Arguments.of(DataSize.mebibytes(10), false), // See beforeAll() setting
Arguments.of(DataSize.mebibytes(1100), false) // See beforeAll() setting
Arguments.of(DataSize.mebibytes(10), false) // See beforeAll() setting
);
}

Expand Down

0 comments on commit 7a80eba

Please sign in to comment.