Skip to content

Commit

Permalink
Rename AwakeableHandle#complete() to AwakeableHandle#resolve(). Intro…
Browse files Browse the repository at this point in the history
…duce AwakeableHandle#reject()
  • Loading branch information
slinkydeveloper committed Aug 21, 2023
1 parent ab4b604 commit ed52127
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,14 @@ public interface AwakeableHandle {
* configured {@link Serde}. MUST NOT be null.
* @see Awakeable
*/
<T> void complete(TypeTag<T> typeTag, @Nonnull T payload);
<T> void resolve(TypeTag<T> typeTag, @Nonnull T payload);

/**
* Complete with failure the {@link Awakeable} identified by the provided {@link
* AwakeableIdentifier}.
*
* @param reason the rejection reason.
* @see Awakeable
*/
void reject(String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default <T> Awakeable<T> awakeable(Class<T> type) {

/**
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
* AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}.
* AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}.
*
* @see Awakeable
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,13 @@ public <T> Awakeable<T> awakeable(TypeTag<T> typeTag) throws StatusRuntimeExcept
public AwakeableHandle awakeableHandle(String id) {
return new AwakeableHandle() {
@Override
public <T> void complete(TypeTag<T> typeTag, @Nonnull T payload) {
Util.<Void>blockOnSyscall(cb -> syscalls.completeAwakeable(id, typeTag, payload, cb));
public <T> void resolve(TypeTag<T> typeTag, @Nonnull T payload) {
Util.<Void>blockOnSyscall(cb -> syscalls.resolveAwakeable(id, typeTag, payload, cb));
}

@Override
public void reject(String reason) {
Util.<Void>blockOnSyscall(cb -> syscalls.rejectAwakeable(id, reason, cb));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ public <T> void awakeable(
}

@Override
public <T> void completeAwakeable(
public <T> void resolveAwakeable(
String id, TypeTag<T> typeTag, @Nonnull T payload, SyscallCallback<Void> requestCallback) {
syscallsExecutor.execute(
() -> syscalls.completeAwakeable(id, typeTag, payload, requestCallback));
() -> syscalls.resolveAwakeable(id, typeTag, payload, requestCallback));
}

@Override
public void rejectAwakeable(String id, String reason, SyscallCallback<Void> requestCallback) {
syscallsExecutor.execute(() -> syscalls.rejectAwakeable(id, reason, requestCallback));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.rpc.Code;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.generated.sdk.java.Java;
import dev.restate.generated.service.protocol.Protocol;
Expand Down Expand Up @@ -246,27 +247,48 @@ public <T> void awakeable(
}

@Override
public <T> void completeAwakeable(
public <T> void resolveAwakeable(
String serializedId, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> callback) {
LOG.trace("completeAwakeable");
LOG.trace("resolveAwakeable");

Objects.requireNonNull(payload);
ByteString serialized = serialize(ty, payload);

completeAwakeable(
serializedId,
Protocol.CompleteAwakeableEntryMessage.newBuilder().setValue(serialized),
callback);
}

@Override
public void rejectAwakeable(String serializedId, String reason, SyscallCallback<Void> callback) {
LOG.trace("rejectAwakeable");

completeAwakeable(
serializedId,
Protocol.CompleteAwakeableEntryMessage.newBuilder()
.setFailure(
Protocol.Failure.newBuilder().setCode(Code.UNKNOWN_VALUE).setMessage(reason)),
callback);
}

private void completeAwakeable(
String serializedId,
Protocol.CompleteAwakeableEntryMessage.Builder builder,
SyscallCallback<Void> callback) {
Protocol.AwakeableIdentifier id;
try {
id = Protocol.AwakeableIdentifier.parseFrom(Base64.getUrlDecoder().decode(serializedId));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Cannot decode AwakeableIdentifier", e);
}

Objects.requireNonNull(payload);
ByteString serialized = serialize(ty, payload);

Protocol.CompleteAwakeableEntryMessage expectedEntry =
Protocol.CompleteAwakeableEntryMessage.newBuilder()
builder
.setServiceName(id.getServiceName())
.setInstanceKey(id.getInstanceKey())
.setInvocationId(id.getInvocationId())
.setEntryIndex(id.getEntryIndex())
.setValue(serialized)
.build();
this.stateMachine.processJournalEntry(expectedEntry, CompleteAwakeableEntry.INSTANCE, callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ void exitSideEffectBlockWithException(
<T> void awakeable(
TypeTag<T> typeTag, SyscallCallback<Map.Entry<String, DeferredResult<T>>> callback);

<T> void completeAwakeable(
<T> void resolveAwakeable(
String id, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> requestCallback);

void rejectAwakeable(String id, String reason, SyscallCallback<Void> requestCallback);

// ----- Deferred

<T> void resolveDeferred(DeferredResult<T> deferredToResolve, SyscallCallback<Void> callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ internal constructor(
internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle {
override suspend fun <T : Any> complete(typeTag: TypeTag<T>, payload: T) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.completeAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void awake(AwakeServiceRequest request, StreamObserver<Empty> responseObs
RestateContext ctx = restateContext();

AwakeableHandle awakeableHandle = ctx.awakeableHandle(request.getId());
awakeableHandle.complete(TypeTag.STRING_UTF8, "Wake up!");
awakeableHandle.resolve(TypeTag.STRING_UTF8, "Wake up!");

responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
Expand Down

0 comments on commit ed52127

Please sign in to comment.