Skip to content

Commit

Permalink
Add support for upsert memo (#2202)
Browse files Browse the repository at this point in the history
Add support for upsert memo
  • Loading branch information
Quinn-With-Two-Ns authored Sep 5, 2024
1 parent ecd26b7 commit add6c4e
Show file tree
Hide file tree
Showing 18 changed files with 368 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ <R> R mutableSideEffect(

void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates);

void upsertMemo(Map<String, Object> memo);

/**
* Intercepts creation of a workflow child thread.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttrib
next.upsertTypedSearchAttributes(searchAttributeUpdates);
}

@Override
public void upsertMemo(Map<String, Object> memo) {
next.upsertMemo(memo);
}

@Override
public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ public static boolean isCommandEvent(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
case EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED:
return true;
default:
return false;
Expand Down Expand Up @@ -334,6 +335,8 @@ public static EventType getEventTypeForCommand(CommandType commandType) {
return EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED;
case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
return EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES;
case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES:
return EventType.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED;
}
throw new IllegalArgumentException("Unknown commandType");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ public Map<String, Payload> getHeader() {
return startedAttributes.getHeader().getFieldsMap();
}

public Payload getMemo(String key) {
return startedAttributes.getMemo().getFieldsMap().get(key);
}

int getAttempt() {
return startedAttributes.getAttempt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ boolean getVersion(
/** Updates or inserts search attributes used to index workflows. */
void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes);

/** Updates or inserts memos. */
void upsertMemo(@Nonnull Memo memo);

