diff --git a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java index f42dabdd55a..86631f46298 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java @@ -108,6 +108,7 @@ public static CronetChannelBuilder forAddress(String name, int port) { private int trafficStatsTag; private boolean trafficStatsUidSet; private int trafficStatsUid; + private int readBufferSize = 4 * 1024; private Network network; private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) { @@ -194,6 +195,17 @@ CronetChannelBuilder setTrafficStatsUid(int uid) { return this; } + /** + * Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer + * size leads to less overhead but more memory consumption. The current default value is 4KB. + * @param size Buffer size in bytes. + * @return the builder to facilitate chaining. + */ + CronetChannelBuilder setReadBufferSize(int size) { + readBufferSize = size; + return this; + } + /** Sets the network ID to use for this channel traffic. */ @CanIgnoreReturnValue CronetChannelBuilder bindToNetwork(@Nullable Network network) { @@ -233,7 +245,8 @@ ClientTransportFactory buildTransportFactory() { alwaysUsePut, transportTracerFactory.create(), useGetForSafeMethods, - usePutForIdempotentMethods); + usePutForIdempotentMethods, + readBufferSize); } @VisibleForTesting @@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory { private final boolean usingSharedScheduler; private final boolean useGetForSafeMethods; private final boolean usePutForIdempotentMethods; + private final int readBufferSize; private CronetTransportFactory( StreamBuilderFactory streamFactory, @@ -256,7 +270,8 @@ private CronetTransportFactory( boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { usingSharedScheduler = timeoutService == null; this.timeoutService = usingSharedScheduler ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService; @@ -267,6 +282,7 @@ private CronetTransportFactory( this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.useGetForSafeMethods = useGetForSafeMethods; this.usePutForIdempotentMethods = usePutForIdempotentMethods; + this.readBufferSize = readBufferSize; } @Override @@ -275,7 +291,7 @@ public ConnectionClientTransport newClientTransport( InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(), options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize, - alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods); + alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods, readBufferSize); } @Override diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java index fcba49a7ae1..33b7450cb90 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -59,7 +59,6 @@ * Client stream for the cronet transport. */ class CronetClientStream extends AbstractClientStream { - private static final int READ_BUFFER_CAPACITY = 4 * 1024; private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); private static final String LOG_TAG = "grpc-java-cronet"; @@ -85,6 +84,8 @@ class CronetClientStream extends AbstractClientStream { private final Collection annotations; private final TransportState state; private final Sink sink = new Sink(); + @VisibleForTesting + final int readBufferSize; private StreamBuilderFactory streamFactory; CronetClientStream( @@ -102,7 +103,8 @@ class CronetClientStream extends AbstractClientStream { CallOptions callOptions, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { super( new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions, useGetForSafeMethods && method.isSafe()); @@ -120,6 +122,7 @@ class CronetClientStream extends AbstractClientStream { this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer, callOptions); + this.readBufferSize = readBufferSize; // Tests expect the "plain" deframer behavior, not MigratingDeframer // https://github.com/grpc/grpc-java/issues/7140 @@ -309,7 +312,7 @@ public void bytesRead(int processedBytes) { if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { Log.v(LOG_TAG, "BidirectionalStream.read"); } - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } } @@ -429,7 +432,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf Log.v(LOG_TAG, "BidirectionalStream.read"); } reportHeaders(info.getAllHeadersAsList(), false); - stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + stream.read(ByteBuffer.allocateDirect(readBufferSize)); } @Override diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index 465df8b2cc9..8509c74da0a 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -64,6 +64,7 @@ class CronetClientTransport implements ConnectionClientTransport { private Attributes attrs; private final boolean useGetForSafeMethods; private final boolean usePutForIdempotentMethods; + private final int readBufferSize; private final StreamBuilderFactory streamFactory; // Indicates the transport is in go-away state: no new streams will be processed, // but existing streams may continue. @@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport { boolean alwaysUsePut, TransportTracer transportTracer, boolean useGetForSafeMethods, - boolean usePutForIdempotentMethods) { + boolean usePutForIdempotentMethods, + int readBufferSize) { this.address = Preconditions.checkNotNull(address, "address"); this.logId = InternalLogId.allocate(getClass(), address.toString()); this.authority = authority; @@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport { .build(); this.useGetForSafeMethods = useGetForSafeMethods; this.usePutForIdempotentMethods = usePutForIdempotentMethods; + this.readBufferSize = readBufferSize; } @Override @@ -132,7 +135,7 @@ class StartCallback implements Runnable { final CronetClientStream clientStream = new CronetClientStream( url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize, alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods, - usePutForIdempotentMethods); + usePutForIdempotentMethods, readBufferSize); @Override public void run() { diff --git a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java index 41f48bc03bb..8a9a6141ec1 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java @@ -18,6 +18,7 @@ import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -92,6 +93,40 @@ public void alwaysUsePut_defaultsToFalse() throws Exception { assertFalse(stream.idempotent); } + @Test + public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + CronetTransportFactory transportFactory = + (CronetTransportFactory) builder.buildTransportFactory(); + CronetClientTransport transport = + (CronetClientTransport) + transportFactory.newClientTransport( + new InetSocketAddress("localhost", 443), + new ClientTransportOptions(), + channelLogger); + CronetClientStream stream = transport.newStream( + method, new Metadata(), CallOptions.DEFAULT, tracers); + + assertEquals(4 * 1024, stream.readBufferSize); + } + + @Test + public void channelBuilderReadBufferSize_changeReflected() throws Exception { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + CronetTransportFactory transportFactory = + (CronetTransportFactory) builder.setReadBufferSize(1024 * 1024).buildTransportFactory(); + CronetClientTransport transport = + (CronetClientTransport) + transportFactory.newClientTransport( + new InetSocketAddress("localhost", 443), + new ClientTransportOptions(), + channelLogger); + CronetClientStream stream = transport.newStream( + method, new Metadata(), CallOptions.DEFAULT, tracers); + + assertEquals(1024 * 1024, stream.readBufferSize); + } + @Test public void scheduledExecutorService_default() { CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java index e2b0e0b26ca..32e391dfa9f 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java @@ -127,7 +127,8 @@ public void setUp() { CallOptions.DEFAULT, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(clientStream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -591,7 +592,8 @@ public void addCronetRequestAnnotation_deprecated() { CallOptions.DEFAULT.withOption(CronetClientStream.CRONET_ANNOTATION_KEY, annotation), transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -626,7 +628,8 @@ public void withAnnotation() { callOptions, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); when(factory.newBidirectionalStreamBuilder( any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) @@ -666,7 +669,8 @@ public void getUnaryRequest() { CallOptions.DEFAULT, transportTracer, true, - false); + false, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder getBuilder = mock(BidirectionalStream.Builder.class); @@ -723,7 +727,8 @@ public void idempotentMethod_usesHttpPut() { CallOptions.DEFAULT, transportTracer, true, - true); + true, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); @@ -755,7 +760,8 @@ public void alwaysUsePutOption_usesHttpPut() { CallOptions.DEFAULT, transportTracer, true, - true); + true, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); @@ -795,7 +801,8 @@ public void reservedHeadersStripped() { CallOptions.DEFAULT, transportTracer, false, - false); + false, + 4 * 1024); callback.setStream(stream); BidirectionalStream.Builder builder = mock(BidirectionalStream.Builder.class); diff --git a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java index 03c31f93329..cffda6361db 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java @@ -89,7 +89,8 @@ public void setUp() { false, TransportTracer.getDefaultFactory().create(), false, - false); + false, + 4 * 1024); Runnable callback = transport.start(clientTransportListener); assertNotNull(callback); callback.run();