Skip to content

Commit

Permalink
handle error case for source exception
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Feb 6, 2025
1 parent cd6fd96 commit fd1f19a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 27 deletions.
24 changes: 24 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.numaproj.numaflow.shared;

import java.io.PrintWriter;
import java.io.StringWriter;

public class ExceptionUtils {

Check warning on line 6 in src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java#L6

Added line #L6 was not covered by tests
/**
* Formalized exception error strings
*/
public static final String ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)";
public static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)";

/**
* Converts the stack trace of an exception into a String.
*
* @param e the exception to extract the stack trace from
* @return the stack trace as a String
*/
public static String getStackTrace(Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
}
32 changes: 22 additions & 10 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package io.numaproj.numaflow.sourcer;

import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.source.v1.SourceGrpc;
import io.numaproj.numaflow.source.v1.SourceOuterClass;
import lombok.AllArgsConstructor;
Expand All @@ -15,7 +20,6 @@

import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod;


/**
* Implementation of the gRPC service for the sourcer.
*/
Expand All @@ -31,7 +35,8 @@ class Service extends SourceGrpc.SourceImplBase {
* @param responseObserver the response observer
*/
@Override
public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
public StreamObserver<SourceOuterClass.ReadRequest> readFn(
final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

Expand Down Expand Up @@ -77,12 +82,18 @@ public void onNext(SourceOuterClass.ReadRequest request) {

responseObserver.onNext(response);
} catch (Exception e) {
log.error("Encountered error in readFn onNext - {}", e.getMessage());
String stackTrace = ExceptionUtils.getStackTrace(e);
log.error("Encountered error in readFn onNext - {} {}", e.getMessage(), stackTrace);

Check warning on line 86 in src/main/java/io/numaproj/numaflow/sourcer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcer/Service.java#L85-L86

Added lines #L85 - L86 were not covered by tests
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
// Build gRPC Status [error]
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())

Check warning on line 90 in src/main/java/io/numaproj/numaflow/sourcer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcer/Service.java#L89-L90

Added lines #L89 - L90 were not covered by tests
.setMessage(ExceptionUtils.ERR_SOURCE_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(stackTrace)
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));

Check warning on line 96 in src/main/java/io/numaproj/numaflow/sourcer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/sourcer/Service.java#L92-L96

Added lines #L92 - L96 were not covered by tests
}
}

Expand Down Expand Up @@ -200,7 +211,8 @@ public void pendingFn(
SourceOuterClass.PendingResponse.Result
.newBuilder()
.setCount(this.sourcer.getPending())
.build()).build());
.build())
.build());
responseObserver.onCompleted();
}

Expand Down Expand Up @@ -235,8 +247,8 @@ public void partitionsFn(
responseObserver.onNext(SourceOuterClass.PartitionsResponse.newBuilder()
.setResult(
SourceOuterClass.PartitionsResponse.Result.newBuilder()
.addAllPartitions(partitions)).
build());
.addAllPartitions(partitions))
.build());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.protobuf.StatusProto;
import io.numaproj.numaflow.shared.ExceptionUtils;
import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer;
import lombok.extern.slf4j.Slf4j;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -55,7 +54,6 @@ class TransformSupervisorActor extends AbstractActor {
private final CompletableFuture<Void> shutdownSignal;
private int activeTransformersCount;
private Exception userException;
private static final String ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)";

/**
* Constructor for TransformSupervisorActor.
Expand Down Expand Up @@ -143,7 +141,7 @@ public Receive createReceive() {
* @param e The exception to be handled.
*/
private void handleFailure(Exception e) {
String stackTrace = getStackTrace(e);
String stackTrace = ExceptionUtils.getStackTrace(e);
log.error("Exception in sourceTransformFn: {} {}", e.getMessage(), stackTrace);
if (userException == null) {
userException = e;
Expand All @@ -153,7 +151,7 @@ private void handleFailure(Exception e) {
// Build gRPC Status [error]
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.setMessage(ExceptionUtils.ERR_TRANSFORMER_EXCEPTION + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(stackTrace)
.build()))
Expand All @@ -163,18 +161,6 @@ private void handleFailure(Exception e) {
activeTransformersCount--;
}

/**
* Converts the stack trace of an exception into a String.
*
* @param e the exception to extract the stack trace from
* @return the stack trace as a String
*/
private String getStackTrace(Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
}

/**
* Sends the SourceTransformResponse back to the client.
*
Expand Down

0 comments on commit fd1f19a

Please sign in to comment.