Skip to content

Commit

Permalink
Move responseObserver ops in a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Sep 10, 2024
1 parent 662cd5d commit 4205a50
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.INVALID_REQUEST_RESOURCE_MSG;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.QuotaEnforcementResult.ALLOWED;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.QuotaEnforcementResult.BAD_REQUEST;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.QuotaEnforcementResult.OVER_CAPACITY;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.QuotaEnforcementResult.REJECTED;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.SERVER_OVER_CAPACITY_MSG;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
Expand Down Expand Up @@ -47,7 +44,7 @@ public void processRequest(GrpcRequestContext requestContext) {
// If the request is allowed, hand it off to the storage read request handler
if (result == null || result == ALLOWED) {
GrpcStorageResponseHandlerCallback callback = GrpcStorageResponseHandlerCallback.create(requestContext);
storageReadRequestHandler.processIoRequestAsync(request, callback);
storageReadRequestHandler.queueIoRequestForAsyncProcessing(request, callback);
return;
}

Expand Down Expand Up @@ -92,85 +89,96 @@ public static <T> void sendResponse(GrpcRequestContext<T> requestContext) {

public static void sendSingleGetResponse(GrpcRequestContext<SingleGetResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
SingleGetResponse.Builder builder = SingleGetResponse.newBuilder()
.setRcu(readResponse.getRCU())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue());
SingleGetResponse.Builder builder = SingleGetResponse.newBuilder();
if (readResponse == null) {
builder.setStatusCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setContentLength(readResponse.getResponseBody().readableBytes());
builder.setContentType(HttpConstants.AVRO_BINARY);
builder.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
builder.setRcu(readResponse.getRCU())
.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode());
builder.setErrorMessage("Key not found");
builder.setContentLength(0);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
}
StreamObserver<SingleGetResponse> responseObserver = requestContext.getResponseObserver();

responseObserver.onNext(builder.build());
responseObserver.onCompleted();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

reportRequestStats(requestContext);
}

public static void sendMultiGetResponse(GrpcRequestContext<MultiGetResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
MultiGetResponse.Builder builder = MultiGetResponse.newBuilder();
builder.setRcu(readResponse.getRCU());
builder.setCompressionStrategy(readResponse.getCompressionStrategy().getValue());
if (readResponse == null) {
builder.setStatusCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setContentLength(readResponse.getResponseBody().readableBytes());
builder.setContentType(HttpConstants.AVRO_BINARY);
builder.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setRcu(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode());
builder.setErrorMessage("Key not found");
builder.setContentLength(0);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
}

StreamObserver<MultiGetResponse> responseObserver = requestContext.getResponseObserver();
responseObserver.onNext(builder.build());
responseObserver.onCompleted();

synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
reportRequestStats(requestContext);
}

public static void sendVeniceServerResponse(GrpcRequestContext<VeniceServerResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
VeniceServerResponse.Builder builder = VeniceServerResponse.newBuilder();
builder.setResponseRCU(readResponse.getRCU());
builder.setCompressionStrategy(readResponse.getCompressionStrategy().getValue());
builder.setIsStreamingResponse(readResponse.isStreamingResponse());

if (readResponse == null) {
builder.setErrorCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (!readResponse.isFound()) {
} else if (readResponse.isFound()) {
builder.setErrorCode(VeniceReadResponseStatus.OK.getCode())
.setResponseRCU(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setIsStreamingResponse(readResponse.isStreamingResponse())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setData(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
builder.setErrorCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
.setErrorMessage("Key not found")
.setData(ByteString.EMPTY);
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setErrorCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode());
builder.setErrorMessage("Key not found");
builder.setData(ByteString.EMPTY);
} else {
builder.setData(GrpcUtils.toByteString(readResponse.getResponseBody()));
builder.setSchemaId(readResponse.getResponseSchemaIdHeader());
requestContext.getStatsContext().setResponseStatus(OK);
}

StreamObserver<VeniceServerResponse> responseObserver = requestContext.getResponseObserver();
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

reportRequestStats(requestContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


/**
* This is used in REST/HTTP Netty handlers to handle the response from the {@link StorageReadRequestHandler#processIoRequestAsync} method.
* This is used in REST/HTTP Netty handlers to handle the response from the {@link StorageReadRequestHandler#queueIoRequestForAsyncProcessing} method.
*/
public class HttpStorageResponseHandlerCallback implements StorageResponseHandlerCallback {
private final ChannelHandlerContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public StorageReadRequestHandler(
public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
if (message instanceof RouterRequest) {
// IO requests are processed in a separate thread pool
processIoRequestAsync((RouterRequest) message, HttpStorageResponseHandlerCallback.create(context));
queueIoRequestForAsyncProcessing((RouterRequest) message, HttpStorageResponseHandlerCallback.create(context));
return;
}

Expand Down Expand Up @@ -339,7 +339,7 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex
/**
* Handles requests that require a storage engine lookup.
*/
public void processIoRequestAsync(RouterRequest request, StorageResponseHandlerCallback responseCallback) {
public void queueIoRequestForAsyncProcessing(RouterRequest request, StorageResponseHandlerCallback responseCallback) {
this.resourceReadUsageTracker.accept(request.getResourceName());

// Check if timeout has occurred before processing the request; if so, return early with an error response
Expand Down

0 comments on commit 4205a50

Please sign in to comment.