From d340ecd104b338eafa86861e821f5617b86be608 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 28 Nov 2024 10:54:58 -0800 Subject: [PATCH 1/3] Add operation Id to callback headers --- .../io/temporal/internal/common/InternalUtils.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index e37d65f58..c00580f17 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -28,6 +28,8 @@ import io.temporal.client.WorkflowStub; import io.temporal.internal.client.NexusStartWorkflowRequest; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +80,13 @@ public static WorkflowStub createNexusBoundStub( throw new IllegalArgumentException( "WorkflowId is expected to be set on WorkflowOptions when used with Nexus"); } + // Add the Nexus operation ID to the headers if it is not already present to support fabricating + // a NexusOperationStarted event if the completion is received before the response to a + // StartOperation request. + Map headers = new HashMap<>(request.getCallbackHeaders()); + if (!headers.containsKey("nexus-operation-id")) { + headers.put("nexus-operation-id", options.getWorkflowId()); + } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options) .setRequestId(request.getRequestId()) @@ -87,7 +96,7 @@ public static WorkflowStub createNexusBoundStub( .setNexus( Callback.Nexus.newBuilder() .setUrl(request.getCallbackUrl()) - .putAllHeader(request.getCallbackHeaders()) + .putAllHeader(headers) .build()) .build())); if (options.getTaskQueue() == null) { From 9b4889373c8be15ac5cf9b43e2135882556f7778 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 08:52:30 -0800 Subject: [PATCH 2/3] Update Nexus SDK --- build.gradle | 2 +- .../temporal/internal/common/InternalUtils.java | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index 2257ce1ca..4fce4e12d 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.3.0-alpha' // [0.1.0,) + nexusVersion = '0.3.0-alpha' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index c00580f17..753e75e77 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -21,6 +21,7 @@ package io.temporal.internal.common; import com.google.common.base.Defaults; +import io.nexusrpc.Header; import io.temporal.api.common.v1.Callback; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.taskqueue.v1.TaskQueue; @@ -28,8 +29,8 @@ import io.temporal.client.WorkflowStub; import io.temporal.internal.client.NexusStartWorkflowRequest; import java.util.Arrays; -import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,9 +84,16 @@ public static WorkflowStub createNexusBoundStub( // Add the Nexus operation ID to the headers if it is not already present to support fabricating // a NexusOperationStarted event if the completion is received before the response to a // StartOperation request. - Map headers = new HashMap<>(request.getCallbackHeaders()); - if (!headers.containsKey("nexus-operation-id")) { - headers.put("nexus-operation-id", options.getWorkflowId()); + Map headers = + request.getCallbackHeaders().entrySet().stream() + .collect( + Collectors.toMap( + (k) -> k.getKey().toLowerCase(), + Map.Entry::getValue, + (a, b) -> a, + () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + if (!headers.containsKey(Header.OPERATION_ID)) { + headers.put(Header.OPERATION_ID, options.getWorkflowId()); } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options) From 62d57435298df0e599fb0d0599dbfd82bc9df64e Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 11:17:11 -0800 Subject: [PATCH 3/3] Make sure header is sent in lower case --- .../main/java/io/temporal/internal/common/InternalUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index 753e75e77..f9fa4c637 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -93,7 +93,7 @@ public static WorkflowStub createNexusBoundStub( (a, b) -> a, () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); if (!headers.containsKey(Header.OPERATION_ID)) { - headers.put(Header.OPERATION_ID, options.getWorkflowId()); + headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId()); } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options)