/**
* @return true if this flag may currently be used.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public long getRunStartedTimestampMillis() {

@Override
public Payload getMemo(String key) {
return basicWorkflowContext.getMemo(key);
return mutableState.getMemo(key);
}

@Override
Expand Down Expand Up @@ -335,6 +335,12 @@ public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
mutableState.upsertSearchAttributes(searchAttributes);
}

@Override
public void upsertMemo(@Nonnull Memo memo) {
workflowStateMachines.upsertMemo(memo);
mutableState.upsertMemo(memo);
}

@Override
public int getAttempt() {
return basicWorkflowContext.getAttempt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package io.temporal.internal.replay;

import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Memo;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

class WorkflowMutableState {
Expand All @@ -31,11 +34,17 @@ class WorkflowMutableState {
private boolean workflowMethodCompleted;
private Throwable workflowTaskFailureThrowable;
private SearchAttributes.Builder searchAttributes;
private Memo.Builder memo;

WorkflowMutableState(WorkflowExecutionStartedEventAttributes startedAttributes) {
if (startedAttributes.hasSearchAttributes()) {
this.searchAttributes = startedAttributes.getSearchAttributes().toBuilder();
}
if (startedAttributes.hasMemo()) {
this.memo = startedAttributes.getMemo().toBuilder();
} else {
this.memo = Memo.newBuilder();
}
}

boolean isCancelRequested() {
Expand Down Expand Up @@ -76,6 +85,10 @@ SearchAttributes getSearchAttributes() {
: searchAttributes.build();
}

public Payload getMemo(String key) {
return memo.build().getFieldsMap().get(key);
}

void upsertSearchAttributes(@Nullable SearchAttributes searchAttributes) {
if (searchAttributes == null || searchAttributes.getIndexedFieldsCount() == 0) {
return;
Expand All @@ -85,4 +98,8 @@ void upsertSearchAttributes(@Nullable SearchAttributes searchAttributes) {
}
this.searchAttributes.putAllIndexedFields(searchAttributes.getIndexedFieldsMap());
}

public void upsertMemo(@Nonnull Memo memo) {
this.memo.putAllFields(memo.getFieldsMap());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.statemachines;

import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.ModifyWorkflowPropertiesCommandAttributes;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.workflow.Functions;

final class WorkflowPropertiesModifiedStateMachine
extends EntityStateMachineInitialCommand<
WorkflowPropertiesModifiedStateMachine.State,
WorkflowPropertiesModifiedStateMachine.ExplicitEvent,
WorkflowPropertiesModifiedStateMachine> {

private ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes;

public static void newInstance(
ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
new WorkflowPropertiesModifiedStateMachine(
modifiedPropertiesAttributes, commandSink, stateMachineSink);
}

private WorkflowPropertiesModifiedStateMachine(
ModifyWorkflowPropertiesCommandAttributes modifiedPropertiesAttributes,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
this.modifiedPropertiesAttributes = modifiedPropertiesAttributes;
explicitEvent(ExplicitEvent.SCHEDULE);
}

enum ExplicitEvent {
SCHEDULE
}

enum State {
CREATED,
MODIFY_COMMAND_CREATED,
MODIFY_COMMAND_RECORDED,
}

public static final StateMachineDefinition<
State, ExplicitEvent, WorkflowPropertiesModifiedStateMachine>
STATE_MACHINE_DEFINITION =
StateMachineDefinition
.<State, ExplicitEvent, WorkflowPropertiesModifiedStateMachine>newInstance(
"WorkflowPropertiesModified", State.CREATED, State.MODIFY_COMMAND_RECORDED)
.add(
State.CREATED,
ExplicitEvent.SCHEDULE,
State.MODIFY_COMMAND_CREATED,
WorkflowPropertiesModifiedStateMachine::createModifyCommand)
.add(
State.MODIFY_COMMAND_CREATED,
CommandType.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES,
State.MODIFY_COMMAND_CREATED)
.add(
State.MODIFY_COMMAND_CREATED,
EventType.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED,
State.MODIFY_COMMAND_RECORDED);

private void createModifyCommand() {
addCommand(
Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES)
.setModifyWorkflowPropertiesCommandAttributes(modifiedPropertiesAttributes)
.build());
modifiedPropertiesAttributes = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,14 @@ public void upsertSearchAttributes(SearchAttributes attributes) {
UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
}

public void upsertMemo(Memo memo) {
checkEventLoopExecuting();
WorkflowPropertiesModifiedStateMachine.newInstance(
ModifyWorkflowPropertiesCommandAttributes.newBuilder().setUpsertedMemo(memo).build(),
commandSink,
stateMachineSink);
}

public void completeWorkflow(Optional<Payloads> workflowOutput) {
checkEventLoopExecuting();
CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
Expand Down Expand Up @@ -1188,6 +1196,7 @@ private void validateCommand(Command command, HistoryEvent event) {
case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
case COMMAND_TYPE_PROTOCOL_MESSAGE:
case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES:
break;
case UNRECOGNIZED:
case COMMAND_TYPE_UNSPECIFIED:
Expand Down Expand Up @@ -1343,6 +1352,7 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
case EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED:
return OptionalLong.of(event.getEventId());

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,16 @@ public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttrib
replayContext.upsertSearchAttributes(attr);
}

@Override
public void upsertMemo(Map<String, Object> memo) {
Preconditions.checkArgument(memo != null, "null memo");
Preconditions.checkArgument(!memo.isEmpty(), "empty memo");
replayContext.upsertMemo(
Memo.newBuilder()
.putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo))
.build());
}

@Nonnull
public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
return runner.newWorkflowThread(runnable, false, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,11 @@ public static void upsertTypedSearchAttributes(
getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates);
}

public static void upsertMemo(Map<String, Object> memo) {
assertNotReadOnly("upsert memo");
getWorkflowOutboundInterceptor().upsertMemo(memo);
}

public static DataConverter getDataConverter() {
return getRootWorkflowContext().getDataConverter();
}
Expand Down
11 changes: 11 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,17 @@ public static void upsertTypedSearchAttributes(
WorkflowInternal.upsertTypedSearchAttributes(searchAttributeUpdates);
}

/**
* Updates a Workflow Memos by applying {@code memoUpdates} to the existing Memos set attached to
* the workflow. Memos are additional non-indexed information attributed to workflow and can
* return by describing or listing a workflow. The type of value can be any object that are
* serializable by {@link io.temporal.common.converter.DataConverter}. To remove a memo set the
* value null.
*/
public static void upsertMemo(Map<String, Object> memo) {
WorkflowInternal.upsertMemo(memo);
}

/**
* Sets the default activity options that will be used for activity stubs that have no {@link
* ActivityOptions} specified.<br>
Expand Down
Loading

0 comments on commit add6c4e

Please sign in to comment.