Skip to content

Commit

Permalink
Nexus error rehydration
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jan 13, 2025
1 parent b848aec commit 107efdb
Show file tree
Hide file tree
Showing 19 changed files with 528 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.opentracing.internal;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand Down Expand Up @@ -49,8 +49,7 @@ public OpenTracingNexusOperationInboundCallsInterceptor(
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

package io.temporal.common.interceptors;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.*;
import io.temporal.common.Experimental;

/**
* Intercepts inbound calls to a Nexus operation on the worker side.
*
* <p>An instance should be created in {@link
* WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}.
* WorkerInterceptor#interceptNexusOperation(OperationContext,
* NexusOperationInboundCallsInterceptor)}.
*
* <p>Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the
* methods you need instead of implementing this interface directly. {@link
Expand Down Expand Up @@ -102,10 +103,9 @@ final class CancelOperationOutput {}
*
* @param input input to the operation start.
* @return result of the operation start.
* @throws OperationUnsuccessfulException if the operation start failed.
* @throws io.nexusrpc.OperationException if the operation start failed.
*/
StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException;
StartOperationOutput startOperation(StartOperationInput input) throws OperationException;

/**
* Intercepts a call to cancel a Nexus operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.common.interceptors;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.temporal.common.Experimental;

/** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */
Expand All @@ -39,8 +39,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) {
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
return next.startOperation(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import io.nexusrpc.handler.OperationHandlerException;
import io.nexusrpc.handler.HandlerException;
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowType;
Expand Down Expand Up @@ -193,8 +193,7 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
case NEXUS_HANDLER_FAILURE_INFO:
{
NexusHandlerFailureInfo info = failure.getNexusHandlerFailureInfo();
return new OperationHandlerException(
OperationHandlerException.ErrorType.valueOf(info.getType()), cause);
return new HandlerException(HandlerException.ErrorType.valueOf(info.getType()), cause);
}
case FAILUREINFO_NOT_SET:
default:
Expand Down Expand Up @@ -319,8 +318,8 @@ private Failure exceptionToFailure(Throwable throwable) {
.setOperation(no.getOperation())
.setOperationId(no.getOperationId());
failure.setNexusOperationExecutionFailureInfo(op);
} else if (throwable instanceof OperationHandlerException) {
OperationHandlerException oe = (OperationHandlerException) throwable;
} else if (throwable instanceof HandlerException) {
HandlerException oe = (HandlerException) throwable;
NexusHandlerFailureInfo.Builder info =
NexusHandlerFailureInfo.newBuilder().setType(oe.getErrorType().toString());
failure.setNexusHandlerFailureInfo(info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.uber.m3.tally.Scope;
import io.nexusrpc.Header;
import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.*;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.nexus.v1.*;
Expand All @@ -51,6 +54,11 @@

public class NexusTaskHandlerImpl implements NexusTaskHandler {
private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class);
private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer();
private static final String TEMPORAL_FAILURE_TYPE_STRING =
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
private static final Map<String, String> NEXUS_FAILURE_METADATA =
Collections.singletonMap("type", TEMPORAL_FAILURE_TYPE_STRING);
private final DataConverter dataConverter;
private final String namespace;
private final String taskQueue;
Expand Down Expand Up @@ -122,7 +130,7 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
} catch (IllegalArgumentException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.BAD_REQUEST.toString())
.setErrorType(HandlerException.ErrorType.BAD_REQUEST.toString())
.setFailure(
Failure.newBuilder().setMessage("cannot parse request timeout").build())
.build());
Expand All @@ -148,20 +156,20 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
default:
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.NOT_IMPLEMENTED.toString())
.setErrorType(HandlerException.ErrorType.NOT_IMPLEMENTED.toString())
.setFailure(Failure.newBuilder().setMessage("unknown request type").build())
.build());
}
} catch (OperationHandlerException e) {
} catch (HandlerException e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(e.getErrorType().toString())
.setFailure(createFailure(e.getCause()))
.setFailure(exceptionToNexusFailure(e.getCause()))
.build());
} catch (Throwable e) {
return new Result(
HandlerError.newBuilder()
.setErrorType(OperationHandlerException.ErrorType.INTERNAL.toString())
.setErrorType(HandlerException.ErrorType.INTERNAL.toString())
.setFailure(Failure.newBuilder().setMessage(e.toString()).build())
.build());
} finally {
Expand All @@ -177,12 +185,21 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
}
}

