diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java index 5350147b..04561863 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/AwakeableHandle.java @@ -17,5 +17,14 @@ public interface AwakeableHandle { * configured {@link Serde}. MUST NOT be null. * @see Awakeable */ - void complete(TypeTag typeTag, @Nonnull T payload); + void resolve(TypeTag typeTag, @Nonnull T payload); + + /** + * Complete with failure the {@link Awakeable} identified by the provided {@link + * AwakeableIdentifier}. + * + * @param reason the rejection reason. + * @see Awakeable + */ + void reject(String reason); } diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java index 807b87ed..0354a1d8 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java @@ -137,7 +137,7 @@ default Awakeable awakeable(Class type) { /** * Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link - * AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}. + * AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}. * * @see Awakeable */ diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java index cb381399..2e6f6697 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java @@ -153,8 +153,13 @@ public Awakeable awakeable(TypeTag typeTag) throws StatusRuntimeExcept public AwakeableHandle awakeableHandle(String id) { return new AwakeableHandle() { @Override - public void complete(TypeTag typeTag, @Nonnull T payload) { - Util.blockOnSyscall(cb -> syscalls.completeAwakeable(id, typeTag, payload, cb)); + public void resolve(TypeTag typeTag, @Nonnull T payload) { + Util.blockOnSyscall(cb -> syscalls.resolveAwakeable(id, typeTag, payload, cb)); + } + + @Override + public void reject(String reason) { + Util.blockOnSyscall(cb -> syscalls.rejectAwakeable(id, reason, cb)); } }; } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java index b9f549cc..57558e54 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java @@ -168,10 +168,15 @@ public void awakeable( } @Override - public void completeAwakeable( + public void resolveAwakeable( String id, TypeTag typeTag, @Nonnull T payload, SyscallCallback requestCallback) { syscallsExecutor.execute( - () -> syscalls.completeAwakeable(id, typeTag, payload, requestCallback)); + () -> syscalls.resolveAwakeable(id, typeTag, payload, requestCallback)); + } + + @Override + public void rejectAwakeable(String id, String reason, SyscallCallback requestCallback) { + syscallsExecutor.execute(() -> syscalls.rejectAwakeable(id, reason, requestCallback)); } @Override diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java index 0f288f16..da810b47 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java @@ -6,6 +6,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import com.google.rpc.Code; import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.generated.sdk.java.Java; import dev.restate.generated.service.protocol.Protocol; @@ -246,10 +247,35 @@ public void awakeable( } @Override - public void completeAwakeable( + public void resolveAwakeable( String serializedId, TypeTag ty, @Nonnull T payload, SyscallCallback callback) { - LOG.trace("completeAwakeable"); + LOG.trace("resolveAwakeable"); + Objects.requireNonNull(payload); + ByteString serialized = serialize(ty, payload); + + completeAwakeable( + serializedId, + Protocol.CompleteAwakeableEntryMessage.newBuilder().setValue(serialized), + callback); + } + + @Override + public void rejectAwakeable(String serializedId, String reason, SyscallCallback callback) { + LOG.trace("rejectAwakeable"); + + completeAwakeable( + serializedId, + Protocol.CompleteAwakeableEntryMessage.newBuilder() + .setFailure( + Protocol.Failure.newBuilder().setCode(Code.UNKNOWN_VALUE).setMessage(reason)), + callback); + } + + private void completeAwakeable( + String serializedId, + Protocol.CompleteAwakeableEntryMessage.Builder builder, + SyscallCallback callback) { Protocol.AwakeableIdentifier id; try { id = Protocol.AwakeableIdentifier.parseFrom(Base64.getUrlDecoder().decode(serializedId)); @@ -257,16 +283,12 @@ public void completeAwakeable( throw new RuntimeException("Cannot decode AwakeableIdentifier", e); } - Objects.requireNonNull(payload); - ByteString serialized = serialize(ty, payload); - Protocol.CompleteAwakeableEntryMessage expectedEntry = - Protocol.CompleteAwakeableEntryMessage.newBuilder() + builder .setServiceName(id.getServiceName()) .setInstanceKey(id.getInstanceKey()) .setInvocationId(id.getInvocationId()) .setEntryIndex(id.getEntryIndex()) - .setValue(serialized) .build(); this.stateMachine.processJournalEntry(expectedEntry, CompleteAwakeableEntry.INSTANCE, callback); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java index c41f01fd..e3a23081 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java @@ -79,9 +79,11 @@ void exitSideEffectBlockWithException( void awakeable( TypeTag typeTag, SyscallCallback>> callback); - void completeAwakeable( + void resolveAwakeable( String id, TypeTag ty, @Nonnull T payload, SyscallCallback requestCallback); + void rejectAwakeable(String id, String reason, SyscallCallback requestCallback); + // ----- Deferred void resolveDeferred(DeferredResult deferredToResolve, SyscallCallback callback); diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt index dc8c4db7..4b2b082e 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt @@ -66,7 +66,7 @@ internal constructor( internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle { override suspend fun complete(typeTag: TypeTag, payload: T) { return suspendCancellableCoroutine { cont: CancellableContinuation -> - syscalls.completeAwakeable(id, typeTag, payload, completingUnitContinuation(cont)) + syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont)) } } } diff --git a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java index 469103ca..ceb952a4 100644 --- a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java +++ b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java @@ -22,7 +22,7 @@ public void awake(AwakeServiceRequest request, StreamObserver responseObs RestateContext ctx = restateContext(); AwakeableHandle awakeableHandle = ctx.awakeableHandle(request.getId()); - awakeableHandle.complete(TypeTag.STRING_UTF8, "Wake up!"); + awakeableHandle.resolve(TypeTag.STRING_UTF8, "Wake up!"); responseObserver.onNext(Empty.getDefaultInstance()); responseObserver.onCompleted();