Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename AwakeableHandle#complete() to AwakeableHandle#resolve(). Introduce AwakeableHandle#reject() #106

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,14 @@ public interface AwakeableHandle {
* configured {@link Serde}. MUST NOT be null.
* @see Awakeable
*/
<T> void complete(TypeTag<T> typeTag, @Nonnull T payload);
<T> void resolve(TypeTag<T> typeTag, @Nonnull T payload);

/**
* Complete with failure the {@link Awakeable} identified by the provided {@link
* AwakeableIdentifier}.
*
* @param reason the rejection reason.
* @see Awakeable
*/
void reject(String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default <T> Awakeable<T> awakeable(Class<T> type) {

/**
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
* AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}.
* AwakeableHandle#resolve(TypeTag, Object)} the linked {@link Awakeable}.
*
* @see Awakeable
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,13 @@ public <T> Awakeable<T> awakeable(TypeTag<T> typeTag) throws StatusRuntimeExcept
public AwakeableHandle awakeableHandle(String id) {
return new AwakeableHandle() {
@Override
public <T> void complete(TypeTag<T> typeTag, @Nonnull T payload) {
Util.<Void>blockOnSyscall(cb -> syscalls.completeAwakeable(id, typeTag, payload, cb));
public <T> void resolve(TypeTag<T> typeTag, @Nonnull T payload) {
Util.<Void>blockOnSyscall(cb -> syscalls.resolveAwakeable(id, typeTag, payload, cb));
}

@Override
public void reject(String reason) {
Util.<Void>blockOnSyscall(cb -> syscalls.rejectAwakeable(id, reason, cb));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ public <T> void awakeable(
}

@Override
public <T> void completeAwakeable(
public <T> void resolveAwakeable(
String id, TypeTag<T> typeTag, @Nonnull T payload, SyscallCallback<Void> requestCallback) {
syscallsExecutor.execute(
() -> syscalls.completeAwakeable(id, typeTag, payload, requestCallback));
() -> syscalls.resolveAwakeable(id, typeTag, payload, requestCallback));
}

@Override
public void rejectAwakeable(String id, String reason, SyscallCallback<Void> requestCallback) {
syscallsExecutor.execute(() -> syscalls.rejectAwakeable(id, reason, requestCallback));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.rpc.Code;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.generated.sdk.java.Java;
import dev.restate.generated.service.protocol.Protocol;
Expand Down Expand Up @@ -246,27 +247,48 @@ public <T> void awakeable(
}

@Override
public <T> void completeAwakeable(
public <T> void resolveAwakeable(
String serializedId, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> callback) {
LOG.trace("completeAwakeable");
LOG.trace("resolveAwakeable");

Objects.requireNonNull(payload);
ByteString serialized = serialize(ty, payload);

completeAwakeable(
serializedId,
Protocol.CompleteAwakeableEntryMessage.newBuilder().setValue(serialized),
callback);
}

@Override
public void rejectAwakeable(String serializedId, String reason, SyscallCallback<Void> callback) {
LOG.trace("rejectAwakeable");

completeAwakeable(
serializedId,
Protocol.CompleteAwakeableEntryMessage.newBuilder()
.setFailure(
Protocol.Failure.newBuilder().setCode(Code.UNKNOWN_VALUE).setMessage(reason)),
callback);
}

private void completeAwakeable(
String serializedId,
Protocol.CompleteAwakeableEntryMessage.Builder builder,
SyscallCallback<Void> callback) {
Protocol.AwakeableIdentifier id;
try {
id = Protocol.AwakeableIdentifier.parseFrom(Base64.getUrlDecoder().decode(serializedId));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Cannot decode AwakeableIdentifier", e);
}

Objects.requireNonNull(payload);
ByteString serialized = serialize(ty, payload);

Protocol.CompleteAwakeableEntryMessage expectedEntry =
Protocol.CompleteAwakeableEntryMessage.newBuilder()
builder
.setServiceName(id.getServiceName())
.setInstanceKey(id.getInstanceKey())
.setInvocationId(id.getInvocationId())
.setEntryIndex(id.getEntryIndex())
.setValue(serialized)
.build();
this.stateMachine.processJournalEntry(expectedEntry, CompleteAwakeableEntry.INSTANCE, callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ void exitSideEffectBlockWithException(
<T> void awakeable(
TypeTag<T> typeTag, SyscallCallback<Map.Entry<String, DeferredResult<T>>> callback);

<T> void completeAwakeable(
<T> void resolveAwakeable(
String id, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> requestCallback);

void rejectAwakeable(String id, String reason, SyscallCallback<Void> requestCallback);

// ----- Deferred

<T> void resolveDeferred(DeferredResult<T> deferredToResolve, SyscallCallback<Void> callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ internal constructor(
internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle {
override suspend fun <T : Any> complete(typeTag: TypeTag<T>, payload: T) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.completeAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
syscalls.resolveAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void awake(AwakeServiceRequest request, StreamObserver<Empty> responseObs
RestateContext ctx = restateContext();

AwakeableHandle awakeableHandle = ctx.awakeableHandle(request.getId());
awakeableHandle.complete(TypeTag.STRING_UTF8, "Wake up!");
awakeableHandle.resolve(TypeTag.STRING_UTF8, "Wake up!");

responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
Expand Down
Loading