private Failure createFailure(Throwable exception) {
private Failure exceptionToNexusFailure(Throwable exception) {
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
String details;
try {
details =
JSON_PRINTER
.omittingInsignificantWhitespace()
.print(failure.toBuilder().setMessage("").build());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return Failure.newBuilder()
.setMessage(failure.getMessage())
.setDetails(failure.toByteString())
.putAllMetadata(Collections.singletonMap("type", "NexusFailureType"))
.setDetails(ByteString.copyFromUtf8(details))
.putAllMetadata(NEXUS_FAILURE_METADATA)
.build();
}

Expand Down Expand Up @@ -210,25 +227,22 @@ private CancelOperationResponse handleCancelledOperation(
try {
cancelOperation(ctx.build(), operationCancelDetails);
} catch (Throwable failure) {
convertKnownFailures(failure);
// convertKnownFailures(failure);
throw failure;
}

return CancelOperationResponse.newBuilder().build();
}

private void convertKnownFailures(Throwable e) {
private void convertKnownFailures(Throwable e) throws OperationException {
Throwable failure = CheckedExceptionWrapper.unwrap(e);
if (failure instanceof WorkflowException) {
throw OperationException.failure(failure);
}
if (failure instanceof ApplicationFailure) {
if (((ApplicationFailure) failure).isNonRetryable()) {
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage());
throw OperationException.failure(failure);
}
throw new OperationHandlerException(
OperationHandlerException.ErrorType.INTERNAL, failure.getMessage());
}
if (failure instanceof WorkflowException) {
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage());
}
if (failure instanceof Error) {
throw (Error) failure;
Expand All @@ -240,7 +254,7 @@ private void convertKnownFailures(Throwable e) {

private OperationStartResult<HandlerResultContent> startOperation(
OperationContext context, OperationStartDetails details, HandlerInputContent input)
throws OperationUnsuccessfulException {
throws OperationException {
try {
return serviceHandler.startOperation(context, details, input);
} catch (Throwable e) {
Expand Down Expand Up @@ -271,8 +285,8 @@ private StartOperationResponse handleStartOperation(
operationStartDetails.addLink(nexusProtoLinkToLink(link));
} catch (URISyntaxException e) {
log.error("failed to parse link url: " + link.getUrl(), e);
throw new OperationHandlerException(
OperationHandlerException.ErrorType.BAD_REQUEST,
throw new HandlerException(
HandlerException.ErrorType.BAD_REQUEST,
new RuntimeException("Invalid link URL: " + link.getUrl(), e));
}
});
Expand All @@ -282,36 +296,40 @@ private StartOperationResponse handleStartOperation(

StartOperationResponse.Builder startResponseBuilder = StartOperationResponse.newBuilder();
try {
OperationStartResult<HandlerResultContent> result =
startOperation(ctx.build(), operationStartDetails.build(), input.build());
if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.build());
} else {
startResponseBuilder.setAsyncSuccess(
StartOperationResponse.Async.newBuilder()
.setOperationId(result.getAsyncOperationId())
.addAllLinks(
result.getLinks().stream()
.map(
link ->
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.build());
try {
OperationStartResult<HandlerResultContent> result =
startOperation(ctx.build(), operationStartDetails.build(), input.build());
if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.build());
} else {
startResponseBuilder.setAsyncSuccess(
StartOperationResponse.Async.newBuilder()
.setOperationId(result.getAsyncOperationId())
.addAllLinks(
result.getLinks().stream()
.map(
link ->
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.build());
}
} catch (OperationException e) {
throw e;
} catch (Throwable failure) {
convertKnownFailures(failure);
}
} catch (OperationUnsuccessfulException e) {
} catch (OperationException e) {
startResponseBuilder.setOperationError(
UnsuccessfulOperationError.newBuilder()
.setOperationState(e.getState().toString().toLowerCase())
.setFailure(createFailure(e.getCause()))
.setFailure(exceptionToNexusFailure(e.getCause()))
.build());
} catch (Throwable failure) {
convertKnownFailures(failure);
}
return startResponseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.internal.nexus;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationStartResult;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
Expand All @@ -40,8 +40,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) {
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
OperationStartResult result =
operationInterceptor.start(
input.getOperationContext(), input.getStartDetails(), input.getInput());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

package io.temporal.internal.nexus;

import io.nexusrpc.OperationException;
import io.nexusrpc.OperationInfo;
import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.handler.*;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkerInterceptor;
Expand Down Expand Up @@ -60,7 +60,7 @@ public OperationInterceptorConverter(NexusOperationInboundCallsInterceptor next)
@Override
public OperationStartResult<Object> start(
OperationContext operationContext, OperationStartDetails operationStartDetails, Object o)
throws OperationUnsuccessfulException {
throws OperationException {
return next.startOperation(
new NexusOperationInboundCallsInterceptor.StartOperationInput(
operationContext, operationStartDetails, o))
Expand All @@ -70,14 +70,14 @@ public OperationStartResult<Object> start(
@Override
public Object fetchResult(
OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails)
throws OperationHandlerException {
throws OperationException {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public OperationInfo fetchInfo(
OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails)
throws OperationHandlerException {
throws HandlerException {
throw new UnsupportedOperationException("Not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.nexus;

import io.nexusrpc.OperationUnsuccessfulException;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.OperationContext;
import io.nexusrpc.handler.OperationStartDetails;
import io.temporal.client.WorkflowClient;
Expand All @@ -37,5 +37,5 @@ public interface SynchronousWorkflowClientOperationFunction<T, R> {
@Nullable
R apply(
OperationContext ctx, OperationStartDetails details, WorkflowClient client, @Nullable T input)
throws OperationUnsuccessfulException;
throws OperationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public OperationStartResult<R> start(
return result.build();
} catch (URISyntaxException e) {
// Not expected as the link is constructed by the SDK.
throw new OperationHandlerException(OperationHandlerException.ErrorType.INTERNAL, e);
throw new HandlerException(HandlerException.ErrorType.INTERNAL, e);
}
}

Expand Down
Loading

0 comments on commit 107efdb

Please sign in to comment.