Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Rewrite gRPC read Service to align with Netty-Based HTTP Server #1163

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ ext.libraries = [
grpcProtobuf: "io.grpc:grpc-protobuf:${grpcVersion}",
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
grpcTesting: "io.grpc:grpc-testing:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.3',
Expand Down Expand Up @@ -486,6 +487,12 @@ subprojects {
doLast {
parseJacocoXml("$buildDir/reports/jacoco/test/jacocoTestReport.xml")
}
// This is required to remove generated code from the report but it fails build in all-modules
// afterEvaluate {
// classDirectories.setFrom(files(classDirectories.files.collect { fileTree(dir: it, exclude: [
// '**/com/linkedin/venice/protocols/**',
// ])}))
// }
}

afterEvaluate {
Expand All @@ -501,6 +508,12 @@ subprojects {
value = 'COVEREDRATIO'
minimum = threshold
}

afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect { fileTree(dir: it, exclude: [
'**/com/linkedin/venice/protocols/**',
])}))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,24 @@ public VeniceGrpcStreamObserver(CompletableFuture<TransportClientResponse> respo

@Override
public void onNext(VeniceServerResponse value) {
if (value.getErrorCode() != VeniceReadResponseStatus.OK) {
handleResponseError(value);
int statusCode = value.getErrorCode();
// Successful response
if (statusCode == VeniceReadResponseStatus.OK.getCode()) {
complete(
new TransportClientResponse(
value.getSchemaId(),
CompressionStrategy.valueOf(value.getCompressionStrategy()),
value.getData().toByteArray()),
null);
return;
}

complete(
new TransportClientResponse(
value.getSchemaId(),
CompressionStrategy.valueOf(value.getCompressionStrategy()),
value.getData().toByteArray()),
null);
// Key not found is a valid response
if (statusCode == VeniceReadResponseStatus.KEY_NOT_FOUND.getCode()) {
complete(null, null);
return;
}
// Handle the cases where the status code doesn't match healthy response codes
handleResponseError(value);
}

@Override
Expand Down Expand Up @@ -308,30 +315,29 @@ void handleResponseError(VeniceServerResponse response) {
int statusCode = response.getErrorCode();
String errorMessage = response.getErrorMessage();
Exception exception;

switch (statusCode) {
case VeniceReadResponseStatus.BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case VeniceReadResponseStatus.TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
case VeniceReadResponseStatus.KEY_NOT_FOUND:
exception = null;
break;
default:
exception = new VeniceClientException(
String
.format("An unexpected error occurred with status code: %d, message: %s", statusCode, errorMessage));
break;
}

if (exception != null) {
LOGGER.error("Got error in response due to", exception);
try {
switch (VeniceReadResponseStatus.fromCode(statusCode)) {
case BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
default:
exception = new VeniceClientException(
String.format(
"An unexpected error occurred with status code: %d, message: %s",
statusCode,
errorMessage));
break;
}
} catch (IllegalArgumentException e) {
// Handle the case where the status code doesn't match any known values
exception = new VeniceClientException(
String.format("Unknown status code: %d, message: %s", statusCode, errorMessage),
e);
}

// In the event of record not found, we treat that as a successful response and complete the future with a null
// value and the exception is set to null as well.
LOGGER.error("Received error response with status code: {}, message: {}", statusCode, errorMessage);
complete(null, exception);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.linkedin.venice.grpc;

import com.google.protobuf.ByteString;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.security.SSLConfig;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.SslUtils;
import io.grpc.Grpc;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -92,4 +94,23 @@ private static KeyStore loadStore(String path, char[] password, String type)
}
return keyStore;
}

/**
* Converts a Netty ByteBuf to a ByteString, checking if it has a backing array to avoid manual copying.
*
* @param body The ByteBuf to be converted to ByteString.
* @return The resulting ByteString.
*/
public static ByteString toByteString(ByteBuf body) {
if (body.hasArray()) {
// Directly use the backing array to avoid copying
return ByteString.copyFrom(body.array(), body.arrayOffset() + body.readerIndex(), body.readableBytes());
}
// Fallback to nioBuffer() to handle the conversion efficiently
return ByteString.copyFrom(body.nioBuffer());
FelixGV marked this conversation as resolved.
Show resolved Hide resolved
}

public static ByteString toByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ public class HttpShortcutResponse {
private final String message;
private final HttpResponseStatus status;

private boolean misroutedStoreVersion = false;

public HttpShortcutResponse(String message, HttpResponseStatus status) {
this.message = message;
this.status = status;
Expand All @@ -28,12 +26,4 @@ public String getMessage() {
public HttpResponseStatus getStatus() {
return status;
}

public boolean isMisroutedStoreVersion() {
return misroutedStoreVersion;
}

public void setMisroutedStoreVersion(boolean misroutedStoreVersion) {
this.misroutedStoreVersion = misroutedStoreVersion;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package com.linkedin.venice.meta;

import java.util.HashMap;
import java.util.Map;


public enum ServerAdminAction {
DUMP_INGESTION_STATE(0), DUMP_SERVER_CONFIGS(1);

private static final Map<Integer, ServerAdminAction> ADMIN_ACTION_MAP = new HashMap<>(2);

static {
for (ServerAdminAction action: values()) {
ADMIN_ACTION_MAP.put(action.getValue(), action);
}
}
Comment on lines 7 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, for the common case of enum codes that range from 0..N monotonically, we have a cleaner solution than using a map... See EnumUtils::getEnumValuesArray and VeniceEnumValue. It is very easy to use, more performant, and will require a bit fewer lines of code than you have here...

private final int value;

ServerAdminAction(int value) {
Expand All @@ -12,4 +23,12 @@ public enum ServerAdminAction {
public int getValue() {
return this.value;
}

public static ServerAdminAction fromValue(int value) {
ServerAdminAction action = ADMIN_ACTION_MAP.get(value);
if (action == null) {
throw new IllegalArgumentException("Unknown server admin action value: " + value);
}
return action;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
package com.linkedin.venice.response;

import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.HashMap;
import java.util.Map;


/**
* Enumeration of response status codes for Venice read requests.
* <p>
* **Positive values** correspond to standard HTTP status codes and can be used directly in HTTP responses.
* **Negative values** represent custom Venice-specific error codes.
* <p>
* For example, a status code of `200` indicates a successful read, while a status code of `-100` might indicate a specific Venice-related error.
* Defines response status codes for Venice read requests. This wrapper around {@link HttpResponseStatus} allows
* for the inclusion of custom status codes that extend beyond the standard HTTP status codes.
*/
public class VeniceReadResponseStatus {
public static final int KEY_NOT_FOUND = -420;

public static final int OK = 200;
public static final int BAD_REQUEST = 400;
public static final int INTERNAL_ERROR = 500;
public static final int TOO_MANY_REQUESTS = 429;
public static final int SERVICE_UNAVAILABLE = 503;
public enum VeniceReadResponseStatus {
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
KEY_NOT_FOUND(HttpResponseStatus.NOT_FOUND), OK(HttpResponseStatus.OK), BAD_REQUEST(HttpResponseStatus.BAD_REQUEST),
FORBIDDEN(HttpResponseStatus.FORBIDDEN), METHOD_NOT_ALLOWED(HttpResponseStatus.METHOD_NOT_ALLOWED),
REQUEST_TIMEOUT(HttpResponseStatus.REQUEST_TIMEOUT), TOO_MANY_REQUESTS(HttpResponseStatus.TOO_MANY_REQUESTS),
INTERNAL_SERVER_ERROR(HttpResponseStatus.INTERNAL_SERVER_ERROR),
SERVICE_UNAVAILABLE(HttpResponseStatus.SERVICE_UNAVAILABLE),
MISROUTED_STORE_VERSION(new HttpResponseStatus(570, "Misrouted request"));

private static final Map<Integer, VeniceReadResponseStatus> STATUS_MAP = new HashMap<>(16);

static {
for (VeniceReadResponseStatus status: values()) {
STATUS_MAP.put(status.getCode(), status);
}
}

private final HttpResponseStatus httpResponseStatus;

VeniceReadResponseStatus(HttpResponseStatus httpResponseStatus) {
this.httpResponseStatus = httpResponseStatus;
}

public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}

public int getCode() {
return httpResponseStatus.code();
}

public static VeniceReadResponseStatus fromCode(int code) {
VeniceReadResponseStatus status = STATUS_MAP.get(code);
if (status == null) {
throw new IllegalArgumentException("Unknown status venice read response status code: " + code);
}
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -38,4 +39,8 @@ public static void setupResponseAndFlush(
response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
}

public static boolean containRetryHeader(HttpRequest request) {
return request.headers().contains(HttpConstants.VENICE_RETRY);
}
}
Loading
Loading