Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus WF Machines & Lang Support #857

Open
wants to merge 9 commits into
base: nexus
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use temporal_sdk_core_protos::{
grpc::health::v1::health_client::HealthClient,
temporal::api::{
cloud::cloudservice::v1::cloud_service_client::CloudServiceClient,
common,
common::v1::{Header, Payload, Payloads, RetryPolicy, WorkflowExecution, WorkflowType},
enums::v1::{TaskQueueKind, WorkflowIdConflictPolicy, WorkflowIdReusePolicy},
failure::v1::Failure,
Expand Down Expand Up @@ -1086,6 +1087,13 @@ pub struct WorkflowOptions {

/// Optionally set a retry policy for the workflow
pub retry_policy: Option<RetryPolicy>,

/// Links to associate with the workflow. Ex: References to a nexus operation.
pub links: Vec<common::v1::Link>,

/// Callbacks that will be invoked upon workflow completion. For, ex, completing nexus
/// operations.
pub completion_callbacks: Vec<common::v1::Callback>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -1125,6 +1133,8 @@ impl WorkflowClientTrait for Client {
cron_schedule: options.cron_schedule.unwrap_or_default(),
request_eager_execution: options.enable_eager_workflow_start,
retry_policy: options.retry_policy,
links: options.links,
completion_callbacks: options.completion_callbacks,
..Default::default()
},
)
Expand Down
15 changes: 14 additions & 1 deletion client/src/workflow_handle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{InterceptedMetricsSvc, RawClientLike, WorkflowService};
use anyhow::{anyhow, bail};
use std::marker::PhantomData;
use std::{fmt::Debug, marker::PhantomData};
use temporal_sdk_core_protos::{
coresdk::FromPayloadsExt,
temporal::api::{
Expand Down Expand Up @@ -31,6 +31,19 @@ pub enum WorkflowExecutionResult<T> {
ContinuedAsNew,
}

impl<T> WorkflowExecutionResult<T>
where
T: Debug,
{
/// Unwrap the result, panicking if it was not a success
pub fn unwrap_success(self) -> T {
match self {
Self::Succeeded(t) => t,
o => panic!("Expected success, got {:?}", o),
}
}
}

/// Options for fetching workflow results
#[derive(Debug, Clone, Copy)]
pub struct GetWorkflowResultOpts {
Expand Down
4 changes: 1 addition & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ temporal-sdk-core-test-utils = { path = "../test-utils" }
temporal-sdk = { path = "../sdk" }
tokio-stream = { version = "0.1", features = ["net"] }

[build-dependencies]
tonic-build = { workspace = true }

[[test]]
name = "integ_tests"
path = "../tests/main.rs"
# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with
# `cargo test --test integ_tests`
test = false
required-features = ["temporal-sdk-core-protos/serde_serialize"]

[[test]]
name = "heavy_tests"
Expand Down
117 changes: 117 additions & 0 deletions core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
use std::convert::TryFrom;
use temporal_sdk_core_protos::{
coresdk::workflow_activation::ResolveCancelNexusOperation,
temporal::api::{
command::v1::{command, RequestCancelNexusOperationCommandAttributes},
enums::v1::{CommandType, EventType},
},
};

fsm! {
pub(super)
name CancelNexusOpMachine;
command CancelNexusOpCommand;
error WFMachinesError;
shared_state SharedState;

RequestCancelNexusOpCommandCreated --(CommandRequestCancelNexusOpWorkflowExecution)
--> RequestCancelNexusOpCommandCreated;

RequestCancelNexusOpCommandCreated --(NexusOpCancelRequested, on_cancel_requested)
--> CancelRequested;
}

#[derive(Default, Clone)]
pub(super) struct SharedState {
seq: u32,
}

#[derive(Debug, derive_more::Display)]
pub(super) enum CancelNexusOpCommand {
Requested,
}

pub(super) fn new_nexus_op_cancel(
seq: u32,
nexus_op_scheduled_event_id: i64,
) -> NewMachineWithCommand {
let s = CancelNexusOpMachine::from_parts(
RequestCancelNexusOpCommandCreated {}.into(),
SharedState { seq },
);
let cmd_attrs = command::Attributes::RequestCancelNexusOperationCommandAttributes(
RequestCancelNexusOperationCommandAttributes {
scheduled_event_id: nexus_op_scheduled_event_id,
},
);
NewMachineWithCommand {
command: cmd_attrs,
machine: s.into(),
}
}

#[derive(Default, Clone)]
pub(super) struct CancelRequested {}

#[derive(Default, Clone)]
pub(super) struct RequestCancelNexusOpCommandCreated {}

impl RequestCancelNexusOpCommandCreated {
pub(super) fn on_cancel_requested(self) -> CancelNexusOpMachineTransition<CancelRequested> {
TransitionResult::commands(vec![CancelNexusOpCommand::Requested])
}
}

impl TryFrom<HistEventData> for CancelNexusOpMachineEvents {
type Error = WFMachinesError;

fn try_from(e: HistEventData) -> Result<Self, Self::Error> {
let e = e.event;
Ok(match e.event_type() {
EventType::NexusOperationCancelRequested => Self::NexusOpCancelRequested,
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
"Cancel external WF machine does not handle this event: {e}"
)))
}
})
}
}

impl TryFrom<CommandType> for CancelNexusOpMachineEvents {
type Error = ();

fn try_from(c: CommandType) -> Result<Self, Self::Error> {
Ok(match c {
CommandType::RequestCancelNexusOperation => {
Self::CommandRequestCancelNexusOpWorkflowExecution
}
_ => return Err(()),
})
}
}

impl WFMachinesAdapter for CancelNexusOpMachine {
fn adapt_response(
&self,
my_command: Self::Command,
_event_info: Option<EventInfo>,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
Ok(match my_command {
CancelNexusOpCommand::Requested => {
vec![ResolveCancelNexusOperation {
seq: self.shared_state.seq,
}
.into()]
}
})
}
}

impl Cancellable for CancelNexusOpMachine {}
12 changes: 8 additions & 4 deletions core/src/worker/workflow/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ mod workflow_machines;

mod activity_state_machine;
mod cancel_external_state_machine;
mod cancel_nexus_op_state_machine;
mod cancel_workflow_state_machine;
mod child_workflow_state_machine;
mod complete_workflow_state_machine;
mod continue_as_new_workflow_state_machine;
mod fail_workflow_state_machine;
mod local_activity_state_machine;
mod modify_workflow_properties_state_machine;
mod nexus_operation_state_machine;
mod patch_state_machine;
mod signal_external_state_machine;
mod timer_state_machine;
Expand All @@ -21,19 +23,18 @@ mod transition_coverage;

pub(crate) use workflow_machines::{MachinesWFTResponseContent, WorkflowMachines};

use crate::{
telemetry::VecDisplayer,
worker::workflow::{machines::update_state_machine::UpdateMachine, WFMachinesError},
};
use crate::{telemetry::VecDisplayer, worker::workflow::WFMachinesError};
use activity_state_machine::ActivityMachine;
use cancel_external_state_machine::CancelExternalMachine;
use cancel_nexus_op_state_machine::CancelNexusOpMachine;
use cancel_workflow_state_machine::CancelWorkflowMachine;
use child_workflow_state_machine::ChildWorkflowMachine;
use complete_workflow_state_machine::CompleteWorkflowMachine;
use continue_as_new_workflow_state_machine::ContinueAsNewWorkflowMachine;
use fail_workflow_state_machine::FailWorkflowMachine;
use local_activity_state_machine::LocalActivityMachine;
use modify_workflow_properties_state_machine::ModifyWorkflowPropertiesMachine;
use nexus_operation_state_machine::NexusOperationMachine;
use patch_state_machine::PatchMachine;
use rustfsm::{MachineError, StateMachine};
use signal_external_state_machine::SignalExternalMachine;
Expand All @@ -46,6 +47,7 @@ use temporal_sdk_core_protos::temporal::api::{
history::v1::HistoryEvent,
};
use timer_state_machine::TimerMachine;
use update_state_machine::UpdateMachine;
use upsert_search_attributes_state_machine::UpsertSearchAttributesMachine;
use workflow_machines::MachineResponse;
use workflow_task_state_machine::WorkflowTaskMachine;
Expand All @@ -71,6 +73,8 @@ enum Machines {
UpsertSearchAttributesMachine,
ModifyWorkflowPropertiesMachine,
UpdateMachine,
NexusOperationMachine,
CancelNexusOpMachine,
}

/// Extends [rustfsm::StateMachine] with some functionality specific to the temporal SDK.
Expand Down
Loading
Loading