Skip to content

Commit

Permalink
Update the pointer of Continuation's headerBuffer after copying the p…
Browse files Browse the repository at this point in the history
…ayload (#507)
  • Loading branch information
CoolTomatos authored and ok2c committed Nov 27, 2024
1 parent 957a823 commit a1ae74d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1342,8 +1342,11 @@ void copyPayload(final ByteBuffer payload) {
if (payload == null) {
return;
}
headerBuffer.ensureCapacity(payload.remaining());
payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
final int originalLength = headerBuffer.length();
final int toCopy = payload.remaining();
headerBuffer.ensureCapacity(toCopy);
payload.get(headerBuffer.array(), originalLength, toCopy);
headerBuffer.setLength(originalLength + toCopy);
}

ByteBuffer getContent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.CharCodingSupport;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
Expand All @@ -45,23 +52,33 @@
import org.apache.hc.core5.http2.frame.FrameType;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
import org.apache.hc.core5.http2.hpack.HPackEncoder;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.util.ByteArrayBuffer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

class TestAbstractH2StreamMultiplexer {

private static final FrameFactory FRAME_FACTORY = DefaultFrameFactory.INSTANCE;

@Mock
ProtocolIOSession protocolIOSession;
@Mock
HttpProcessor httpProcessor;
@Mock
H2StreamListener h2StreamListener;
@Mock
H2StreamHandler streamHandler;
@Captor
ArgumentCaptor<List<Header>> headersCaptor;

@BeforeEach
void prepareMocks() {
Expand All @@ -70,15 +87,19 @@ void prepareMocks() {

static class H2StreamMultiplexerImpl extends AbstractH2StreamMultiplexer {

private Supplier<H2StreamHandler> streamHandlerSupplier;

public H2StreamMultiplexerImpl(
final ProtocolIOSession ioSession,
final FrameFactory frameFactory,
final StreamIdGenerator idGenerator,
final HttpProcessor httpProcessor,
final CharCodingConfig charCodingConfig,
final H2Config h2Config,
final H2StreamListener streamListener) {
final H2StreamListener streamListener,
final Supplier<H2StreamHandler> streamHandlerSupplier) {
super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener);
this.streamHandlerSupplier = streamHandlerSupplier;
}

@Override
Expand All @@ -99,7 +120,7 @@ H2StreamHandler createRemotelyInitiatedStream(
final HttpProcessor httpProcessor,
final BasicHttpConnectionMetrics connMetrics,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException {
return null;
return streamHandlerSupplier.get();
}

@Override
Expand Down Expand Up @@ -128,14 +149,15 @@ void testInputOneFrame() throws Exception {

final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
protocolIOSession,
DefaultFrameFactory.INSTANCE,
FRAME_FACTORY,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
H2Config.custom()
.setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE)
.build(),
h2StreamListener);
h2StreamListener,
() -> streamHandler);

Assertions.assertThrows(H2ConnectionException.class, () ->
streamMultiplexer.onInput(ByteBuffer.wrap(bytes)));
Expand Down Expand Up @@ -179,14 +201,15 @@ void testInputMultipleFrames() throws Exception {

final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
protocolIOSession,
DefaultFrameFactory.INSTANCE,
FRAME_FACTORY,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
H2Config.custom()
.setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE)
.build(),
h2StreamListener);
h2StreamListener,
() -> streamHandler);

Assertions.assertThrows(H2ConnectionException.class, () ->
streamMultiplexer.onInput(ByteBuffer.wrap(bytes)));
Expand All @@ -212,5 +235,40 @@ void testInputMultipleFrames() throws Exception {
});
}

@Test
void testInputHeaderContinuationFrame() throws IOException, HttpException {
final H2Config h2Config = H2Config.custom().setMaxFrameSize(FrameConsts.MIN_FRAME_SIZE)
.build();

final ByteArrayBuffer buf = new ByteArrayBuffer(19);
final HPackEncoder encoder = new HPackEncoder(H2Config.INIT.getHeaderTableSize(), CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT));
final List<Header> headers = new ArrayList<>();
headers.add(new BasicHeader("test-header-key", "value"));
headers.add(new BasicHeader(":status", "200"));
encoder.encodeHeaders(buf, headers, h2Config.isCompressionEnabled());

final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024);
final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024);

final RawFrame headerFrame = FRAME_FACTORY.createHeaders(2, ByteBuffer.wrap(buf.array(), 0, 10), false, false);
outBuffer.write(headerFrame, writableChannel);
final RawFrame continuationFrame = FRAME_FACTORY.createContinuation(2, ByteBuffer.wrap(buf.array(), 10, 9), true);
outBuffer.write(continuationFrame, writableChannel);
final byte[] bytes = writableChannel.toByteArray();

final AbstractH2StreamMultiplexer streamMultiplexer = new H2StreamMultiplexerImpl(
protocolIOSession,
FRAME_FACTORY,
StreamIdGenerator.ODD,
httpProcessor,
CharCodingConfig.DEFAULT,
h2Config,
h2StreamListener,
() -> streamHandler);

streamMultiplexer.onInput(ByteBuffer.wrap(bytes));
Mockito.verify(streamHandler).consumeHeader(headersCaptor.capture(), ArgumentMatchers.eq(false));
Assertions.assertFalse(headersCaptor.getValue().isEmpty());
}
}

0 comments on commit a1ae74d

Please sign in to comment.