Skip to content

Commit a980aba

Browse files
Access to workflow/activity instance from context
1 parent b187644 commit a980aba

17 files changed

+220
-10
lines changed

temporal-sdk/src/main/java/io/temporal/activity/ActivityExecutionContext.java

+3
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,7 @@ public interface ActivityExecutionContext {
147147
* an activity.
148148
*/
149149
WorkflowClient getWorkflowClient();
150+
151+
/** Get the currently running activity instance. */
152+
Object getInstance();
150153
}

temporal-sdk/src/main/java/io/temporal/common/interceptors/ActivityExecutionContextBase.java

+5
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,9 @@ public Scope getMetricsScope() {
9191
public WorkflowClient getWorkflowClient() {
9292
return next.getWorkflowClient();
9393
}
94+
95+
@Override
96+
public Object getInstance() {
97+
return next.getInstance();
98+
}
9499
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@
2323
import com.uber.m3.tally.Scope;
2424

2525
public interface ActivityExecutionContextFactory {
26-
InternalActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
26+
InternalActivityExecutionContext createContext(
27+
ActivityInfoInternal info, Object activity, Scope metricsScope);
2728
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ public ActivityExecutionContextFactoryImpl(
6161

6262
@Override
6363
public InternalActivityExecutionContext createContext(
64-
ActivityInfoInternal info, Scope metricsScope) {
64+
ActivityInfoInternal info, Object activity, Scope metricsScope) {
6565
return new ActivityExecutionContextImpl(
6666
client,
6767
namespace,
68+
activity,
6869
info,
6970
dataConverter,
7071
heartbeatExecutor,

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java

+8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
4949
private final Lock lock = new ReentrantLock();
5050
private final WorkflowClient client;
51+
private final Object activity;
5152
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
5253
private final Functions.Proc completionHandle;
5354
private final HeartbeatContext heartbeatContext;
@@ -61,6 +62,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
6162
ActivityExecutionContextImpl(
6263
WorkflowClient client,
6364
String namespace,
65+
Object activity,
6466
ActivityInfo info,
6567
DataConverter dataConverter,
6668
ScheduledExecutorService heartbeatExecutor,
@@ -71,6 +73,7 @@ class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
7173
Duration maxHeartbeatThrottleInterval,
7274
Duration defaultHeartbeatThrottleInterval) {
7375
this.client = client;
76+
this.activity = activity;
7477
this.metricsScope = metricsScope;
7578
this.info = info;
7679
this.completionHandle = completionHandle;
@@ -177,4 +180,9 @@ public Object getLastHeartbeatValue() {
177180
public WorkflowClient getWorkflowClient() {
178181
return client;
179182
}
183+
184+
@Override
185+
public Object getInstance() {
186+
return activity;
187+
}
180188
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public BaseActivityTaskExecutor(
7676
@Override
7777
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
7878
InternalActivityExecutionContext context =
79-
executionContextFactory.createContext(info, metricsScope);
79+
executionContextFactory.createContext(info, getActivity(), metricsScope);
8080
ActivityInfo activityInfo = context.getInfo();
8181
ActivitySerializationContext serializationContext =
8282
new ActivitySerializationContext(
@@ -144,6 +144,8 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
144144

145145
abstract ActivityInboundCallsInterceptor createRootInboundInterceptor();
146146

147+
abstract Object getActivity();
148+
147149
abstract Object[] provideArgs(
148150
Optional<Payloads> input, DataConverter dataConverterWithActivityContext);
149151

@@ -203,6 +205,11 @@ ActivityInboundCallsInterceptor createRootInboundInterceptor() {
203205
activity, method);
204206
}
205207

208+
@Override
209+
Object getActivity() {
210+
return activity;
211+
}
212+
206213
@Override
207214
Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
208215
return dataConverterWithActivityContext.fromPayloads(
@@ -241,6 +248,11 @@ ActivityInboundCallsInterceptor createRootInboundInterceptor() {
241248
activity);
242249
}
243250

251+
@Override
252+
Object getActivity() {
253+
return activity;
254+
}
255+
244256
@Override
245257
Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
246258
EncodedValues encodedValues = new EncodedValues(input, dataConverterWithActivityContext);

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public LocalActivityExecutionContextFactoryImpl(WorkflowClient client) {
3232

3333
@Override
3434
public InternalActivityExecutionContext createContext(
35-
ActivityInfoInternal info, Scope metricsScope) {
36-
return new LocalActivityExecutionContextImpl(client, info, metricsScope);
35+
ActivityInfoInternal info, Object activity, Scope metricsScope) {
36+
return new LocalActivityExecutionContextImpl(client, activity, info, metricsScope);
3737
}
3838
}

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030

3131
class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext {
3232
private final WorkflowClient client;
33+
private final Object activity;
3334
private final ActivityInfo info;
3435
private final Scope metricsScope;
3536

36-
LocalActivityExecutionContextImpl(WorkflowClient client, ActivityInfo info, Scope metricsScope) {
37+
LocalActivityExecutionContextImpl(
38+
WorkflowClient client, Object activity, ActivityInfo info, Scope metricsScope) {
3739
this.client = client;
40+
this.activity = activity;
3841
this.info = info;
3942
this.metricsScope = metricsScope;
4043
}
@@ -100,4 +103,9 @@ public Object getLastHeartbeatValue() {
100103
public WorkflowClient getWorkflowClient() {
101104
return client;
102105
}
106+
107+
@Override
108+
public Object getInstance() {
109+
return activity;
110+
}
103111
}

temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
3131
import io.temporal.workflow.DynamicWorkflow;
3232
import io.temporal.workflow.Functions;
33+
import java.util.Objects;
3334
import java.util.Optional;
35+
import javax.annotation.Nullable;
3436

3537
final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition {
3638

3739
private final Functions.Func1<EncodedValues, ? extends DynamicWorkflow> factory;
40+
private RootWorkflowInboundCallsInterceptor rootWorkflowInvoker;
3841
private final WorkerInterceptor[] workerInterceptors;
3942
// don't pass it down to other classes, it's a "cached" instance for internal usage only
4043
private final DataConverter dataConverterWithWorkflowContext;
@@ -52,7 +55,9 @@ public DynamicSyncWorkflowDefinition(
5255
@Override
5356
public void initialize(Optional<Payloads> input) {
5457
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
55-
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
58+
RootWorkflowInboundCallsInterceptor rootWorkflowInvoker =
59+
new RootWorkflowInboundCallsInterceptor(workflowContext, input);
60+
workflowInvoker = rootWorkflowInvoker;
5661
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
5762
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
5863
}
@@ -69,6 +74,13 @@ public Optional<Payloads> execute(Header header, Optional<Payloads> input) {
6974
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
7075
}
7176

77+
@Nullable
78+
@Override
79+
public Object getInstance() {
80+
Objects.requireNonNull(rootWorkflowInvoker, "getInstance called before initialize.");
81+
return rootWorkflowInvoker.getInstance();
82+
}
83+
7284
class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
7385
private DynamicWorkflow workflow;
7486
private Optional<Payloads> input;
@@ -79,6 +91,10 @@ public RootWorkflowInboundCallsInterceptor(
7991
this.input = input;
8092
}
8193

94+
public DynamicWorkflow getInstance() {
95+
return workflow;
96+
}
97+
8298
@Override
8399
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
84100
super.init(outboundCalls);

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.Objects;
6161
import java.util.Optional;
6262
import javax.annotation.Nonnull;
63+
import javax.annotation.Nullable;
6364
import org.slf4j.Logger;
6465
import org.slf4j.LoggerFactory;
6566

@@ -315,6 +316,7 @@ private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
315316
private final Class<?> workflowImplementationClass;
316317
private final Method workflowMethod;
317318
private final Constructor<?> ctor;
319+
private RootWorkflowInboundCallsInterceptor rootWorkflowInvoker;
318320
private WorkflowInboundCallsInterceptor workflowInvoker;
319321
// don't pass it down to other classes, it's a "cached" instance for internal usage only
320322
private final DataConverter dataConverterWithWorkflowContext;
@@ -333,7 +335,8 @@ public POJOWorkflowImplementation(
333335
@Override
334336
public void initialize(Optional<Payloads> input) {
335337
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
336-
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
338+
rootWorkflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
339+
workflowInvoker = rootWorkflowInvoker;
337340
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
338341
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
339342
}
@@ -357,6 +360,13 @@ public Optional<Payloads> execute(Header header, Optional<Payloads> input)
357360
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
358361
}
359362

363+
@Nullable
364+
@Override
365+
public Object getInstance() {
366+
Objects.requireNonNull(rootWorkflowInvoker, "getInstance called before initialize.");
367+
return rootWorkflowInvoker.getInstance();
368+
}
369+
360370
private class RootWorkflowInboundCallsInterceptor
361371
extends BaseRootWorkflowInboundCallsInterceptor {
362372
private Object workflow;
@@ -368,6 +378,10 @@ public RootWorkflowInboundCallsInterceptor(
368378
this.input = input;
369379
}
370380

381+
public Object getInstance() {
382+
return workflow;
383+
}
384+
371385
@Override
372386
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
373387
super.init(outboundCalls);

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public SyncWorkflow(
101101
new SyncWorkflowContext(
102102
namespace,
103103
workflowExecution,
104+
workflow,
104105
signalDispatcher,
105106
queryDispatcher,
106107
updateDispatcher,

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
101101

102102
private final String namespace;
103103
private final WorkflowExecution workflowExecution;
104+
private final SyncWorkflowDefinition workflowDefinition;
104105
private final WorkflowImplementationOptions workflowImplementationOptions;
105106
private final DataConverter dataConverter;
106107
// to be used in this class, should not be passed down. Pass the original #dataConverter instead
@@ -125,15 +126,16 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
125126
private Map<String, NexusServiceOptions> nexusServiceOptionsMap;
126127
private boolean readOnly = false;
127128
private final WorkflowThreadLocal<UpdateInfo> currentUpdateInfo = new WorkflowThreadLocal<>();
128-
// Map of all running update handlers. Key is the update Id of the update request.
129+
// Map of all running update handlers. Key is the update ID of the update request.
129130
private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new HashMap<>();
130-
// Map of all running signal handlers. Key is the event Id of the signal event.
131+
// Map of all running signal handlers. Key is the event ID of the signal event.
131132
private Map<Long, SignalHandlerInfo> runningSignalHandlers = new HashMap<>();
132133
@Nullable private String currentDetails;
133134

134135
public SyncWorkflowContext(
135136
@Nonnull String namespace,
136137
@Nonnull WorkflowExecution workflowExecution,
138+
@Nullable SyncWorkflowDefinition workflowDefinition,
137139
SignalDispatcher signalDispatcher,
138140
QueryDispatcher queryDispatcher,
139141
UpdateDispatcher updateDispatcher,
@@ -142,6 +144,7 @@ public SyncWorkflowContext(
142144
List<ContextPropagator> contextPropagators) {
143145
this.namespace = namespace;
144146
this.workflowExecution = workflowExecution;
147+
this.workflowDefinition = workflowDefinition;
145148
this.dataConverter = dataConverter;
146149
this.dataConverterWithCurrentWorkflowContext =
147150
dataConverter.withContext(
@@ -1492,6 +1495,11 @@ public void setCurrentDetails(String details) {
14921495
currentDetails = details;
14931496
}
14941497

1498+
@Nullable
1499+
public Object getInstance() {
1500+
return workflowDefinition.getInstance();
1501+
}
1502+
14951503
@Nullable
14961504
public String getCurrentDetails() {
14971505
return currentDetails;

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowDefinition.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,20 @@
2323
import io.temporal.api.common.v1.Payloads;
2424
import io.temporal.common.interceptors.Header;
2525
import java.util.Optional;
26+
import javax.annotation.Nullable;
2627

2728
/** Workflow wrapper used by the workflow thread to start a workflow */
2829
interface SyncWorkflowDefinition {
2930

3031
/** Always called first. */
3132
void initialize(Optional<Payloads> input);
3233

34+
/**
35+
* Returns the workflow instance that is executing this code. Must be called after {@link
36+
* #initialize(Optional)}.
37+
*/
38+
@Nullable
39+
Object getInstance();
40+
3341
Optional<Payloads> execute(Header header, Optional<Payloads> input);
3442
}

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

+5
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,11 @@ public static String getCurrentDetails() {
837837
return getRootWorkflowContext().getCurrentDetails();
838838
}
839839

840+
@Nullable
841+
public static Object getInstance() {
842+
return getRootWorkflowContext().getInstance();
843+
}
844+
840845
static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
841846
return getRootWorkflowContext().getWorkflowOutboundInterceptor();
842847
}

temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java

+15
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.temporal.common.SearchAttributeUpdate;
3030
import io.temporal.common.SearchAttributes;
3131
import io.temporal.common.converter.DataConverter;
32+
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
3233
import io.temporal.failure.ActivityFailure;
3334
import io.temporal.failure.CanceledFailure;
3435
import io.temporal.failure.ChildWorkflowFailure;
@@ -1385,6 +1386,20 @@ public static String getCurrentDetails() {
13851386
return WorkflowInternal.getCurrentDetails();
13861387
}
13871388

1389+
/**
1390+
* Get the currently running workflow instance.
1391+
*
1392+
* @apiNote The instance is only available after it has been initialized. This function will
1393+
* return null if called before the workflow has been initialized. For example, this could
1394+
* happen if the function is called from a {@link WorkflowInit} constructor or {@link
1395+
* io.temporal.common.interceptors.WorkflowInboundCallsInterceptor#init(WorkflowOutboundCallsInterceptor)}.
1396+
*/
1397+
@Experimental
1398+
@Nullable
1399+
public static Object getInstance() {
1400+
return WorkflowInternal.getInstance();
1401+
}
1402+
13881403
/** Prohibit instantiation. */
13891404
private Workflow() {}
13901405
}

0 commit comments

Comments
 (0)