diff --git a/client/src/lib.rs b/client/src/lib.rs index 778257e5d..917fd22f0 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -57,6 +57,7 @@ use temporal_sdk_core_protos::{ grpc::health::v1::health_client::HealthClient, temporal::api::{ cloud::cloudservice::v1::cloud_service_client::CloudServiceClient, + common, common::v1::{Header, Payload, Payloads, RetryPolicy, WorkflowExecution, WorkflowType}, enums::v1::{TaskQueueKind, WorkflowIdConflictPolicy, WorkflowIdReusePolicy}, failure::v1::Failure, @@ -1086,6 +1087,13 @@ pub struct WorkflowOptions { /// Optionally set a retry policy for the workflow pub retry_policy: Option, + + /// Links to associate with the workflow. Ex: References to a nexus operation. + pub links: Vec, + + /// Callbacks that will be invoked upon workflow completion. For, ex, completing nexus + /// operations. + pub completion_callbacks: Vec, } #[async_trait::async_trait] @@ -1125,6 +1133,8 @@ impl WorkflowClientTrait for Client { cron_schedule: options.cron_schedule.unwrap_or_default(), request_eager_execution: options.enable_eager_workflow_start, retry_policy: options.retry_policy, + links: options.links, + completion_callbacks: options.completion_callbacks, ..Default::default() }, ) diff --git a/client/src/workflow_handle/mod.rs b/client/src/workflow_handle/mod.rs index 8752fcb7e..fff49c9fd 100644 --- a/client/src/workflow_handle/mod.rs +++ b/client/src/workflow_handle/mod.rs @@ -1,6 +1,6 @@ use crate::{InterceptedMetricsSvc, RawClientLike, WorkflowService}; use anyhow::{anyhow, bail}; -use std::marker::PhantomData; +use std::{fmt::Debug, marker::PhantomData}; use temporal_sdk_core_protos::{ coresdk::FromPayloadsExt, temporal::api::{ @@ -31,6 +31,19 @@ pub enum WorkflowExecutionResult { ContinuedAsNew, } +impl WorkflowExecutionResult +where + T: Debug, +{ + /// Unwrap the result, panicking if it was not a success + pub fn unwrap_success(self) -> T { + match self { + Self::Succeeded(t) => t, + o => panic!("Expected success, got {:?}", o), + } + } +} + /// Options for fetching workflow results #[derive(Debug, Clone, Copy)] pub struct GetWorkflowResultOpts { diff --git a/core/Cargo.toml b/core/Cargo.toml index 5eeb2cedd..bcd623662 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -97,15 +97,13 @@ temporal-sdk-core-test-utils = { path = "../test-utils" } temporal-sdk = { path = "../sdk" } tokio-stream = { version = "0.1", features = ["net"] } -[build-dependencies] -tonic-build = { workspace = true } - [[test]] name = "integ_tests" path = "../tests/main.rs" # Prevents autodiscovery, and hence these getting run with `cargo test`. Run with # `cargo test --test integ_tests` test = false +required-features = ["temporal-sdk-core-protos/serde_serialize"] [[test]] name = "heavy_tests" diff --git a/core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs b/core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs new file mode 100644 index 000000000..6323678fd --- /dev/null +++ b/core/src/worker/workflow/machines/cancel_nexus_op_state_machine.rs @@ -0,0 +1,117 @@ +use super::{ + workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand, + WFMachinesAdapter, WFMachinesError, +}; +use crate::worker::workflow::machines::HistEventData; +use rustfsm::{fsm, StateMachine, TransitionResult}; +use std::convert::TryFrom; +use temporal_sdk_core_protos::{ + coresdk::workflow_activation::ResolveCancelNexusOperation, + temporal::api::{ + command::v1::{command, RequestCancelNexusOperationCommandAttributes}, + enums::v1::{CommandType, EventType}, + }, +}; + +fsm! { + pub(super) + name CancelNexusOpMachine; + command CancelNexusOpCommand; + error WFMachinesError; + shared_state SharedState; + + RequestCancelNexusOpCommandCreated --(CommandRequestCancelNexusOpWorkflowExecution) + --> RequestCancelNexusOpCommandCreated; + + RequestCancelNexusOpCommandCreated --(NexusOpCancelRequested, on_cancel_requested) + --> CancelRequested; +} + +#[derive(Default, Clone)] +pub(super) struct SharedState { + seq: u32, +} + +#[derive(Debug, derive_more::Display)] +pub(super) enum CancelNexusOpCommand { + Requested, +} + +pub(super) fn new_nexus_op_cancel( + seq: u32, + nexus_op_scheduled_event_id: i64, +) -> NewMachineWithCommand { + let s = CancelNexusOpMachine::from_parts( + RequestCancelNexusOpCommandCreated {}.into(), + SharedState { seq }, + ); + let cmd_attrs = command::Attributes::RequestCancelNexusOperationCommandAttributes( + RequestCancelNexusOperationCommandAttributes { + scheduled_event_id: nexus_op_scheduled_event_id, + }, + ); + NewMachineWithCommand { + command: cmd_attrs, + machine: s.into(), + } +} + +#[derive(Default, Clone)] +pub(super) struct CancelRequested {} + +#[derive(Default, Clone)] +pub(super) struct RequestCancelNexusOpCommandCreated {} + +impl RequestCancelNexusOpCommandCreated { + pub(super) fn on_cancel_requested(self) -> CancelNexusOpMachineTransition { + TransitionResult::commands(vec![CancelNexusOpCommand::Requested]) + } +} + +impl TryFrom for CancelNexusOpMachineEvents { + type Error = WFMachinesError; + + fn try_from(e: HistEventData) -> Result { + let e = e.event; + Ok(match e.event_type() { + EventType::NexusOperationCancelRequested => Self::NexusOpCancelRequested, + _ => { + return Err(WFMachinesError::Nondeterminism(format!( + "Cancel external WF machine does not handle this event: {e}" + ))) + } + }) + } +} + +impl TryFrom for CancelNexusOpMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::RequestCancelNexusOperation => { + Self::CommandRequestCancelNexusOpWorkflowExecution + } + _ => return Err(()), + }) + } +} + +impl WFMachinesAdapter for CancelNexusOpMachine { + fn adapt_response( + &self, + my_command: Self::Command, + _event_info: Option, + ) -> Result, WFMachinesError> { + Ok(match my_command { + CancelNexusOpCommand::Requested => { + vec![ResolveCancelNexusOperation { + seq: self.shared_state.seq, + } + .into()] + } + }) + } +} + +impl Cancellable for CancelNexusOpMachine {} diff --git a/core/src/worker/workflow/machines/mod.rs b/core/src/worker/workflow/machines/mod.rs index dcbe9d439..bdb5fc3e2 100644 --- a/core/src/worker/workflow/machines/mod.rs +++ b/core/src/worker/workflow/machines/mod.rs @@ -2,6 +2,9 @@ mod workflow_machines; mod activity_state_machine; mod cancel_external_state_machine; +// This machine is kept commented out until cancelling externally started nexus operations is +// supported +// mod cancel_nexus_op_state_machine; mod cancel_workflow_state_machine; mod child_workflow_state_machine; mod complete_workflow_state_machine; @@ -9,6 +12,7 @@ mod continue_as_new_workflow_state_machine; mod fail_workflow_state_machine; mod local_activity_state_machine; mod modify_workflow_properties_state_machine; +mod nexus_operation_state_machine; mod patch_state_machine; mod signal_external_state_machine; mod timer_state_machine; @@ -21,10 +25,7 @@ mod transition_coverage; pub(crate) use workflow_machines::{MachinesWFTResponseContent, WorkflowMachines}; -use crate::{ - telemetry::VecDisplayer, - worker::workflow::{machines::update_state_machine::UpdateMachine, WFMachinesError}, -}; +use crate::{telemetry::VecDisplayer, worker::workflow::WFMachinesError}; use activity_state_machine::ActivityMachine; use cancel_external_state_machine::CancelExternalMachine; use cancel_workflow_state_machine::CancelWorkflowMachine; @@ -34,6 +35,7 @@ use continue_as_new_workflow_state_machine::ContinueAsNewWorkflowMachine; use fail_workflow_state_machine::FailWorkflowMachine; use local_activity_state_machine::LocalActivityMachine; use modify_workflow_properties_state_machine::ModifyWorkflowPropertiesMachine; +use nexus_operation_state_machine::NexusOperationMachine; use patch_state_machine::PatchMachine; use rustfsm::{MachineError, StateMachine}; use signal_external_state_machine::SignalExternalMachine; @@ -46,6 +48,7 @@ use temporal_sdk_core_protos::temporal::api::{ history::v1::HistoryEvent, }; use timer_state_machine::TimerMachine; +use update_state_machine::UpdateMachine; use upsert_search_attributes_state_machine::UpsertSearchAttributesMachine; use workflow_machines::MachineResponse; use workflow_task_state_machine::WorkflowTaskMachine; @@ -71,6 +74,7 @@ enum Machines { UpsertSearchAttributesMachine, ModifyWorkflowPropertiesMachine, UpdateMachine, + NexusOperationMachine, } /// Extends [rustfsm::StateMachine] with some functionality specific to the temporal SDK. diff --git a/core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/core/src/worker/workflow/machines/nexus_operation_state_machine.rs new file mode 100644 index 000000000..5db650690 --- /dev/null +++ b/core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -0,0 +1,534 @@ +use crate::worker::workflow::{ + machines::{ + workflow_machines::MachineResponse, Cancellable, EventInfo, HistEventData, + NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter, + }, + WFMachinesError, +}; +use itertools::Itertools; +use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; +use temporal_sdk_core_protos::{ + coresdk::{ + nexus::{nexus_operation_result, NexusOperationResult}, + workflow_activation::{ + resolve_nexus_operation_start, ResolveNexusOperation, ResolveNexusOperationStart, + }, + workflow_commands::ScheduleNexusOperation, + }, + temporal::api::{ + command::v1::{command, RequestCancelNexusOperationCommandAttributes}, + common::v1::Payload, + enums::v1::{CommandType, EventType}, + failure::v1::{self as failure, failure::FailureInfo, Failure}, + history::v1::{ + history_event, NexusOperationCanceledEventAttributes, + NexusOperationCompletedEventAttributes, NexusOperationFailedEventAttributes, + NexusOperationStartedEventAttributes, NexusOperationTimedOutEventAttributes, + }, + }, +}; + +fsm! { + pub(super) name NexusOperationMachine; + command NexusOperationCommand; + error WFMachinesError; + shared_state SharedState; + + ScheduleCommandCreated --(CommandScheduleNexusOperation)--> ScheduleCommandCreated; + ScheduleCommandCreated + --(NexusOperationScheduled(NexusOpScheduledData), shared on_scheduled)--> ScheduledEventRecorded; + ScheduleCommandCreated --(Cancel, shared on_cancelled)--> Cancelled; + + ScheduledEventRecorded --(Cancel, shared on_issue_cancel)--> ScheduledEventRecorded; + ScheduledEventRecorded --(CommandRequestCancelNexusOperation)--> ScheduledEventRecorded; + ScheduledEventRecorded --(NexusOperationCancelRequested)--> ScheduledEventRecorded; + ScheduledEventRecorded + --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), on_completed)--> Completed; + ScheduledEventRecorded + --(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed; + ScheduledEventRecorded + --(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled; + ScheduledEventRecorded + --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut; + ScheduledEventRecorded + --(NexusOperationStarted(NexusOperationStartedEventAttributes), on_started)--> Started; + + Started --(Cancel, shared on_issue_cancel)--> Started; + Started --(CommandRequestCancelNexusOperation)--> Started; + Started --(NexusOperationCancelRequested)--> Started; + Started + --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), on_completed)--> Completed; + Started + --(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed; + Started + --(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled; + Started + --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut; + + // Ignore cancels in all terminal states + Completed --(Cancel)--> Completed; + Failed --(Cancel)--> Failed; + Cancelled --(Cancel)--> Cancelled; + TimedOut --(Cancel)--> TimedOut; +} + +#[derive(Debug, derive_more::Display)] +pub(super) enum NexusOperationCommand { + #[display("Start")] + Start { operation_id: String }, + #[display("CancelBeforeStart")] + CancelBeforeStart, + #[display("Complete")] + Complete(Option), + #[display("Fail")] + Fail(Failure), + #[display("Cancel")] + Cancel(Failure), + #[display("TimedOut")] + TimedOut(Failure), + #[display("IssueCancel")] + IssueCancel, +} + +#[derive(Clone, Debug)] +pub(super) struct SharedState { + lang_seq_num: u32, + pub(super) scheduled_event_id: i64, + endpoint: String, + service: String, + operation: String, + + cancelled_before_sent: bool, + cancel_sent: bool, +} + +impl NexusOperationMachine { + pub(super) fn new_scheduled(attribs: ScheduleNexusOperation) -> NewMachineWithCommand { + let s = Self::from_parts( + ScheduleCommandCreated.into(), + SharedState { + lang_seq_num: attribs.seq, + scheduled_event_id: 0, + endpoint: attribs.endpoint.clone(), + service: attribs.service.clone(), + operation: attribs.operation.clone(), + cancelled_before_sent: false, + cancel_sent: false, + }, + ); + NewMachineWithCommand { + command: attribs.into(), + machine: s.into(), + } + } +} + +#[derive(Default, Clone)] +pub(super) struct ScheduleCommandCreated; + +pub(super) struct NexusOpScheduledData { + event_id: i64, +} + +impl ScheduleCommandCreated { + pub(super) fn on_scheduled( + self, + state: &mut SharedState, + event_dat: NexusOpScheduledData, + ) -> NexusOperationMachineTransition { + state.scheduled_event_id = event_dat.event_id; + NexusOperationMachineTransition::default() + } + + pub(super) fn on_cancelled( + self, + state: &mut SharedState, + ) -> NexusOperationMachineTransition { + state.cancelled_before_sent = true; + NexusOperationMachineTransition::commands([NexusOperationCommand::CancelBeforeStart]) + } +} + +#[derive(Default, Clone)] +pub(super) struct ScheduledEventRecorded; + +impl ScheduledEventRecorded { + pub(crate) fn on_issue_cancel( + &self, + ss: &mut SharedState, + ) -> NexusOperationMachineTransition { + if !ss.cancel_sent { + ss.cancel_sent = true; + NexusOperationMachineTransition::commands([NexusOperationCommand::IssueCancel]) + } else { + NexusOperationMachineTransition::default() + } + } + + pub(super) fn on_completed( + self, + ca: NexusOperationCompletedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([ + NexusOperationCommand::Start { + operation_id: String::default(), + }, + NexusOperationCommand::Complete(ca.result), + ]) + } + + pub(super) fn on_failed( + self, + fa: NexusOperationFailedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([ + NexusOperationCommand::Start { + operation_id: String::default(), + }, + NexusOperationCommand::Fail(fa.failure.unwrap_or_else(|| Failure { + message: "Nexus operation failed but failure field was not populated".to_owned(), + ..Default::default() + })), + ]) + } + + pub(super) fn on_canceled( + self, + ca: NexusOperationCanceledEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([ + NexusOperationCommand::Start { + operation_id: String::default(), + }, + NexusOperationCommand::Cancel(ca.failure.unwrap_or_else(|| Failure { + message: + "Nexus operation was cancelled but failure field was not populated".to_owned(), + ..Default::default() + })), + ]) + } + + pub(super) fn on_timed_out( + self, + toa: NexusOperationTimedOutEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([ + NexusOperationCommand::Start { + operation_id: String::default(), + }, + NexusOperationCommand::TimedOut(toa.failure.unwrap_or_else(|| Failure { + message: "Nexus operation timed out but failure field was not populated".to_owned(), + ..Default::default() + })), + ]) + } + + pub(super) fn on_started( + self, + sa: NexusOperationStartedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([NexusOperationCommand::Start { + operation_id: sa.operation_id, + }]) + } +} + +#[derive(Default, Clone)] +pub(super) struct Started; + +impl Started { + pub(crate) fn on_issue_cancel( + &self, + ss: &mut SharedState, + ) -> NexusOperationMachineTransition { + if !ss.cancel_sent { + ss.cancel_sent = true; + NexusOperationMachineTransition::commands([NexusOperationCommand::IssueCancel]) + } else { + NexusOperationMachineTransition::default() + } + } + + pub(super) fn on_completed( + self, + ca: NexusOperationCompletedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([NexusOperationCommand::Complete(ca.result)]) + } + + pub(super) fn on_failed( + self, + fa: NexusOperationFailedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([NexusOperationCommand::Fail( + fa.failure.unwrap_or_else(|| Failure { + message: "Nexus operation failed but failure field was not populated".to_owned(), + ..Default::default() + }), + )]) + } + + pub(super) fn on_canceled( + self, + ca: NexusOperationCanceledEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([NexusOperationCommand::Cancel( + ca.failure.unwrap_or_else(|| Failure { + message: "Nexus operation was cancelled but failure field was not populated" + .to_owned(), + ..Default::default() + }), + )]) + } + + pub(super) fn on_timed_out( + self, + toa: NexusOperationTimedOutEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::commands([NexusOperationCommand::TimedOut( + toa.failure.unwrap_or_else(|| Failure { + message: "Nexus operation timed out but failure field was not populated".to_owned(), + ..Default::default() + }), + )]) + } +} + +#[derive(Default, Clone)] +pub(super) struct Completed; + +#[derive(Default, Clone)] +pub(super) struct Failed; + +#[derive(Default, Clone)] +pub(super) struct TimedOut; + +#[derive(Default, Clone)] +pub(super) struct Cancelled; + +impl TryFrom for NexusOperationMachineEvents { + type Error = WFMachinesError; + + fn try_from(e: HistEventData) -> Result { + let e = e.event; + Ok(match EventType::try_from(e.event_type) { + Ok(EventType::NexusOperationScheduled) => { + if let Some(history_event::Attributes::NexusOperationScheduledEventAttributes(_)) = + e.attributes + { + Self::NexusOperationScheduled(NexusOpScheduledData { + event_id: e.event_id, + }) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationScheduled attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationStarted) => { + if let Some(history_event::Attributes::NexusOperationStartedEventAttributes(sa)) = + e.attributes + { + Self::NexusOperationStarted(sa) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationStarted attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationCompleted) => { + if let Some(history_event::Attributes::NexusOperationCompletedEventAttributes(ca)) = + e.attributes + { + Self::NexusOperationCompleted(ca) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationCompleted attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationFailed) => { + if let Some(history_event::Attributes::NexusOperationFailedEventAttributes(fa)) = + e.attributes + { + Self::NexusOperationFailed(fa) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationFailed attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationCanceled) => { + if let Some(history_event::Attributes::NexusOperationCanceledEventAttributes(ca)) = + e.attributes + { + Self::NexusOperationCanceled(ca) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationCanceled attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationTimedOut) => { + if let Some(history_event::Attributes::NexusOperationTimedOutEventAttributes(toa)) = + e.attributes + { + Self::NexusOperationTimedOut(toa) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationTimedOut attributes were unset or malformed".to_string(), + )); + } + } + Ok(EventType::NexusOperationCancelRequested) => Self::NexusOperationCancelRequested, + _ => { + return Err(WFMachinesError::Nondeterminism(format!( + "Nexus operation machine does not handle this event: {e:?}" + ))) + } + }) + } +} + +impl WFMachinesAdapter for NexusOperationMachine { + fn adapt_response( + &self, + my_command: Self::Command, + _: Option, + ) -> Result, WFMachinesError> { + Ok(match my_command { + NexusOperationCommand::Start { operation_id } => { + vec![ResolveNexusOperationStart { + seq: self.shared_state.lang_seq_num, + status: Some(resolve_nexus_operation_start::Status::OperationId( + operation_id, + )), + } + .into()] + } + NexusOperationCommand::CancelBeforeStart => { + vec![ + ResolveNexusOperationStart { + seq: self.shared_state.lang_seq_num, + status: Some(resolve_nexus_operation_start::Status::CancelledBeforeStart( + self.cancelled_failure( + "Nexus Operation cancelled before scheduled".to_owned(), + ), + )), + } + .into(), + ResolveNexusOperation { + seq: self.shared_state.lang_seq_num, + result: Some(NexusOperationResult { + status: Some(nexus_operation_result::Status::Cancelled( + self.cancelled_failure( + "Nexus Operation cancelled before scheduled".to_owned(), + ), + )), + }), + } + .into(), + ] + } + NexusOperationCommand::Complete(c) => { + vec![ResolveNexusOperation { + seq: self.shared_state.lang_seq_num, + result: Some(NexusOperationResult { + status: Some(nexus_operation_result::Status::Completed( + c.unwrap_or_default(), + )), + }), + } + .into()] + } + NexusOperationCommand::Fail(f) => { + vec![ResolveNexusOperation { + seq: self.shared_state.lang_seq_num, + result: Some(NexusOperationResult { + status: Some(nexus_operation_result::Status::Failed(f)), + }), + } + .into()] + } + NexusOperationCommand::Cancel(f) => { + vec![ResolveNexusOperation { + seq: self.shared_state.lang_seq_num, + result: Some(NexusOperationResult { + status: Some(nexus_operation_result::Status::Cancelled(f)), + }), + } + .into()] + } + NexusOperationCommand::TimedOut(f) => { + vec![ResolveNexusOperation { + seq: self.shared_state.lang_seq_num, + result: Some(NexusOperationResult { + status: Some(nexus_operation_result::Status::TimedOut(f)), + }), + } + .into()] + } + NexusOperationCommand::IssueCancel => { + vec![MachineResponse::IssueNewCommand( + command::Attributes::RequestCancelNexusOperationCommandAttributes( + RequestCancelNexusOperationCommandAttributes { + scheduled_event_id: self.shared_state.scheduled_event_id, + }, + ) + .into(), + )] + } + }) + } +} + +impl TryFrom for NexusOperationMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::ScheduleNexusOperation => Self::CommandScheduleNexusOperation, + CommandType::RequestCancelNexusOperation => Self::CommandRequestCancelNexusOperation, + _ => return Err(()), + }) + } +} + +impl Cancellable for NexusOperationMachine { + fn cancel(&mut self) -> Result, MachineError> { + let event = NexusOperationMachineEvents::Cancel; + let cmds = OnEventWrapper::on_event_mut(self, event)?; + let mach_resps = cmds + .into_iter() + .map(|mc| self.adapt_response(mc, None)) + .flatten_ok() + .try_collect()?; + Ok(mach_resps) + } + + fn was_cancelled_before_sent_to_server(&self) -> bool { + self.shared_state.cancelled_before_sent + } +} + +impl NexusOperationMachine { + fn cancelled_failure(&self, message: String) -> Failure { + Failure { + message, + cause: Some(Box::new(Failure { + failure_info: Some(FailureInfo::CanceledFailureInfo(Default::default())), + ..Default::default() + })), + failure_info: Some(FailureInfo::NexusOperationExecutionFailureInfo( + failure::NexusOperationFailureInfo { + scheduled_event_id: self.shared_state.scheduled_event_id, + endpoint: self.shared_state.endpoint.clone(), + service: self.shared_state.service.clone(), + operation: self.shared_state.operation.clone(), + operation_id: "".to_string(), + }, + )), + ..Default::default() + } + } +} diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index eeaed89ae..9545391ae 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -27,6 +27,7 @@ use crate::{ activity_state_machine::ActivityMachine, child_workflow_state_machine::ChildWorkflowMachine, modify_workflow_properties_state_machine::modify_workflow_properties, + nexus_operation_state_machine::NexusOperationMachine, patch_state_machine::VERSION_SEARCH_ATTR_KEY, update_state_machine::UpdateMachine, upsert_search_attributes_state_machine::upsert_search_attrs_internal, HistEventData, @@ -1463,6 +1464,17 @@ impl WorkflowMachines { ))); } } + WFCommandVariant::ScheduleNexusOperation(attrs) => { + let seq = attrs.seq; + self.add_cmd_to_wf_task( + NexusOperationMachine::new_scheduled(attrs), + cmd.metadata, + CommandID::NexusOperation(seq).into(), + ); + } + WFCommandVariant::RequestCancelNexusOperation(attrs) => { + self.process_cancellation(CommandID::NexusOperation(attrs.seq))? + } WFCommandVariant::NoCommandsFromLang => (), } } @@ -1482,7 +1494,7 @@ impl WorkflowMachines { fn get_machine_key(&self, id: CommandID) -> Result { Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - WFMachinesError::Fatal(format!("Missing associated machine for {id:?}")) + WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}")) })?) } diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 311b4960d..4b4f87010 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -373,6 +373,7 @@ impl ManagedRun { mut commands: Vec, used_flags: Vec, resp_chan: Option>, + is_forced_failure: bool, ) -> Result> { let activation_was_only_eviction = self.activation_is_eviction(); let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() { @@ -446,6 +447,7 @@ impl ManagedRun { query_responses, used_flags, resp_chan, + is_forced_failure, }; // Verify we can actually apply the next workflow task, which will happen as part of @@ -617,6 +619,7 @@ impl ManagedRun { }], vec![], resp_chan, + true, ) .unwrap_or_else(|e| { dbg_panic!("Got next page request when auto-failing workflow: {e:?}"); @@ -686,6 +689,7 @@ impl ManagedRun { query_responses: completion.query_responses, has_pending_query: completion.has_pending_query, activation_was_eviction: completion.activation_was_eviction, + is_forced_failure: completion.is_forced_failure, }; self.wfm.machines.add_lang_used_flags(completion.used_flags); @@ -708,7 +712,8 @@ impl ManagedRun { self.wfm.feed_history_from_new_page(update)?; } // Don't bother applying the next task if we're evicting at the end of this activation - if !completion.activation_was_eviction { + // or are otherwise broken. + if !completion.activation_was_eviction && !self.am_broken { self.wfm.apply_next_task_if_ready()?; } let new_local_acts = self.wfm.drain_queued_local_activities(); @@ -1083,7 +1088,7 @@ impl ManagedRun { // fulfilling a query. If the activation we sent was *only* an eviction, don't send that // either. let should_respond = !(machines_wft_response.has_pending_jobs - || machines_wft_response.replaying + || (machines_wft_response.replaying && !data.is_forced_failure) || is_query_playback || data.activation_was_eviction || machines_wft_response.have_seen_terminal_event); @@ -1331,6 +1336,7 @@ struct CompletionDataForWFT { query_responses: Vec, has_pending_query: bool, activation_was_eviction: bool, + is_forced_failure: bool, } /// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data @@ -1405,13 +1411,11 @@ impl WorkflowManager { self.machines.ready_to_apply_next_wft() } - /// If there are no pending jobs for the workflow, apply the next workflow task and check - /// again if there are any jobs. Importantly, does not *drain* jobs. - /// - /// Returns true if there are jobs (before or after applying the next WFT). - fn apply_next_task_if_ready(&mut self) -> Result { + /// If there are no pending jobs for the workflow apply the next workflow task and check again + /// if there are any jobs. Importantly, does not *drain* jobs. + fn apply_next_task_if_ready(&mut self) -> Result<()> { if self.machines.has_pending_jobs() { - return Ok(true); + return Ok(()); } loop { let consumed_events = self.machines.apply_next_wft_from_history()?; @@ -1423,7 +1427,7 @@ impl WorkflowManager { break; } } - Ok(self.machines.has_pending_jobs()) + Ok(()) } /// Must be called when we're ready to respond to a WFT after handling catching up on replay @@ -1473,6 +1477,7 @@ struct RunActivationCompletion { has_pending_query: bool, query_responses: Vec, used_flags: Vec, + is_forced_failure: bool, /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 6817e1c9a..1dfd53157 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -1168,6 +1168,8 @@ enum WFCommandVariant { UpsertSearchAttributes(UpsertWorkflowSearchAttributes), ModifyWorkflowProperties(ModifyWorkflowProperties), UpdateResponse(UpdateResponse), + ScheduleNexusOperation(ScheduleNexusOperation), + RequestCancelNexusOperation(RequestCancelNexusOperation), } impl TryFrom for WFCommand { @@ -1223,6 +1225,12 @@ impl TryFrom for WFCommand { WFCommandVariant::ModifyWorkflowProperties(s) } workflow_command::Variant::UpdateResponse(s) => WFCommandVariant::UpdateResponse(s), + workflow_command::Variant::ScheduleNexusOperation(s) => { + WFCommandVariant::ScheduleNexusOperation(s) + } + workflow_command::Variant::RequestCancelNexusOperation(s) => { + WFCommandVariant::RequestCancelNexusOperation(s) + } }; Ok(Self { variant, @@ -1256,6 +1264,7 @@ enum CommandID { ChildWorkflowStart(u32), SignalExternal(u32), CancelExternal(u32), + NexusOperation(u32), } /// Details remembered from the workflow execution started event that we may need to recall later. diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index a1babfdfb..50afaa1c5 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -264,7 +264,12 @@ impl WFStream { commands, used_flags, .. - } => match rh.successful_completion(commands, used_flags, complete.response_tx) { + } => match rh.successful_completion( + commands, + used_flags, + complete.response_tx, + false, + ) { Ok(acts) => acts, Err(npr) => { self.runs_needing_fetching diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/nexus/nexus.proto b/sdk-core-protos/protos/local/temporal/sdk/core/nexus/nexus.proto new file mode 100644 index 000000000..3789ecb89 --- /dev/null +++ b/sdk-core-protos/protos/local/temporal/sdk/core/nexus/nexus.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package coresdk.nexus; +option ruby_package = "Temporalio::Internal::Bridge::Api::Nexus"; + +import "temporal/api/common/v1/message.proto"; +import "temporal/api/failure/v1/message.proto"; +import "temporal/sdk/core/common/common.proto"; + +// Used by core to resolve nexus operations. +message NexusOperationResult { + oneof status { + temporal.api.common.v1.Payload completed = 1; + temporal.api.failure.v1.Failure failed = 2; + temporal.api.failure.v1.Failure cancelled = 3; + temporal.api.failure.v1.Failure timed_out = 4; + } +} diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index a2b084202..469b754d0 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -9,6 +9,7 @@ option ruby_package = "Temporalio::Internal::Bridge::Api::WorkflowActivation"; import "google/protobuf/timestamp.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/empty.proto"; import "temporal/api/failure/v1/message.proto"; import "temporal/api/update/v1/message.proto"; import "temporal/api/common/v1/message.proto"; @@ -16,6 +17,7 @@ import "temporal/api/enums/v1/workflow.proto"; import "temporal/sdk/core/activity_result/activity_result.proto"; import "temporal/sdk/core/child_workflow/child_workflow.proto"; import "temporal/sdk/core/common/common.proto"; +import "temporal/sdk/core/nexus/nexus.proto"; // An instruction to the lang sdk to run some workflow code, whether for the first time or from // a cached state. @@ -119,6 +121,10 @@ message WorkflowActivationJob { ResolveRequestCancelExternalWorkflow resolve_request_cancel_external_workflow = 13; // A request to handle a workflow update. DoUpdate do_update = 14; + // A nexus operation started. + ResolveNexusOperationStart resolve_nexus_operation_start = 15; + // A nexus operation resolved. + ResolveNexusOperation resolve_nexus_operation = 16; // Remove the workflow identified by the [WorkflowActivation] containing this job from the // cache after performing the activation. It is guaranteed that this will be the only job // in the activation if present. @@ -316,6 +322,30 @@ message DoUpdate { bool run_validator = 7; } +message ResolveNexusOperationStart { + // Sequence number as provided by lang in the corresponding ScheduleNexusOperation command + uint32 seq = 1; + oneof status { + // The operation started asynchronously. Contains an ID that can be used to perform + // operations on the started operation by, ex, clients. A `ResolveNexusOperation` job will + // follow at some point. + string operation_id = 2; + // If true the operation "started" but only because it's also already resolved. A + // `ResolveNexusOperation` job will be in the same activation. + bool started_sync = 3; + // The operation was cancelled before it was ever sent to server (same WFT). + // Note that core will still send a `ResolveNexusOperation` job in the same activation, so + // there does not need to be an exceptional case for this in lang. + temporal.api.failure.v1.Failure cancelled_before_start = 4; + } +} + +message ResolveNexusOperation { + // Sequence number as provided by lang in the corresponding ScheduleNexusOperation command + uint32 seq = 1; + nexus.NexusOperationResult result = 2; +} + message RemoveFromCache { string message = 1; diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto index 482566ea1..e7f10b82f 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto @@ -45,6 +45,8 @@ message WorkflowCommand { UpsertWorkflowSearchAttributes upsert_workflow_search_attributes = 18; ModifyWorkflowProperties modify_workflow_properties = 19; UpdateResponse update_response = 20; + ScheduleNexusOperation schedule_nexus_operation = 21; + RequestCancelNexusOperation request_cancel_nexus_operation = 22; } } @@ -337,4 +339,38 @@ message UpdateResponse { // Must be sent once the update handler completes successfully. temporal.api.common.v1.Payload completed = 4; } +} + +// A request to begin a Nexus operation +message ScheduleNexusOperation { + // Lang's incremental sequence number, used as the operation identifier + uint32 seq = 1; + // Endpoint name, must exist in the endpoint registry or this command will fail. + string endpoint = 2; + // Service name. + string service = 3; + // Operation name. + string operation = 4; + // Input for the operation. The server converts this into Nexus request content and the + // appropriate content headers internally when sending the StartOperation request. On the + // handler side, if it is also backed by Temporal, the content is transformed back to the + // original Payload sent in this command. + temporal.api.common.v1.Payload input = 5; + // Schedule-to-close timeout for this operation. + // Indicates how long the caller is willing to wait for operation completion. + // Calls are retried internally by the server. + google.protobuf.Duration schedule_to_close_timeout = 6; + // Header to attach to the Nexus request. + // Users are responsible for encrypting sensitive data in this header as it is stored in + // workflow history and transmitted to external services as-is. This is useful for propagating + // tracing information. Note these headers are not the same as Temporal headers on internal + // activities and child workflows, these are transmitted to Nexus operations that may be + // external and are not traditional payloads. + map nexus_header = 7; +} + +// Request cancellation of a nexus operation started via `ScheduleNexusOperation` +message RequestCancelNexusOperation { + // Lang's incremental sequence number as passed to `ScheduleNexusOperation` + uint32 seq = 1; } \ No newline at end of file diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 82e158b44..0ca48a52e 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -629,6 +629,12 @@ pub mod coresdk { workflow_activation_job::Variant::DoUpdate(_) => { write!(f, "DoUpdate") } + workflow_activation_job::Variant::ResolveNexusOperationStart(_) => { + write!(f, "ResolveNexusOperationStart") + } + workflow_activation_job::Variant::ResolveNexusOperation(_) => { + write!(f, "ResolveNexusOperation") + } } } } @@ -750,6 +756,10 @@ pub mod coresdk { tonic::include_proto!("coresdk.child_workflow"); } + pub mod nexus { + tonic::include_proto!("coresdk.nexus"); + } + pub mod workflow_commands { tonic::include_proto!("coresdk.workflow_commands"); @@ -909,6 +919,18 @@ pub mod coresdk { } } + impl Display for ScheduleNexusOperation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ScheduleNexusOperation({})", self.seq) + } + } + + impl Display for RequestCancelNexusOperation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RequestCancelNexusOperation({})", self.seq) + } + } + impl QueryResult { /// Helper to construct the Temporal API query result types. pub fn into_components(self) -> (String, QueryResultType, Option, String) { @@ -1563,6 +1585,13 @@ pub mod temporal { attributes: Some(a), user_metadata: Default::default(), }, + a @ Attributes::RequestCancelNexusOperationCommandAttributes(_) => { + Self { + command_type: CommandType::RequestCancelNexusOperation as i32, + attributes: Some(a), + user_metadata: Default::default(), + } + } _ => unimplemented!(), } } @@ -1790,6 +1819,21 @@ pub mod temporal { ) } } + + impl From for command::Attributes { + fn from(c: workflow_commands::ScheduleNexusOperation) -> Self { + Self::ScheduleNexusOperationCommandAttributes( + ScheduleNexusOperationCommandAttributes { + endpoint: c.endpoint, + service: c.service, + operation: c.operation, + input: c.input, + schedule_to_close_timeout: c.schedule_to_close_timeout, + nexus_header: c.nexus_header, + }, + ) + } + } } } pub mod cloud { @@ -1979,7 +2023,17 @@ pub mod temporal { } pub mod enums { pub mod v1 { + use crate::camel_case_to_screaming_snake; + tonic::include_proto!("temporal.api.enums.v1"); + + impl EventType { + pub fn from_json_str(val: &str) -> Option { + let with_prefix = + format!("EVENT_TYPE_{}", camel_case_to_screaming_snake(val)); + EventType::from_str_name(&with_prefix) + } + } } } pub mod failure { @@ -2041,6 +2095,8 @@ pub mod temporal { | EventType::TimerStarted | EventType::UpsertWorkflowSearchAttributes | EventType::WorkflowPropertiesModified + | EventType::NexusOperationScheduled + | EventType::NexusOperationCancelRequested | EventType::WorkflowExecutionCanceled | EventType::WorkflowExecutionCompleted | EventType::WorkflowExecutionContinuedAsNew @@ -2085,6 +2141,12 @@ pub mod temporal { Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id), Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id), Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationStartedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationCompletedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationFailedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationTimedOutEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationCanceledEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::NexusOperationCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id), _ => None } }) @@ -2288,7 +2350,90 @@ pub mod temporal { } pub mod nexus { pub mod v1 { + use crate::temporal::api::{ + common, + common::v1::link::{workflow_event, WorkflowEvent}, + enums::v1::EventType, + }; + use anyhow::{anyhow, bail}; + use tonic::transport::Uri; + tonic::include_proto!("temporal.api.nexus.v1"); + + static SCHEME_PREFIX: &str = "temporal://"; + + /// Attempt to parse a nexus lint into a workflow event link + pub fn workflow_event_link_from_nexus( + l: &Link, + ) -> Result { + if !l.url.starts_with(SCHEME_PREFIX) { + bail!("Invalid scheme for nexus link: {:?}", l.url); + } + // We strip the scheme/authority portion because of + // https://github.com/hyperium/http/issues/696 + let no_authority_url = l.url.strip_prefix(SCHEME_PREFIX).unwrap(); + let uri = Uri::try_from(no_authority_url)?; + let parts = uri.into_parts(); + let path = parts.path_and_query.ok_or_else(|| { + anyhow!("Failed to parse nexus link, invalid path: {:?}", l) + })?; + let path_parts = path.path().split('/').collect::>(); + if path_parts.get(1) != Some(&"namespaces") { + bail!("Invalid path for nexus link: {:?}", l); + } + let namespace = path_parts.get(2).ok_or_else(|| { + anyhow!("Failed to parse nexus link, no namespace: {:?}", l) + })?; + if path_parts.get(3) != Some(&"workflows") { + bail!("Invalid path for nexus link, no workflows segment: {:?}", l); + } + let workflow_id = path_parts.get(4).ok_or_else(|| { + anyhow!("Failed to parse nexus link, no workflow id: {:?}", l) + })?; + let run_id = path_parts + .get(5) + .ok_or_else(|| anyhow!("Failed to parse nexus link, no run id: {:?}", l))?; + if path_parts.get(6) != Some(&"history") { + bail!("Invalid path for nexus link, no history segment: {:?}", l); + } + let reference = if let Some(query) = path.query() { + let mut eventref = workflow_event::EventReference::default(); + let query_parts = query.split('&').collect::>(); + for qp in query_parts { + let mut kv = qp.split('='); + let key = kv.next().ok_or_else(|| { + anyhow!("Failed to parse nexus link query parameter: {:?}", l) + })?; + let val = kv.next().ok_or_else(|| { + anyhow!("Failed to parse nexus link query parameter: {:?}", l) + })?; + match key { + "eventID" => { + eventref.event_id = val.parse().map_err(|_| { + anyhow!("Failed to parse nexus link event id: {:?}", l) + })?; + } + "eventType" => { + eventref.event_type = + EventType::from_json_str(val).unwrap_or_default().into() + } + _ => continue, + } + } + Some(workflow_event::Reference::EventRef(eventref)) + } else { + None + }; + + Ok(common::v1::Link { + variant: Some(common::v1::link::Variant::WorkflowEvent(WorkflowEvent { + namespace: namespace.to_string(), + workflow_id: workflow_id.to_string(), + run_id: run_id.to_string(), + reference, + })), + }) + } } } pub mod workflowservice { @@ -2391,6 +2536,25 @@ pub mod grpc { } } +/// Case conversion, used for json -> proto enum string conversion +pub fn camel_case_to_screaming_snake(val: &str) -> String { + let mut out = String::new(); + let mut last_was_upper = true; + for c in val.chars() { + if c.is_uppercase() { + if !last_was_upper { + out.push('_'); + } + out.push(c.to_ascii_uppercase()); + last_was_upper = true; + } else { + out.push(c.to_ascii_uppercase()); + last_was_upper = false; + } + } + out +} + #[cfg(test)] mod tests { use crate::temporal::api::failure::v1::Failure; diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index c47675978..b529b439a 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -56,11 +56,14 @@ pub use temporal_client::Namespace; use tracing::{field, Instrument, Span}; pub use workflow_context::{ ActivityOptions, CancellableFuture, ChildWorkflow, ChildWorkflowOptions, LocalActivityOptions, - PendingChildWorkflow, Signal, SignalData, SignalWorkflowOptions, StartedChildWorkflow, - TimerOptions, WfContext, + NexusOperationOptions, PendingChildWorkflow, Signal, SignalData, SignalWorkflowOptions, + StartedChildWorkflow, TimerOptions, WfContext, }; -use crate::{interceptors::WorkerInterceptor, workflow_context::ChildWfCommon}; +use crate::{ + interceptors::WorkerInterceptor, + workflow_context::{ChildWfCommon, NexusUnblockData, StartedNexusOperation}, +}; use anyhow::{anyhow, bail, Context}; use app_data::AppData; use futures_util::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; @@ -87,9 +90,10 @@ use temporal_sdk_core_protos::{ activity_task::{activity_task, ActivityTask}, child_workflow::ChildWorkflowResult, common::NamespacedWorkflowExecution, + nexus::NexusOperationResult, workflow_activation::{ resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus, - workflow_activation_job::Variant, WorkflowActivation, + resolve_nexus_operation_start, workflow_activation_job::Variant, WorkflowActivation, }, workflow_commands::{workflow_command, ContinueAsNewWorkflowExecution, WorkflowCommand}, workflow_completion::WorkflowActivationCompletion, @@ -601,6 +605,8 @@ enum UnblockEvent { WorkflowComplete(u32, Box), SignalExternal(u32, Option), CancelExternal(u32, Option), + NexusOperationStart(u32, Box), + NexusOperationComplete(u32, Box), } /// Result of awaiting on a timer @@ -698,6 +704,42 @@ impl Unblockable for CancelExternalWfResult { } } +type NexusStartResult = Result; +impl Unblockable for NexusStartResult { + type OtherDat = NexusUnblockData; + fn unblock(ue: UnblockEvent, od: Self::OtherDat) -> Self { + match ue { + UnblockEvent::NexusOperationStart(_, result) => match *result { + resolve_nexus_operation_start::Status::OperationId(op_id) => { + Ok(StartedNexusOperation { + operation_id: Some(op_id), + unblock_dat: od, + }) + } + resolve_nexus_operation_start::Status::StartedSync(_) => { + Ok(StartedNexusOperation { + operation_id: None, + unblock_dat: od, + }) + } + resolve_nexus_operation_start::Status::CancelledBeforeStart(f) => Err(f), + }, + _ => panic!("Invalid unblock event for nexus operation"), + } + } +} + +impl Unblockable for NexusOperationResult { + type OtherDat = (); + + fn unblock(ue: UnblockEvent, _: Self::OtherDat) -> Self { + match ue { + UnblockEvent::NexusOperationComplete(_, result) => *result, + _ => panic!("Invalid unblock event for nexus operation complete"), + } + } +} + /// Identifier for cancellable operations #[derive(Debug, Clone)] pub enum CancellableID { @@ -718,6 +760,8 @@ pub enum CancellableID { /// Identifying information about the workflow to be cancelled execution: NamespacedWorkflowExecution, }, + /// A nexus operation (waiting for start) + NexusOp(u32), } impl CancellableID { @@ -730,6 +774,7 @@ impl CancellableID { CancellableID::ChildWorkflow(seq) => *seq, CancellableID::SignalExternalWorkflow(seq) => *seq, CancellableID::ExternalWorkflow { seqnum, .. } => *seqnum, + CancellableID::NexusOp(seq) => *seq, } } } @@ -745,6 +790,10 @@ enum RustWfCmd { SubscribeChildWorkflowCompletion(CommandSubscribeChildWorkflowCompletion), SubscribeSignal(String, UnboundedSender), RegisterUpdate(String, UpdateFunctions), + SubscribeNexusOperationCompletion { + seq: u32, + unblocker: oneshot::Sender, + }, } struct CommandCreateRequest { diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 4c51d6fb7..5b8587092 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -1,17 +1,17 @@ mod options; pub use options::{ - ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, Signal, SignalData, - SignalWorkflowOptions, TimerOptions, + ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, NexusOperationOptions, Signal, + SignalData, SignalWorkflowOptions, TimerOptions, }; use crate::{ workflow_context::options::IntoWorkflowCommand, CancelExternalWfResult, CancellableID, CommandCreateRequest, CommandSubscribeChildWorkflowCompletion, IntoUpdateHandlerFunc, - IntoUpdateValidatorFunc, RustWfCmd, SignalExternalWfResult, TimerResult, UnblockEvent, - Unblockable, UpdateFunctions, + IntoUpdateValidatorFunc, NexusStartResult, RustWfCmd, SignalExternalWfResult, TimerResult, + UnblockEvent, Unblockable, UpdateFunctions, }; -use futures_util::{task::Context, FutureExt, Stream, StreamExt}; +use futures_util::{future::Shared, task::Context, FutureExt, Stream, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; use std::{ collections::HashMap, @@ -32,6 +32,7 @@ use temporal_sdk_core_protos::{ activity_result::{activity_resolution, ActivityResolution}, child_workflow::ChildWorkflowResult, common::NamespacedWorkflowExecution, + nexus::NexusOperationResult, workflow_activation::resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus, workflow_commands::{ signal_external_workflow_execution as sig_we, workflow_command, @@ -90,6 +91,7 @@ impl WfContext { next_child_workflow_sequence_number: 1, next_cancel_external_wf_sequence_number: 1, next_signal_external_wf_sequence_number: 1, + next_nexus_op_sequence_number: 1, })), }, rx, @@ -360,6 +362,31 @@ impl WfContext { )) } + /// Start a nexus operation + pub fn start_nexus_operation( + &self, + opts: NexusOperationOptions, + ) -> impl CancellableFuture { + let seq = self.seq_nums.write().next_nexus_op_seq(); + let (result_future, unblocker) = WFCommandFut::new(); + self.send(RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker }); + let (cmd, unblocker) = CancellableWFCommandFut::new_with_dat( + CancellableID::NexusOp(seq), + NexusUnblockData { + result_future: result_future.shared(), + schedule_seq: seq, + }, + ); + self.send( + CommandCreateRequest { + cmd: opts.into_command(seq), + unblocker, + } + .into(), + ); + cmd + } + /// Buffer a command to be sent in the activation reply pub(crate) fn send(&self, c: RustWfCmd) { self.chan.send(c).expect("command channel intact"); @@ -407,6 +434,7 @@ struct WfCtxProtectedDat { next_child_workflow_sequence_number: u32, next_cancel_external_wf_sequence_number: u32, next_signal_external_wf_sequence_number: u32, + next_nexus_op_sequence_number: u32, } impl WfCtxProtectedDat { @@ -435,6 +463,11 @@ impl WfCtxProtectedDat { self.next_signal_external_wf_sequence_number += 1; seq } + fn next_nexus_op_seq(&mut self) -> u32 { + let seq = self.next_nexus_op_sequence_number; + self.next_nexus_op_sequence_number += 1; + seq + } } #[derive(Clone, Debug, Default)] @@ -774,3 +807,26 @@ impl StartedChildWorkflow { cx.send_signal_wf(target, data.into()) } } + +#[derive(derive_more::Debug)] +#[debug("StartedNexusOperation{{ operation_id: {operation_id:?} }}")] +pub struct StartedNexusOperation { + /// The operation id, if the operation started asynchronously + pub operation_id: Option, + pub(crate) unblock_dat: NexusUnblockData, +} + +pub(crate) struct NexusUnblockData { + result_future: Shared>, + schedule_seq: u32, +} + +impl StartedNexusOperation { + pub async fn result(&self) -> NexusOperationResult { + self.unblock_dat.result_future.clone().await + } + + pub fn cancel(&self, cx: &WfContext) { + cx.cancel(CancellableID::NexusOp(self.unblock_dat.schedule_seq)); + } +} diff --git a/sdk/src/workflow_context/options.rs b/sdk/src/workflow_context/options.rs index 6dcafceac..9fed52826 100644 --- a/sdk/src/workflow_context/options.rs +++ b/sdk/src/workflow_context/options.rs @@ -6,7 +6,7 @@ use temporal_sdk_core_protos::{ child_workflow::ChildWorkflowCancellationType, workflow_commands::{ ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity, - StartChildWorkflowExecution, WorkflowCommand, + ScheduleNexusOperation, StartChildWorkflowExecution, WorkflowCommand, }, }, temporal::api::{ @@ -364,3 +364,52 @@ impl From for TimerOptions { } } } + +/// Options for Nexus Operations +#[derive(Default, Debug, Clone)] +pub struct NexusOperationOptions { + /// Endpoint name, must exist in the endpoint registry or this command will fail. + pub endpoint: String, + /// Service name. + pub service: String, + /// Operation name. + pub operation: String, + /// Input for the operation. The server converts this into Nexus request content and the + /// appropriate content headers internally when sending the StartOperation request. On the + /// handler side, if it is also backed by Temporal, the content is transformed back to the + /// original Payload sent in this command. + pub input: Option, + /// Schedule-to-close timeout for this operation. + /// Indicates how long the caller is willing to wait for operation completion. + /// Calls are retried internally by the server. + pub schedule_to_close_timeout: Option, + /// Header to attach to the Nexus request. + /// Users are responsible for encrypting sensitive data in this header as it is stored in + /// workflow history and transmitted to external services as-is. This is useful for propagating + /// tracing information. Note these headers are not the same as Temporal headers on internal + /// activities and child workflows, these are transmitted to Nexus operations that may be + /// external and are not traditional payloads. + pub nexus_header: HashMap, +} + +impl IntoWorkflowCommand for NexusOperationOptions { + fn into_command(self, seq: u32) -> WorkflowCommand { + WorkflowCommand { + user_metadata: None, + variant: Some( + ScheduleNexusOperation { + seq, + endpoint: self.endpoint, + service: self.service, + operation: self.operation, + input: self.input, + schedule_to_close_timeout: self + .schedule_to_close_timeout + .and_then(|t| t.try_into().ok()), + nexus_header: self.nexus_header, + } + .into(), + ), + } + } +} diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 3ae387bcf..9be6a86af 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -25,8 +25,8 @@ use temporal_sdk_core_protos::{ update_response, workflow_command, CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer, CancelWorkflowExecution, CompleteWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, RequestCancelExternalWorkflowExecution, - RequestCancelLocalActivity, ScheduleActivity, ScheduleLocalActivity, StartTimer, - UpdateResponse, WorkflowCommand, + RequestCancelLocalActivity, RequestCancelNexusOperation, ScheduleActivity, + ScheduleLocalActivity, StartTimer, UpdateResponse, WorkflowCommand, }, workflow_completion, workflow_completion::{workflow_activation_completion, WorkflowActivationCompletion}, @@ -130,6 +130,8 @@ impl WorkflowFuture { UnblockEvent::WorkflowComplete(seq, _) => CommandID::ChildWorkflowComplete(seq), UnblockEvent::SignalExternal(seq, _) => CommandID::SignalExternal(seq), UnblockEvent::CancelExternal(seq, _) => CommandID::CancelExternal(seq), + UnblockEvent::NexusOperationStart(seq, _) => CommandID::NexusOpStart(seq), + UnblockEvent::NexusOperationComplete(seq, _) => CommandID::NexusOpComplete(seq), }; let unblocker = self.command_status.remove(&cmd_id); let _ = unblocker @@ -209,7 +211,7 @@ impl WorkflowFuture { Variant::QueryWorkflow(q) => { error!( "Queries are not implemented in the Rust SDK. Got query '{}'", - q.query_id + q.query_type ); } Variant::CancelWorkflow(_) => { @@ -306,7 +308,22 @@ impl WorkflowFuture { ); } } - + Variant::ResolveNexusOperationStart(attrs) => { + self.unblock(UnblockEvent::NexusOperationStart( + attrs.seq, + Box::new( + attrs + .status + .context("Nexus operation start must have status")?, + ), + ))? + } + Variant::ResolveNexusOperation(attrs) => { + self.unblock(UnblockEvent::NexusOperationComplete( + attrs.seq, + Box::new(attrs.result.context("Nexus operation must have result")?), + ))? + } Variant::RemoveFromCache(_) => { // TODO: Need to abort any user-spawned tasks, etc. See also cancel WF. // How best to do this in executor agnostic way? Is that possible? @@ -531,6 +548,11 @@ impl WorkflowFuture { }, ) } + CancellableID::NexusOp(seq) => { + workflow_command::Variant::RequestCancelNexusOperation( + RequestCancelNexusOperation { seq }, + ) + } }; activation_cmds.push(cmd_variant.into()); } @@ -561,6 +583,9 @@ impl WorkflowFuture { workflow_command::Variant::RequestCancelExternalWorkflowExecution(req) => { CommandID::CancelExternal(req.seq) } + workflow_command::Variant::ScheduleNexusOperation(req) => { + CommandID::NexusOpStart(req.seq) + } _ => unimplemented!("Command type not implemented"), }; activation_cmds.push(cmd.cmd); @@ -602,6 +627,12 @@ impl WorkflowFuture { RustWfCmd::RegisterUpdate(name, impls) => { self.updates.insert(name, impls); } + RustWfCmd::SubscribeNexusOperationCompletion { seq, unblocker } => { + self.command_status.insert( + CommandID::NexusOpComplete(seq), + WFCommandFutInfo { unblocker }, + ); + } } } @@ -648,6 +679,8 @@ enum CommandID { ChildWorkflowComplete(u32), SignalExternal(u32), CancelExternal(u32), + NexusOpStart(u32), + NexusOpComplete(u32), } fn update_response( diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 1d696376d..e9b7619b8 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -13,11 +13,10 @@ pub use temporal_sdk_core::replay::HistoryForReplay; use crate::stream::{Stream, TryStreamExt}; use anyhow::{Context, Error}; use assert_matches::assert_matches; -use base64::{prelude::BASE64_STANDARD, Engine}; use futures_util::{future, stream, stream::FuturesUnordered, StreamExt}; use parking_lot::Mutex; use prost::Message; -use rand::{distributions::Standard, Rng}; +use rand::{distributions, Rng}; use std::{ convert::TryFrom, env, future::Future, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, @@ -196,8 +195,7 @@ impl CoreWfStarter { } fn _new(test_name: &str, runtime_override: Option) -> Self { - let rand_bytes: Vec = rand::thread_rng().sample_iter(&Standard).take(6).collect(); - let task_q_salt = BASE64_STANDARD.encode(rand_bytes); + let task_q_salt = rand_6_chars(); let task_queue = format!("{test_name}_{task_q_salt}"); let mut worker_config = integ_worker_config(&task_queue); worker_config @@ -369,7 +367,7 @@ pub struct TestWorker { inner: Worker, pub core_worker: Arc, client: Option>>, - pub started_workflows: Mutex>, + pub started_workflows: Arc>>, /// If set true (default), and a client is available, we will fetch workflow results to /// determine when they have all completed. pub fetch_results: bool, @@ -382,7 +380,7 @@ impl TestWorker { inner, core_worker, client: None, - started_workflows: Mutex::new(vec![]), + started_workflows: Arc::new(Mutex::new(vec![])), fetch_results: true, } } @@ -408,12 +406,22 @@ impl TestWorker { self.inner.register_activity(activity_type, act_function) } + /// Create a handle that can be used to submit workflows. Useful when workflows need to be + /// started concurrently with running the worker. + pub fn get_submitter_handle(&self) -> TestWorkerSubmitterHandle { + TestWorkerSubmitterHandle { + client: self.client.clone().expect("client must be set"), + tq: self.inner.task_queue().to_string(), + started_workflows: self.started_workflows.clone(), + } + } + /// Create a workflow, asking the server to start it with the provided workflow ID and using the /// provided workflow function. /// /// Increments the expected Workflow run count. /// - /// Returns the run id of the started workflow + /// Returns the run id of the started workflow (if no client has initialized returns a fake id) pub async fn submit_wf( &self, workflow_id: impl Into, @@ -421,27 +429,12 @@ impl TestWorker { input: Vec, options: WorkflowOptions, ) -> Result { - if let Some(c) = self.client.as_ref() { - let wfid = workflow_id.into(); - let res = c - .start_workflow( - input, - self.inner.task_queue().to_string(), - wfid.clone(), - workflow_type.into(), - None, - options, - ) - .await?; - self.started_workflows.lock().push(WorkflowExecutionInfo { - namespace: c.namespace().to_string(), - workflow_id: wfid, - run_id: Some(res.run_id.clone()), - }); - Ok(res.run_id) - } else { - Ok("fake_run_id".to_string()) + if self.client.is_none() { + return Ok("fake_run_id".to_string()); } + self.get_submitter_handle() + .submit_wf(workflow_id, workflow_type, input, options) + .await } /// Similar to `submit_wf` but checking that the server returns the first @@ -522,6 +515,46 @@ impl TestWorker { } } +pub struct TestWorkerSubmitterHandle { + client: Arc>, + tq: String, + started_workflows: Arc>>, +} +impl TestWorkerSubmitterHandle { + /// Create a workflow, asking the server to start it with the provided workflow ID and using the + /// provided workflow function. + /// + /// Increments the expected Workflow run count. + /// + /// Returns the run id of the started workflow + pub async fn submit_wf( + &self, + workflow_id: impl Into, + workflow_type: impl Into, + input: Vec, + options: WorkflowOptions, + ) -> Result { + let wfid = workflow_id.into(); + let res = self + .client + .start_workflow( + input, + self.tq.clone(), + wfid.clone(), + workflow_type.into(), + None, + options, + ) + .await?; + self.started_workflows.lock().push(WorkflowExecutionInfo { + namespace: self.client.namespace().to_string(), + workflow_id: wfid, + run_id: Some(res.run_id.clone()), + }); + Ok(res.run_id) + } +} + pub type BoxDynActivationHook = Box; pub enum TestWorkerShutdownCond { @@ -631,7 +664,7 @@ pub fn get_integ_tls_config() -> Option { pub fn get_integ_telem_options() -> TelemetryOptions { let mut ob = TelemetryOptionsBuilder::default(); let filter_string = - env::var("RUST_LOG").unwrap_or_else(|_| "temporal_sdk_core=INFO".to_string()); + env::var("RUST_LOG").unwrap_or_else(|_| "INFO,temporal_sdk_core=INFO".to_string()); if let Some(url) = env::var(OTEL_URL_ENV_VAR) .ok() .map(|x| x.parse::().unwrap()) @@ -823,3 +856,11 @@ pub async fn drain_pollers_and_shutdown(worker: &Arc) { ); worker.shutdown().await; } + +pub fn rand_6_chars() -> String { + rand::thread_rng() + .sample_iter(&distributions::Alphanumeric) + .take(6) + .map(char::from) + .collect() +} diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index df173847c..ae4d9e237 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -8,6 +8,7 @@ mod determinism; mod eager; mod local_activities; mod modify_wf_properties; +mod nexus; mod patches; mod replay; mod resets; @@ -23,8 +24,11 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; -use temporal_client::{WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult}; +use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions}; +use temporal_sdk::{ + interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext, + WorkflowResult, +}; use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime}; use temporal_sdk_core_api::errors::{PollWfError, WorkflowErrorType}; use temporal_sdk_core_protos::{ @@ -46,9 +50,8 @@ use temporal_sdk_core_test_utils::{ drain_pollers_and_shutdown, history_from_proto_binary, init_core_and_create_wf, init_core_replay_preloaded, schedule_activity_cmd, CoreWfStarter, WorkerTestHelpers, }; -use tokio::{join, time::sleep}; +use tokio::{join, sync::Notify, time::sleep}; use uuid::Uuid; - // TODO: We should get expected histories for these tests and confirm that the history at the end // matches. @@ -769,3 +772,96 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( ); assert!(body.contains(&match_this)); } + +#[tokio::test] +async fn history_out_of_order_on_restart() { + let wf_name = "history_out_of_order_on_restart"; + let mut starter = CoreWfStarter::new(wf_name); + starter + .worker_config + .workflow_failure_errors([WorkflowErrorType::Nondeterminism]); + let mut worker = starter.worker().await; + let mut starter2 = starter.clone_no_worker(); + let mut worker2 = starter2.worker().await; + + static HIT_SLEEP: Notify = Notify::const_new(); + + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Interrupt this sleep on first go + HIT_SLEEP.notify_one(); + ctx.timer(Duration::from_secs(5)).await; + Ok(().into()) + }); + worker.register_activity("echo", echo); + + worker2.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Timer is added after restarting workflow + ctx.timer(Duration::from_secs(1)).await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.timer(Duration::from_secs(2)).await; + Ok(().into()) + }); + worker2.register_activity("echo", echo); + worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions { + execution_timeout: Some(Duration::from_secs(20)), + ..Default::default() + }, + ) + .await + .unwrap(); + + let w1 = async { + worker.run_until_done().await.unwrap(); + }; + let w2 = async { + // wait to hit sleep + HIT_SLEEP.notified().await; + starter.shutdown().await; + // start new worker + worker2.expect_workflow_completion(wf_name, None); + worker2.run_until_done().await.unwrap(); + }; + join!(w1, w2); + // The workflow should fail with the nondeterminism error + let handle = starter + .get_client() + .await + .get_untyped_workflow_handle(wf_name, ""); + let res = handle + .get_workflow_result(Default::default()) + .await + .unwrap(); + assert_matches!(res, WorkflowExecutionResult::Failed(_)); +} diff --git a/tests/integ_tests/workflow_tests/nexus.rs b/tests/integ_tests/workflow_tests/nexus.rs new file mode 100644 index 000000000..72a31cf28 --- /dev/null +++ b/tests/integ_tests/workflow_tests/nexus.rs @@ -0,0 +1,441 @@ +use anyhow::bail; +use assert_matches::assert_matches; +use std::time::Duration; +use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions, WorkflowService}; +use temporal_sdk::{CancellableFuture, NexusOperationOptions, WfContext, WfExitValue}; +use temporal_sdk_core_protos::{ + coresdk::{ + nexus::{nexus_operation_result, NexusOperationResult}, + FromJsonPayloadExt, + }, + temporal::api::{ + common::v1::{callback, Callback}, + enums::v1::TaskQueueKind, + failure::v1::failure::FailureInfo, + nexus, + nexus::v1::{ + endpoint_target, request, start_operation_response, workflow_event_link_from_nexus, + EndpointSpec, EndpointTarget, HandlerError, StartOperationResponse, + }, + operatorservice::v1::CreateNexusEndpointRequest, + taskqueue::v1::TaskQueue, + workflowservice::v1::{ + PollNexusTaskQueueRequest, RespondNexusTaskCompletedRequest, + RespondNexusTaskFailedRequest, + }, + }, +}; +use temporal_sdk_core_test_utils::{rand_6_chars, CoreWfStarter}; +use tokio::join; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum Outcome { + Succeed, + Fail, + Cancel, + CancelAfterRecordedBeforeStarted, + Timeout, +} + +#[rstest::rstest] +#[tokio::test] +async fn nexus_basic( + #[values(Outcome::Succeed, Outcome::Fail, Outcome::Timeout)] outcome: Outcome, +) { + let wf_name = "nexus_basic"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + + let endpoint = mk_endpoint(&mut starter).await; + + worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { + let endpoint = endpoint.clone(); + async move { + let started = ctx + .start_nexus_operation(NexusOperationOptions { + endpoint, + service: "svc".to_string(), + operation: "op".to_string(), + schedule_to_close_timeout: Some(Duration::from_secs(3)), + ..Default::default() + }) + .await + .unwrap(); + let res = started.result().await; + Ok(res.into()) + } + }); + starter.start_with_worker(wf_name, &mut worker).await; + + let mut client = starter.get_client().await.get_client().clone(); + let nexus_task_handle = async { + let nt = client + .poll_nexus_task_queue(PollNexusTaskQueueRequest { + namespace: client.namespace().to_owned(), + task_queue: Some(TaskQueue { + name: starter.get_task_queue().to_owned(), + kind: TaskQueueKind::Normal.into(), + normal_name: "".to_string(), + }), + ..Default::default() + }) + .await + .unwrap() + .into_inner(); + match outcome { + Outcome::Succeed => { + client + .respond_nexus_task_completed(RespondNexusTaskCompletedRequest { + namespace: client.namespace().to_owned(), + task_token: nt.task_token, + response: Some(nexus::v1::Response { + variant: Some(nexus::v1::response::Variant::StartOperation( + StartOperationResponse { + variant: Some(start_operation_response::Variant::SyncSuccess( + start_operation_response::Sync { + payload: Some("yay".into()), + }, + )), + }, + )), + }), + ..Default::default() + }) + .await + .unwrap(); + } + Outcome::Fail => { + client + .respond_nexus_task_failed(RespondNexusTaskFailedRequest { + namespace: client.namespace().to_owned(), + task_token: nt.task_token, + error: Some(HandlerError { + error_type: "BAD_REQUEST".to_string(), // bad req is non-retryable + failure: Some(nexus::v1::Failure { + message: "busted".to_string(), + ..Default::default() + }), + }), + identity: "whatever".to_string(), + }) + .await + .unwrap(); + } + Outcome::Timeout => {} + Outcome::Cancel | Outcome::CancelAfterRecordedBeforeStarted => unimplemented!(), + } + }; + + join!(nexus_task_handle, async { + worker.run_until_done().await.unwrap(); + }); + + let res = client + .get_untyped_workflow_handle(starter.get_task_queue().to_owned(), "") + .get_workflow_result(Default::default()) + .await + .unwrap(); + let res = NexusOperationResult::from_json_payload(&res.unwrap_success()[0]).unwrap(); + match outcome { + Outcome::Succeed => { + let p = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Completed(p)) => p + ); + assert_eq!(p.data, b"yay"); + } + Outcome::Fail => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Failed(f)) => f + ); + assert_eq!(f.message, "nexus operation completed unsuccessfully"); + } + Outcome::Timeout => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::TimedOut(f)) => f + ); + assert_eq!(f.message, "nexus operation completed unsuccessfully"); + assert_eq!(f.cause.unwrap().message, "operation timed out"); + } + Outcome::Cancel | Outcome::CancelAfterRecordedBeforeStarted => unimplemented!(), + } +} + +#[rstest::rstest] +#[tokio::test] +async fn nexus_async( + #[values( + Outcome::Succeed, + Outcome::Fail, + Outcome::Cancel, + Outcome::CancelAfterRecordedBeforeStarted, + Outcome::Timeout + )] + outcome: Outcome, +) { + let wf_name = "nexus_async"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + + let endpoint = mk_endpoint(&mut starter).await; + let schedule_to_close_timeout = if outcome == Outcome::CancelAfterRecordedBeforeStarted { + // There is some internal timer on the server that won't record cancel in this case until + // after some elapsed period, so, don't time out first then. + None + } else { + Some(Duration::from_secs(5)) + }; + + worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { + let endpoint = endpoint.clone(); + async move { + let started = ctx.start_nexus_operation(NexusOperationOptions { + endpoint, + service: "svc".to_string(), + operation: "op".to_string(), + schedule_to_close_timeout, + ..Default::default() + }); + if outcome == Outcome::CancelAfterRecordedBeforeStarted { + ctx.timer(Duration::from_millis(1)).await; + started.cancel(&ctx); + } + let started = started.await.unwrap(); + let result = started.result(); + if matches!(outcome, Outcome::Cancel) { + started.cancel(&ctx); + // Make sure double-cancel doesn't cause problems + started.cancel(&ctx); + } + let res = result.await; + // Make sure cancel after completion doesn't cause problems + started.cancel(&ctx); + Ok(res.into()) + } + }); + worker.register_wf( + "async_completer".to_owned(), + move |ctx: WfContext| async move { + match outcome { + Outcome::Succeed => Ok("completed async".into()), + Outcome::Cancel => { + ctx.cancelled().await; + Ok(WfExitValue::Cancelled) + } + _ => bail!("broken"), + } + }, + ); + let submitter = worker.get_submitter_handle(); + starter.start_with_worker(wf_name, &mut worker).await; + + let mut client = starter.get_client().await.get_client().clone(); + let nexus_task_handle = async { + let nt = client + .poll_nexus_task_queue(PollNexusTaskQueueRequest { + namespace: client.namespace().to_owned(), + task_queue: Some(TaskQueue { + name: starter.get_task_queue().to_owned(), + kind: TaskQueueKind::Normal.into(), + normal_name: "".to_string(), + }), + ..Default::default() + }) + .await + .unwrap() + .into_inner(); + let start_req = assert_matches!( + nt.request.unwrap().variant.unwrap(), + request::Variant::StartOperation(sr) => sr + ); + let completer_id = format!("completer-{}", rand_6_chars()); + if !matches!( + outcome, + Outcome::Timeout | Outcome::CancelAfterRecordedBeforeStarted + ) { + // Start the workflow which will act like the nexus handler and complete the async + // operation + submitter + .submit_wf( + completer_id.clone(), + "async_completer", + vec![], + WorkflowOptions { + links: start_req + .links + .iter() + .map(workflow_event_link_from_nexus) + .collect::, _>>() + .unwrap(), + completion_callbacks: vec![Callback { + variant: Some(callback::Variant::Nexus(callback::Nexus { + url: start_req.callback, + header: start_req.callback_header, + })), + }], + ..Default::default() + }, + ) + .await + .unwrap(); + } + if outcome != Outcome::CancelAfterRecordedBeforeStarted { + // Do not say the operation started if we are trying to test this type of cancel + client + .respond_nexus_task_completed(RespondNexusTaskCompletedRequest { + namespace: client.namespace().to_owned(), + task_token: nt.task_token, + response: Some(nexus::v1::Response { + variant: Some(nexus::v1::response::Variant::StartOperation( + StartOperationResponse { + variant: Some(start_operation_response::Variant::AsyncSuccess( + start_operation_response::Async { + operation_id: "op-1".to_string(), + links: vec![], + }, + )), + }, + )), + }), + ..Default::default() + }) + .await + .unwrap(); + } + if outcome == Outcome::Cancel { + let nt = client + .poll_nexus_task_queue(PollNexusTaskQueueRequest { + namespace: client.namespace().to_owned(), + task_queue: Some(TaskQueue { + name: starter.get_task_queue().to_owned(), + kind: TaskQueueKind::Normal.into(), + normal_name: "".to_string(), + }), + ..Default::default() + }) + .await + .unwrap() + .into_inner(); + assert_matches!( + nt.request.unwrap().variant.unwrap(), + request::Variant::CancelOperation(_) + ); + client + .cancel_workflow_execution(completer_id, None, "nexus cancel".to_string(), None) + .await + .unwrap(); + } + }; + + join!(nexus_task_handle, async { + worker.run_until_done().await.unwrap(); + }); + + let res = client + .get_untyped_workflow_handle(starter.get_task_queue().to_owned(), "") + .get_workflow_result(Default::default()) + .await + .unwrap(); + let res = NexusOperationResult::from_json_payload(&res.unwrap_success()[0]).unwrap(); + match outcome { + Outcome::Succeed => { + let p = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Completed(p)) => p + ); + assert_eq!(p.data, b"\"completed async\""); + } + Outcome::Fail => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Failed(f)) => f + ); + assert_eq!(f.message, "nexus operation completed unsuccessfully"); + assert_eq!(f.cause.unwrap().message, "broken"); + } + Outcome::Cancel | Outcome::CancelAfterRecordedBeforeStarted => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Cancelled(f)) => f + ); + assert_eq!(f.message, "nexus operation completed unsuccessfully"); + let msg = if outcome == Outcome::CancelAfterRecordedBeforeStarted { + "operation canceled before it was started" + } else { + "operation canceled" + }; + assert_eq!(f.cause.unwrap().message, msg); + } + Outcome::Timeout => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::TimedOut(f)) => f + ); + assert_eq!(f.message, "nexus operation completed unsuccessfully"); + assert_eq!(f.cause.unwrap().message, "operation timed out"); + } + } +} +#[tokio::test] +async fn nexus_cancel_before_start() { + let wf_name = "nexus_cancel_before_start"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + + let endpoint = mk_endpoint(&mut starter).await; + + worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { + let endpoint = endpoint.clone(); + async move { + let started = ctx.start_nexus_operation(NexusOperationOptions { + endpoint: endpoint.clone(), + service: "svc".to_string(), + operation: "op".to_string(), + ..Default::default() + }); + started.cancel(&ctx); + let res = started.await.unwrap_err(); + assert_eq!(res.message, "Nexus Operation cancelled before scheduled"); + if let FailureInfo::NexusOperationExecutionFailureInfo(fi) = res.failure_info.unwrap() { + assert_eq!(fi.endpoint, endpoint); + assert_eq!(fi.service, "svc"); + assert_eq!(fi.operation, "op"); + } else { + panic!("unexpected failure info"); + } + Ok(().into()) + } + }); + starter.start_with_worker(wf_name, &mut worker).await; + + worker.run_until_done().await.unwrap(); +} + +async fn mk_endpoint(starter: &mut CoreWfStarter) -> String { + let client = starter.get_client().await; + let endpoint = format!("mycoolendpoint-{}", rand_6_chars()); + let mut op_client = client.get_client().inner().operator_svc().clone(); + op_client + .create_nexus_endpoint(CreateNexusEndpointRequest { + spec: Some(EndpointSpec { + name: endpoint.to_owned(), + description: None, + target: Some(EndpointTarget { + variant: Some(endpoint_target::Variant::Worker(endpoint_target::Worker { + namespace: client.namespace().to_owned(), + task_queue: starter.get_task_queue().to_owned(), + })), + }), + }), + }) + .await + .unwrap(); + // Endpoint creation can (as of server 1.25.2 at least) return before they are actually usable. + tokio::time::sleep(Duration::from_millis(800)).await; + endpoint +} diff --git a/tests/runner.rs b/tests/runner.rs index cf895a6b8..40111490d 100644 --- a/tests/runner.rs +++ b/tests/runner.rs @@ -53,11 +53,17 @@ async fn main() -> Result<(), anyhow::Error> { } = Cli::parse(); let cargo = env::var("CARGO").unwrap_or_else(|_| "cargo".to_string()); // Try building first, so that we error early on build failures & don't start server - let test_args_preamble = ["test", "--test", &test_name] - .into_iter() - .map(ToString::to_string) - .chain(cargo_test_args) - .collect::>(); + let test_args_preamble = [ + "test", + "--features", + "temporal-sdk-core-protos/serde_serialize", + "--test", + &test_name, + ] + .into_iter() + .map(ToString::to_string) + .chain(cargo_test_args) + .collect::>(); let status = Command::new(&cargo) .args([test_args_preamble.as_slice(), &["--no-run".to_string()]].concat()) .status() @@ -74,6 +80,10 @@ async fn main() -> Result<(), anyhow::Error> { // TODO: Delete when temporalCLI enables it by default. "--dynamic-config-value".to_string(), "system.enableEagerWorkflowStart=true".to_string(), + "--dynamic-config-value".to_string(), + "system.enableNexus=true".to_string(), + "--http-port".to_string(), + "7243".to_string(), "--search-attribute".to_string(), format!("{SEARCH_ATTR_TXT}=Text"), "--search-attribute".to_string(),