diff --git a/sdk-core/build.gradle.kts b/sdk-core/build.gradle.kts index 13dfbe96..7b87e852 100644 --- a/sdk-core/build.gradle.kts +++ b/sdk-core/build.gradle.kts @@ -17,6 +17,9 @@ dependencies { implementation(coreLibs.grpc.protobuf) implementation(coreLibs.log4j.api) + // We don't want a hard-dependency on it + compileOnly(coreLibs.log4j.core) + implementation(platform(coreLibs.opentelemetry.bom)) implementation(coreLibs.opentelemetry.api) implementation(coreLibs.opentelemetry.semconv) diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingWrappers.java b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingWrappers.java index b5f35c7b..0a8dd023 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingWrappers.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingWrappers.java @@ -11,7 +11,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; import dev.restate.sdk.common.TerminalException; -import dev.restate.sdk.common.syscalls.*; import dev.restate.sdk.common.syscalls.DeferredResult; import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback; import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback; @@ -48,13 +47,13 @@ private ExecutorSwitchingServerCallListener( } @Override - public void onMessageAndHalfClose(MessageLite message) { - userExecutor.execute(() -> listener.onMessageAndHalfClose(message)); + public void invoke(MessageLite message) { + userExecutor.execute(() -> listener.invoke(message)); } // A bit of explanation why the following methods are not executed on the user executor. // - // The listener methods onReady/onCancel/onComplete are used purely for notification reasons, + // The listener methods listenerReady/cancel/close are used purely for notification reasons, // they don't execute any user code. // // Running them in the userExecutor can also be problematic if the listener @@ -64,18 +63,18 @@ public void onMessageAndHalfClose(MessageLite message) { // as thread local. @Override - public void onCancel() { - listener.onCancel(); + public void cancel() { + listener.cancel(); } @Override - public void onComplete() { - listener.onComplete(); + public void close() { + listener.close(); } @Override - public void onReady() { - listener.onReady(); + public void listenerReady() { + listener.listenerReady(); } } @@ -182,6 +181,18 @@ public void resolveDeferred( syscallsExecutor.execute(() -> syscalls.resolveDeferred(deferredToResolve, callback)); } + @Override + public String getFullyQualifiedMethodName() { + // We can read this from another thread + return syscalls.getFullyQualifiedMethodName(); + } + + @Override + public InvocationState getInvocationState() { + // We can read this from another thread + return syscalls.getInvocationState(); + } + @Override public void close() { syscallsExecutor.execute(syscalls::close); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/GrpcContextDataProvider.java b/sdk-core/src/main/java/dev/restate/sdk/core/GrpcContextDataProvider.java new file mode 100644 index 00000000..bc2bb057 --- /dev/null +++ b/sdk-core/src/main/java/dev/restate/sdk/core/GrpcContextDataProvider.java @@ -0,0 +1,48 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core; + +import dev.restate.sdk.common.InvocationId; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.core.util.ContextDataProvider; + +/** + * Log4j2 ContextDataProvider inferring context from the Grpc context. + * + *

This is used to propagate the context to the user code, such that log statements from the user + * will contain the restate logging context variables. + * + *

This is based on grpc Context due to the fact that it's the only guaranteed thread + * local/context we can rely on that is always available in the user code. + */ +public class GrpcContextDataProvider implements ContextDataProvider { + @Override + public Map supplyContextData() { + InvocationId invocationId = InvocationId.INVOCATION_ID_KEY.get(); + SyscallsInternal syscalls = (SyscallsInternal) SyscallsInternal.SYSCALLS_KEY.get(); + + if (invocationId == null) { + return Collections.emptyMap(); + } + + // We can't use the immutable MapN implementation from Map.of because of + // https://github.com/apache/logging-log4j2/issues/2098 + HashMap m = new HashMap<>(); + m.put(RestateGrpcServer.LoggingContextSetter.INVOCATION_ID_KEY, invocationId.toString()); + m.put( + RestateGrpcServer.LoggingContextSetter.SERVICE_METHOD_KEY, + syscalls.getFullyQualifiedMethodName()); + m.put( + RestateGrpcServer.LoggingContextSetter.SERVICE_INVOCATION_STATUS_KEY, + syscalls.getInvocationState().toString()); + return m; + } +} diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/GrpcServerCallListenerAdaptor.java b/sdk-core/src/main/java/dev/restate/sdk/core/GrpcServerCallListenerAdaptor.java index 7eb00de7..f2d8b499 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/GrpcServerCallListenerAdaptor.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/GrpcServerCallListenerAdaptor.java @@ -8,9 +8,7 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.core; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.Status; +import io.grpc.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -24,50 +22,75 @@ class GrpcServerCallListenerAdaptor implements RestateServerCallLis private static final Logger LOG = LogManager.getLogger(GrpcServerCallListenerAdaptor.class); + private final Context context; private final ServerCall serverCall; - private final ServerCall.Listener delegate; GrpcServerCallListenerAdaptor( - ServerCall.Listener delegate, ServerCall serverCall) { - this.delegate = delegate; + Context context, + ServerCall serverCall, + Metadata headers, + ServerCallHandler next) { + this.context = context; this.serverCall = serverCall; + + // This emulates Contexts.interceptCall. + // We need it because some code generator depends on the fact that startCall already has the + // context available + Context previous = this.context.attach(); + try { + this.delegate = next.startCall(serverCall, headers); + } finally { + this.context.detach(previous); + } } @Override - public void onMessageAndHalfClose(ReqT message) { + public void invoke(ReqT message) { + Context previous = context.attach(); try { delegate.onMessage(message); delegate.onHalfClose(); } catch (Throwable e) { closeWithException(e); + } finally { + context.detach(previous); } } @Override - public void onCancel() { + public void cancel() { + Context previous = context.attach(); try { delegate.onCancel(); } catch (Throwable e) { closeWithException(e); + } finally { + context.detach(previous); } } @Override - public void onComplete() { + public void close() { + Context previous = context.attach(); try { delegate.onComplete(); } catch (Throwable e) { closeWithException(e); + } finally { + context.detach(previous); } } @Override - public void onReady() { + public void listenerReady() { + Context previous = context.attach(); try { delegate.onReady(); } catch (Throwable e) { closeWithException(e); + } finally { + context.detach(previous); } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/InvocationState.java b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationState.java new file mode 100644 index 00000000..387d30fd --- /dev/null +++ b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationState.java @@ -0,0 +1,16 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.core; + +public enum InvocationState { + WAITING_START, + REPLAYING, + PROCESSING, + CLOSED; +} diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java index 31ba5b98..d31ebdd6 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/InvocationStateMachine.java @@ -33,18 +33,12 @@ class InvocationStateMachine implements InvocationFlow.InvocationProcessor { private static final Logger LOG = LogManager.getLogger(InvocationStateMachine.class); - enum State { - WAITING_START, - REPLAYING, - PROCESSING, - CLOSED; - } - private final String serviceName; + private final String fullyQualifiedMethodName; private final Span span; - private final Consumer transitionStateObserver; + private final Consumer transitionStateObserver; - private State state = State.WAITING_START; + private volatile InvocationState invocationState = InvocationState.WAITING_START; // Used for the side effect guard private boolean insideSideEffect = false; @@ -68,8 +62,13 @@ enum State { private Flow.Subscription inputSubscription; private final CallbackHandle> afterStartCallback; - InvocationStateMachine(String serviceName, Span span, Consumer transitionStateObserver) { + InvocationStateMachine( + String serviceName, + String fullyQualifiedMethodName, + Span span, + Consumer transitionStateObserver) { this.serviceName = serviceName; + this.fullyQualifiedMethodName = fullyQualifiedMethodName; this.span = span; this.transitionStateObserver = transitionStateObserver; @@ -94,6 +93,14 @@ public String debugId() { return debugId; } + public InvocationState getInvocationState() { + return this.invocationState; + } + + public String getFullyQualifiedMethodName() { + return this.fullyQualifiedMethodName; + } + // --- Output Publisher impl @Override @@ -122,7 +129,7 @@ public void onSubscribe(Flow.Subscription subscription) { public void onNext(InvocationFlow.InvocationInput invocationInput) { MessageLite msg = invocationInput.message(); LOG.trace("Received input message {} {}", msg.getClass(), msg); - if (this.state == State.WAITING_START) { + if (this.invocationState == InvocationState.WAITING_START) { this.onStart(msg); } else if (msg instanceof Protocol.CompletionMessage) { // We check the instance rather than the state, because the user code might still be @@ -189,9 +196,9 @@ void onStart(MessageLite msg) { } // Execute state transition - this.transitionState(State.REPLAYING); + this.transitionState(InvocationState.REPLAYING); if (this.entriesToReplay == 0) { - this.transitionState(State.PROCESSING); + this.transitionState(InvocationState.PROCESSING); } this.inputSubscription.request(Long.MAX_VALUE); @@ -201,8 +208,8 @@ void onStart(MessageLite msg) { } void close() { - if (this.state != State.CLOSED) { - this.transitionState(State.CLOSED); + if (this.invocationState != InvocationState.CLOSED) { + this.transitionState(InvocationState.CLOSED); LOG.debug("Closing state machine"); // Cancel inputSubscription and complete outputSubscriber @@ -223,8 +230,8 @@ void close() { } void fail(Throwable cause) { - if (this.state != State.CLOSED) { - this.transitionState(State.CLOSED); + if (this.invocationState != InvocationState.CLOSED) { + this.transitionState(InvocationState.CLOSED); LOG.debug("Closing state machine with failure", cause); // Cancel inputSubscription and complete outputSubscriber @@ -262,9 +269,9 @@ void processCompletableJournalEntry( Entries.CompletableJournalEntry journalEntry, SyscallCallback> callback) { checkInsideSideEffectGuard(); - if (this.state == State.CLOSED) { + if (this.invocationState == InvocationState.CLOSED) { callback.onCancel(AbortedExecutionException.INSTANCE); - } else if (this.state == State.REPLAYING) { + } else if (this.invocationState == InvocationState.REPLAYING) { // Retrieve the entry this.readEntry( (entryIndex, actualEntryMessage) -> { @@ -290,7 +297,7 @@ void processCompletableJournalEntry( } }, callback::onCancel); - } else if (this.state == State.PROCESSING) { + } else if (this.invocationState == InvocationState.PROCESSING) { // Try complete with local storage E entryToWrite = journalEntry.tryCompleteWithUserStateStorage(expectedEntryMessage, userStateStore); @@ -335,9 +342,9 @@ void processJournalEntry( Entries.JournalEntry journalEntry, SyscallCallback callback) { checkInsideSideEffectGuard(); - if (this.state == State.CLOSED) { + if (this.invocationState == InvocationState.CLOSED) { callback.onCancel(AbortedExecutionException.INSTANCE); - } else if (this.state == State.REPLAYING) { + } else if (this.invocationState == InvocationState.REPLAYING) { // Retrieve the entry this.readEntry( (entryIndex, actualEntryMessage) -> { @@ -346,7 +353,7 @@ void processJournalEntry( callback.onSuccess(null); }, callback::onCancel); - } else if (this.state == State.PROCESSING) { + } else if (this.invocationState == InvocationState.PROCESSING) { if (span.isRecording()) { journalEntry.trace(expectedEntryMessage, span); } @@ -367,9 +374,9 @@ void processJournalEntry( void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) { checkInsideSideEffectGuard(); - if (this.state == State.CLOSED) { + if (this.invocationState == InvocationState.CLOSED) { callback.onCancel(AbortedExecutionException.INSTANCE); - } else if (this.state == State.REPLAYING) { + } else if (this.invocationState == InvocationState.REPLAYING) { // Retrieve the entry this.readEntry( (entryIndex, msg) -> { @@ -379,7 +386,7 @@ void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) { completeSideEffectCallbackWithEntry((Java.SideEffectEntryMessage) msg, callback); }, callback::onCancel); - } else if (this.state == State.PROCESSING) { + } else if (this.invocationState == InvocationState.PROCESSING) { insideSideEffect = true; if (span.isRecording()) { span.addEvent("Enter SideEffect"); @@ -394,13 +401,13 @@ void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) { void exitSideEffectBlock( Java.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) { this.insideSideEffect = false; - if (this.state == State.CLOSED) { + if (this.invocationState == InvocationState.CLOSED) { callback.onCancel(AbortedExecutionException.INSTANCE); - } else if (this.state == State.REPLAYING) { + } else if (this.invocationState == InvocationState.REPLAYING) { throw new IllegalStateException( "exitSideEffect has been invoked when the state machine is in replaying mode. " + "This is probably an SDK bug and might be caused by a missing enterSideEffectBlock invocation before exitSideEffectBlock."); - } else if (this.state == State.PROCESSING) { + } else if (this.invocationState == InvocationState.PROCESSING) { if (span.isRecording()) { span.addEvent("Exit SideEffect"); } @@ -541,7 +548,7 @@ private void resolveCombinatorDeferred( // Calling .await() on a combinator deferred within a side effect is not allowed // as resolving it creates or read a journal entry. checkInsideSideEffectGuard(); - if (Objects.equals(this.state, State.REPLAYING)) { + if (Objects.equals(this.invocationState, InvocationState.REPLAYING)) { // Retrieve the CombinatorAwaitableEntryMessage this.readEntry( (entryIndex, actualMsg) -> { @@ -554,7 +561,7 @@ private void resolveCombinatorDeferred( callback.onSuccess(null); }, callback::onCancel); - } else if (this.state == State.PROCESSING) { + } else if (this.invocationState == InvocationState.PROCESSING) { // Create map of singles to resolve Map> resolvableSingles = new HashMap<>(); @@ -657,24 +664,25 @@ private void writeCombinatorEntry(List resolvedList) { // --- Internal callback - private void transitionState(State newState) { - if (this.state == State.CLOSED) { + private void transitionState(InvocationState newInvocationState) { + if (this.invocationState == InvocationState.CLOSED) { // Cannot move out of the closed state return; } - LOG.debug("Transitioning {} to {}", this, newState); - this.state = newState; - this.transitionStateObserver.accept(newState); + LOG.debug("Transitioning {} to {}", this, newInvocationState); + this.invocationState = newInvocationState; + this.transitionStateObserver.accept(newInvocationState); } private void incrementCurrentIndex() { this.currentJournalIndex++; - if (currentJournalIndex >= entriesToReplay && this.state == State.REPLAYING) { + if (currentJournalIndex >= entriesToReplay + && this.invocationState == InvocationState.REPLAYING) { if (!this.incomingEntriesStateMachine.isEmpty()) { throw new IllegalStateException("Entries queue should be empty at this point"); } - this.transitionState(State.PROCESSING); + this.transitionState(InvocationState.PROCESSING); } } @@ -734,8 +742,8 @@ public String toString() { + "serviceName='" + serviceName + '\'' - + ", state=" - + state + + ", invocationState=" + + invocationState + ", id=" + debugId + '}'; diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/RestateGrpcServer.java b/sdk-core/src/main/java/dev/restate/sdk/core/RestateGrpcServer.java index 17a0f451..b3dae977 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/RestateGrpcServer.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/RestateGrpcServer.java @@ -32,8 +32,6 @@ public class RestateGrpcServer { private static final Logger LOG = LogManager.getLogger(RestateGrpcServer.class); - static final Context.Key SERVICE_METHOD = - Context.key("restate.dev/logging_service_method"); private final Map services; private final Tracer tracer; @@ -64,9 +62,10 @@ public InvocationHandler resolve( if (svc == null) { throw ProtocolException.methodNotFound(serviceName, methodName); } - String serviceMethodName = serviceName + "/" + methodName; + String fullyQualifiedServiceMethod = serviceName + "/" + methodName; ServerMethodDefinition method = - (ServerMethodDefinition) svc.getMethod(serviceMethodName); + (ServerMethodDefinition) + svc.getMethod(fullyQualifiedServiceMethod); if (method == null) { throw ProtocolException.methodNotFound(serviceName, methodName); } @@ -83,12 +82,15 @@ public InvocationHandler resolve( .startSpan(); // Setup logging context - loggingContextSetter.setServiceMethod(serviceMethodName); + loggingContextSetter.setServiceMethod(fullyQualifiedServiceMethod); // Instantiate state machine, syscall and grpc bridge InvocationStateMachine stateMachine = new InvocationStateMachine( - serviceName, span, s -> loggingContextSetter.setInvocationStatus(s.toString())); + serviceName, + fullyQualifiedServiceMethod, + span, + s -> loggingContextSetter.setInvocationStatus(s.toString())); SyscallsInternal syscalls = syscallExecutor != null ? ExecutorSwitchingWrappers.syscalls(new SyscallsImpl(stateMachine), syscallExecutor) @@ -115,18 +117,19 @@ public void start() { // Set invocation id in logging context loggingContextSetter.setInvocationId(invocationId.toString()); - // Create the listener and create the decorators chain - ServerCall.Listener grpcListener = - Contexts.interceptCall( - Context.current() - .withValue(SERVICE_METHOD, serviceMethodName) - .withValue(InvocationId.INVOCATION_ID_KEY, invocationId) - .withValue(Syscalls.SYSCALLS_KEY, syscalls), - bridge, - new Metadata(), - method.getServerCallHandler()); + // This gRPC context will be propagated to the user thread. + // Note: from now on we cannot modify this context anymore! + io.grpc.Context context = + Context.current() + .withValue(InvocationId.INVOCATION_ID_KEY, invocationId) + .withValue(Syscalls.SYSCALLS_KEY, syscalls); + + // Create the listener RestateServerCallListener restateListener = - new GrpcServerCallListenerAdaptor<>(grpcListener, bridge); + new GrpcServerCallListenerAdaptor<>( + context, bridge, new Metadata(), method.getServerCallHandler()); + + // Wrap in the executor switcher, if needed if (serverCallListenerExecutor != null) { restateListener = ExecutorSwitchingWrappers.serverCallListener( @@ -205,23 +208,23 @@ public interface LoggingContextSetter { String INVOCATION_ID_KEY = "restateInvocationId"; String SERVICE_METHOD_KEY = "restateServiceMethod"; - String SERVICE_INVOCATION_STATUS = "restateInvocationStatus"; + String SERVICE_INVOCATION_STATUS_KEY = "restateInvocationStatus"; LoggingContextSetter THREAD_LOCAL_INSTANCE = new LoggingContextSetter() { @Override public void setServiceMethod(String serviceMethod) { - ThreadContext.put(INVOCATION_ID_KEY, serviceMethod); + ThreadContext.put(SERVICE_METHOD_KEY, serviceMethod); } @Override public void setInvocationId(String id) { - ThreadContext.put(SERVICE_METHOD_KEY, id); + ThreadContext.put(INVOCATION_ID_KEY, id); } @Override public void setInvocationStatus(String invocationStatus) { - ThreadContext.put(SERVICE_INVOCATION_STATUS, invocationStatus); + ThreadContext.put(SERVICE_INVOCATION_STATUS_KEY, invocationStatus); } }; diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCall.java b/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCall.java index c3ac4d70..de936fbd 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCall.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCall.java @@ -55,7 +55,7 @@ class RestateServerCall extends ServerCall { void setListener(RestateServerCallListener listener) { this.listener = listener; - this.listener.onReady(); + this.listener.listenerReady(); if (requestCount > 0) { this.pollInput(); @@ -90,11 +90,11 @@ public void sendMessage(MessageLite message) { @Override public void close(Status status, Metadata trailers) { if (status.isOk() || Util.containsSuspendedException(status.getCause())) { - listener.onComplete(); + listener.close(); syscalls.close(); } else { // Let's cancel the listener first - listener.onCancel(); + listener.cancel(); if (Util.isTerminalException(status.getCause())) { syscalls.writeOutput( @@ -165,7 +165,7 @@ private void pollInput() { MessageLite message = deferredValue.toReadyResult().getResult(); LOG.trace("Read input message:\n{}", message); - listener.onMessageAndHalfClose(message); + listener.invoke(message); }, this::onError)), this::onError)); diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCallListener.java b/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCallListener.java index fb823b95..0cf94c28 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCallListener.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/RestateServerCallListener.java @@ -9,18 +9,22 @@ package dev.restate.sdk.core; /** - * Callbacks for incoming rpc messages. + * Interface to invoke generated service code. * *

This interface is strongly inspired by {@link io.grpc.ServerCall.Listener}. * * @param type of the incoming message */ public interface RestateServerCallListener { - void onMessageAndHalfClose(M message); + /** Invoke the service code. */ + void invoke(M message); - void onCancel(); + /** Send cancel signal to service code. */ + void cancel(); - void onComplete(); + /** Close the service code. */ + void close(); - void onReady(); + /** Set the underlying listener as ready. */ + void listenerReady(); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java index 26380b1a..1d2f687d 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsImpl.java @@ -245,7 +245,7 @@ public void exitSideEffectBlockWithTerminalException( public void awakeable(SyscallCallback>> callback) { wrapAndPropagateExceptions( () -> { - LOG.trace("callback"); + LOG.trace("awakeable"); this.stateMachine.processCompletableJournalEntry( Protocol.AwakeableEntryMessage.getDefaultInstance(), AwakeableEntry.INSTANCE, @@ -314,10 +314,17 @@ private void completeAwakeable( public void resolveDeferred( DeferredResult deferredToResolve, SyscallCallback callback) { wrapAndPropagateExceptions( - () -> { - this.stateMachine.resolveDeferred(deferredToResolve, callback); - }, - callback); + () -> this.stateMachine.resolveDeferred(deferredToResolve, callback), callback); + } + + @Override + public String getFullyQualifiedMethodName() { + return this.stateMachine.getFullyQualifiedMethodName(); + } + + @Override + public InvocationState getInvocationState() { + return this.stateMachine.getInvocationState(); } @Override diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsInternal.java b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsInternal.java index ee2c5db4..0387cef0 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsInternal.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/SyscallsInternal.java @@ -14,7 +14,7 @@ import java.util.List; import java.util.stream.Collectors; -public interface SyscallsInternal extends Syscalls { +interface SyscallsInternal extends Syscalls { @Override default DeferredResult createAnyDeferred(List> children) { @@ -31,4 +31,13 @@ default DeferredResult createAllDeferred(List> children) // -- Lifecycle methods void close(); + + // -- State machine introspection (used by logging propagator) + + /** + * @return fully qualified method name in the form {fullyQualifiedServiceName}/{methodName} + */ + String getFullyQualifiedMethodName(); + + InvocationState getInvocationState(); } diff --git a/sdk-core/src/main/resources/META-INF/services/org.apache.logging.log4j.core.util.ContextDataProvider b/sdk-core/src/main/resources/META-INF/services/org.apache.logging.log4j.core.util.ContextDataProvider new file mode 100644 index 00000000..bbed00cc --- /dev/null +++ b/sdk-core/src/main/resources/META-INF/services/org.apache.logging.log4j.core.util.ContextDataProvider @@ -0,0 +1 @@ +dev.restate.sdk.core.GrpcContextDataProvider \ No newline at end of file diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/AwakeableIdTestSuite.java b/sdk-core/src/test/java/dev/restate/sdk/core/AwakeableIdTestSuite.java index 117ea6a1..a813ce6c 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/AwakeableIdTestSuite.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/AwakeableIdTestSuite.java @@ -52,7 +52,6 @@ public Stream definitions() { .setId(ByteString.copyFrom(serializedId)) .setKnownEntries(1), inputMessage(GreetingRequest.getDefaultInstance())) - .onlyUnbuffered() .assertingOutput( messages -> { assertThat(messages) diff --git a/sdk-core/src/test/resources/log4j2.properties b/sdk-core/src/test/resources/log4j2.properties index 97b4a95b..2b7c7a9d 100644 --- a/sdk-core/src/test/resources/log4j2.properties +++ b/sdk-core/src/test/resources/log4j2.properties @@ -5,4 +5,4 @@ appender.testlogger.name = TestLogger appender.testlogger.type = CONSOLE appender.testlogger.target = SYSTEM_ERR appender.testlogger.layout.type = PatternLayout -appender.testlogger.layout.pattern = %-4r %-5p %notEmpty{[%X{restateServiceMethod}]}%notEmpty{[%X{restateInvocationId}]} [%t] %c - %m%n \ No newline at end of file +appender.testlogger.layout.pattern = %-4r %-5p %X [%t] %c:%L - %m%n \ No newline at end of file diff --git a/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RequestHttpServerHandler.java b/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RequestHttpServerHandler.java index e8bd789d..5f0b1702 100644 --- a/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RequestHttpServerHandler.java +++ b/sdk-http-vertx/src/main/java/dev/restate/sdk/http/vertx/RequestHttpServerHandler.java @@ -136,7 +136,7 @@ public void setInvocationId(String id) { @Override public void setInvocationStatus(String invocationStatus) { ContextualData.put( - RestateGrpcServer.LoggingContextSetter.SERVICE_INVOCATION_STATUS, + RestateGrpcServer.LoggingContextSetter.SERVICE_INVOCATION_STATUS_KEY, invocationStatus); } }, @@ -178,12 +178,7 @@ private Executor currentContextExecutor(Context currentContext) { } private Executor blockingExecutor(String serviceName) { - Executor userExecutor = this.blockingServices.get(serviceName); - return runnable -> { - // We need to propagate the gRPC context! - io.grpc.Context ctx = io.grpc.Context.current(); - userExecutor.execute(ctx.wrap(runnable)); - }; + return this.blockingServices.get(serviceName); } private void handleDiscoveryRequest(HttpServerRequest request) { diff --git a/settings.gradle.kts b/settings.gradle.kts index 80c502d6..1070d4e5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -30,7 +30,7 @@ dependencyResolutionManagement { version("protobuf", "3.24.3") version("grpc", "1.58.0") version("grpckt", "1.4.0") - version("log4j", "2.20.0") + version("log4j", "2.22.0") version("opentelemetry", "1.30.1") library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf")