Skip to content

Commit

Permalink
Stats cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Sep 11, 2024
1 parent d706dbd commit 65e8341
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ public class HttpShortcutResponse {
private final String message;
private final HttpResponseStatus status;

private boolean misroutedStoreVersion = false;

public HttpShortcutResponse(String message, HttpResponseStatus status) {
this.message = message;
this.status = status;
Expand All @@ -28,12 +26,4 @@ public String getMessage() {
public HttpResponseStatus getStatus() {
return status;
}

public boolean isMisroutedStoreVersion() {
return misroutedStoreVersion;
}

public void setMisroutedStoreVersion(boolean misroutedStoreVersion) {
this.misroutedStoreVersion = misroutedStoreVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import static com.linkedin.venice.listener.QuotaEnforcementHandler.QuotaEnforcementResult.ALLOWED;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.INVALID_REQUEST_RESOURCE_MSG;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.SERVER_OVER_CAPACITY_MSG;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import com.google.protobuf.ByteString;
import com.linkedin.davinci.listener.response.ReadResponse;
Expand All @@ -22,7 +20,6 @@
import com.linkedin.venice.stats.ServerHttpRequestStats;
import com.linkedin.venice.utils.LatencyUtils;
import io.grpc.stub.StreamObserver;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -93,28 +90,31 @@ public static <T> void sendResponse(GrpcRequestContext<T> requestContext) {
public static void sendSingleGetResponse(GrpcRequestContext<SingleGetResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
SingleGetResponse.Builder builder = SingleGetResponse.newBuilder();
VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();
RequestStatsRecorder requestStatsRecorder = requestContext.getRequestStatsRecorder();
requestStatsRecorder.setResponseStatus(responseStatus);

if (readResponse == null) {
builder.setStatusCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
requestStatsRecorder.setResponseStatus(requestContext.getReadResponseStatus());
} else if (readResponse.isFound()) {
builder.setRcu(readResponse.getRCU())
.setStatusCode(VeniceReadResponseStatus.OK.getCode())
.setStatusCode(responseStatus.getCode())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
}
StreamObserver<SingleGetResponse> responseObserver = requestContext.getResponseObserver();

StreamObserver<SingleGetResponse> responseObserver = requestContext.getResponseObserver();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
Expand All @@ -126,21 +126,24 @@ public static void sendSingleGetResponse(GrpcRequestContext<SingleGetResponse> r
public static void sendMultiKeyResponse(GrpcRequestContext<MultiKeyResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
MultiKeyResponse.Builder builder = MultiKeyResponse.newBuilder();

VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();
RequestStatsRecorder requestStatsRecorder = requestContext.getRequestStatsRecorder();
requestStatsRecorder.setResponseStatus(responseStatus);

if (readResponse == null) {
builder.setStatusCode(requestContext.getReadResponseStatus().getCode());
builder.setStatusCode(responseStatus.getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setStatusCode(VeniceReadResponseStatus.OK.getCode())
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setStatusCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
Expand All @@ -158,23 +161,23 @@ public static void sendVeniceServerResponse(GrpcRequestContext<VeniceServerRespo
ReadResponse readResponse = requestContext.getReadResponse();
VeniceServerResponse.Builder builder = VeniceServerResponse.newBuilder();

VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();
RequestStatsRecorder requestStatsRecorder = requestContext.getRequestStatsRecorder();
requestStatsRecorder.setResponseStatus(responseStatus);

if (readResponse == null) {
builder.setErrorCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorCode(responseStatus.getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setErrorCode(VeniceReadResponseStatus.OK.getCode())
builder.setErrorCode(responseStatus.getCode())
.setResponseRCU(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setIsStreamingResponse(readResponse.isStreamingResponse())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setData(GrpcUtils.toByteString(readResponse.getResponseBody()));
requestContext.getStatsContext().setResponseStatus(OK);
} else {
builder.setErrorCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode())
.setErrorMessage("Key not found")
.setData(ByteString.EMPTY);
requestContext.setError();
requestContext.getStatsContext().setResponseStatus(NOT_FOUND);
builder.setErrorCode(responseStatus.getCode()).setErrorMessage("Key not found").setData(ByteString.EMPTY);
}

StreamObserver<VeniceServerResponse> responseObserver = requestContext.getResponseObserver();
Expand All @@ -186,30 +189,33 @@ public static void sendVeniceServerResponse(GrpcRequestContext<VeniceServerRespo
reportRequestStats(requestContext);
}

/**
* TODO: Fix stats recording as this is incomplete and broken
*/
public static void reportRequestStats(GrpcRequestContext requestContext) {
RequestStatsRecorder statsContext = requestContext.getStatsContext();
HttpResponseStatus responseStatus = statsContext.getResponseStatus();
if (statsContext.getResponseStatus() == null) {
RequestStatsRecorder requestStatsRecorder = requestContext.getRequestStatsRecorder();

VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();
if (responseStatus == null) {
LOGGER.error("Received error in outbound gRPC Stats Handler: response status could not be null");
return;
}

String storeName = statsContext.getStoreName();
String storeName = requestStatsRecorder.getStoreName();
ServerHttpRequestStats serverHttpRequestStats;

if (statsContext.getStoreName() == null) {
if (requestStatsRecorder.getStoreName() == null) {
LOGGER.error("Received error in outbound gRPC Stats Handler: store name could not be null");
return;
} else {
serverHttpRequestStats = statsContext.getCurrentStats().getStoreStats(storeName);
statsContext.recordBasicMetrics(serverHttpRequestStats);
}

double elapsedTime = LatencyUtils.getElapsedTimeFromNSToMS(statsContext.getRequestStartTimeInNS());
if (!requestContext.hasError() && !responseStatus.equals(OK) || responseStatus.equals(NOT_FOUND)) {
statsContext.successRequest(serverHttpRequestStats, elapsedTime);
} else {
statsContext.errorRequest(serverHttpRequestStats, elapsedTime);
serverHttpRequestStats = requestStatsRecorder.getCurrentStats().getStoreStats(storeName);
requestStatsRecorder.recordBasicMetrics(serverHttpRequestStats);
double elapsedTime = LatencyUtils.getElapsedTimeFromNSToMS(requestStatsRecorder.getRequestStartTimeInNS());
if (responseStatus == VeniceReadResponseStatus.OK || responseStatus == VeniceReadResponseStatus.KEY_NOT_FOUND) {
requestStatsRecorder.successRequest(serverHttpRequestStats, elapsedTime);
return;
}

requestStatsRecorder.errorRequest(serverHttpRequestStats, elapsedTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GrpcRequestContext<T> {

private RouterRequest routerRequest;
private ReadResponse readResponse = null;
private VeniceReadResponseStatus readResponseStatus;
private VeniceReadResponseStatus readResponseStatus = null;
private String errorMessage;

enum GrpcRequestType {
Expand All @@ -45,7 +45,7 @@ public static <T> GrpcRequestContext<T> create(
grpcRequestType);
}

public RequestStatsRecorder getStatsContext() {
public RequestStatsRecorder getRequestStatsRecorder() {
return requestStatsRecorder;
}

Expand Down Expand Up @@ -78,7 +78,23 @@ public void setError() {
}

public VeniceReadResponseStatus getReadResponseStatus() {
return readResponseStatus;
// If the readResponseStatus is set, return it.
if (readResponseStatus != null) {
return readResponseStatus;
}

// If the readResponse is set, return the appropriate status based on the response.
if (readResponse != null && readResponse.isFound()) {
return VeniceReadResponseStatus.OK;
}

// If the readResponse is set and the key is not found, return the appropriate status.
if (readResponse != null && !readResponse.isFound()) {
return VeniceReadResponseStatus.KEY_NOT_FOUND;
}

// If the readResponse is not set, return an internal server error.
return VeniceReadResponseStatus.INTERNAL_SERVER_ERROR;
}

public void setReadResponseStatus(VeniceReadResponseStatus readResponseStatus) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package com.linkedin.venice.grpc;

import static com.linkedin.venice.response.VeniceReadResponseStatus.MISROUTED_STORE_VERSION;

import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.venice.listener.RequestStatsRecorder;
import com.linkedin.venice.listener.StorageResponseHandlerCallback;
import com.linkedin.venice.listener.VeniceRequestEarlyTerminationException;
import com.linkedin.venice.listener.response.AbstractReadResponse;
import com.linkedin.venice.response.VeniceReadResponseStatus;


Expand All @@ -25,12 +30,37 @@ public static GrpcStorageResponseHandlerCallback create(GrpcRequestContext reque

@Override
public void onReadResponse(ReadResponse readResponse) {
// TODO: move this stats reporting to {@link GrpcIoRequestProcessor#reportRequestStats}
RequestStatsRecorder statsRecorder = requestContext.getRequestStatsRecorder();
AbstractReadResponse abstractReadResponse = (AbstractReadResponse) readResponse;
if (readResponse.isFound()) {
requestContext.setReadResponseStatus(VeniceReadResponseStatus.OK);
statsRecorder.setResponseStatus(VeniceReadResponseStatus.OK)
.setReadResponseStats(abstractReadResponse.getStatsRecorder())
.setResponseSize(abstractReadResponse.getResponseBody().readableBytes());
} else {
requestContext.setReadResponseStatus(VeniceReadResponseStatus.KEY_NOT_FOUND);
statsRecorder.setResponseStatus(VeniceReadResponseStatus.KEY_NOT_FOUND)
.setReadResponseStats(abstractReadResponse.getStatsRecorder())
.setResponseSize(0);
}

requestContext.setReadResponse(readResponse);
GrpcIoRequestProcessor.sendResponse(requestContext);
}

@Override
public void onError(VeniceReadResponseStatus readResponseStatus, String message) {
// TODO: move this stats reporting to {@link GrpcIoRequestProcessor#reportRequestStats}
RequestStatsRecorder statsRecorder = requestContext.getRequestStatsRecorder().setResponseStatus(readResponseStatus);

if (readResponseStatus == VeniceRequestEarlyTerminationException.getResponseStatusCode()) {
statsRecorder.setRequestTerminatedEarly();
}
if (readResponseStatus == MISROUTED_STORE_VERSION) {
statsRecorder.setMisroutedStoreVersion(true);
}

requestContext.setError();
requestContext.setReadResponseStatus(readResponseStatus);
requestContext.setErrorMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void get(VeniceClientRequest singleGetRequest, StreamObserver<VeniceServe
GrpcRequestContext.create(dependencies, streamObserver, LEGACY);
try {
RouterRequest routerRequest = GetRouterRequest.parseSingleGetGrpcRequest(singleGetRequest);
clientRequestCtx.getStatsContext().setRequestInfo(routerRequest);
clientRequestCtx.getRequestStatsRecorder().setRequestInfo(routerRequest);
clientRequestCtx.setRouterRequest(routerRequest);
requestProcessor.processRequest(clientRequestCtx);
} catch (Exception e) {
Expand All @@ -108,7 +108,7 @@ public void batchGet(VeniceClientRequest batchGetRequest, StreamObserver<VeniceS
GrpcRequestContext.create(dependencies, streamObserver, LEGACY);
try {
RouterRequest routerRequest = MultiGetRouterRequestWrapper.parseMultiGetGrpcRequest(batchGetRequest);
requestContext.getStatsContext().setRequestInfo(routerRequest);
requestContext.getRequestStatsRecorder().setRequestInfo(routerRequest);
requestContext.setRouterRequest(routerRequest);
requestProcessor.processRequest(requestContext);
} catch (Exception e) {
Expand All @@ -129,7 +129,7 @@ public void singleGet(SingleGetRequest singleGetRequest, StreamObserver<SingleGe
GrpcRequestContext.create(dependencies, streamObserver, SINGLE_GET);
try {
RouterRequest routerRequest = GetRouterRequest.parseSingleGetGrpcRequest(singleGetRequest);
requestContext.getStatsContext().setRequestInfo(routerRequest);
requestContext.getRequestStatsRecorder().setRequestInfo(routerRequest);
requestContext.setRouterRequest(routerRequest);
requestProcessor.processRequest(requestContext);
} catch (Exception e) {
Expand All @@ -150,7 +150,7 @@ public void multiGet(MultiGetRequest request, StreamObserver<MultiKeyResponse> s
GrpcRequestContext.create(dependencies, streamObserver, MULTI_GET);
try {
RouterRequest routerRequest = MultiGetRouterRequestWrapper.parseMultiGetGrpcRequest(request);
requestContext.getStatsContext().setRequestInfo(routerRequest);
requestContext.getRequestStatsRecorder().setRequestInfo(routerRequest);
requestContext.setRouterRequest(routerRequest);
requestProcessor.processRequest(requestContext);
} catch (Exception e) {
Expand All @@ -171,7 +171,7 @@ public void compute(ComputeRequest request, StreamObserver<MultiKeyResponse> res
GrpcRequestContext.create(dependencies, responseObserver, COMPUTE);
try {
RouterRequest routerRequest = ComputeRouterRequestWrapper.parseComputeGrpcRequest(request);
requestContext.getStatsContext().setRequestInfo(routerRequest);
requestContext.getRequestStatsRecorder().setRequestInfo(routerRequest);
requestContext.setRouterRequest(routerRequest);
requestProcessor.processRequest(requestContext);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.linkedin.venice.listener;

import static com.linkedin.venice.response.VeniceReadResponseStatus.INTERNAL_SERVER_ERROR;

import com.linkedin.venice.listener.response.HttpShortcutResponse;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponseStatus;


/***
Expand All @@ -16,7 +17,7 @@
public class ErrorCatchingHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.writeAndFlush(new HttpShortcutResponse(cause.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR));
ctx.writeAndFlush(new HttpShortcutResponse(cause.getMessage(), INTERNAL_SERVER_ERROR.getHttpResponseStatus()));
ctx.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public void onReadResponse(ReadResponse readResponse) {
@Override
public void onError(VeniceReadResponseStatus readResponseStatus, String message) {
HttpShortcutResponse response = new HttpShortcutResponse(message, readResponseStatus.getHttpResponseStatus());
if (readResponseStatus == VeniceReadResponseStatus.MISROUTED_STORE_VERSION) {
response.setMisroutedStoreVersion(true);
}
context.writeAndFlush(response);
}
}
Loading

0 comments on commit 65e8341

Please sign in to comment.