Skip to content

Add setReadbufferSize API to CronetChannelBuilder #12199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static CronetChannelBuilder forAddress(String name, int port) {
private int trafficStatsTag;
private boolean trafficStatsUidSet;
private int trafficStatsUid;
private int readBufferSize = 4 * 1024;
private Network network;

private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) {
Expand Down Expand Up @@ -194,6 +195,17 @@ CronetChannelBuilder setTrafficStatsUid(int uid) {
return this;
}

/**
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
* size leads to less overhead but more memory consumption. The current default value is 4KB.
* @param size Buffer size in bytes.
* @return the builder to facilitate chaining.
*/
CronetChannelBuilder setReadBufferSize(int size) {
readBufferSize = size;
return this;
}

/** Sets the network ID to use for this channel traffic. */
@CanIgnoreReturnValue
CronetChannelBuilder bindToNetwork(@Nullable Network network) {
Expand Down Expand Up @@ -233,7 +245,8 @@ ClientTransportFactory buildTransportFactory() {
alwaysUsePut,
transportTracerFactory.create(),
useGetForSafeMethods,
usePutForIdempotentMethods);
usePutForIdempotentMethods,
readBufferSize);
}

@VisibleForTesting
Expand All @@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory {
private final boolean usingSharedScheduler;
private final boolean useGetForSafeMethods;
private final boolean usePutForIdempotentMethods;
private final int readBufferSize;

private CronetTransportFactory(
StreamBuilderFactory streamFactory,
Expand All @@ -256,7 +270,8 @@ private CronetTransportFactory(
boolean alwaysUsePut,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
Expand All @@ -267,6 +282,7 @@ private CronetTransportFactory(
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.useGetForSafeMethods = useGetForSafeMethods;
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
this.readBufferSize = readBufferSize;
}

@Override
Expand All @@ -275,7 +291,7 @@ public ConnectionClientTransport newClientTransport(
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(),
options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize,
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods);
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods, readBufferSize);
}

@Override
Expand Down
11 changes: 7 additions & 4 deletions cronet/src/main/java/io/grpc/cronet/CronetClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
* Client stream for the cronet transport.
*/
class CronetClientStream extends AbstractClientStream {
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private static final String LOG_TAG = "grpc-java-cronet";

Expand All @@ -85,6 +84,8 @@ class CronetClientStream extends AbstractClientStream {
private final Collection<Object> annotations;
private final TransportState state;
private final Sink sink = new Sink();
@VisibleForTesting
final int readBufferSize;
private StreamBuilderFactory streamFactory;

CronetClientStream(
Expand All @@ -102,7 +103,8 @@ class CronetClientStream extends AbstractClientStream {
CallOptions callOptions,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
super(
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
useGetForSafeMethods && method.isSafe());
Expand All @@ -120,6 +122,7 @@ class CronetClientStream extends AbstractClientStream {
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
callOptions);
this.readBufferSize = readBufferSize;

// Tests expect the "plain" deframer behavior, not MigratingDeframer
// https://github.com/grpc/grpc-java/issues/7140
Expand Down Expand Up @@ -309,7 +312,7 @@ public void bytesRead(int processedBytes) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.read");
}
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}
}

Expand Down Expand Up @@ -429,7 +432,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
Log.v(LOG_TAG, "BidirectionalStream.read");
}
reportHeaders(info.getAllHeadersAsList(), false);
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
stream.read(ByteBuffer.allocateDirect(readBufferSize));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class CronetClientTransport implements ConnectionClientTransport {
private Attributes attrs;
private final boolean useGetForSafeMethods;
private final boolean usePutForIdempotentMethods;
private final int readBufferSize;
private final StreamBuilderFactory streamFactory;
// Indicates the transport is in go-away state: no new streams will be processed,
// but existing streams may continue.
Expand Down Expand Up @@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport {
boolean alwaysUsePut,
TransportTracer transportTracer,
boolean useGetForSafeMethods,
boolean usePutForIdempotentMethods) {
boolean usePutForIdempotentMethods,
int readBufferSize) {
this.address = Preconditions.checkNotNull(address, "address");
this.logId = InternalLogId.allocate(getClass(), address.toString());
this.authority = authority;
Expand All @@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport {
.build();
this.useGetForSafeMethods = useGetForSafeMethods;
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
this.readBufferSize = readBufferSize;
}

@Override
Expand All @@ -132,7 +135,7 @@ class StartCallback implements Runnable {
final CronetClientStream clientStream = new CronetClientStream(
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods,
usePutForIdempotentMethods);
usePutForIdempotentMethods, readBufferSize);

@Override
public void run() {
Expand Down
35 changes: 35 additions & 0 deletions cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -92,6 +93,40 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
assertFalse(stream.idempotent);
}

@Test
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
CronetTransportFactory transportFactory =
(CronetTransportFactory) builder.buildTransportFactory();
CronetClientTransport transport =
(CronetClientTransport)
transportFactory.newClientTransport(
new InetSocketAddress("localhost", 443),
new ClientTransportOptions(),
channelLogger);
CronetClientStream stream = transport.newStream(
method, new Metadata(), CallOptions.DEFAULT, tracers);

assertEquals(4 * 1024, stream.readBufferSize);
}

@Test
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
CronetTransportFactory transportFactory =
(CronetTransportFactory) builder.setReadBufferSize(1024 * 1024).buildTransportFactory();
CronetClientTransport transport =
(CronetClientTransport)
transportFactory.newClientTransport(
new InetSocketAddress("localhost", 443),
new ClientTransportOptions(),
channelLogger);
CronetClientStream stream = transport.newStream(
method, new Metadata(), CallOptions.DEFAULT, tracers);

assertEquals(1024 * 1024, stream.readBufferSize);
}

@Test
public void scheduledExecutorService_default() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
Expand Down
21 changes: 14 additions & 7 deletions cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setUp() {
CallOptions.DEFAULT,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(clientStream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -591,7 +592,8 @@ public void addCronetRequestAnnotation_deprecated() {
CallOptions.DEFAULT.withOption(CronetClientStream.CRONET_ANNOTATION_KEY, annotation),
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -626,7 +628,8 @@ public void withAnnotation() {
callOptions,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
Expand Down Expand Up @@ -666,7 +669,8 @@ public void getUnaryRequest() {
CallOptions.DEFAULT,
transportTracer,
true,
false);
false,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder getBuilder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -723,7 +727,8 @@ public void idempotentMethod_usesHttpPut() {
CallOptions.DEFAULT,
transportTracer,
true,
true);
true,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -755,7 +760,8 @@ public void alwaysUsePutOption_usesHttpPut() {
CallOptions.DEFAULT,
transportTracer,
true,
true);
true,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down Expand Up @@ -795,7 +801,8 @@ public void reservedHeadersStripped() {
CallOptions.DEFAULT,
transportTracer,
false,
false);
false,
4 * 1024);
callback.setStream(stream);
BidirectionalStream.Builder builder =
mock(BidirectionalStream.Builder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void setUp() {
false,
TransportTracer.getDefaultFactory().create(),
false,
false);
false,
4 * 1024);
Runnable callback = transport.start(clientTransportListener);
assertNotNull(callback);
callback.run();
Expand Down